File created by Jinghao Chen (jinghc2@uci.edu).

Make sure file "gfp.pkl" is in the directory.

In [1]:
import pickle

In [2]:
pickle_in = open("gfp.pkl","rb")
gfp = pickle.load(pickle_in)

Check the first 5 reads from the bin #0.

In [3]:
gfp[0][:5]

['AATTAATGTGAGTTAGCTCACTCATTAGGCCCCCCAGGGTTTACACTTTTTGCTACCGTCTCGTATGTTGTTTGC',
 'AATTAAGTAGAGATCGCTCACTCATTAGGCACCCAAGCCGTTACATTTTATGCTTCAGGCCCGTACGCTGTGTGT',
 'GCTTAATGGGAGTGAGCTCACTCATTAGGCACCCCAGTCTTTAGACTTTATGCTTCCGGCTCGTATCTTGCGTGG',
 'GCCCACTCTGAGTTAGCTCAACCACTATGCACCCCAGGCTCTACGCTATTTGATTCCGGCCTGATTGTTGTGTGA',
 'AATTAATGTGAGTTAGCTCACTCATTTGGCAACCCAGTCTTTACCCTTTATGCTTCCGACTCGTATGTTGTTTGG']

The effective area is [-75,-1] so the length should be 75.

In [4]:
len(gfp[0][0])

75

Define 2 functions for future use.

In [5]:
def bp2bi(sq):
    '''base pairs to binary data'''
    l = []
    for bp in sq:
        if bp == 'A':
            l += [1,0,0,0]
        elif bp == 'C':
            l += [0,1,0,0]
        elif bp == 'G':
            l += [0,0,1,0]
        elif bp == 'T':
            l += [0,0,0,1]
        else:
            print('Error: please ensure only ACGT nucliotides contained.')
            return
    return l

In [6]:
def gfp2data(gfp,bin_min,bin_max,pt_min,pt_max):
    '''gfp to data in the standard form: feature values + label value'''
    data = []
    for k in range(bin_min,bin_max+1):
        for sq in gfp[k]:
            l = bp2bi(sq[pt_min:pt_max+1]) if pt_max != -1 else bp2bi(sq[pt_min:])
            l.append(k)
            data.append(l)
    return data

Import numpy package.

In [7]:
import numpy as np
np.random.seed(114514)

Generate standard-form data, basically feature values + label (target) value.

In [8]:
data = gfp2data(gfp,1,9,-74,-49) # CRP binding sites: [-74:-49]
data = np.array(data,np.float32)

In [9]:
data1 = gfp2data(gfp,1,9,-41,-1) # RNAP binding sites: [-41:-1]
data1 = np.array(data1,np.float32)

Check the data we obtain.

In [10]:
M, N = data.shape

In [11]:
M, N

(46054, 105)

In [12]:
M1, N1 = data1.shape

In [13]:
M1, N1

(46054, 165)

In [14]:
data_comb = np.hstack((data[:,:-1],data1))

In [15]:
M_comb, N_comb = data_comb.shape

In [16]:
M_comb, N_comb

(46054, 269)

Import packages for machine learning model:

In [17]:
from __future__ import division
import torch
torch.manual_seed(1919810)
import sys
sys.path.append('code')
import mltools as ml # make sure this is installed or already in the directory
import matplotlib.pyplot as plt   # use matplotlib for plotting with inline plots

In [18]:
X, Y = data_comb[:,:-1], data_comb[:,-1]   # get features & target
X, Y = ml.shuffleData(X,Y)       # reorder randomly rather than by class label

Usually we can rescale to improve numerical stability, speed convergence. But in this context, we have to keep minimal model in order to reserve its physical meaning, i.e., explanable from the first principle.

In [19]:
# X,_  = ml.transforms.rescale(X)

We only use batches 1-4 and 6-9 (without 5) for our initial test, and make binary labels.

In [20]:
XA, YA = X[Y!=5,:], Y[Y!=5]
YA = np.heaviside(YA-5,-1)

In [21]:
class logisticClassify(ml.classifier):     # trivial shell class to contain whatever we want
    def predict(self,X): pass               # we will replace the shell later as we extend the class
    def train(self,X): pass

In [22]:
def myPredict(self,X):
    '''Predict model on data X; return nparray of class predictions'''
    
    N1 = 104 # length of CRP binding sites
    
    X1 = X[:,:N1] # X1 is a CRP sequence
    X2 = X[:,N1:] # X2 is a RNAP sequence
    
    R = 1.98e-3 # gas const
    T = 310 # temperature at which cells were induced
    RT = R*T

    eps_c = torch.Tensor(X1) @ self.theta[4:4+N1]; # CRP binding energy
    eps_r = torch.Tensor(X2) @ self.theta[4+N1:]; # RNAP binding energy
    wi = self.theta[2]*(torch.exp(-eps_r/RT)+self.theta[1]*torch.exp(-(eps_c+eps_r+self.theta[3])/RT))
    ri = wi/(1+self.theta[1]*torch.exp(-eps_c/RT)+wi)-self.theta[0]

    Y01 = 1*(ri>0)                   # binary classification threshold; convert to integers
    Y = self.classes[Y01]           # use lookup to convert back to class values if given
    return Y                        # NOTE: returns as numpy, not torch! (b/c classes is a nparray)
                                    # (This is necessary for mltools plot to work)

