### load and explore 

In [8]:
data_path = './data/Training_data.txt'
vocab_path = './data/vocab.txt'

In [9]:
import pandas as pd
df = pd.read_csv(data_path,sep='\t')

### Stratified Split (train,val,test: 0.6, 0.2, 0.2) 

In [40]:
# split
from matplotlib import pyplot as plt
df.columns = ['seq','label']

from sklearn.model_selection import train_test_split as tts
a = tts(df,test_size=0.2,shuffle=True,stratify=df['label'],random_state=1004)

tr,val = tts(a[0],test_size=0.25,shuffle=True,stratify=a[0]['label'],random_state=1004)
te = a[1]

In [83]:
len(tr),len(val),len(te)

(21834, 7278, 7279)

In [89]:
tr.to_csv('./data/split/train.csv',index_label='index')
val.to_csv('./data/split/val.csv',index_label='index')
te.to_csv('./data/split/test.csv',index_label='index')

### Dataset & DataLoader for pytorch 

In [12]:
def get_vocab_map(vocab_path='./data/vocab.txt'):
    with open(vocab_path,'r') as f:
        vocab = f.read()

    vocab = vocab.replace('\n','')

    import re
    p = re.compile('\s+')
    vocab = re.sub(p,' ',vocab)

    import ast
    vocab = ast.literal_eval(vocab.split('=')[1].strip())

    len_vocab = len(vocab)
    vocab_map = dict(zip(vocab,range(len_vocab)))
    return vocab_map
vocab_map =get_vocab_map()

import torch
from torch.nn import functional as F
import pandas as pd
class ProteinDataset(torch.utils.data.Dataset):
    def __init__(self,path):
        self.df = pd.read_csv(path)

    def __getitem__(self,idx):
        item = self.df.iloc[idx]
        x = item['seq']
        y = item['label']
        
        x = self.seq2oneHot(x)
        y = torch.tensor(y)
        # y = self.label2oneHot(y)
        return x,y
    
    def __len__(self):
        return len(self.df)
    
    def seq2oneHot(self,seq):
        seq2int = [ vocab_map[x] for x in list(seq) ]
        seq2int = torch.tensor(seq2int)
        oneHot = F.one_hot(seq2int,num_classes=len(vocab_map) )
        return oneHot
    
    def label2oneHot(self,label):
        return F.one_hot(torch.tensor(label),num_classes=2)
    
device = 'cuda' if torch.cuda.is_available() else 'cpu'

trdt  = ProteinDataset('./data/split/train.csv')
valdt = ProteinDataset('./data/split/val.csv')
tedt  = ProteinDataset('./data/split/test.csv')

trdl  = torch.utils.data.DataLoader(trdt, batch_size=64, num_workers=4)
valdl  = torch.utils.data.DataLoader(valdt, batch_size=64, num_workers=4)
tedl  = torch.utils.data.DataLoader(tedt, batch_size=64, num_workers=4)

### Model 

|i|model | used|
|---|--------| ----|
|0  |ResNet| ✔|
|1  | ResNext   |x|
|2  | MaxFilterCNN|✔ |
|3  | LSTM|     x|
|4  |Self-attention|✔ |
<!-- |5  ||✔ | -->


#### ResNet 

In [2]:
import torchvision

### x: b,10,20
from torch import nn
class ResNet(nn.Module):
    def __init__(self):
        super(ResNet,self).__init__()
        self.backbone = torchvision.models.resnet18(pretrained=True)
        in_nodes = self.backbone.fc.in_features
        self.backbone.fc = nn.Linear(in_nodes,2)
    def forward(self,x):
        x = x.to(torch.float)
        x = torch.stack([x,x,x],dim=1)
        x = self.backbone(x)
        return x
        

#### ResNext 

In [2]:
import torchvision

### x: b,10,20
from torch import nn
class ResNext(nn.Module):
    def __init__(self):
        super(ResNext,self).__init__()
        self.backbone = torchvision.models.resnext50_32x4d(pretrained=True)
        in_nodes = self.backbone.fc.in_features
        self.backbone.fc = nn.Linear(in_nodes,2)
    def forward(self,x):
        x = x.to(torch.float)
        x = torch.stack([x,x,x],dim=1)
        x = self.backbone(x)
        return x
        

#### MaxFilterCNN 

