# 1.Original model prediction

In [6]:
import torch.optim as optim
from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score, f1_score, matthews_corrcoef
import torch
import torch.nn as nn
import time
from torch.utils.data import Dataset, DataLoader
from tpbte import Model
import pandas as pd
import pickle as pk


class MyDataset(Dataset):
    def __init__(self, path, emb_type):

        self.data = pd.read_csv(path,low_memory=False)
        self.CDR3B = self.data['CDR3B']
        self.Epitope = self.data['Epitope']
        self.Affinity = self.data['Affinity']
        self.emb_type = emb_type
        if self.emb_type == 'onehot':
            CDR3B, Epi, self.Affinity = onehot(self.CDR3B, self.Epitope, self.Affinity)
        elif self.emb_type == 'BLOSUM62':
            CDR3B, Epi, self.Affinity = BLOSUM_62(self.CDR3B, self.Epitope, self.Affinity, 20)
        else:
            CDR3B, Epi, self.Affinity = Atchley(self.CDR3B, self.Epitope, self.Affinity, 20)
        self.pair = torch.cat((CDR3B, Epi), -1)
    def __getitem__(self, index):
        return self.pair[index], self.Affinity[index]
    def __len__(self):
        return torch.LongTensor(self.Affinity).size()[0]

def Atchley(TCR, Epitope, Label, Length):
    aa_vec = pk.load(open('atchley.pk', 'rb'))
    Label = torch.LongTensor(Label).view(-1, 1)
    n = Label.size()[0]
    ext = list('********************') 
    tcr_embedding = torch.zeros(n, Length, 6)
    epi_embedding = torch.zeros(n, Length, 6)
    for ti, tcr in enumerate(TCR):
        tcr = tcr + ' ' * (Length - len(tcr))
        for i in range(Length):
            tcr_embedding[ti, i, :] = torch.from_numpy(aa_vec[tcr[i]])

    for ei, epi in enumerate(Epitope):
        epi = epi + ' ' * (Length - len(epi))
        for i in range(Length):
            epi_embedding[ei, i, :] = torch.from_numpy(aa_vec[epi[i]])
    return tcr_embedding[:, :, 0:5], epi_embedding[:, :, 0:5], Label 

def Convert_letters(seq):
    result = []
    atchley = pk.load(open('atchley.pk', 'rb'))
    for key in atchley.keys():
        atchley[key] = atchley[key][..., :-1]
    for row in seq:
        res_row = []
        for element in row:
            closest_key = None
            min_distance = float('inf')
            for key, value in atchley.items():
                distance = np.linalg.norm(element - value)
                if distance < min_distance:
                    min_distance = distance
                    closest_key = key
            res_row.append([closest_key])
        result.append(res_row)
    result = np.array(result, dtype=object)
    shape = result.shape
    result = result.reshape((shape[0], shape[1]*shape[2]))
    letter = []
    for row in result:
        row_without_symbols = [element for element in row if element.isalpha()]
        letter.append(''.join(row_without_symbols))
    return letter

def data_renew(pairs, emb_type):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    if emb_type == 'onehot':
        tcr = pairs[:, :, 0:21].type(torch.LongTensor).to(device)
        epi = pairs[:, :, 21:-1].type(torch.LongTensor).to(device)
    elif emb_type == 'BLOSUM62':
        tcr = pairs[:, :, 0:20].to(device)
        epi = pairs[:, :, 20:].to(device)
    else:
        tcr = pairs[:, :, 0:5].to(device)
        epi = pairs[:, :, 5:].to(device)
    return tcr, epi

import pandas as pd
import torch
import numpy as np
import warnings
warnings.filterwarnings("ignore")


def Original_model_prediction(testfile_path,modelfile_path,result_path):
    vocab = 21 
    d_model = 512
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(device)
    e_type = 'Atchley'
    test_path=testfile_path
    test = MyDataset(test_path, emb_type=e_type)
    b_size = 32
    test_dataset = DataLoader(dataset=test, batch_size=b_size, shuffle=True, drop_last=False)
    model = Model(src_vocab=vocab, tgt_vocab=vocab, emb_type=e_type, N=6, h=8, d_model=d_model, dropout=0.1, device=device)
    model.load_state_dict(torch.load(modelfile_path))
    model = model.to(device)
    model.eval()
    df_data = []
    predictions=[]
    true_labels=[]
    predicted_classes=[]
    with torch.no_grad():
        for data in test_dataset:
            pairs, label = data
            tcr, epi = data_renew(pairs=pairs, emb_type=e_type)
            tcr = tcr.to(device)
            epi = epi.to(device)
            output = model(tcr, epi)
            pred = output.argmax(dim=1)
            predictions.extend(output[:, 1].tolist())  
            true_labels.extend(label.tolist())
            predicted_classes.extend(pred.tolist())
            tcr=Convert_letters(tcr.cpu().numpy())
            epi=Convert_letters(epi.cpu().numpy())
            prediction_scores = output[:, 1].cpu().numpy()
            true_labels_batch = label.cpu().numpy()
            predicted_classes_batch = pred.cpu().numpy()
            for i in range(len(pairs)):
                 df_data.append({'Epitope': epi[i],'CDR3B': tcr[i],'y_true': true_labels_batch[i],
                                 'y_prob': prediction_scores[i], 
                                 'y_pred': predicted_classes_batch[i]})
    df_data=pd.DataFrame(df_data)           
    df_data['y_true'] = df_data['y_true'].str[0]
    df_data.to_csv(result_path+'probability.csv')