In [23]:
# Create a shell classifier
class logisticClassify(ml.classifier):
    predict = myPredict              #
    train = None                     # this function will be implemented later

In [67]:
from IPython import display

def myTrain(self,X,Y,initStep=1.,stopTol=1e-4,stopEpochs=5000,alpha=0,plot=None):
    """ Train the logistic regression using stochastic gradient descent """
    
    M,N = X.shape;                     # initialize the model if necessary:
    N1 = 104 # length of CRP binding sites
    # N2 = N-N1;
    X1 = X[:,:N1] # X1 is a CRP sequence
    X2 = X[:,N1:] # X2 is a RNAP sequence
    self.classes = np.unique(Y);       # Y may have two classes, any values
    # Y01 = ml.toIndex(Y,self.classes);  # Y01 is Y, but with canonical values 0 or 1
    Y01 = Y
    
    R = 1.98e-3 # gas const
    T = 310 # temperature at which cells were induced
    RT = R*T
    
    N_theta = 4+N # the number of unknown parameters
    # if the shape of initial theta is wrong, randomly generate a correct one
    if len(self.theta)!=N_theta: self.theta=torch.randn((N_theta,1),requires_grad = True);
    
        
    # init loop variables:
    epoch=0; done=False; Jnll=[]; J01=[];            # initialize loop variables
    myrate = lambda epoch: initStep*2.0/(2.0+epoch)  # step size as a f'n of epoch
    
    opt = torch.optim.SGD([self.theta], initStep)
    sched = torch.optim.lr_scheduler.LambdaLR(opt, myrate)
    
    
    while not done:
        # Do an SGD pass through the entire data set:
        Jnll.append(0.)
        for i in np.random.permutation(M):
            # Compute predictions and loss for *just* data X[i]:
            # Note: @ operation requires Python 3.5+
            eps_c = torch.tensor(X1[i],dtype=torch.float32) @ self.theta[4:4+N1]; # CRP binding energy
            eps_r = torch.tensor(X2[i],dtype=torch.float32) @ self.theta[4+N1:]; # RNAP binding energy
            wi = self.theta[2]*(torch.exp(-eps_r/RT)+self.theta[1]*torch.exp(-(eps_c+eps_r+self.theta[3])/RT))
            ri = wi/(1+self.theta[1]*torch.exp(-eps_c/RT)+wi)-self.theta[0]
            si = 1/(1+torch.exp(-ri)); # logistic (probability) prediction of the class
            Ji_pre = -Y01[i]*torch.log(si)-(1-Y01[i])*torch.log(1-si); # torch.Tensor shape [] (scalar)
            # Ji_pre = torch.maximum( 1-(2*Y01[i]-1)*ri, 0*ri); # Hinge loss
            
            # add penalty to constrain the range of parameters
            Ji0 = Ji_pre + alpha*torch.maximum( self.theta[0]*(self.theta[0]-1), 0*self.theta[0]); 
            Ji1 = Ji0 + alpha*torch.maximum( self.theta[1]*(self.theta[1]-1), 0*self.theta[1]);
            Ji = Ji1 + alpha*torch.maximum( self.theta[2]*(self.theta[2]-1), 0*self.theta[2]);
            
            Jnll[-1] += float(Ji)/M             # find running average of surrogate loss
            opt.zero_grad()                     # Ji should be a torch.tensor of shape []
            Ji.backward()
            opt.step()
        sched.step()        

        epoch += 1

        J01.append( self.err(X,Y) )  # evaluate the current actual error rate 
        
        theta1 = self.theta.detach().numpy()
        eps_c = X1 @ theta1[4:4+N1]; # CRP binding energy
        eps_r = X2 @ theta1[4+N1:]; # RNAP binding energy
        wi = theta1[2]*(np.exp(-eps_r/RT)+theta1[1]*np.exp(-(eps_c+eps_r+theta1[3])/RT))
        tau = wi/(1+theta1[1]*np.exp(-eps_c/RT)+wi)
        
        print('at epoch '+ str(epoch) +', big: '+str(sum(tau>theta1[0]))+', small: '+str(sum(tau<theta1[0]))+ ', error rate: '+str(J01[-1]))

        ## For debugging: you may want to print current parameters & losses
        # print(self.theta, ' => ', Jnll, ' / ', J01[-1] )
        # input()   # pause for keystroke

        # check stopping criteria: exit if exceeded # of epochs ( > stopEpochs)
        done = epoch > stopEpochs or abs(Jnll[-1]) < stopTol;   # or if Jnll not changing between epochs ( < stopTol )


In [68]:
N_theta = 4+N_comb-1 # the number of unknown parameters

In [69]:
theta_ini = np.array([0.5,0.06,0.5,-3.26]+[0]*(N_comb-1)).reshape(N_theta,1) # + 0.1*np.random.randn(N_theta,1)

