In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from json import encoder
import math
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as f
import torch.optim as opt
from torch.utils.data import DataLoader,TensorDataset
from torch.nn.utils.rnn import pad_packed_sequence,pack_padded_sequence
from sklearn import metrics
from sklearn.model_selection import train_test_split
import scipy.io as sio
from functools import partial
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
blosum62 = {
        'A': [4, -1, -2, -2, 0,  -1, -1, 0, -2,  -1, -1, -1, -1, -2, -1, 1,  0,  -3, -2, 0],  # A
        'R': [-1, 5,  0, -2, -3, 1,  0,  -2, 0,  -3, -2, 2,  -1, -3, -2, -1, -1, -3, -2, -3], # R
        'N': [-2, 0,  6,  1,  -3, 0,  0,  0,  1,  -3, -3, 0,  -2, -3, -2, 1,  0,  -4, -2, -3], # N
        'D': [-2, -2, 1,  6,  -3, 0,  2,  -1, -1, -3, -4, -1, -3, -3, -1, 0,  -1, -4, -3, -3], # D
        'C': [0,  -3, -3, -3, 9,  -3, -4, -3, -3, -1, -1, -3, -1, -2, -3, -1, -1, -2, -2, -1], # C
        'Q': [-1, 1,  0,  0,  -3, 5,  2,  -2, 0,  -3, -2, 1,  0,  -3, -1, 0,  -1, -2, -1, -2], # Q
        'E': [-1, 0,  0,  2,  -4, 2,  5,  -2, 0,  -3, -3, 1,  -2, -3, -1, 0,  -1, -3, -2, -2], # E
        'G': [0,  -2, 0,  -1, -3, -2, -2, 6,  -2, -4, -4, -2, -3, -3, -2, 0,  -2, -2, -3, -3], # G
        'H': [-2, 0,  1,  -1, -3, 0,  0,  -2, 8,  -3, -3, -1, -2, -1, -2, -1, -2, -2, 2,  -3], # H
        'I': [-1, -3, -3, -3, -1, -3, -3, -4, -3, 4,  2,  -3, 1,  0,  -3, -2, -1, -3, -1, 3],  # I
        'L': [-1, -2, -3, -4, -1, -2, -3, -4, -3, 2,  4,  -2, 2,  0,  -3, -2, -1, -2, -1, 1],  # L
        'K': [-1, 2,  0,  -1, -3, 1,  1,  -2, -1, -3, -2, 5,  -1, -3, -1, 0,  -1, -3, -2, -2], # K
        'M': [-1, -1, -2, -3, -1, 0,  -2, -3, -2, 1,  2,  -1, 5,  0,  -2, -1, -1, -1, -1, 1],  # M
        'F': [-2, -3, -3, -3, -2, -3, -3, -3, -1, 0,  0,  -3, 0,  6,  -4, -2, -2, 1,  3,  -1], # F
        'P': [-1, -2, -2, -1, -3, -1, -1, -2, -2, -3, -3, -1, -2, -4, 7,  -1, -1, -4, -3, -2], # P
        'S': [1,  -1, 1,  0,  -1, 0,  0,  0,  -1, -2, -2, 0,  -1, -2, -1, 4,  1,  -3, -2, -2], # S
        'T': [0,  -1, 0,  -1, -1, -1, -1, -2, -2, -1, -1, -1, -1, -2, -1, 1,  5,  -2, -2, 0],  # T
        'W': [-3, -3, -4, -4, -2, -2, -3, -2, -2, -3, -2, -3, -1, 1,  -4, -3, -2, 11, 2,  -3], # W
        'Y': [-2, -2, -2, -3, -2, -1, -2, -3, 2,  -1, -1, -2, -1, 3,  -3, -2, -2, 2,  7,  -1], # Y
        'V': [0,  -3, -3, -3, -1, -2, -2, -3, -3, 3,  1,  -2, 1,  -1, -2, -2, 0,  -3, -1, 4],  # V
        '-': [0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0],  # -
    }

