In [108]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as f
import torch.optim as opt
from torch.utils.data import DataLoader,TensorDataset
from torch.nn.utils.rnn import pad_packed_sequence,pack_padded_sequence
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from sklearn import metrics
from sklearn.model_selection import train_test_split
import scipy.io as sio
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [109]:
blosum62 = {
        'A': [4, -1, -2, -2, 0,  -1, -1, 0, -2,  -1, -1, -1, -1, -2, -1, 1,  0,  -3, -2, 0],  # A
        'R': [-1, 5,  0, -2, -3, 1,  0,  -2, 0,  -3, -2, 2,  -1, -3, -2, -1, -1, -3, -2, -3], # R
        'N': [-2, 0,  6,  1,  -3, 0,  0,  0,  1,  -3, -3, 0,  -2, -3, -2, 1,  0,  -4, -2, -3], # N
        'D': [-2, -2, 1,  6,  -3, 0,  2,  -1, -1, -3, -4, -1, -3, -3, -1, 0,  -1, -4, -3, -3], # D
        'C': [0,  -3, -3, -3, 9,  -3, -4, -3, -3, -1, -1, -3, -1, -2, -3, -1, -1, -2, -2, -1], # C
        'Q': [-1, 1,  0,  0,  -3, 5,  2,  -2, 0,  -3, -2, 1,  0,  -3, -1, 0,  -1, -2, -1, -2], # Q
        'E': [-1, 0,  0,  2,  -4, 2,  5,  -2, 0,  -3, -3, 1,  -2, -3, -1, 0,  -1, -3, -2, -2], # E
        'G': [0,  -2, 0,  -1, -3, -2, -2, 6,  -2, -4, -4, -2, -3, -3, -2, 0,  -2, -2, -3, -3], # G
        'H': [-2, 0,  1,  -1, -3, 0,  0,  -2, 8,  -3, -3, -1, -2, -1, -2, -1, -2, -2, 2,  -3], # H
        'I': [-1, -3, -3, -3, -1, -3, -3, -4, -3, 4,  2,  -3, 1,  0,  -3, -2, -1, -3, -1, 3],  # I
        'L': [-1, -2, -3, -4, -1, -2, -3, -4, -3, 2,  4,  -2, 2,  0,  -3, -2, -1, -2, -1, 1],  # L
        'K': [-1, 2,  0,  -1, -3, 1,  1,  -2, -1, -3, -2, 5,  -1, -3, -1, 0,  -1, -3, -2, -2], # K
        'M': [-1, -1, -2, -3, -1, 0,  -2, -3, -2, 1,  2,  -1, 5,  0,  -2, -1, -1, -1, -1, 1],  # M
        'F': [-2, -3, -3, -3, -2, -3, -3, -3, -1, 0,  0,  -3, 0,  6,  -4, -2, -2, 1,  3,  -1], # F
        'P': [-1, -2, -2, -1, -3, -1, -1, -2, -2, -3, -3, -1, -2, -4, 7,  -1, -1, -4, -3, -2], # P
        'S': [1,  -1, 1,  0,  -1, 0,  0,  0,  -1, -2, -2, 0,  -1, -2, -1, 4,  1,  -3, -2, -2], # S
        'T': [0,  -1, 0,  -1, -1, -1, -1, -2, -2, -1, -1, -1, -1, -2, -1, 1,  5,  -2, -2, 0],  # T
        'W': [-3, -3, -4, -4, -2, -2, -3, -2, -2, -3, -2, -3, -1, 1,  -4, -3, -2, 11, 2,  -3], # W
        'Y': [-2, -2, -2, -3, -2, -1, -2, -3, 2,  -1, -1, -2, -1, 3,  -3, -2, -2, 2,  7,  -1], # Y
        'V': [0,  -3, -3, -3, -1, -2, -2, -3, -3, 3,  1,  -2, 1,  -1, -2, -2, 0,  -3, -1, 4],  # V
        '-': [0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0],  # -
    }

def normalization(dataset):
    min = dataset.min(axis=0)
    max = dataset.max(axis=0)
    dataset = (dataset - min) / (max - min)
    return dataset

def get_blosum62(seq):
    blosum_list = []
    for i in seq:
        if i in blosum62:
            blosum_list.append(blosum62[i])
        else:
            blosum_list.append(blosum62['-'])
    blosum = np.array(blosum_list)
#     blosum = normalization(blosum)  # why did they get ride of normalization?
    feature = np.zeros((1002,20))
    idx = blosum.shape[0]
    feature[0:idx,:] = blosum
    return feature


def make_tensor(path):
    data = pd.read_csv(path)
    sequences = data['sequence'].values
    labels = data['label'].values
    evolution = torch.zeros(len(sequences),1002,20)
    lengths = []
    for i in range(len(sequences)):
        lengths.append((len(sequences[i])))
        temp = get_blosum62(sequences[i])
        evolution[i,:,:] = torch.Tensor(temp)

    return evolution, torch.Tensor(lengths), torch.Tensor(labels) 

In [110]:
class dvib(nn.Module):
    def __init__(self,k,out_channels, hidden_size):
        super(dvib, self).__init__()
        
        self.conv = torch.nn.Conv2d(in_channels=1,
                            out_channels = out_channels,
                            kernel_size = (1,20),
                            stride=(1,1),
                            padding=(0,0),
                            )
        
        self.rnn = torch.nn.GRU(input_size = out_channels,  
                                hidden_size = hidden_size,
                                num_layers = 2,
                                bidirectional = True,
                                batch_first = True,
                                dropout = 0.2
                              )
        
        self.fc1 = nn.Linear(hidden_size*4, hidden_size*4)