In [85]:
# Update our shell classifier definition
class logisticClassify(ml.classifier):
    predict = myPredict              # Now all parts are implemented
    train = myTrain

learnerA = logisticClassify()
learnerA.classes = np.unique(YA)
learnerA.theta = torch.tensor(theta_ini,requires_grad = True,dtype=torch.float32) # TODO
learnerA.train(XA,YA,initStep=1e-1,stopEpochs=100,stopTol=1e-5,alpha=10);

at epoch 1, big: [22461], small: [18215], error rate: 0.17917199331301012
at epoch 2, big: [19769], small: [20907], error rate: 0.14740879142491886
at epoch 3, big: [21107], small: [19569], error rate: 0.1554233454616973
at epoch 4, big: [21378], small: [19298], error rate: 0.15579211328547546
at epoch 5, big: [18576], small: [22100], error rate: 0.13455108663585408
at epoch 6, big: [19515], small: [21161], error rate: 0.13786999704985742
at epoch 7, big: [19022], small: [21654], error rate: 0.1339118890746386
at epoch 8, big: [19226], small: [21450], error rate: 0.13445274854951322
at epoch 9, big: [19541], small: [21135], error rate: 0.13580489723669978
at epoch 10, big: [19458], small: [21218], error rate: 0.13499360802438784
at epoch 11, big: [20416], small: [20260], error rate: 0.14202478119775788
at epoch 12, big: [19140], small: [21536], error rate: 0.13268266299537812
at epoch 13, big: [18941], small: [21735], error rate: 0.1306913167469761
at epoch 14, big: [18642], small: [22

Change back into numpy format for visualization and analysis, and save it if necessary.

In [86]:
theta_np = learnerA.theta.detach().numpy()

# pickle_out = open("data/theta230803joint.pkl","wb")
# pickle.dump(theta_np, pickle_out)
# pickle_out.close()

In [87]:
X, Y = data_comb[:,:-1], data_comb[:,-1]   # get features & target

In [88]:
N1 = 104
R = 1.98e-3 # gas const
T = 310 # temperature at which cells were induced
RT = R*T

In [89]:
X1 = X[:,:N1] # X1 is a CRP sequence
X2 = X[:,N1:] # X2 is a RNAP sequence

In [90]:
theta = theta_np

In [91]:
eps_c = X1 @ theta[4:4+N1]; # CRP binding energy
eps_r = X2 @ theta[4+N1:]; # RNAP binding energy
wi = theta[2]*(np.exp(-eps_r/RT)+theta[1]*np.exp(-(eps_c+eps_r+theta[3])/RT))
tau = wi/(1+theta[1]*np.exp(-eps_c/RT)+wi)

In [92]:
sum(tau < theta[0])

array([24665])

In [93]:
sum(tau > theta[0])

array([21389])

In [None]:
theta_np[:4] # value after 500 epochs

In [None]:
theta_np[:4] # value after 100 epochs

In [None]:
theta_np[:4] # value after 30 epochs

In [None]:
# pickle_out = open("data/theta230727joint272param.pkl","wb")
# pickle.dump(theta_np, pickle_out)
# pickle_out.close()

In [None]:
def energy_matrix(theta_raw, plot = False):
    '''generate energy matrix, the normalized energy vector and related constants'''
    '''input theta_raw has NO intercept term (pure energy vector with no shifting term)'''
    n = int((theta_raw.shape[0])/4)
    theta_shift = 0 # initialize energy shift
    theta = np.transpose(np.reshape(theta_raw,(n,4)))
    for i in range(n):
        theta_shift += min(theta[:,i]) # cummulate energy shift
        theta[:,i] -= min(theta[:,i])       
    theta_scale = theta.max() # calculate the unit/scaling factor of energy
    theta = theta/theta_scale
    theta_long = np.reshape(np.transpose(theta),theta_raw.shape)
    if plot:
        plt.matshow(theta)
        plt.colorbar(label='arbitrary unit')
        plt.style.use('classic')
        plt.show()
    return theta, theta_long, theta_shift, theta_scale

Plot the energy matrix:

In [None]:
theta_c,_,shift_c,scale_c = energy_matrix(theta_np[4:4+104],plot = True)

In [None]:
theta_c,_,shift_c,scale_c = energy_matrix(theta_np[4:4+104],plot = True)

In [None]:
theta_c,_,shift_c,scale_c = energy_matrix(theta_np[4:4+104],plot = True)

Now we turn to RNAP.

In [None]:
theta_r,_,shift_r,scale_r = energy_matrix(theta_np[4+104:],plot = True)

In [None]:
theta_r,_,shift_r,scale_r = energy_matrix(theta_np[4+104:],plot = True)

In [None]:
theta_r,_,shift_r,scale_r = energy_matrix(theta_np[4+104:],plot = True)

In [None]:
shift_c, scale_c

In [None]:
shift_c, scale_c

In [None]:
shift_c, scale_c

In [None]:
shift_r, scale_r

In [None]:
shift_r, scale_r

In [None]:
shift_r, scale_r