# Peptides encoding (One-hot)

In [1]:
import peptides
import numpy as np
import pandas as pd
from datetime import date
import seaborn as sns
from math import sqrt
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

# libs for ml
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.metrics import classification_report
from sklearn.manifold import TSNE
from sklearn.utils import shuffle
#from umap import UMAP

# my module with some func
import pepcode

## Set parameters

In [2]:
AA_LIST = pepcode.AA_LIST

latent_dims = 200
num_epochs = 500 
batch_size = 300
learning_rate = 1e-3 
use_gpu = True

# Device set
if use_gpu and torch.cuda.is_available():
    device = torch.device("cuda:0")
elif use_gpu and torch.backends.mps.is_available():
    device = torch.device("mps")
else:
    device = torch.device("cpu")

## Preparing data

In [3]:
data = pd.read_csv('./dataset/vdjdb-2024-11-27-fixed/vdjdb.slim.txt', sep = '\t')
data = data[(data.gene == 'TRB') & (data.species == 'HomoSapiens')]
data.reset_index(drop=True, inplace=True)
index_list = []
for i in range(len(data)):
    if len(data.iloc[i].cdr3)==15 and data.iloc[i].cdr3[0]=='C' and (data.iloc[i].cdr3[-1]=='F' or data.iloc[i].cdr3[-1]=='W'):
        continue
    else:
        index_list.append(i)
data.drop(index=index_list, inplace=True)
data.reset_index(drop=True, inplace=True)
# data.sample(frac=1, random_state=100).reset_index(drop=True, inplace=True)
data = shuffle(data)

#Split dataset and prepare train, test, evaluation datasets
train_size, test_size, eval_size = int(round(len(data)*0.7, 0)), int(round(len(data)*(0.7+0.15), 0)), int(round(len(data)*(1), 0))
data_split = np.split(data, [train_size, test_size, eval_size], axis=0)
data_train = data_split[0]
data_test = data_split[1]
data_eval = data_split[2]

# Prepare peptide lists
pep_train_list = data_train.cdr3.values
pep_test_list = data_test.cdr3.values
pep_eval_list = data_eval.cdr3.values

len_seq = len(pep_train_list[0])


pep_train_oh = np.zeros((len(pep_train_list), len(pepcode.AA_LIST), len_seq), dtype = np.float32)
for i in range(len(pep_train_oh)):
    pep_train_oh[i] = pepcode.one_hot_code(pep_train_list[i])

pep_test_oh = np.zeros((len(pep_test_list), len(pepcode.AA_LIST), len_seq), dtype = np.float32)
for i in range(len(pep_test_list)):
    pep_test_oh[i] = pepcode.one_hot_code(pep_test_list[i])   


oh_matr_size = pep_test_oh[0].size

# Пока не нужно
pep_eval_oh = np.zeros((len(pep_eval_list), len(pepcode.AA_LIST), len_seq), dtype = np.float32)
for i in range(len(pep_eval_list)):
    pep_eval_oh[i] = pepcode.one_hot_code(pep_eval_list[i])  

# Prepare train dataloader
oh_dataset_train = torch.utils.data.TensorDataset(torch.tensor(pep_train_oh), torch.tensor(np.ones(pep_train_oh.shape[0])))
oh_train_dl = torch.utils.data.DataLoader(oh_dataset_train, batch_size=batch_size, shuffle=False)

# Prepare test dataloader
oh_dataset_test = torch.utils.data.TensorDataset(torch.tensor(pep_test_oh), torch.tensor(np.ones(pep_test_oh.shape[0])))
oh_test_dl = torch.utils.data.DataLoader(oh_dataset_test, batch_size=batch_size, shuffle=False)

# Evaluation (пока не нужно)
oh_dataset_eval = torch.utils.data.TensorDataset(torch.tensor(pep_eval_oh), torch.tensor(np.ones(pep_eval_oh.shape[0])))
oh_eval_dl = torch.utils.data.DataLoader(oh_dataset_eval, batch_size=batch_size, shuffle=False)


  return bound(*args, **kwds)


## Autoencoder definition (300->200)

In [4]:
autoencoder_arch = '300->200 without L2'

class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.fc1 = nn.Linear(in_features=len(AA_LIST)*len_seq, out_features=latent_dims)
            
    def forward(self, x):
        x = self.fc1(x)
        return x

