<a href="https://colab.research.google.com/github/bgmseo/MotGen/blob/main/Motility_Prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import axes3d

import os

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

torch.set_printoptions(sci_mode=False)
batch_size = 128
workers = 0

In [None]:
### class needed before model called
latent_dim = 100
output_dim = 10
class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        n = 300
        self.l1 = nn.Linear(latent_dim+2,n)
        self.l2 = nn.Linear(n,n)
        self.l3 = nn.Linear(n,n)
        self.l4 = nn.Linear(n,n)
        self.l5 = nn.Linear(n,n)
        self.l6 = nn.Linear(n,n)
        self.l7 = nn.Linear(n,n)
        self.l8 = nn.Linear(n,n)
        self.l9 = nn.Linear(n,output_dim)
        self.activation = nn.LeakyReLU(negative_slope=0.1)

        self.bn2 = nn.BatchNorm1d(n)
        self.bn3 = nn.BatchNorm1d(n)
        self.bn4 = nn.BatchNorm1d(n)
        self.bn5 = nn.BatchNorm1d(n)
        self.bn6 = nn.BatchNorm1d(n)
        self.bn7 = nn.BatchNorm1d(n)
        self.bn8 = nn.BatchNorm1d(n)

    def forward(self, x):
        x = x.view(-1, latent_dim+2)
        x = self.activation(self.l1(x))
        x = self.bn2(self.activation(self.l2(x)))
        x = self.bn3(self.activation(self.l3(x)))
        x = self.bn4(self.activation(self.l4(x)))
        x = self.bn5(self.activation(self.l5(x)))
        x = self.bn6(self.activation(self.l6(x)))
        x = self.bn7(self.activation(self.l7(x)))
        x = self.bn8(self.activation(self.l8(x)))
        return self.l9(x)

class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        n = 300
        self.l1 = nn.Linear(output_dim+2,n)
        self.l2 = nn.Linear(n,n)
        self.l3 = nn.Linear(n,n)
        self.l4 = nn.Linear(n,n)
        self.l5 = nn.Linear(n,n)
        self.l6 = nn.Linear(n,n)
        self.l7 = nn.Linear(n,n)
        self.l8 = nn.Linear(n,n)
        self.l9 = nn.Linear(n,1)
        self.activation = nn.LeakyReLU(negative_slope=0.1)

    def forward(self, x):
        x = self.activation(self.l1(x))
        x = self.activation(self.l2(x))
        x = self.activation(self.l3(x))
        x = self.activation(self.l4(x))
        x = self.activation(self.l5(x))
        x = self.activation(self.l6(x))
        x = self.activation(self.l7(x))
        x = self.activation(self.l8(x))
        return torch.sigmoid(self.l9(x))


Load trained models

In [None]:

G = torch.load('./models/GG_final')
D = torch.load('./models/DD_final')
G.eval()
D.eval()

Discriminator(
  (l1): Linear(in_features=12, out_features=300, bias=True)
  (l2): Linear(in_features=300, out_features=300, bias=True)
  (l3): Linear(in_features=300, out_features=300, bias=True)
  (l4): Linear(in_features=300, out_features=300, bias=True)
  (l5): Linear(in_features=300, out_features=300, bias=True)
  (l6): Linear(in_features=300, out_features=300, bias=True)
  (l7): Linear(in_features=300, out_features=300, bias=True)
  (l8): Linear(in_features=300, out_features=300, bias=True)
  (l9): Linear(in_features=300, out_features=1, bias=True)
  (activation): LeakyReLU(negative_slope=0.1)
)

In [None]:
#labels used for training
env_map2 = torch.load('./env_map2')

In [None]:
# normalize
def solvex(x,u,s):
    output = ((x-u)/s)
    return output

#  denormalization
def invsolve(x,u,s):
    element = (x*s)+u
    return element

########### user parameters for desired environments #############
- Must set current motility values and desired motility features to get appropraite environmental settings!