#         self.fc2 = nn.Linear(1024,1024)
        self.enc_mean = nn.Linear(hidden_size*4+578,k)
        self.enc_std = nn.Linear(hidden_size*4+578,k)
        self.dec = nn.Linear(k, 2)
        
        self.drop_layer = torch.nn.Dropout(0.5)
        
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.constant_(self.fc1.bias, 0.0)
#         nn.init.xavier_uniform_(self.fc2.weight)
#         nn.init.constant_(self.fc2.bias, 0.0)
        nn.init.xavier_uniform_(self.enc_mean.weight)
        nn.init.constant_(self.enc_mean.bias, 0.0)
        nn.init.xavier_uniform_(self.enc_std.weight)
        nn.init.constant_(self.enc_std.bias, 0.0)
        nn.init.xavier_uniform_(self.dec.weight)
        nn.init.constant_(self.dec.bias, 0.0)
   
        
        
    def cnn_gru(self,x,lens):
        x = x.unsqueeze(1)
#         print(x.shape)
        x = self.conv(x)
#         print(x.shape)   
        x = torch.nn.ReLU()(x)
#         print(x.shape,type(x))
        x = x.squeeze(3)
#         x = x.view(x.size(0),-1)
        x = x.permute(0,2,1)
#         print(x.shape)
#         print(type(lens))
        gru_input = pack_padded_sequence(x,lens,batch_first=True)
        output, hidden = self.rnn(gru_input)
#         print(hidden.shape)
        output_all = torch.cat([hidden[-1],hidden[-2],hidden[-3],hidden[-4]],dim=1)
#         print("output_all.shape:",output_all.shape)    
        return output_all
    
        
    def forward(self, pssm, lengths, FEGS): 
        cnn_vectors = self.cnn_gru(pssm, lengths)
        feature_vec = torch.cat([cnn_vectors,FEGS], dim = 1)
      
        enc_mean, enc_std = self.enc_mean(feature_vec), f.softplus(self.enc_std(feature_vec)-5)
        eps = torch.randn_like(enc_std)
        latent = enc_mean + enc_std*eps
        
        outputs = f.sigmoid(self.dec(latent))
#         print(outputs.shape)

        return outputs,enc_mean, enc_std,latent

In [34]:
# initialize model object
device = torch.device('cpu')
out_channels = 128
hidden_size = 512
beta = 2#0，1，3，4，5，6
k = 1024
model = dvib(k, out_channels, hidden_size).to(device)

In [None]:
class DBUnpickler(pickle.Unpickler):

    def __init__(self, file):
        super().__init__(file)

    def persistent_load(self, pid):
        # This method is invoked whenever a persistent ID is encountered.
        # Here, pid is the tuple returned by DBPickler.
        lib, className, file, device, unknown = pid
        return className()

In [None]:
with open('../Model/archive/data.pkl', 'rb') as file:
    pretrainedModel = DBUnpickler(file).load()

In [56]:
# initialize model weights with weights from ToxIBTL paper
for param, weights in zip(model.parameters(), pretrainedModel['model'].values()):
    param.data = weights

In [102]:
# retrieve FEGS embeddings for test set
test_fegs = []
for i in range(4108):
    test_data = sio.loadmat('../data/testBatches/testBatch' + str(i) + '.mat')['mat']
    if (len(test_data) != 10):
        print(i)
    test_fegs.append(test_data)
    
test_FEGS = torch.Tensor(normalization(np.vstack(test_fegs)))

4107


In [104]:
# note: takes a hot second to run, try to run ONCE
test_path = '../data/test1002.csv'
test_pssm, test_len, test_label = make_tensor(test_path)

In [106]:
test_data = DataLoader(TensorDataset(test_pssm, test_len, test_FEGS, test_label), batch_size=100)

In [113]:
# evaluate model
correct = 0
y_pre = []
y_test = []
with torch.no_grad():
    for batch_idx, (sequences, lengths,FEGS, labels) in enumerate(test_data):
        print(batch_idx + ' of ' + len(test_data))
        seq_lengths, perm_idx = lengths.sort(dim=0,descending=True)
        seq_tensor = sequences[perm_idx].to(device)
        FEGS_tensor = FEGS[perm_idx].to(device)
        label = labels[perm_idx].long().to(device)
#                     seq_lengths = seq_lengths.to(device)
        y_test.extend(label.cpu().detach().numpy())


        y_pred, end_means, enc_stds,latent = model(seq_tensor,seq_lengths,FEGS_tensor)
        y_pre.extend(y_pred.argmax(dim=1).cpu().detach().numpy())

#             pred = outputs.argmax(dim=1)
#             print(output.shape,label.shape)
        _, pred = torch.max(y_pred, 1) 

        correct += pred.eq(label).sum().item()
#             print(output,label.data)
#             correct += (output == label.data).sum()

    print('\nTest: Accuracy:{}/{} ({:.4f}%) f1:({:.4f}%) mcc:({:.4f}%)\n'.format(
        correct, len(test_data.dataset),
        100. * correct / len(test_data.dataset),
        metrics.f1_score(y_test,y_pre),
        metrics.matthews_corrcoef(y_test,y_pre)
    ))




Test: Accuracy:20029/41077 (48.7596%) f1:(0.0000%) mcc:(0.0000%)