class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        self.fc1 = nn.Linear(in_features=latent_dims, out_features=len(AA_LIST)*len_seq)
            
    def forward(self, x):
        x = self.fc1(x)
        x = F.sigmoid(x)
        return x
    
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()
    
    def forward(self, x):
        latent = self.encoder(x)
        x_recon = self.decoder(latent)
        return x_recon
    
autoencoder_f = Autoencoder()

autoencoder_f = autoencoder_f.to(device)

num_params = sum(p.numel() for p in autoencoder_f.parameters() if p.requires_grad)
print('Number of parameters for One-hot encoding: %d' % num_params)

loss_function = nn.MSELoss()

optimizer = torch.optim.Adam(autoencoder_f.parameters(), lr = learning_rate)

Number of parameters for One-hot encoding: 120500


### Train autoencoder

In [None]:
# set to training mode
autoencoder_f.train()

train_loss_avg_1 = []

print('Training ...')
for epoch in range(num_epochs):
    train_loss_avg_1.append(0)
    num_batches = 0
    
    for pep_batch, _ in oh_train_dl:
        
        pep_batch = pep_batch.to(device)

        pep_batch = pep_batch.reshape(-1, oh_matr_size)
        
        # autoencoder reconstruction
        pep_batch_recon = autoencoder_f(pep_batch)
        
        # reconstruction error
        loss = loss_function(pep_batch_recon, pep_batch)
        
        # backpropagation
        optimizer.zero_grad()
        loss.backward()
        
        # one step of the optmizer (using the gradients from backpropagation)
        optimizer.step()
        
        train_loss_avg_1[-1] += loss.item()
        num_batches += 1
        
    train_loss_avg_1[-1] /= num_batches
    if epoch % 50 == 0:
        print('Epoch [%d / %d] average reconstruction error: %f' % (epoch+1, num_epochs, train_loss_avg_1[-1]))
print('Train finished')

### Evaluation on test set

In [None]:
autoencoder_f.eval()

output = []
test_loss_avg_1, num_batches = 0, 0

# Прогоняем тестовую выборку через автоэнкодер
for (pep, _) in oh_test_dl:
    with torch.no_grad():
        pep = pep.reshape(-1, oh_matr_size)
        pep = pep.to(device)
        pep_recon = autoencoder_f(pep)
        loss = loss_function(pep_recon, pep)
        test_loss_avg_1 += loss.item()
        num_batches += 1
    output.append((pep, pep_recon))
test_loss_avg_1 /= num_batches


test_loss_avg_1 /= num_batches
print('Average reconstruction error: %f' % (test_loss_avg_1))

# Output выглядит странно это лист, состоящий из кортежей, каждый из которых относится к одному батчу. Т.е. ouput[0] - это кортеж из первого батча
# Далее каждый кортеж, состоит из двух тензоров: до и после энкодера. Тензоры в 2д формате.


# Создаём пустые(нулевые) массивы для one-hot петпидов до и после
pep_test_oh_bef = np.zeros((len(pep_test_list), len(pepcode.AA_LIST), len_seq), dtype = np.float32)
pep_test_oh_aft = np.zeros((len(pep_test_list), len(pepcode.AA_LIST), len_seq), dtype = np.float32)


# Переводим пептиды в np массив из output
pointer = 0
for i in range(num_batches):
    cur_batch_size = len(output[i][0])
    pep_test_oh_bef[pointer:pointer + cur_batch_size, :, : ] = output[i][0].reshape((cur_batch_size, len(AA_LIST), len_seq)).numpy(force=True)
    pep_test_oh_aft[pointer:pointer + cur_batch_size, :, : ] = output[i][1].reshape((cur_batch_size, len(AA_LIST), len_seq)).numpy(force=True)
    pointer += cur_batch_size
    

# Для каждого пептида после автоэнкодера приводим его к формату one-hot путем простанови 1 на место максимального веса в столбце
for i in range(pep_test_oh_aft.shape[0]):
    for j in range(len_seq):
        col_max = np.max(pep_test_oh_aft[i][:,j])
        for k in range(len(AA_LIST)):
            if pep_test_oh_aft[i][k][j] == col_max:
                pep_test_oh_aft[i][k][j] = 1.0
            else: 
                pep_test_oh_aft[i][k][j] = 0.0  

pep_test_list_bef_ae = []
pep_test_list_aft_ae = []

