In [10]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as f
import torch.optim as opt
from torch.utils.data import DataLoader,TensorDataset
from torch.nn.utils.rnn import pad_packed_sequence,pack_padded_sequence
from torch.nn import TransformerEncoder, TransformerEncoderLayer
from sklearn import metrics
from sklearn.model_selection import train_test_split
import scipy.io as sio
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [11]:
blosum62 = {
        'A': [4, -1, -2, -2, 0,  -1, -1, 0, -2,  -1, -1, -1, -1, -2, -1, 1,  0,  -3, -2, 0],  # A
        'R': [-1, 5,  0, -2, -3, 1,  0,  -2, 0,  -3, -2, 2,  -1, -3, -2, -1, -1, -3, -2, -3], # R
        'N': [-2, 0,  6,  1,  -3, 0,  0,  0,  1,  -3, -3, 0,  -2, -3, -2, 1,  0,  -4, -2, -3], # N
        'D': [-2, -2, 1,  6,  -3, 0,  2,  -1, -1, -3, -4, -1, -3, -3, -1, 0,  -1, -4, -3, -3], # D
        'C': [0,  -3, -3, -3, 9,  -3, -4, -3, -3, -1, -1, -3, -1, -2, -3, -1, -1, -2, -2, -1], # C
        'Q': [-1, 1,  0,  0,  -3, 5,  2,  -2, 0,  -3, -2, 1,  0,  -3, -1, 0,  -1, -2, -1, -2], # Q
        'E': [-1, 0,  0,  2,  -4, 2,  5,  -2, 0,  -3, -3, 1,  -2, -3, -1, 0,  -1, -3, -2, -2], # E
        'G': [0,  -2, 0,  -1, -3, -2, -2, 6,  -2, -4, -4, -2, -3, -3, -2, 0,  -2, -2, -3, -3], # G
        'H': [-2, 0,  1,  -1, -3, 0,  0,  -2, 8,  -3, -3, -1, -2, -1, -2, -1, -2, -2, 2,  -3], # H
        'I': [-1, -3, -3, -3, -1, -3, -3, -4, -3, 4,  2,  -3, 1,  0,  -3, -2, -1, -3, -1, 3],  # I
        'L': [-1, -2, -3, -4, -1, -2, -3, -4, -3, 2,  4,  -2, 2,  0,  -3, -2, -1, -2, -1, 1],  # L
        'K': [-1, 2,  0,  -1, -3, 1,  1,  -2, -1, -3, -2, 5,  -1, -3, -1, 0,  -1, -3, -2, -2], # K
        'M': [-1, -1, -2, -3, -1, 0,  -2, -3, -2, 1,  2,  -1, 5,  0,  -2, -1, -1, -1, -1, 1],  # M
        'F': [-2, -3, -3, -3, -2, -3, -3, -3, -1, 0,  0,  -3, 0,  6,  -4, -2, -2, 1,  3,  -1], # F
        'P': [-1, -2, -2, -1, -3, -1, -1, -2, -2, -3, -3, -1, -2, -4, 7,  -1, -1, -4, -3, -2], # P
        'S': [1,  -1, 1,  0,  -1, 0,  0,  0,  -1, -2, -2, 0,  -1, -2, -1, 4,  1,  -3, -2, -2], # S
        'T': [0,  -1, 0,  -1, -1, -1, -1, -2, -2, -1, -1, -1, -1, -2, -1, 1,  5,  -2, -2, 0],  # T
        'W': [-3, -3, -4, -4, -2, -2, -3, -2, -2, -3, -2, -3, -1, 1,  -4, -3, -2, 11, 2,  -3], # W
        'Y': [-2, -2, -2, -3, -2, -1, -2, -3, 2,  -1, -1, -2, -1, 3,  -3, -2, -2, 2,  7,  -1], # Y
        'V': [0,  -3, -3, -3, -1, -2, -2, -3, -3, 3,  1,  -2, 1,  -1, -2, -2, 0,  -3, -1, 4],  # V
        '-': [0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0],  # -
    }

def normalization(dataset):
    min = dataset.min(axis=0)
    max = dataset.max(axis=0)
    dataset = (dataset - min) / (max - min)
    return dataset

def get_blosum62(seq):
    blosum_list = []
    for i in seq: 
        blosum_list.append(blosum62[i])
    blosum = np.array(blosum_list)