def normalization(dataset):
    min = dataset.min(axis=0)
    max = dataset.max(axis=0)
    dataset = (dataset - min) / (max - min)
    return dataset

def get_blosum62(seq):
    blosum_list = []
    for i in seq: 
        blosum_list.append(blosum62[i])
    blosum = np.array(blosum_list)
#     blosum = normalization(blosum)
    feature = np.zeros((1002,20))
    idx = blosum.shape[0]
    feature[0:idx,:] = blosum
    return feature

def make_tensor(path):
    data = pd.read_csv(path)
    sequences = data['sequence'].values
    labels = data['label'].values
    evolution = torch.zeros(len(sequences),1002,20)
    lengths = []
    for i in range(len(sequences)):
        lengths.append((len(sequences[i])))
        temp = get_blosum62(sequences[i])
        evolution[i,:,:] = torch.Tensor(temp)

    return evolution,torch.Tensor(lengths),torch.Tensor(labels) 

In [None]:
def make_tensor2(S_train,y_train):
    sequences = S_train
    labels = y_train
    evolution = torch.zeros(len(sequences),1002,20)
    lengths = []
    for i in range(len(sequences)):
        lengths.append((len(sequences[i])))
        temp = get_blosum62(sequences[i])
        evolution[i,:,:] = torch.Tensor(temp)

    return evolution,torch.Tensor(lengths),torch.Tensor(labels) 

In [None]:
train_path = '/content/drive/MyDrive/Capstone Project/datafile/protein_train1002.csv'
test_path = '/content/drive/MyDrive/Capstone Project/datafile/protein_test1002.csv'

train_data = sio.loadmat('/content/drive/MyDrive/Capstone Project/datafile/protein_train.mat')
train_FEGS = torch.Tensor(normalization(train_data['FV']))

test_data = sio.loadmat('/content/drive/MyDrive/Capstone Project/datafile/protein_test.mat')
test_FEGS = torch.Tensor(normalization(test_data['FV']))

train_pssm, train_len,train_label = make_tensor(train_path)
test_pssm, test_len,test_label = make_tensor(test_path)

train_data = DataLoader(TensorDataset(train_pssm, train_len,train_FEGS,train_label), batch_size=35, shuffle=True)
test_data = DataLoader(TensorDataset(test_pssm, test_len,test_FEGS, test_label), batch_size=35)

print("data done")

In [None]:
# Transformer
def drop_path(x, drop_prob: float = 0., training: bool = False):
    if drop_prob == 0. or not training:
        return x
    keep_prob = 1 - drop_prob
    shape = (x.shape[0],) + (1,) * (x.ndim - 1) 
    random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
    random_tensor.floor_()  # binarize
    output = x.div(keep_prob) * random_tensor
    return output

class DropPath(nn.Module):
    def __init__(self, drop_prob=None):
        super(DropPath, self).__init__()
        self.drop_prob = drop_prob

    def forward(self, x):
        return drop_path(x, self.drop_prob, self.training)
from itertools import repeat
import collections.abc

def _ntuple(n):
    def parse(x):
        if isinstance(x, collections.abc.Iterable):
            return x
        return tuple(repeat(x, n))
    return parse
to_1tuple = _ntuple(1)
to_2tuple = _ntuple(2)
to_3tuple = _ntuple(3)
to_4tuple = _ntuple(4)
to_ntuple = _ntuple

class Mlp(nn.Module):
    def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.):
        super().__init__()
        out_features = out_features or in_features
        hidden_features = hidden_features or in_features
        drop_probs = to_2tuple(drop)
        # hidden_features = 4 * in_features
        self.fc1 = nn.Linear(in_features, hidden_features)
        self.act = act_layer()
        self.drop1 = nn.Dropout(drop_probs[0])
        # out_features = in_features
        self.fc2 = nn.Linear(hidden_features, out_features)
        self.drop2 = nn.Dropout(drop_probs[1])

    def forward(self, x):
        # [B,N,C]
        x = self.fc1(x)
        # [B,N,4C]
        x = self.act(x)
        x = self.drop1(x)
        x = self.fc2(x)
        # [B,N,C]
        x = self.drop2(x)
        return x
    