for i in range(pep_test_oh_aft.shape[0]):
    pep_test_list_bef_ae.append(pepcode.one_hot_decode(pep_test_oh_bef[i]))   
    pep_test_list_aft_ae.append(pepcode.one_hot_decode(pep_test_oh_aft[i]))

#### Biological evaluatiuon

In [None]:
err_count = np.zeros(len_seq, dtype = np.float32)
blos_score = np.zeros(len_seq, dtype = np.float32)

for i in range(len(pep_test_list_bef_ae)):
    for j in range(len_seq):
        if pep_test_list_bef_ae[i][j] != pep_test_list_aft_ae[i][j]:
            err_count[j] +=1
            blos_score[j] += pepcode.blosum_score(pep_test_list_bef_ae[i][j],pep_test_list_aft_ae[i][j])


avg_blos_score = {}
for i in range(len_seq):
    if err_count[i] !=0:
        avg_blos_score[i]=blos_score[i]/err_count[i]

pep_test_list_bef_ae_aa = []
pep_test_list_aft_ae_aa = []
for i in range(len(pep_test_list_bef_ae)):
    for j in pep_test_list_bef_ae[i]:
        pep_test_list_bef_ae_aa.append(j)
    for k in pep_test_list_aft_ae[i]:
        pep_test_list_aft_ae_aa.append(k)

In [None]:
fig, axs = plt.subplots(3,2, figsize = [8.3, 11.7])

sns.pointplot(train_loss_avg_1 , color = 'green', ax=axs[0, 0])
sns.pointplot(x=avg_blos_score.keys(), y=avg_blos_score.values(), color='green', ax=axs[1,0])
sns.pointplot(err_count, color = 'green', ax=axs[2, 0])
sns.pointplot(err_count/len(pep_test_list_bef_ae), color = 'green', ax=axs[2, 1])
axs[0, 0].hlines(round(test_loss_avg_1, 5), 0, num_epochs, color = 'red', label='Test MSE')
axs[0, 0].legend()
axs[0, 0].set(xlabel='Epoch', ylabel='Reconstruction error', title = f'Reconstruction error during training\n for Autoencoder_final', xticks=[i for i in range(1, num_epochs, int(num_epochs/10))])
axs[0, 1].set(frame_on=False)
axs[0, 1].set_xticks([])
axs[0, 1].set_yticks([])
axs[0, 1].text(x=-0.1, y=-1.5, s=f'Average reconstruction error on test set:\nMSE: {round(test_loss_avg_1, 5)}\nRMSE: {round(sqrt(test_loss_avg_1), 5)}\n\nArchitecture: {autoencoder_arch}\nLearning rate: {learning_rate}\n\n{classification_report(pep_test_list_bef_ae_aa, pep_test_list_aft_ae_aa)}')
axs[1, 0].set(xlabel='Position in peptide', ylabel='Average blossum62 score', title=f'Average blossum62 score by position for\nAutoencoder_final')
axs[1, 1].set(frame_on=False)
axs[1, 1].set_xticks([])
axs[1, 1].set_yticks([])
axs[2, 0].set(xlabel='Position in peptide', ylabel='Count of errors', title=f'Count of errors by position\n for Autoencoder_final')
axs[2, 1].set(xlabel='Position in peptide', ylabel='Percent of errors', title=f'Count of errors by position\n  for Autoencoder_final(in %)', yticks=[i/10 for i in range(1, 10)])

plt.subplots_adjust(wspace=0.2, hspace=0.5)
fig.suptitle(f'One-hot Encoder with arch {autoencoder_arch} on test set') 
plt.show()
fig.savefig(f'./Results_one_hot/{str(date.today())}_final_{autoencoder_arch}_on_test.pdf', format='pdf')

### Evaluation on eval set

In [None]:
autoencoder_f.eval()

output = []
eval_loss_avg_1, num_batches = 0, 0

# Прогоняем тестовую выборку через автоэнкодер
for (pep, _) in oh_eval_dl:
    with torch.no_grad():
        pep = pep.reshape(-1, oh_matr_size)
        pep = pep.to(device)
        pep_recon = autoencoder_f(pep)
        loss = loss_function(pep_recon, pep)
        eval_loss_avg_1 += loss.item()
        num_batches += 1
    output.append((pep, pep_recon))
eval_loss_avg_1 /= num_batches