#     blosum = normalization(blosum)
    feature = np.zeros((1002,20))
    idx = blosum.shape[0]
    feature[0:idx,:] = blosum
    return feature


def make_tensor(path):
    data = pd.read_csv(path)
    sequences = data['sequence'].values
    labels = data['label'].values
    evolution = torch.zeros(len(sequences),1002,20)
    lengths = []
    for i in range(len(sequences)):
        lengths.append((len(sequences[i])))
        temp = get_blosum62(sequences[i])
        evolution[i,:,:] = torch.Tensor(temp)

    return evolution,torch.Tensor(lengths),torch.Tensor(labels) 

In [13]:
train_path = 'data/protein_train1002.csv'
test_path = 'data/protein_test1002.csv'

train_data = sio.loadmat('data/benchmark/protein_train.mat')
train_FEGS = torch.Tensor(normalization(train_data['FV']))

test_data = sio.loadmat('data/benchmark/protein_test.mat')
test_FEGS = torch.Tensor(normalization(test_data['FV']))

train_pssm, train_len,train_label = make_tensor(train_path)
test_pssm, test_len,test_label = make_tensor(test_path)

train_data = DataLoader(TensorDataset(train_pssm, train_len,train_FEGS,train_label), batch_size=100, shuffle=True)
test_data = DataLoader(TensorDataset(test_pssm, test_len,test_FEGS, test_label), batch_size=100)

print("data done") 

data done


In [None]:
# data = pd.read_csv('data/peptide.csv')
# labels = data['label'].values
# sequences = data['sequence'].values
# fegs = sio.loadmat('data/peptide3864.mat')
# X_train, X_test, y_train1, y_test1 = train_test_split(
#     fegs['FV'], labels, test_size=0.15, random_state=66, stratify=labels)

# train_FEGS = torch.Tensor(X_train)
# test_FEGS = torch.Tensor(X_test)

# S_train, S_test, y_train2, y_test2 = train_test_split(
#     sequences, labels, test_size=0.15, random_state=66, stratify=labels)

# train_pssm, train_len,train_label = make_tensor(S_train,y_train2)
# test_pssm, test_len,test_label = make_tensor(S_test,y_test2)

# train_data = DataLoader(TensorDataset(train_pssm, train_len,train_FEGS,train_label), batch_size=100, shuffle=True)
# test_data = DataLoader(TensorDataset(test_pssm, test_len,test_FEGS, test_label), batch_size=100)

In [14]:
class dvib(nn.Module):
    def __init__(self,k,out_channels, hidden_size):
        super(dvib, self).__init__()
        
        self.conv = torch.nn.Conv2d(in_channels=1,
                            out_channels = out_channels,
                            kernel_size = (1,20),
                            stride=(1,1),
                            padding=(0,0),
                            )
        
        self.rnn = torch.nn.GRU(input_size = out_channels,  
                                hidden_size = hidden_size,
                                num_layers = 2,
                                bidirectional = True,
                                batch_first = True,
                                dropout = 0.2
                              )
        
        self.fc1 = nn.Linear(hidden_size*4, hidden_size*4)
#         self.fc2 = nn.Linear(1024,1024)
        self.enc_mean = nn.Linear(hidden_size*4+578,k)
        self.enc_std = nn.Linear(hidden_size*4+578,k)
        self.dec = nn.Linear(k, 2)
        
        self.drop_layer = torch.nn.Dropout(0.5)
        
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.constant_(self.fc1.bias, 0.0)
#         nn.init.xavier_uniform_(self.fc2.weight)
#         nn.init.constant_(self.fc2.bias, 0.0)
        nn.init.xavier_uniform_(self.enc_mean.weight)
        nn.init.constant_(self.enc_mean.bias, 0.0)
        nn.init.xavier_uniform_(self.enc_std.weight)
        nn.init.constant_(self.enc_std.bias, 0.0)
        nn.init.xavier_uniform_(self.dec.weight)
        nn.init.constant_(self.dec.bias, 0.0)
   
        
        
    def cnn_gru(self,x,lens):
        x = x.unsqueeze(1)
#         print(x.shape)
        x = self.conv(x)
#         print(x.shape)   
        x = torch.nn.ReLU()(x)
#         print(x.shape,type(x))
        x = x.squeeze(3)
#         x = x.view(x.size(0),-1)
        x = x.permute(0,2,1)
#         print(x.shape)
#         print(type(lens))
        gru_input = pack_padded_sequence(x,lens,batch_first=True)
        output, hidden = self.rnn(gru_input)
