In [19]:
import time
start_time = time.time()
import random
from collections import deque
import torch
import torch.nn as nn
import torch.nn.functional as F
from tqdm import tqdm
import gzip
from Bio.SeqIO import parse
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import pickle
import os


In [24]:

with open('./database/test.fa', "rt") as handle:
    database = []
    for record in parse(handle, "fasta"):
        # print(type(record))
        database.append(record.seq)

dictionary = ['A','T','G','C']
dict_for_cmap = ['PAD','A','T','G','C']

len_cutoff = 80


sample_I = np.eye(len(dictionary))
tokenized = np.array([np.array([np.zeros(len(dictionary))]+[sample_I[dictionary.index(i)] for i in database[j][:min(len(database[j]),len_cutoff)]]+[np.zeros(len(dictionary))]*(len_cutoff-len(database[j])+1)) for j in range((len(database)))])
test_data = torch.tensor(tokenized,dtype=float).to('cuda')
test_data_CNN = torch.permute(test_data, (0,2,1)).to('cuda')
print(test_data.shape)
print(test_data_CNN.shape)

torch.Size([1000, 82, 4])
torch.Size([1000, 4, 82])


In [25]:


class LSTMModel(nn.Module):
    def __init__(self,input_dims=4,hidden_dims=1,num_output_neurons=1,num_layers = 1,bidirectionality = False):
        super().__init__()
        self.inp = input_dims
        self.num_layers = num_layers
        self.hid = hidden_dims
        self.out = num_output_neurons
        self.bidir = bidirectionality
        self.recording = True
        self.record = {}
        D = 4 if self.bidir else 1
        self.RNN = nn.LSTM(self.inp,self.hid,self.num_layers,batch_first = True,bidirectional=self.bidir)
        # self.DNN1 = nn.Linear(D*self.hid,D*self.hid)
        # self.bn1 = nn.BatchNorm1d(D*self.hid)
        # self.DNN2 = nn.Linear(D*self.hid,D*self.hid)
        # self.bn2 = nn.BatchNorm1d(D*self.hid)
        # self.DNN3 = nn.Linear(D*self.hid,D*self.hid)
        # self.bn3 = nn.BatchNorm1d(D*self.hid)
        self.DNN4 = nn.Linear(D*self.hid,self.out)
    def start_recording(self):
        self.recording = True
    def stop_recording(self):
        self.recording = False
    def forward(self,x):
        '''
        Note: x should be ending with a $ sign, the encoding of which is [0,0,0,0]. This will be passed to the DNN
        if is bidirectional, then should start and end with $ signs
        '''

        out,_ = self.RNN(x)
        # print(out,type(out))
        if not self.bidir:
            last_token = out[:,-1,:]
            # print(last_token.shape)
            # out = F.dropout(F.relu(self.bn1(self.DNN1(last_token))),p=0.1)
            # out = F.dropout(F.relu(self.bn2(self.DNN2(out))),p=0.1)
            # out = F.dropout(F.relu(self.bn3(self.DNN3(out))),p=0.1)
            # out = (self.DNN4(out))
            out = (self.DNN4(last_token))
            # print(out.shape)
            if self.recording:
                self.record = {'last_token':last_token,'out1':out}
        else:
            # NOTE: This does not function correctly DO NOT USE
            last_token = out[:,-1,:]
            first_token = out[:,0,:]
            # out = F.dropout(F.relu(self.bn1(self.DNN1(torch.concatenate((first_token,last_token),dims=-1)))),p=0.1)
            # out = F.dropout(F.relu(self.bn2(self.DNN2(out))),p=0.1)
            # out = F.dropout(F.relu(self.bn3(self.DNN3(out))),p=0.1)
            out = (self.DNN4(out))
        out = F.sigmoid(out)
        # print(out.shape)
        if self.recording:
            self.record['out'] = out

        return torch.reshape(out,(-1,self.out))