class Attention(nn.Module):
    def __init__(self, dim, num_heads, qkv_bias=False, attn_drop=0., proj_drop=0.):
        super().__init__()
        self.num_heads = num_heads
        head_dim = dim // num_heads
        self.scale = head_dim ** -0.5

        self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias)
        self.attn_drop = nn.Dropout(attn_drop)
        self.proj = nn.Linear(dim, dim)
        self.proj_drop = nn.Dropout(proj_drop)

    def forward(self, x):
        # B:Batch_size N:序列长度 C:每个序列的维度dim
        B, N, C = x.shape
        # 得到qkv矩阵
        qkv = self.qkv(x).reshape(B, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4)
        q, k, v = qkv.unbind(0)   # make torchscript happy (cannot use tensor as tuple)

        # 计算qkv之间的self attention
        attn = (q @ k.transpose(-2, -1)) * self.scale
        attn = attn.softmax(dim=-1)
        attn = self.attn_drop(attn)

        x = (attn @ v).transpose(1, 2).reshape(B, N, C)
        x = self.proj(x)
        x = self.proj_drop(x)
        return x

# transformer encoder
class Block(nn.Module):
    def __init__(self, dim, num_heads, mlp_ratio=4., qkv_bias=False, drop=0., attn_drop=0.,
                 drop_path=0., act_layer=nn.GELU, norm_layer=nn.LayerNorm):
        super().__init__()
        self.norm1 = norm_layer(dim)
        self.attn = Attention(dim, num_heads=num_heads, qkv_bias=qkv_bias, attn_drop=attn_drop, proj_drop=drop)
        # NOTE: drop path for stochastic depth, we shall see if this is better than dropout here
        self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity()
        self.norm2 = norm_layer(dim)
        mlp_hidden_dim = int(dim * mlp_ratio)
        self.mlp = Mlp(in_features=dim, hidden_features=mlp_hidden_dim, act_layer=act_layer, drop=drop)
    # Multi-head Attention + MLP
    def forward(self, x):
        # x + :表示残差连接
        x = x + self.drop_path(self.attn(self.norm1(x)))
        x = x + self.drop_path(self.mlp(self.norm2(x)))
        return x

class Transformer(nn.Module):
    def __init__(self,embed_dim,depth,num_heads,mlp_ratio,qkv_bias, numstride, drop_rate=0.,attn_drop_rate=0.,drop_path_rate=0.):
        super().__init__()
        self.num_features = self.embed_dim = embed_dim  # num_features for consistency with other models
    
        norm_layer = partial(nn.LayerNorm, eps=1e-6)
        act_layer = nn.GELU
        # self.patch_embed = nn.Embedding(1002, embed_dim)
        self.patch_embed = torch.nn.Conv2d(in_channels=1,
                            out_channels = embed_dim,
                            kernel_size = (2,20),
                            stride=(numstride,1),
                            padding=(0,0))
        num_embed = math.floor(1000/numstride)+1
        self.pos_embed = nn.Parameter(torch.zeros(1, num_embed, embed_dim))
        self.pos_drop = nn.Dropout(p=drop_rate)

        dpr = [x.item() for x in torch.linspace(0, drop_path_rate, depth)]  # stochastic depth decay rule
        self.blocks = nn.Sequential(*[
            Block(
                dim=embed_dim, num_heads=num_heads, mlp_ratio=mlp_ratio, qkv_bias=qkv_bias, drop=drop_rate,
                attn_drop=attn_drop_rate, drop_path=dpr[i], norm_layer=norm_layer, act_layer=act_layer)
            for i in range(depth)])
        self.norm = norm_layer(embed_dim)
        self.apply(self._init_weights)

    def _init_weights(self, m):
        if isinstance(m, nn.Linear):
            nn.init.xavier_normal_(m.weight)
            nn.init.normal_(m.bias, std=1e-6)
        elif isinstance(m, nn.LayerNorm):
            nn.init.constant_(m.bias, 0)
            nn.init.constant_(m.weight, 1.0)

    @torch.jit.ignore
    def no_weight_decay(self):
        return {'pos_embed'}

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.patch_embed(x)
        x = torch.nn.ReLU()(x) 
        x = x.squeeze(3) 
        x = x.permute(0,2,1)
        x = self.pos_drop(x + self.pos_embed) 
        x = self.blocks(x)
        x = self.norm(x) 
        return x