In [2]:
### x: b,10,20
from torch import nn
class MaxFilterCNN(nn.Module):
    def __init__(self):
        super(MaxFilterCNN,self).__init__()
        self.maxFconv = nn.Sequential(
            nn.Conv2d(1,8,kernel_size=(3,20) ),
            nn.BatchNorm2d(8),
            nn.ReLU()
        )
        class squeeze(nn.Module):
            def __init__(self):
                super(squeeze,self).__init__()
            def forward(self,x):
                x = x.squeeze(-1)
                return x
        self.sq = squeeze()
        self.conv1d0 = nn.Sequential(
                    nn.Conv1d(8,8,kernel_size=3),
                    # nn.MaxPool1d(2),
                    nn.BatchNorm1d(8),
                    nn.ReLU()
                )
        self.conv1d1 = nn.Sequential(
                    nn.Conv1d(8,8,kernel_size=2),
                    # nn.MaxPool1d(2),
                    nn.BatchNorm1d(8),
                    nn.ReLU()
                )

        mli = nn.ModuleList([self.maxFconv,self.sq,self.conv1d0,self.conv1d1])
        sample = torch.rand(1,1,10,20)
        for f in mli:
            sample = f(sample)
        b,c,l = sample.shape
        num_node  = c*l 

        self.last = nn.Sequential(
                    nn.Flatten(),
                    nn.Linear(num_node,2)
                )
    def forward(self,x):
        x = x.to(torch.float)
        x = x.unsqueeze(1)
        x = self.maxFconv(x)
        x = self.sq(x)
        x = self.conv1d0(x)
        x = self.conv1d1(x)
        x = self.last(x)
        return x
        

####  LSTM

In [2]:
import torchvision

### x: b,10,20
from torch import nn
class lstm(nn.Module):
    def __init__(self):
        super(lstm,self).__init__()
        self.lstm0 = nn.LSTM(input_size = 20, hidden_size = 20,num_layers=1, batch_first=True)
        self.lstm1 = nn.LSTM(input_size = 20, hidden_size = 20,num_layers=1, batch_first=True)
        self.fc = nn.Linear(20,2)
        
    def forward(self,x):
        x        = x.to(torch.float)
        x,(h,c)  = self.lstm0(x)
        al,(x,c) = self.lstm1(x)
        x        = x.transpose(0,1)
        x        = x.squeeze()
        x        = self.fc(x)
        return x
        

####  Self-attention

In [2]:
import torchvision

### x: b,10,20
from torch import nn
class attns(nn.Module):
    def __init__(self):
        super(attns,self).__init__()
        self.attn0 = nn.TransformerEncoderLayer(d_model=20, nhead=4,batch_first=True)
        self.attn1 = nn.TransformerEncoderLayer(d_model=20, nhead=4,batch_first=True)
        self.flat  = nn.Flatten()
        self.fc    = nn.Sequential(
            nn.Linear(200,100),
            nn.ReLU(),
            nn.Linear(100,2)
        )
        
    def forward(self,x):
        x = x.to(torch.float)
        x = self.attn0(x)+x
        x = self.attn1(x)+x
        x = self.flat(x)
        x = self.fc(x)
        return x
        

#### Vision transformer 

In [20]:
import torchvision

### x: b,10,20
import timm
from torch import nn
class Vit(nn.Module):
    def __init__(self):
        super(Vit,self).__init__()
        self.backbone = timm.create_model('visformer_small', pretrained=True)
        new_head = nn.Sequential(
            nn.Linear(self.backbone.head.in_features,50),
            nn.Linear(50,2))
        self.backbone.head = new_head
        
    def forward(self,x):
        x = x.to(torch.float)
        x = torch.stack([x,x,x],dim=1)
        x = self.backbone(x)
        return x
        

### settings 

In [8]:
it = iter(trdl)
x,y = next(it)

In [21]:
model = 'vit'

model = model.lower()
if model.startswith('maxfil'):
    model = MaxFilterCNN().to(device)
elif model.startswith('resnet'):
    model = ResNet().to(device)
elif model.startswith('resnext'):
    model = ResNext().to(device)
elif model.startswith('attn'):
    model = attns().to(device)
elif model.startswith('lstm'):
    model = lstm().to(device)
elif model.startswith('vit'):
    model = Vit().to(device)    
    
loss = nn.CrossEntropyLoss()
params = [p for p in model.parameters() if p.requires_grad]
opt  = torch.optim.Adam(params)

Downloading: "https://github.com/rwightman/pytorch-image-models/releases/download/v0.1-vt3p-weights/visformer_small-839e1f5b.pth" to /home/yp/.cache/torch/hub/checkpoints/visformer_small-839e1f5b.pth


### train/val/test 

In [22]:
def train(dl,model,lossf,opt):
    model.train()
    for x,y in dl:
        x,y = x.to(device),y.to(device)
        pre = model(x)
        loss = lossf(pre,y)

        opt.zero_grad()
        loss.backward()
        opt.step()

def test(dl,model,lossf,epoch=None):
    model.eval()
    size, acc , losses = len(dl.dataset) ,0,0
    with torch.no_grad():
        for x,y in dl:
            x,y = x.to(device),y.to(device)
            pre = model(x)
            loss = lossf(pre,y)
    
            acc += (pre.argmax(1)==y).type(torch.float).sum().item()
            losses += loss.item()
    accuracy = round(acc/size,4)
    val_loss = round(losses/size,6)
    print(f'[{epoch}] acc/loss: {accuracy}/{val_loss}')
    return accuracy,val_loss