class LSTMModelWithSkip(nn.Module):
    def __init__(self,input_dims=4,hidden_dims=64,num_output_neurons=1,num_layers = 1,bidirectionality = False):
        super().__init__()
        self.inp = input_dims
        self.num_layers = num_layers
        self.hid = hidden_dims
        self.out = num_output_neurons
        self.bidir = bidirectionality
        self.recording = True
        self.record={}
        D = 4 if self.bidir else 1
        self.RNN = nn.LSTM(self.inp,self.hid,self.num_layers,batch_first = True,bidirectional=self.bidir)

        self.DNN1 = nn.Linear(D*self.hid,D*self.hid)
        self.bn1 = nn.BatchNorm1d(D*self.hid)
        self.DNN2 = nn.Linear(2*D*self.hid,D*self.hid)
        self.bn2 = nn.BatchNorm1d(D*self.hid)
        self.DNN3 = nn.Linear(2*D*self.hid,D*self.hid)
        self.bn3 = nn.BatchNorm1d(D*self.hid)
        self.DNN4 = nn.Linear(2*D*self.hid,D*self.hid)
        self.bn4 = nn.BatchNorm1d(D*self.hid)
        self.DNN5 = nn.Linear(2*D*self.hid,D*self.hid)
        self.bn5 = nn.BatchNorm1d(D*self.hid)
        self.DNN6 = nn.Linear(2*D*self.hid,D*self.hid)
        self.bn6 = nn.BatchNorm1d(D*self.hid)
        self.DNN7 = nn.Linear(2*D*self.hid,D*self.hid)
        self.bn7 = nn.BatchNorm1d(D*self.hid)
        self.DNN8 = nn.Linear(2*D*self.hid,D*self.hid)
        self.bn8 = nn.BatchNorm1d(D*self.hid)
        self.DNN9 = nn.Linear(2*D*self.hid,self.out)
    def start_recording(self):
        self.recording = True
    def stop_recording(self):
        self.recording = False
    def forward(self,x):
        '''
        Note: x should be ending with a $ sign, the encoding of which is [0,0,0,0]. This will be passed to the DNN
        if is bidirectional, then should start and end with $ signs
        '''

        out,_ = self.RNN(x)
        # print(out,type(out))
        if not self.bidir:
            last_token = out[:,-1,:]
            out1 = F.dropout(F.relu(self.bn1(self.DNN1(last_token))),p=0.1)
            out2 = F.dropout(F.relu(self.bn2(self.DNN2(torch.concatenate((last_token,out1),dim=-1)))),p=0.1)
            out3 = F.dropout(F.relu(self.bn3(self.DNN3(torch.concatenate((out1,out2),dim=-1)))),p=0.1)
            out4 = F.dropout(F.relu(self.bn4(self.DNN4(torch.concatenate((out2,out3),dim=-1)))),p=0.1)
            out5 = F.dropout(F.relu(self.bn3(self.DNN5(torch.concatenate((out3,out4),dim=-1)))),p=0.1)
            out6 = F.dropout(F.relu(self.bn4(self.DNN6(torch.concatenate((out4,out5),dim=-1)))),p=0.1)
            out7 = F.dropout(F.relu(self.bn3(self.DNN7(torch.concatenate((out5,out6),dim=-1)))),p=0.1)
            out8 = F.dropout(F.relu(self.bn4(self.DNN8(torch.concatenate((out6,out7),dim=-1)))),p=0.1)
            out = self.DNN9(torch.concatenate((out7,out8),dim=-1))
            if self.recording:
                self.record = {'last_token':last_token,'out1':out1,'out2':out2,'out3':out3,'out4':out4,'out5':out5,'out6':out6,'out7':out7,'out8':out8,'out9':out}
        else:
            last_token = out[:,-1,:]
            first_token = out[:,0,:]
            out = F.dropout(F.relu(self.bn1(self.DNN1(torch.concatenate((first_token,last_token),dims=-1)))),p=0.1)
            out = F.dropout(F.relu(self.bn2(self.DNN2(out))),p=0.1)
            out = F.dropout(F.relu(self.bn3(self.DNN3(out))),p=0.1)
            out = (self.DNN4(out))
        out = F.sigmoid(out)
        out = torch.reshape(out,(-1,self.out))
        if self.recording:
            self.record['out'] = out
        return out


class PyramidalCNN(nn.Module):
    def __init__(self,num_heads=1,output_size=4,kernel_size=2):
        super().__init__()
        '''
        Takes a batch or an unbatched input and assimilated information from the surrounding bases, until only one vector remians
        input shape = (N,1,82) 80 LENGTH OF SEQUENCE + 1 START CODE +1 END CODE
        '''
        self.output_size = output_size
        self.kernel_size = kernel_size
        self.cnn_list = []
        for i in range(81):
            if i==0:
                self.cnn_list.append(nn.Conv1d(4,num_heads,kernel_size=kernel_size,padding=0))
            elif i==80:
                self.cnn_list.append(nn.Conv1d(num_heads,output_size,kernel_size=kernel_size,padding=0))
            else:
                self.cnn_list.append(nn.Conv1d(num_heads,num_heads,kernel_size=kernel_size,padding=0))
        for i,c in enumerate(self.cnn_list):
            self.add_module(name=str(c)[:6]+'_'+str(i),module=c)
    def forward(self,x):
        # the shape of x must be N,1,82
        for c in self.cnn_list:
            x = F.relu(c(F.pad(x,(self.kernel_size-2,0))))
        return torch.reshape(x,(-1,self.output_size))
class PyramidalClassifier(nn.Module):
    def __init__(self,num_heads,output_size,kernel_size):
        super().__init__()
        self.PyCNN = PyramidalCNN(num_heads=num_heads,output_size=output_size,kernel_size=kernel_size)
        self.DNN = nn.Linear(output_size,1)
    def forward(self,x):
        x = self.PyCNN(x)
        x = F.sigmoid(self.DNN(x))
        return x


