In [1]:
import pandas as pd
import numpy as np
import torch 
from torch import nn
from torch.nn import functional as F
from keras.preprocessing.text import Tokenizer
from keras.utils import to_categorical
from numpy import random
import math

Using TensorFlow backend.


In [3]:
def one_hot_encode_inputs(df, input_cols, seq_len=107): 
    """One-hot encode inputs"""
    results = np.empty(shape=(df.shape[0],seq_len,0))
    tokenizer = Tokenizer(char_level=True)
    for col in input_cols: 
        tokenizer.fit_on_texts(df['sequence'])
        int_sequences = tokenizer.texts_to_sequences(df['sequence'])
        results = np.append(results, to_categorical(int_sequences), axis=2)
    trend_mat = np.tile(np.reshape(np.arange(seq_len)/100, (1,seq_len,1)),(df.shape[0],1,1))
    results = np.append(results, trend_mat, axis=2)
    return results

#from Kaggle user
def pandas_list_to_array(df):
    """
    Input: dataframe of shape (x, y), containing list of length l
    Return: np.array of shape (x, l, y)
    """
    
    return np.transpose(
        np.array(df.values.tolist()),
        (0, 2, 1)
    )

In [18]:
#modified code from d2l.ai to allow for different types of layer
def get_params(num_hiddens, layer_type, num_inputs=None, num_outputs=None): 

    def normal(shape): 
        return torch.randn(size=shape) * 0.01

    params = []
    if layer_type == 'base':
        params.append(normal((num_inputs, num_hiddens)))
    
    if layer_type != 'output':
        params.append(normal((num_hiddens, num_hiddens)))
        params.append(torch.zeros(num_hiddens))
    
    else:
        params.append(normal((num_hiddens, num_outputs)))
        params.append(torch.zeros(num_outputs))

    # Attach gradients
    for param in params: 
        param.requires_grad_(True)
    
    return params

def init_rnn_state(batch_size, num_hiddens): 
    return torch.zeros((batch_size, num_hiddens))

def relu(x): 
    return x*(x>0)

def identity(x): 
    return x

#from Chapter 8 of d2l.ai
def rnn(inputs, state, params, len_pred=68): 
    #X dim: (num_steps, batch_size, num_inputs)
    W_xh, W_hh, b_h, W_hq, b_q = params
    H, = state
    outputs = []
    
    for X in inputs: 
        H = torch.tanh(torch.mm(X, W_xh)+torch.mm(H, W_hh) + b_h)
        Y = torch.mm(H, W_hq) + b_q
        outputs.append(Y)
    return torch.cat(outputs, dim=0), (H,)

#define layer class
class layer: 
    def __init__(self, layer_type, num_inputs = None, num_hiddens = None, num_outputs = None,  activation = identity): 
        self.activation = activation
        self.num_inputs, self.num_hiddens, self.num_outputs = num_inputs, num_hiddens, num_outputs
        self.layer_type = layer_type
        
        if self.layer_type == 'base':
            self.params = get_params(num_hiddens, num_inputs=num_inputs, layer_type='base')
            self.W_xh, self.W_hh, self.b_h = self.params
        elif self.layer_type == 'intermediate': 
            self.params = get_params(num_hiddens, layer_type='intermediate')
            self.W_hh, self.b_h = self.params 
        elif self.layer_type == 'output': 
            self.params = get_params(num_hiddens, num_outputs=num_outputs, 
            layer_type='output')
            self.W_hq, self.b_q = self.params
            
    def __call__(self, X =None, H=None): 
        return self.forward(X, H)
        
    def forward(self, X=None, H=None):
        
        if self.layer_type == 'base':
            
            return self.activation(torch.mm(X, self.W_xh)+torch.mm(H, self.W_hh) + self.b_h)
        elif self.layer_type == 'intermediate':
            return self.activation(torch.mm(H, self.W_hh)+self.b_h)
        elif self.layer_type == 'output': 
            return self.activation(torch.mm(H, self.W_hq)+self.b_q)
    
    def begin_state(self, batch_size): 
        self.params = self.init_state(batch_size, self.num_hiddens)

#define recurrent network (adapted from Chapter 8 of d2l.ai)
class RNNModelScratch: 
    def __init__(self,get_params, init_state): 
        self.init_state = init_state
        self.layers = []
        self.params = []
    
    def __call__(self, X, state): 
        return self.forward(X, state)
    
    def add_layer(self, layer): 
        self.layers.append(layer)
        self.params += layer.params
    
    def forward(self, XX, state): 
        #H = self.begin_state(batch_size)
        outputs = []
        H = state
        for X in XX:
            H = self.layers[0].forward(X, H)
            for l in self.layers[1:-1]: 
                H = l(H=H)
            outputs.append(self.layers[-1](H=H)) 
        return torch.cat(outputs, dim=0)
        
    def begin_state(self, batch_size): 
        return self.init_state(batch_size, self.layers[-2].num_hiddens)