class dvib(nn.Module):
    def __init__(self,k,out_channels, hidden_size, numstride): 
        super(dvib, self).__init__()
        
        self.transformer = Transformer(embed_dim=out_channels,
                                       depth=6, 
                                       num_heads=8,
                                       mlp_ratio=4.,
                                       qkv_bias=True,
                                       drop_rate=0.,
                                       attn_drop_rate=0.,
                                       drop_path_rate=0.1,
                                       numstride = numstride)
        
        self.fc1 = nn.Linear(out_channels, hidden_size)
#         self.fc2 = nn.Linear(1024,1024)
        self.enc_mean = nn.Linear(hidden_size+578,k)
        self.enc_std = nn.Linear(hidden_size+578,k)
        self.dec = nn.Linear(k, 2)
        
        self.drop_layer = torch.nn.Dropout(0.5)
        
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.constant_(self.fc1.bias, 0.0)
#         nn.init.xavier_uniform_(self.fc2.weight)
#         nn.init.constant_(self.fc2.bias, 0.0)
        nn.init.xavier_uniform_(self.enc_mean.weight)
        nn.init.constant_(self.enc_mean.bias, 0.0)
        nn.init.xavier_uniform_(self.enc_std.weight)
        nn.init.constant_(self.enc_std.bias, 0.0)
        nn.init.xavier_uniform_(self.dec.weight)
        nn.init.constant_(self.dec.bias, 0.0)

    def forward(self, pssm, lengths, FEGS): 
        encoder_feat = self.transformer(pssm)
        encoder_feat= torch.mean(encoder_feat[:, :] , dim=1)
        encoder_feat = self.fc1(encoder_feat)
        feature_vec = torch.cat([encoder_feat,FEGS],dim = 1) 
        #information bottleneck
        enc_mean, enc_std = self.enc_mean(feature_vec), f.softplus(self.enc_std(feature_vec)-5)
        eps = torch.randn_like(enc_std) 
        latent = enc_mean + enc_std*eps 
        outputs = torch.sigmoid(self.dec(latent)) 
        return outputs,enc_mean, enc_std,latent

In [None]:
CE = nn.CrossEntropyLoss(reduction='sum')
betas = [1e-1, 1e-2, 1e-3, 1e-4, 1e-5, 1e-6,1e-7]

def calc_loss(y_pred, labels, enc_mean, enc_std, beta):
    """    
    y_pred : [batch_size,2]
    label : [batch_size,1]    
    enc_mean : [batch_size,z_dim]  
    enc_std: [batch_size,z_dim] 
    """   
    
    ce = CE(y_pred,labels)
    KL = 0.5 * torch.sum(enc_mean.pow(2) + enc_std.pow(2) - 2*enc_std.log() - 1)
    
    return (ce + beta * KL)/y_pred.shape[0]

In [None]:
SAVE_PATH = '/content/drive/MyDrive/Capstone Project/datafile/'
out_channels = 512
hidden_size = 2048
beta = 2 #0，1，3，4，5，6
k = 1024
numstride = 5   

model = dvib(k,out_channels, hidden_size, numstride).to(device)
optimizer = opt.AdamW(model.parameters(),lr = 0.0001,betas=(0.9,0.999),eps = 1e-08,weight_decay=0) # ++++
# optimizer = opt.Adam(model.parameters(),lr = 0.0001)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer=optimizer, gamma=0.95)
num_epochs = 500
best_accuracy_test = 0 # ++++

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    train_correct = 0
    if epoch % 10 == 0 and epoch > 0:
         scheduler.step()

    for batch_idx, (sequences, lengths, FEGS, labels) in enumerate(train_data):
        seq_lengths, perm_idx = lengths.sort(dim=0,descending=True)
        seq_tensor = sequences[perm_idx].to(device)
        FEGS_tensor = FEGS[perm_idx].to(device)
        label = labels[perm_idx].long().to(device)