class CNNBreathing(nn.Module):
    def __init__(self,num_heads=1,output_size=4,kernel_size=2,total=5):
        super().__init__()
        '''
        Takes a batch or an unbatched input and assimilated information from the surrounding bases, until only one vector remians
        input shape = (N,1,82) 80 LENGTH OF SEQUENCE + 1 START CODE +1 END CODE
        '''
        self.output_size = output_size
        self.kernel_size = kernel_size
        self.cnn_list = []
        for i in range(total):
            if i==0:
                self.cnn_list.append(nn.Conv1d(4,num_heads,kernel_size=kernel_size))
            elif i==total-1:
                self.cnn_list.append(nn.ConvTranspose1d(num_heads,num_heads,kernel_size=kernel_size))
                self.cnn_list.append(nn.Conv1d(num_heads,output_size,kernel_size=kernel_size))
                self.cnn_list.append(nn.ConvTranspose1d(output_size,output_size,kernel_size=kernel_size))
            else:
                self.cnn_list.append(nn.ConvTranspose1d(num_heads,num_heads,kernel_size=kernel_size))
                self.cnn_list.append(nn.Conv1d(num_heads,num_heads,kernel_size=kernel_size))
        for i,c in enumerate(self.cnn_list):
            self.add_module(name=str(c)[:6]+'_'+str(i),module=c)
    def forward(self,x):
        # the shape of x must be N,1,82
        for c in self.cnn_list:
            # print(c)
            x = F.relu(c(x))
        #     print(x.shape)
        # print(x.shape)
        return torch.reshape(x,(-1,self.output_size))
class NormalCNNClassifier(nn.Module):
    def __init__(self,num_heads=4,output_size=4,kernel_size=2,total=4):
        super().__init__()
        self.output_size = output_size
        self.PyCNN = CNNBreathing(num_heads=num_heads,output_size=output_size,kernel_size=kernel_size,total=total)
        self.DNN = nn.Sequential(nn.Linear(output_size*82,512),nn.BatchNorm1d(512),nn.ReLU(),nn.Dropout(p=0.1),nn.Linear(512,1))
        
        # self.DNN = nn.Sequential(nn.Linear(output_size*82,512),nn.BatchNorm1d(512),nn.ReLU(),nn.Dropout(p=0.1),nn.Linear(512,512),nn.BatchNorm1d(512),nn.ReLU(),nn.Dropout(p=0.1),nn.Linear(512,512),nn.BatchNorm1d(512),nn.ReLU(),nn.Dropout(p=0.1),nn.Linear(512,1))
    
    def forward(self,x):
        x = self.PyCNN(x)
        x = F.sigmoid(self.DNN(torch.reshape(x,(-1,self.output_size*(82)))))
        return x


In [26]:


model_LSTM = torch.load('/storage/madhu/deep/CFG/Project/models/model_LSTMModel_0.9070017773549953.pt')
model_LSTM_skip = torch.load('/storage/madhu/deep/CFG/Project/models/model_LSTMModelWithSkip_0.8882519338123013.pt')
model_CNN_Breather = torch.load('/storage/madhu/deep/CFG/Project/models/model_NormalCNNClassifier_0.8619170400781034.pt')
model_pyramidal = torch.load('/storage/madhu/deep/CFG/Project/models/model_PyramidalClassifier_0.497459133351691.pt')

  model_LSTM = torch.load('/storage/madhu/deep/CFG/Project/models/model_LSTMModel_0.9070017773549953.pt')
  model_LSTM_skip = torch.load('/storage/madhu/deep/CFG/Project/models/model_LSTMModelWithSkip_0.8882519338123013.pt')
  model_CNN_Breather = torch.load('/storage/madhu/deep/CFG/Project/models/model_NormalCNNClassifier_0.8619170400781034.pt')
  model_pyramidal = torch.load('/storage/madhu/deep/CFG/Project/models/model_PyramidalClassifier_0.497459133351691.pt')


In [None]:
with torch.no_grad():

    model_LSTM.eval()
    model_LSTM_skip.eval()
    model_CNN_Breather.eval()
    model_pyramidal.eval()
    # print(test_data)
    y_pred_LSTM = model_LSTM(test_data).detach().cpu().numpy()
    y_pred_LSTM_skip = model_LSTM_skip(test_data).detach().cpu().numpy()
    y_pred_CNN_Breather = model_CNN_Breather(test_data_CNN).detach().cpu().numpy()
    y_pred_pyramidal = model_pyramidal(test_data_CNN).detach().cpu().numpy()
    print(y_pred_LSTM.shape)
    for i in range(len(y_pred_LSTM)):
        with open('./results/LSTM_OUT.txt','a') as f:
            f.write(f'{y_pred_LSTM[i][0]},\n')
        with open('./results/LSTM_SKIP_OUT.txt','a') as f:
            f.write(f'{y_pred_LSTM_skip[i][0]},\n')
        with open('./results/CNN_Breather_OUT.txt','a') as f:
            f.write(f'{y_pred_CNN_Breather[i][0]},\n')
        with open('./results/pyramidal_OUT.txt','a') as f:
            f.write(f'{y_pred_pyramidal[i][0]},\n')
    
    

(1000, 1)