#from Chapter 8 of d2l.ai, clip gradient to avoid exploding gradient issues
def grad_clipping(model, theta): 
    if isinstance(model, nn.Module): 
        params = [p for p in model.parameters() if p.requires_grad]
    else: 
        params = model.params
    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
    if norm > theta: 
        for param in params: 
            param.grad[:] *= theta / norm

#adapted from Chapter 8 of d2l.ai. One full epoch of training
def train_epoch(model, train_iter, loss, updater, 
               use_random_iter, batch_size=50, len_pred=68): 
    metric = np.zeros(2)
    state = model.begin_state(batch_size=batch_size)
    for X, Y in train_iter(train_inputs[:,0:68,:], train_labels, batch_size): 
#         if state is None or use_random_iter: 
#             state = model.begin_state(batch_size=X.shape[1])
#         else: 
#             if isinstance(model, nn.Module) and not isinstance(state, tuple): 
#                 state.detach_()
#             else: 
#                 for s in state: 
#                     s.detach_()
        y = Y.reshape((-1, Y.shape[2]))
        y_hat = model(X, state)
        l = loss(y_hat, y).mean()    #.long() converts to long data type
        if isinstance(updater, torch.optim.Optimizer): 
            updater.zero_grad()
            l.backward()
            grad_clipping(model,1)
            updater.step()
        else: 
            l.backward()
            grad_clipping(model,1)
            updater(batch_size=1)
        
        metric += np.array([l.data, 1])
    return metric[0] / metric[1], metric[1]

# loss function for mRNA vaccine application
def MCRMSE(y_hat, y): 
    mse_col = torch.mean(torch.square(y_hat - y),axis=0)
    result = torch.mean(torch.sqrt(mse_col), axis=0)
    return result

#from Chapter 8 of d2l.ai. define updater for gradient descent, and apply train_epoch
def train(model, train_iter, lr, num_epochs, use_random_iter=False): 
    loss = MCRMSE 
    updater = torch.optim.SGD(model.params, lr)
    for epoch in range(num_epochs): 
        loss_, speed = train_epoch(model, train_iter, loss, updater, 
                                  use_random_iter)
        print(loss_)

# generator for reading data
def data_iter(train_inputs, train_labels, batch_size): 
    n = train_inputs.shape[0]
    d = train_inputs.shape[2]
    q = train_labels.shape[2]
    initial_index = list(range(train_inputs.shape[0]))
    random.shuffle(initial_index)
    
    for i in range(n//batch_size):
        idx = initial_index[(i*batch_size):((i+1)*batch_size)]
        XX = train_inputs[idx]   
        YY = train_labels[idx]
        
        X = XX.transpose((1,0,2))
        Y = YY.transpose((1,0,2))
        yield torch.Tensor(X), torch.Tensor(Y)

Apply to vaccine data

In [13]:
#import data
data_dir = '/Users/carlgreen/Downloads/stanford-covid-vaccine/'
train_df = pd.read_json(data_dir + 'train.json', lines=True)
test_df = pd.read_json(data_dir + 'test.json', lines=True)
train_df = train_df.query("signal_to_noise >= 1")

In [14]:
input_cols = ['sequence', 'structure', 'predicted_loop_type']
pred_cols = ['reactivity', 'deg_Mg_pH10', 'deg_Mg_50C', 'deg_pH10', 'deg_50C']
train_inputs = one_hot_encode_inputs(train_df, input_cols)
train_inputs.shape

(2097, 107, 16)

In [15]:
train_labels = pandas_list_to_array(train_df[pred_cols])
X_train = torch.Tensor(train_inputs)

In [16]:
#build model
model = RNNModelScratch(get_params = get_params, 
                        init_state = init_rnn_state)
model.add_layer(layer(layer_type='base', num_inputs=16, num_hiddens=128, activation=relu))
model.add_layer(layer(layer_type='intermediate', num_hiddens=128, activation=relu))
model.add_layer(layer(layer_type='intermediate', num_hiddens=128, activation=relu))
model.add_layer(layer(layer_type='output', num_hiddens=128, num_outputs=5, activation=identity))

In [19]:
#train model
train(model, data_iter, lr=0.1, num_epochs=8)

0.5636317918940288
0.5078650989183565
0.5052482318587419
0.504944598529397
0.5055699835463268
0.5050962421952224
0.5050553265141278
0.5053050699757367


Not fitting well, not sure whether it's a problem with the implementation in this code, or whether a simple RNN like this simply cannot perform well. Should try to implement GRU or LSTM to see whether performance improves. 