eval_loss_avg_1 /= num_batches
print('Average reconstruction error: %f' % (eval_loss_avg_1))

# Output выглядит странно это лист, состоящий из кортежей, каждый из которых относится к одному батчу. Т.е. ouput[0] - это кортеж из первого батча
# Далее каждый кортеж, состоит из двух тензоров: до и после энкодера. Тензоры в 2д формате.


# Создаём пустые(нулевые) массивы для one-hot петпидов до и после
pep_eval_oh_bef = np.zeros((len(pep_eval_list), len(pepcode.AA_LIST), len_seq), dtype = np.float32)
pep_eval_oh_aft = np.zeros((len(pep_eval_list), len(pepcode.AA_LIST), len_seq), dtype = np.float32)


# Переводим пептиды в np массив из output
pointer = 0
for i in range(num_batches):
    cur_batch_size = len(output[i][0])
    pep_eval_oh_bef[pointer:pointer + cur_batch_size, :, : ] = output[i][0].reshape((cur_batch_size, len(AA_LIST), len_seq)).numpy(force=True)
    pep_eval_oh_aft[pointer:pointer + cur_batch_size, :, : ] = output[i][1].reshape((cur_batch_size, len(AA_LIST), len_seq)).numpy(force=True)
    pointer += cur_batch_size
    

# Для каждого пептида после автоэнкодера приводим его к формату one-hot путем простанови 1 на место максимального веса в столбце
for i in range(pep_eval_oh_aft.shape[0]):
    for j in range(len_seq):
        col_max = np.max(pep_eval_oh_aft[i][:,j])
        for k in range(len(AA_LIST)):
            if pep_eval_oh_aft[i][k][j] == col_max:
                pep_eval_oh_aft[i][k][j] = 1.0
            else: 
                pep_eval_oh_aft[i][k][j] = 0.0  

pep_eval_list_bef_ae = []
pep_eval_list_aft_ae = []

for i in range(pep_eval_oh_aft.shape[0]):
    pep_eval_list_bef_ae.append(pepcode.one_hot_decode(pep_eval_oh_bef[i]))   
    pep_eval_list_aft_ae.append(pepcode.one_hot_decode(pep_eval_oh_aft[i]))

### Biological evaluatiuon

In [None]:
err_count = np.zeros(len_seq, dtype = np.float32)
blos_score = np.zeros(len_seq, dtype = np.float32)

for i in range(len(pep_eval_list_bef_ae)):
    for j in range(len_seq):
        if pep_eval_list_bef_ae[i][j] != pep_eval_list_aft_ae[i][j]:
            err_count[j] +=1
            blos_score[j] += pepcode.blosum_score(pep_eval_list_bef_ae[i][j],pep_eval_list_aft_ae[i][j])


avg_blos_score = {}
for i in range(len_seq):
    if err_count[i] !=0:
        avg_blos_score[i]=blos_score[i]/err_count[i]

pep_eval_list_bef_ae_aa = []
pep_eval_list_aft_ae_aa = []
for i in range(len(pep_eval_list_bef_ae)):
    for j in pep_eval_list_bef_ae[i]:
        pep_eval_list_bef_ae_aa.append(j)
    for k in pep_eval_list_aft_ae[i]:
        pep_eval_list_aft_ae_aa.append(k)

In [None]:
fig, axs = plt.subplots(3,2, figsize = [8.3, 11.7])

# sns.pointplot(train_loss_avg_1 , color = 'green', ax=axs[0, 0])
sns.pointplot(x=avg_blos_score.keys(), y=avg_blos_score.values(), color='green', ax=axs[1,0])
sns.pointplot(err_count, color = 'green', ax=axs[2, 0])
sns.pointplot(err_count/len(pep_eval_list_bef_ae), color = 'green', ax=axs[2, 1])
axs[0, 0].hlines(round(eval_loss_avg_1, 5), 0, num_epochs, color = 'red', label='Eval MSE')
axs[0, 0].legend()
axs[0, 0].set(xlabel='Epoch', ylabel='Reconstruction error', title = f'Reconstruction error during training\n for Autoencoder_final', xticks=[i for i in range(1, num_epochs, int(num_epochs/10))])
axs[0, 1].set(frame_on=False)
axs[0, 1].set_xticks([])
axs[0, 1].set_yticks([])
axs[0, 1].text(x=-0.1, y=-1.5, s=f'Average reconstruction error on evaluation set:\nMSE: {round(eval_loss_avg_1, 5)}\nRMSE: {round(sqrt(eval_loss_avg_1), 5)}\n\nArchitecture: {autoencoder_arch}\nLearning rate: {learning_rate}\n\n{classification_report(pep_eval_list_bef_ae_aa, pep_eval_list_aft_ae_aa)}')
axs[1, 0].set(xlabel='Position in peptide', ylabel='Average blossum62 score', title=f'Average blossum62 score by position for\nAutoencoder_final')
axs[1, 1].set(frame_on=False)
axs[1, 1].set_xticks([])
axs[1, 1].set_yticks([])
axs[2, 0].set(xlabel='Position in peptide', ylabel='Count of errors', title=f'Count of errors by position\n for Autoencoder_final')
axs[2, 1].set(xlabel='Position in peptide', ylabel='Percent of errors', title=f'Count of errors by position\n  for Autoencoder_final(in %)', yticks=[i/10 for i in range(1, 10)])