import copy
patience = 5
val_losses = {0:1}
for i in range(100):
    train(trdl,model,loss,opt)
    acc,val_loss = test(valdl,model,loss,i)
    
    
    if min(val_losses.values() ) > val_loss:
        val_losses[i] = val_loss
        best_model = copy.deepcopy(model)
    if i == min(val_losses,key=val_losses.get)+patience:
        break

In [7]:
test(valdl,best_model,loss)

[None] acc/loss: 0.855/0.005422


(0.855, 0.005422)

In [8]:
test(tedl,best_model,loss)

[None] acc/loss: 0.8479/0.005516


(0.8479, 0.005516)

### Evaluate 

In [13]:
from sklearn.metrics import accuracy_score 
from sklearn.metrics import recall_score 
from sklearn.metrics import precision_score 
from sklearn.metrics import f1_score 
# from sklearn.metrics import make_scorer
# from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix 

def evaluate(dl,model,lossf,epoch=None):
    model.eval()
    size, _ , losses = len(dl.dataset) ,0,0
    pre_l,gt_l = [],[]
    with torch.no_grad():
        for x,y in dl:
            x,y = x.to(device),y.to(device)
            pre = model(x)
            loss = lossf(pre,y)
            
            losses += loss.item()
            pre_l.extend(pre.argmax(1).cpu().numpy().tolist())
            gt_l .extend(y.cpu().numpy().tolist())
    
    loss     = losses/size
    acc      = accuracy_score(gt_l,pre_l)
    recall   = recall_score(gt_l,pre_l)
    precision= precision_score(gt_l,pre_l)
    f1       = f1_score(gt_l,pre_l)
    confusion= confusion_matrix(gt_l,pre_l)

    metrics = {'acc':acc,'recall':recall,'precision':precision,'f1':f1,'confusion':confusion,'loss':loss}
    return metrics

print(evaluate(tedl,best_model,loss))

{'acc': 0.8479186701469982, 'recall': 0.7309397163120568, 'precision': 0.7673336435551419, 'f1': 0.748694665153235, 'confusion': array([[4523,  500],
       [ 607, 1649]]), 'loss': 0.005515566614404913}


## ETC 

### Save model 

In [26]:
import os
model_name = f"{best_model.__str__().split('(')[0]}_{max(val_losses)}.pt"
model_path = os.path.join('./models',model_name) 
torch.save(best_model.state_dict(),model_path)

### load saved models and evaluate them 

In [50]:
files = os.listdir('./models')

model_paths = [os.path.join('./models',file) for file in files if file.endswith('.pt')]

import os
from models import * 
results = {}

for m_path in model_paths:
    model_name = os.path.basename(m_path).split('_')[0]
    model_name = model_name.lower()
    
    model = model_name

    model = model.lower()
    if model.startswith('maxfil') :
        model = MaxFilterCNN().to(device)
    elif model.startswith('resnet'):
        model = ResNet().to(device)
    elif model.startswith('resnext'):
        model = ResNext().to(device)
    elif model.startswith('attn'):
        model = attns().to(device)
    elif model.startswith('lstm'):
        model = lstm().to(device)
    
    
    model.load_state_dict(torch.load(m_path))
    
    loss = nn.CrossEntropyLoss()
    
    result = evaluate(tedl,model,loss)
    
    print(f'{model_name}: {result}')
    results[model_name] = result

import pandas as pd
df  = pd.DataFrame(results).T

models = [os.path.splitext( os.path.basename(path) )[0] for path in model_paths]

df.to_csv(f"assets/{'&'.join(models)}.csv")

### Inference 

In [125]:
def get_seqStr(path = './data/sample.txt'):
    with open(path,'r',encoding='utf8') as f:
        x_list = f.readlines()

    x_list = list(map(lambda x:x.replace('\n',''),x_list ) )
    return x_list

def seq2oneHot(seq):
    seq2int = [ vocab_map[x] for x in seq ]
    seq2int = torch.tensor(seq2int)
    oneHot = F.one_hot(seq2int,num_classes=len(vocab_map) )
    return oneHot

def seq_list2oneHot(seq_list):
    tensor_list = list(map(seq2oneHot,seq_list))
    seq_tensor = torch.stack(seq_tensor_list)
    return seq_tensor


In [132]:
x = seq_list2oneHot(get_seqStr()).to(device)

In [134]:
y = model(x)
y = y.argmax(1)
y = y.cpu().numpy().tolist()

In [137]:
# save output
y = list(map(str,y))
dirname = os.path.dirname(path)
fname   = os.path.basename(path).split('.')[0]
savep   = os.path.join(dirname,f'{fname}_out.csv')
with open(savep,'w') as f:
    f.write('\n'.join(y) )