#                 seq_lengths = seq_lengths.to(device)

#                 label = torch.LongTensor(label).to(device)

        y_pred,end_means, enc_stds,latent = model(seq_tensor,seq_lengths,FEGS_tensor)
        loss = calc_loss(y_pred, label, end_means, enc_stds, betas[beta])
#             loss = calc_loss(y_pred, label, end_means, enc_stds, betas[2])
#         train_pred = outputs.argmax(dim=1)
        _, train_pred = torch.max(y_pred, 1)
        train_correct += train_pred.eq(label).sum().item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_loss += loss.item() 
    print("Train Epoch:{} Average loss: {:.6f} ACC:{}/{} ({:.4f}%)".format(
                epoch,
                train_loss,
                train_correct, len(train_data.dataset),
                100. * train_correct / len(train_data.dataset)
            ))

    correct = 0
    y_pre = []
    y_test = []
    with torch.no_grad():
        for batch_idx, (sequences, lengths,FEGS, labels) in enumerate(test_data):
            seq_lengths, perm_idx = lengths.sort(dim=0,descending=True)
            seq_tensor = sequences[perm_idx].to(device)
            FEGS_tensor = FEGS[perm_idx].to(device)
            label = labels[perm_idx].long().to(device)
#                     seq_lengths = seq_lengths.to(device)
            y_test.extend(label.cpu().detach().numpy())


            y_pred, end_means, enc_stds,latent = model(seq_tensor,seq_lengths,FEGS_tensor)
            y_pre.extend(y_pred.argmax(dim=1).cpu().detach().numpy())

#             pred = outputs.argmax(dim=1)
#             print(output.shape,label.shape)
            _, pred = torch.max(y_pred, 1) 

            correct += pred.eq(label).sum().item()
#             print(output,label.data)
#             correct += (output == label.data).sum()

        print('\nTest: Accuracy:{}/{} ({:.4f}%) f1:({:.4f}%) mcc:({:.4f}%)\n'.format(
            correct, len(test_data.dataset),
            100. * correct / len(test_data.dataset),
            metrics.f1_score(y_test,y_pre),
            metrics.matthews_corrcoef(y_test,y_pre)
        ))
    if correct >= best_accuracy_test: # ++++
        best_accuracy_test = correct # ++++
        torch.save(model.state_dict(), SAVE_PATH + 'protein_best_model_weights_test_transformer.pth') # ++++

In [None]:
# Load peptide dataset
data = pd.read_csv('/content/drive/MyDrive/Capstone Project/datafile/peptide.csv')
labels = data['label'].values
sequences = data['sequence'].values
fegs = sio.loadmat('/content/drive/MyDrive/Capstone Project/datafile/peptide3864.mat')
X_train, X_test, y_train1, y_test1 = train_test_split(
     fegs['FV'], labels, test_size=0.15, random_state=66, stratify=labels)

train_FEGS = torch.Tensor(X_train)
test_FEGS = torch.Tensor(X_test)

S_train, S_test, y_train2, y_test2 = train_test_split(
     sequences, labels, test_size=0.15, random_state=66, stratify=labels)

train_pssm, train_len,train_label = make_tensor2(S_train,y_train2)
test_pssm, test_len,test_label = make_tensor2(S_test,y_test2)

train_data = DataLoader(TensorDataset(train_pssm, train_len,train_FEGS,train_label), batch_size=100, shuffle=True)
test_data = DataLoader(TensorDataset(test_pssm, test_len,test_FEGS, test_label), batch_size=100)

In [None]:
SAVE_PATH = '/content/drive/MyDrive/Capstone Project/datafile/'
out_channels = 512
hidden_size = 2048
beta = 2 #0，1，3，4，5，6
k = 1024
                