In [None]:
testfile_path="../data/test.csv"
modelfile_path="../Original_model/TPBTE_mc.pth"
result_path="../result_path/Original_model_prediction"
Original_model_prediction(testfile_path,modelfile_path,result_path)


# 2.Model retraining

In [10]:
import torch.optim as optim
from sklearn.metrics import accuracy_score, roc_auc_score, precision_score, recall_score, f1_score, matthews_corrcoef,roc_curve
import torch
import torch.nn as nn
import time
from torch.utils.data import Dataset, DataLoader
from tpbte import Model
import pandas as pd
import pickle as pk


def data_renew(pairs, emb_type):
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    if emb_type == 'onehot':
        tcr = pairs[:, :, 0:21].type(torch.LongTensor).to(device)
        epi = pairs[:, :, 21:-1].type(torch.LongTensor).to(device)
    elif emb_type == 'BLOSUM62':
        tcr = pairs[:, :, 0:20].to(device)
        epi = pairs[:, :, 20:].to(device)
    else:
        tcr = pairs[:, :, 0:5].to(device)
        epi = pairs[:, :, 5:].to(device)
    return tcr, epi

class MyDataset(Dataset):
    def __init__(self, path, emb_type):

        self.data = pd.read_csv(path,low_memory=False)
        self.CDR3B = self.data['CDR3B']
        self.Epitope = self.data['Epitope']
        self.Affinity = self.data['Affinity']
        self.emb_type = emb_type
        if self.emb_type == 'onehot':
            CDR3B, Epi, self.Affinity = onehot(self.CDR3B, self.Epitope, self.Affinity)
        elif self.emb_type == 'BLOSUM62':
            CDR3B, Epi, self.Affinity = BLOSUM_62(self.CDR3B, self.Epitope, self.Affinity, 20)
        else:
            CDR3B, Epi, self.Affinity = Atchley(self.CDR3B, self.Epitope, self.Affinity, 20)
        self.pair = torch.cat((CDR3B, Epi), -1)
    def __getitem__(self, index):
        return self.pair[index], self.Affinity[index]
    def __len__(self):
        return torch.LongTensor(self.Affinity).size()[0]
def Atchley(TCR, Epitope, Label, Length):
    aa_vec = pk.load(open('atchley.pk', 'rb'))
    Label = torch.LongTensor(Label).view(-1, 1)
    n = Label.size()[0]
    ext = list('********************') 
    tcr_embedding = torch.zeros(n, Length, 6)
    epi_embedding = torch.zeros(n, Length, 6)
    for ti, tcr in enumerate(TCR):
        tcr = tcr + ' ' * (Length - len(tcr))
        for i in range(Length):
            tcr_embedding[ti, i, :] = torch.from_numpy(aa_vec[tcr[i]])

    for ei, epi in enumerate(Epitope):
        epi = epi + ' ' * (Length - len(epi))
        for i in range(Length):
            epi_embedding[ei, i, :] = torch.from_numpy(aa_vec[epi[i]])
    return tcr_embedding[:, :, 0:5], epi_embedding[:, :, 0:5], Label 

def Convert_letters(seq):
    import numpy as np
    result = []
    atchley = pk.load(open('atchley.pk', 'rb'))
    for key in atchley.keys():
        atchley[key] = atchley[key][..., :-1]
    for row in seq:
        res_row = []
        for element in row:
            closest_key = None
            min_distance = float('inf')
            for key, value in atchley.items():
                distance = np.linalg.norm(element - value)
                if distance < min_distance:
                    min_distance = distance
                    closest_key = key
            res_row.append([closest_key])
        result.append(res_row)
    result = np.array(result, dtype=object)
    shape = result.shape
    result = result.reshape((shape[0], shape[1]*shape[2]))
    letter = []
    for row in result:
        row_without_symbols = [element for element in row if element.isalpha()]
        letter.append(''.join(row_without_symbols))
    return letter