#         print(hidden.shape)
        output_all = torch.cat([hidden[-1],hidden[-2],hidden[-3],hidden[-4]],dim=1)
#         print("output_all.shape:",output_all.shape)    
        return output_all
    
        
    def forward(self, pssm, lengths, FEGS): 
        cnn_vectors = self.cnn_gru(pssm,lengths)
        feature_vec = torch.cat([cnn_vectors,FEGS],dim = 1)
      
        enc_mean, enc_std = self.enc_mean(feature_vec), f.softplus(self.enc_std(feature_vec)-5)
        eps = torch.randn_like(enc_std)
        latent = enc_mean + enc_std*eps
        
        outputs = f.sigmoid(self.dec(latent))
#         print(outputs.shape)

        return outputs,enc_mean, enc_std,latent

In [15]:
CE = nn.CrossEntropyLoss(reduction='sum')
betas = [1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6,1e-7]

def calc_loss(y_pred, labels, enc_mean, enc_std, beta=1e-3):
    """    
    y_pred : [batch_size,2]
    label : [batch_size,1]    
    enc_mean : [batch_size,z_dim]  
    enc_std: [batch_size,z_dim] 
    """   
    
    ce = CE(y_pred,labels)
    KL = 0.5 * torch.sum(enc_mean.pow(2) + enc_std.pow(2) - 2*enc_std.log() - 1)
    
    return (ce + beta * KL)/y_pred.shape[0]

In [None]:
out_channels = 128
hidden_size = 512
beta = 2#0，1，3，4，5，6
k = 1024
                
model = dvib(k,out_channels, hidden_size).to(device)
optimizer = opt.Adam(model.parameters(),lr = 0.0001)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer=optimizer, gamma=0.95)
num_epochs = 500
for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    train_correct = 0
    if epoch % 10 == 0 and epoch > 0:
         scheduler.step()

    for batch_idx, (sequences, lengths, FEGS, labels) in enumerate(train_data):
        seq_lengths, perm_idx = lengths.sort(dim=0,descending=True)
        seq_tensor = sequences[perm_idx].to(device)
        FEGS_tensor = FEGS[perm_idx].to(device)
        label = labels[perm_idx].long().to(device)
#                 seq_lengths = seq_lengths.to(device)

#                 label = torch.LongTensor(label).to(device)

        y_pred,end_means, enc_stds,latent = model(seq_tensor,seq_lengths,FEGS_tensor)
        loss = calc_loss(y_pred, label, end_means, enc_stds, betas[beta])
#             loss = calc_loss(y_pred, label, end_means, enc_stds, betas[2])
#         train_pred = outputs.argmax(dim=1)
        _, train_pred = torch.max(y_pred, 1)
        train_correct += train_pred.eq(label).sum().item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_loss += loss.item() 
    print("Train Epoch:{} Average loss: {:.6f} ACC:{}/{} ({:.4f}%)".format(
                epoch,
                train_loss,
                train_correct, len(train_data.dataset),
                100. * train_correct / len(train_data.dataset)
            ))

    correct = 0
    y_pre = []
    y_test = []
    with torch.no_grad():
        for batch_idx, (sequences, lengths,FEGS, labels) in enumerate(test_data):
            seq_lengths, perm_idx = lengths.sort(dim=0,descending=True)
            seq_tensor = sequences[perm_idx].to(device)
            FEGS_tensor = FEGS[perm_idx].to(device)
            label = labels[perm_idx].long().to(device)
#                     seq_lengths = seq_lengths.to(device)
            y_test.extend(label.cpu().detach().numpy())


            y_pred, end_means, enc_stds,latent = model(seq_tensor,seq_lengths,FEGS_tensor)
            y_pre.extend(y_pred.argmax(dim=1).cpu().detach().numpy())

#             pred = outputs.argmax(dim=1)
#             print(output.shape,label.shape)
            _, pred = torch.max(y_pred, 1) 

            correct += pred.eq(label).sum().item()
#             print(output,label.data)
#             correct += (output == label.data).sum()

        print('\nTest: Accuracy:{}/{} ({:.4f}%) f1:({:.4f}%) mcc:({:.4f}%)\n'.format(
            correct, len(test_data.dataset),
            100. * correct / len(test_data.dataset),
            metrics.f1_score(y_test,y_pre),
            metrics.matthews_corrcoef(y_test,y_pre)
        ))