plt.subplots_adjust(wspace=0.2, hspace=0.5)
fig.suptitle(f'One-hot Encoder with arch {autoencoder_arch} on evaluation set') 
plt.show()
# fig.savefig(f'./Results_one_hot/{str(date.today())}_final_{autoencoder_arch}_on_eval.pdf', format='pdf')

### Model save

In [None]:
torch.save(autoencoder_f, f'One-hot_models/model_f.pth')

## Model load

In [5]:
model = torch.load('./One-hot_models/model_f.pth', weights_only=False) # Class should be defined 

In [6]:
model.eval()
output = []
num_batches = 0

# Прогоняем тестовую выборку через энкодер
for (pep, _) in oh_eval_dl:
    with torch.no_grad():
        pep = pep.reshape(-1, oh_matr_size)
        pep = pep.to(device)
        pep_recon = model.encoder(pep)
        num_batches += 1
    output.append((pep, pep_recon))

output


# Создаём пустые(нулевые) массивы для one-hot петпидов до и после
pep_eval_oh_bef = np.zeros((len(pep_eval_list), len(pepcode.AA_LIST), len_seq), dtype = np.float32)
pep_eval_oh_encoded = np.zeros((len(pep_eval_list), 200), dtype = np.float32)


# Переводим пептиды в np массив из output
pointer = 0
for i in range(num_batches):
    cur_batch_size = len(output[i][0])
    pep_eval_oh_bef[pointer:pointer + cur_batch_size, :, : ] = output[i][0].reshape((cur_batch_size, len(AA_LIST), len_seq)).numpy(force=True)
    pep_eval_oh_encoded[pointer:pointer + cur_batch_size, : ] = output[i][1].reshape((cur_batch_size, 200)).numpy(force=True)
    pointer += cur_batch_size

## Biological sense (Пока не доделано)

In [None]:
# Пока не важно
for i in range(data_eval['v.segm'].size):
    data_eval['v.segm'].iloc[i] = data_eval['v.segm'].iloc[i].split('*')[0]
    data_eval['j.segm'].iloc[i] = data_eval['j.segm'].iloc[i].split('*')[0]

# data_eval['v.segm']

# fig, axs = plt.subplots(4,2, figsize = [8.3, 11.7])
# perplexity_list = [0.1, 1, 5, 10, 20, 50, 100, 1000]

# for i in range(len(perplexity_list)):
#     tsne = TSNE(n_components=2, n_jobs=-1,
#                 n_iter=500, perplexity=perplexity_list[i])
#     data_tsne = tsne.fit_transform(pep_eval_oh_encoded)
#     sns.scatterplot(x=data_tsne [:, 0],
#                     y=data_tsne [:, 1],
#                     hue = data_eval['antigen.species'].values.reshape(-1),
#                     palette="colorblind", ax=axs[int(i%(len(perplexity_list)/2)), int(i//(len(perplexity_list)/2))])
#     axs[int(i%(len(perplexity_list)/2)), int(i//(len(perplexity_list)/2))].set(xlabel='TSNE_1', ylabel='TSNE_2', title=f'TSNE for encoded peptides\nwith perplexity={perplexity_list[i]}')
#     axs[int(i%(len(perplexity_list)/2)), int(i//(len(perplexity_list)/2))].legend().set_visible(False)

# plt.subplots_adjust(wspace=0.2, hspace=0.5)
# plt.show()