In [13]:
def Model_retraining(trainfile_path,testfile_path,save_model_path,result_path): 
    import pandas as pd
    import numpy as np 
    vocab = 21 
    d_model = 512
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(device)
    e_type = 'Atchley'
    criterion = nn.CrossEntropyLoss()
    model = Model(src_vocab=vocab, tgt_vocab=vocab, emb_type=e_type, N=6, h=8, d_model=d_model,dropout=0.1, device=device)
    model = model.to(device)
    opt = torch.optim.Adam(model.parameters(), lr=0.00005, betas=(0.9, 0.98), eps=1e-9)
    n_epoch = 2
    train = MyDataset(trainfile_path, emb_type=e_type)
    test = MyDataset(testfile_path, emb_type=e_type)
    b_size = 32
    train_dataset = DataLoader(dataset=train, batch_size=b_size, shuffle=True, drop_last=True) 
    test_dataset = DataLoader(dataset=test, batch_size=b_size, shuffle=True, drop_last=False)
    model.train()
    best_loss = 1
    L = torch.zeros(n_epoch, 500)
    predictions = []  # Initialize predictions list
    true_labels = []  # Initialize true_labels list
    predicted_classes = []  # Initialize predicted_classes list
    for epoch in range(n_epoch):
        print('epoch:', epoch + 1)
        for i, data in enumerate(train_dataset, 0):
            pairs, label = data
            tcr, epi = data_renew(pairs=pairs, emb_type=e_type)
            label = label.unsqueeze(-1).to(device)
            n = tcr.size()[0]
            output = torch.unsqueeze(model(tcr, epi), 1)
            pred = output.argmax(dim=2)
            output = output.view(-1, 2)
            pred = pred.view(-1)
            label = label.long().view(-1)
            loss = criterion(output, label) 
            correct = torch.eq(pred, label.long()).sum().float().item()
            acc = correct / n
            if loss < best_loss:
                best_loss = loss
                param = model.state_dict()
            if loss == best_loss:
                param = model.state_dict()
            opt.zero_grad()
            loss.backward() 
            opt.step()
    torch.save(model.state_dict(),save_model_path)  
    print('End training. Begin testing')
    good_model = model
    good_model.load_state_dict(param)
    good_model.eval()
    good_model.to(device)
    df_data = []
    with torch.no_grad():
        for i, data in enumerate(test_dataset, 0):
            correct = 0  
            pairs, label = data
            tcr, epi = data_renew(pairs=pairs, emb_type=e_type)
            output = good_model(tcr, epi)
            pred = output.argmax(dim=1)
            predictions.extend(output[:, 1].tolist())  
            true_labels.extend(label.tolist())
            predicted_classes.extend(pred.tolist())

            tcr=Convert_letters(tcr.cpu().numpy())
            epi=Convert_letters(epi.cpu().numpy())
            prediction_scores = output[:, 1].cpu().numpy()
            true_labels_batch = label.cpu().numpy()
            predicted_classes_batch = pred.cpu().numpy()
            for i in range(len(pairs)):
                 df_data.append({'Epitope': epi[i],'CDR3B': tcr[i],'y_true': true_labels_batch[i],
                                 'y_prob': prediction_scores[i], 
                                 'y_pred': predicted_classes_batch[i]})
            torch.cuda.empty_cache()
    df_data=pd.DataFrame(df_data)           
    df_data['y_true'] = df_data['y_true'].str[0]
    df_data.to_csv(result_path+'probability.csv')


In [None]:
trainfile_path ="../data/train.csv"
testfile_path="../data/test.csv"
save_modle_path="../Retraining_model/Retraining_model.pth"
result_path="../result_path/Retraining_model_prediction"
Model_retraining(trainfile_path,testfile_path,save_modle_path,result_path) 

# 3.Retraining_model_prediction

In [15]:
def Retraining_model_prediction(testfile_path,modelfile_path,result_path):
    vocab = 21 
    d_model = 512
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print(device)
    e_type = 'Atchley'
    test_path=testfile_path
    test = MyDataset(test_path, emb_type=e_type)
    b_size = 32
    test_dataset = DataLoader(dataset=test, batch_size=b_size, shuffle=True, drop_last=False)
    model = Model(src_vocab=vocab, tgt_vocab=vocab, emb_type=e_type, N=6, h=8, d_model=d_model, dropout=0.1, device=device)
    model.load_state_dict(torch.load(modelfile_path))
    model = model.to(device)
    model.eval()
    df_data = []
    predictions=[]
    true_labels=[]
    predicted_classes=[]
    with torch.no_grad():
        for data in test_dataset:
            pairs, label = data
            tcr, epi = data_renew(pairs=pairs, emb_type=e_type)
            tcr = tcr.to(device)
            epi = epi.to(device)
            output = model(tcr, epi)
            pred = output.argmax(dim=1)
            predictions.extend(output[:, 1].tolist())  
            true_labels.extend(label.tolist())
            predicted_classes.extend(pred.tolist())
            tcr=Convert_letters(tcr.cpu().numpy())
            epi=Convert_letters(epi.cpu().numpy())
            prediction_scores = output[:, 1].cpu().numpy()
            true_labels_batch = label.cpu().numpy()
            predicted_classes_batch = pred.cpu().numpy()
            for i in range(len(pairs)):
                 df_data.append({'Epitope': epi[i],'CDR3B': tcr[i],'y_true': true_labels_batch[i],
                                 'y_prob': prediction_scores[i], 
                                 'y_pred': predicted_classes_batch[i]})
    df_data=pd.DataFrame(df_data)           
    df_data['y_true'] = df_data['y_true'].str[0]
    df_data.to_csv(result_path+'probability.csv')

In [None]:
testfile_path="../data/Validation.csv"
modelfile_path="../Retraining_model/Retraining_model.pth"
result_path="../result_path/Retraining_model_prediction"
Retraining_model_prediction(testfile_path,modelfile_path,result_path)