model = dvib(k,out_channels, hidden_size).to(device)
optimizer = opt.AdamW(model.parameters(),lr = 0.0001,betas=(0.9,0.999),eps = 1e-08,weight_decay=0) # ++++
#optimizer = opt.Adam(model.parameters(),lr = 0.0001)
scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer=optimizer, gamma=0.95)

In [None]:
# Load pre-tained model weights
model.load_state_dict(torch.load('/content/drive/MyDrive/Capstone Project/datafile/protein_best_model_weights_test_transformer.pth'))

<All keys matched successfully>

In [None]:
# Fine-tuning on peptide trainset
num_epochs = 150
best_accuracy_test = 0 # ++++

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    train_correct = 0
    if epoch % 10 == 0 and epoch > 0:
         scheduler.step()

    for batch_idx, (sequences, lengths, FEGS, labels) in enumerate(train_data):
        seq_lengths, perm_idx = lengths.sort(dim=0,descending=True)
        seq_tensor = sequences[perm_idx].to(device)
        FEGS_tensor = FEGS[perm_idx].to(device)
        label = labels[perm_idx].long().to(device)
#                 seq_lengths = seq_lengths.to(device)

#                 label = torch.LongTensor(label).to(device)

        y_pred,end_means, enc_stds,latent = model(seq_tensor,seq_lengths,FEGS_tensor)
        loss = calc_loss(y_pred, label, end_means, enc_stds, betas[beta])
#             loss = calc_loss(y_pred, label, end_means, enc_stds, betas[2])
#         train_pred = outputs.argmax(dim=1)
        _, train_pred = torch.max(y_pred, 1)
        train_correct += train_pred.eq(label).sum().item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_loss += loss.item() 
    print("Train Epoch:{} Average loss: {:.6f} ACC:{}/{} ({:.4f}%)".format(
                epoch,
                train_loss,
                train_correct, len(train_data.dataset),
                100. * train_correct / len(train_data.dataset)
            ))

    correct = 0
    y_pre = []
    y_test = []
    with torch.no_grad():
        for batch_idx, (sequences, lengths,FEGS, labels) in enumerate(test_data):
            seq_lengths, perm_idx = lengths.sort(dim=0,descending=True)
            seq_tensor = sequences[perm_idx].to(device)
            FEGS_tensor = FEGS[perm_idx].to(device)
            label = labels[perm_idx].long().to(device)
#                     seq_lengths = seq_lengths.to(device)
            y_test.extend(label.cpu().detach().numpy())


            y_pred, end_means, enc_stds,latent = model(seq_tensor,seq_lengths,FEGS_tensor)
            y_pre.extend(y_pred.argmax(dim=1).cpu().detach().numpy())

#             pred = outputs.argmax(dim=1)
#             print(output.shape,label.shape)
            _, pred = torch.max(y_pred, 1) 

            correct += pred.eq(label).sum().item()
#             print(output,label.data)
#             correct += (output == label.data).sum()
        cm1 = metrics.confusion_matrix(y_test,y_pre) # ++++
        TP = cm1[1,1] # true positive # ++++
        TN = cm1[0,0] # true negatives # ++++
        FP = cm1[0,1] # false positives # ++++
        FN = cm1[1,0] # false negatives # ++++
        SN = cm1[1,1]/(cm1[1,0]+cm1[1,1]) # ++++
        print('\nTest: Accuracy:{}/{} ({:.4f}%) f1:({:.4f}%) mcc:({:.4f}%)\n'.format(
            correct, len(test_data.dataset),
            100. * correct / len(test_data.dataset),
            metrics.f1_score(y_test,y_pre),
            metrics.matthews_corrcoef(y_test,y_pre)
        ))
        print('SN:{}'.format(SN))
    if correct > best_accuracy_test: # ++++
        best_accuracy_test = correct # ++++
        torch.save(model.state_dict(), SAVE_PATH + 'peptide_best_model_weights_test_transformer.pth') # ++++