In [None]:
########### user parameters for desired environments #############

PH0 = 6.9
temp0 = 27.0


control_motility_idx = [4,8] ## motility parameter indices to control


motility0 = [0.2787, 37.7251]     ## current experimental motility values
motility_d = [0.2503734*1.2, 30.6101*1.1] ## desired motility values
print(motility_d)

[0.30044808, 33.67111]


In [None]:
### normalize inputs (PH & temp.)
mean = env_map2[:,0]
std = env_map2[:,1]

PH0_normalized = solvex(PH0, mean[0], std[0])
temp0_normalized = solvex(temp0, mean[1], std[1])

x0 = torch.Tensor([PH0_normalized, temp0_normalized])
x0 = nn.Parameter(x0)

In [None]:
## define y0
y00 = solvex(motility0[0], mean[control_motility_idx[0]+2], std[control_motility_idx[0]+2])
y01 = solvex(motility0[1], mean[control_motility_idx[1]+2], std[control_motility_idx[1]+2])

y0 = torch.Tensor([y00, y01])


## define y_d
y_d0 = solvex(motility_d[0], mean[control_motility_idx[0]+2], std[control_motility_idx[0]+2])
y_d1 = solvex(motility_d[1], mean[control_motility_idx[1]+2], std[control_motility_idx[1]+2])

y_d = torch.Tensor([y_d0, y_d1])
print(y0, y_d, y_d - y0)

tensor([0.2082, 0.3239]) tensor([0.3220, 0.1295]) tensor([ 0.1138, -0.1944])


In [None]:
## generator input samples
nSample = 10000

ginput_test =torch.cat([x0.repeat(nSample,1), torch.randn(nSample,latent_dim)],dim=1).float().to(device)


In [None]:
## generator output 계산
goutput = G(ginput_test)

y0_est = goutput[:, control_motility_idx].mean(dim=0)
print(y0_est)
y0_est0 = invsolve(y0_est[0], mean[control_motility_idx[0]+2], std[control_motility_idx[0]+2])
y0_est1 = invsolve(y0_est[1], mean[control_motility_idx[1]+2], std[control_motility_idx[1]+2])
print([y0_est0.item(), y0_est1.item()])

tensor([0.2032, 0.1649], device='cuda:0', grad_fn=<MeanBackward1>)
[0.27775353563711125, 34.40982092541972]


In [None]:
optimizer = optim.Adam([x0], lr=0.00001, weight_decay=0.000001)
optimizer.zero_grad()

y0_est[0].backward()

print(x0.grad)
J0 = x0.grad.clone() # Jacobian row 1

tensor([-0.1197, -0.0007])


In [None]:
## generator output calculation
goutput = G(ginput_test)

y0_est = goutput[:, control_motility_idx].mean(dim=0)
print(y0_est)

tensor([0.2032, 0.1649], device='cuda:0', grad_fn=<MeanBackward1>)


In [None]:
optimizer = optim.Adam([x0], lr=0.00001, weight_decay=0.000001)
optimizer.zero_grad()

y0_est[1].backward()

print(x0.grad)
J1 = x0.grad # Jacobian row 2

tensor([-0.0934, -0.1156])


In [None]:
## Jacobian and its inverse
J = torch.cat([J0.view(1,2), J1.view(1,2)], dim=0)

# regularization
if torch.linalg.cond(J) < 1e-3:
    J[0,0] += 1e-3
    J[1,1] += 1e-3
    print('Regularized!')
invJ = torch.linalg.inv(J)

Final envrionemental settings!

In [None]:
## compute target PH and temp
stepsize = 0.
x = x0.detach() + stepsize * torch.matmul(invJ, (y_d - y0).view(2,1)).view(2)

PH1 = (x[0] * std[0] + mean[0]).item()
temp1 = (x[1] * std[1] + mean[1]).item()

print([PH1, temp1])

[6.899999989736983, 27.00000010098188]
