# Setup

Import libraries

In [1]:
import os
# os.environ['CUDA_VISIBLE_DEVICES'] = '0'
from datetime import datetime
import numpy as np
import pandas as pd

import torch
from torch.utils.data import TensorDataset, DataLoader, RandomSampler
from torch import optim
from torch import nn

from keras.preprocessing.sequence import pad_sequences

from tqdm.notebook import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, roc_auc_score, roc_curve
import time

from tape import ProteinBertModel, TAPETokenizer

random_seed = 22

_ = torch.manual_seed(random_seed)

Define paths and experiment name

In [None]:
project_dir = "/path/to/project/root"

data_dir = project_dir + "/data"
models_dir = data_dir + "/models"
temp_dir = project_dir + "/temp"
datasets_dir = data_dir + "/datasets"

dataset_easy_true_train_file = datasets_dir + "/synthetic_easy/true_dataset_train.csv"
dataset_easy_true_test_file = datasets_dir + "/synthetic_easy/true_dataset_test.csv"
dataset_easy_decoys_train_file = datasets_dir + "/synthetic_easy/decoys_dataset_train.csv"
dataset_easy_decoys_test_file = datasets_dir + "/synthetic_easy/decoys_dataset_test.csv"

dataset_hard_true_train_file = datasets_dir + "/synthetic_hard/true_dataset_train.csv"
dataset_hard_true_test_file = datasets_dir + "/synthetic_hard/true_dataset_test.csv"
dataset_hard_decoys_train_file = datasets_dir + "/synthetic_hard/decoys_dataset_train.csv"
dataset_hard_decoys_test_file = datasets_dir + "/synthetic_hard/decoys_dataset_test.csv"

dataset_src_col_pep = "peptide"
dataset_src_col_label = "label"
dataset_src_col_hla_seq = "HLA_sequence"

dataset_true_col_pep = "peptide"
dataset_true_col_hla_seq = "mhc"
dataset_true_col_label = "label"

dataset_decoys_col_pep = "peptide_rand"
dataset_decoys_col_hla_seq = "mhc_rand"
dataset_decoys_col_label = "label"

model_type = "tape"

Select experiment type

In [3]:
exp_name = "synthetic_easy"

# exp_name = "synthetic_hard"

In [4]:
time_stamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S-%f")

weights_file = models_dir + f"/model_{model_type}_{exp_name}_no_hla.pt"

results_dir = data_dir + f"/results/{model_type}_{exp_name}_no_hla"

train_results_file = results_dir + f"/results_{model_type}_{exp_name}_no_hla_train.txt"
test_results_file = results_dir + f"/results_{model_type}_{exp_name}_no_hla_test.txt"

temp_prediction_flat_file = (
    temp_dir + f"/model_training_test_prediction_{model_type}_{exp_name}_no_hla_flat_{time_stamp}"
)
temp_prediction_prob_file = (
    temp_dir + f"/model_training_test_prediction_{model_type}_{exp_name}_no_hla_prob_{time_stamp}"
)

roc_file = results_dir + f"/test_roc_{model_type}_{exp_name}_no_hla.csv"
roc_col_fpr = "False_positive_rate"  # Column name
roc_col_tpr = "True_positive_rate"  # Column name

Create folders

In [None]:
if not os.path.exists(results_dir):
    os.makedirs(results_dir)
if not os.path.exists(temp_dir):
    os.makedirs(temp_dir)
if not os.path.exists(models_dir):
    os.makedirs(models_dir)

Loading the training dataset

In [None]:
if exp_name == "synthetic_easy":
    dataset_true_train = pd.read_csv(dataset_easy_true_train_file, index_col=0)
    dataset_decoys_train = pd.read_csv(dataset_easy_decoys_train_file, index_col=0)
elif exp_name == "synthetic_hard":
    dataset_true_train = pd.read_csv(dataset_hard_true_train_file, index_col=0)
    dataset_decoys_train = pd.read_csv(dataset_hard_decoys_train_file, index_col=0)

dataset_true_train = dataset_true_train.rename(columns={dataset_true_col_hla_seq: dataset_src_col_hla_seq})
dataset_decoys_train = dataset_decoys_train[[dataset_decoys_col_pep, dataset_decoys_col_label, dataset_decoys_col_hla_seq]].rename(columns={dataset_decoys_col_pep: dataset_src_col_pep, dataset_decoys_col_hla_seq: dataset_src_col_hla_seq})

df = pd.concat([dataset_true_train, dataset_decoys_train], ignore_index=True, axis=0)

del dataset_true_train, dataset_decoys_train

df[dataset_src_col_label] = df[dataset_src_col_label].replace({True: 1, False: 0})

# Step 1: Input tokenization

In [None]:
peptide_sequences = df.peptide.values

labels = df.label.values
labels = labels.astype(int)

Import the TAPE tokenizer

In [None]:
tokenizer = TAPETokenizer(vocab='iupac')

In [None]:
print("Tokenizing inputs")

tokenized_peptide = [tokenizer.encode(sent) for sent in peptide_sequences]
tokenized_train = pad_sequences([sent for sent in tokenized_peptide],
                                dtype="long", truncating="post", padding="post")

Split the dataset into train and validation

In [None]:
train_inputs, validation_inputs, train_labels, validation_labels = train_test_split(tokenized_train, labels, random_state=1024, test_size=0.2)

Convert both of them into tensors

In [None]:
train_inputs = torch.tensor(train_inputs)
train_labels = torch.tensor(train_labels)

validation_inputs = torch.tensor(validation_inputs)
validation_labels = torch.tensor(validation_labels)

Convert into dataloaders

In [None]:
train_data = TensorDataset(train_inputs, train_labels)
train_sampler = RandomSampler(train_data)
train_dataloader = DataLoader(train_data, sampler=train_sampler, batch_size=128)

validation_data = TensorDataset(validation_inputs, validation_labels)
validation_sampler = RandomSampler(validation_data)
validation_dataloader = DataLoader(validation_data, sampler=validation_sampler, batch_size=64)

# Step 2: Define the model

In [None]:
class TAPEClassification(nn.Module):
    def __init__(self, input_dim_bert, output_dim):
        super().__init__()
        self.bert = ProteinBertModel.from_pretrained('bert-base')
        self.dropout = nn.Dropout(0.1)
        self.classifier = nn.Linear(input_dim_bert, output_dim)

    def forward(self, x_sem):
        pooled_output = self.bert(x_sem)[0][:, 0, :]
        pooled_output = self.dropout(pooled_output)

        out = self.classifier(pooled_output)

        return out

In [None]:
model = TAPEClassification(768,2)
model.cuda()

Define early stop

In [None]:
class EarlyStopping():
    def __init__(self, tolerance=2, min_delta=0.00002, model=None, store_position =None):
        self.tolerance = tolerance
        self.min_delta = min_delta
        self.counter = 0
        self.early_stop = False
        self.model = model
        self.store_position = store_position
        self.max_value = 0

    def __call__(self, new_value):
        if new_value - self.min_delta > self.max_value :
            self.counter = 0

            torch.save(self.model.state_dict(), self.store_position)
            self.max_value = new_value
        else:
            self.counter +=1

        if self.counter >= self.tolerance:
            self.early_stop = True

Set the parameters

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=2e-5)

early_stopping = EarlyStopping(tolerance=2, min_delta=0.002, model = model, store_position = weights_file)

Set a function to calculate accuracy

In [None]:
def flat_accuracy(preds, labels):
    pred_flat = np.argmax(preds, axis=1).flatten()
    labels_flat = labels.flatten()
    return np.sum(pred_flat == labels_flat) / len(labels_flat)

# Step 3: Training phase

In [None]:
train_loss_set = []
train_loss_list = []
train_acc_list = []
val_acc_list = []

epochs = 10

with open(train_results_file, 'w') as f:
    f.write('===== TRAIN RESULTS =====\n')

for _ in tqdm(range(0, epochs), leave=False, desc="Epoch"):
    tr_loss = 0
    nb_tr_examples, nb_tr_steps = 0, 0
    eval_loss, eval_accuracy = 0, 0
    nb_eval_steps, nb_eval_examples = 0, 0
    acc_flag = 0

    model.train()


    for batch in tqdm(train_dataloader, leave=False, desc="Training"):
        batch = tuple(t.cuda() for t in batch)
        b_input_ids,  b_labels = batch

        optimizer.zero_grad()

        logits = model(b_input_ids)
        loss = criterion(logits, b_labels)

        train_loss_set.append(loss.item())
        loss.backward()
        optimizer.step()
        tr_loss += loss.item()
        nb_tr_examples += b_input_ids.size(0)
        nb_tr_steps += 1

        logits = logits.detach().cpu().numpy()
        b_labels = b_labels.detach().cpu().numpy()
        accuracy = flat_accuracy(logits, b_labels)
        acc_flag+=accuracy

    print("Train loss: {}".format(tr_loss/nb_tr_steps))
    print("Train acc: {}".format(acc_flag/nb_tr_steps))

    train_loss_list.append(tr_loss/nb_tr_steps)
    train_acc_list.append(acc_flag/nb_tr_steps)

    model.eval()

    for batch in tqdm(validation_dataloader, leave=False, desc="Validating"):
        batch = tuple(t.cuda() for t in batch)
        b_input_ids, b_labels = batch

        with torch.no_grad():
            logits = model(b_input_ids)

        # Move logits and labels to CPU
        logits = logits.detach().cpu().numpy()
        label_ids = b_labels.to('cpu').numpy()
        tmp_eval_accuracy = flat_accuracy(logits, label_ids)
        eval_accuracy += tmp_eval_accuracy
        nb_eval_steps += 1

    print("Validation Accuracy: {}".format(eval_accuracy/nb_eval_steps))
    val_acc_list.append(eval_accuracy/nb_eval_steps)
    time.sleep(1)

    with open(train_results_file, 'a') as f:
        f.write('EPOCH '+str(len(train_loss_list))+' --> LOSS: '+ str(train_loss_list[-1])
                +' TRAIN_ACC: '+ str(train_acc_list[-1])
                + ' VAL_ACC: ' + str(val_acc_list[-1]))
        f.write('\n')

    early_stopping(val_acc_list[-1])
    if early_stopping.early_stop:
        print("We stop for early stopping")
        break

# Step 4: Testing

Load the fine-tuned weights (skip the following cell box if you run the experiment in one-shot mode)

In [None]:
# model.load_state_dict(torch.load(weights_file))

Load the dataset

In [None]:
if exp_name == "synthetic_easy":
    dataset_true_test = pd.read_csv(dataset_easy_true_test_file, index_col=0)
    dataset_decoys_test = pd.read_csv(dataset_easy_decoys_test_file, index_col=0)
elif exp_name == "synthetic_hard":
    dataset_true_test = pd.read_csv(dataset_hard_true_test_file, index_col=0)
    dataset_decoys_test = pd.read_csv(dataset_hard_decoys_test_file, index_col=0)

dataset_true_test = dataset_true_test.rename(columns={dataset_true_col_hla_seq: dataset_src_col_hla_seq})
dataset_decoys_test = dataset_decoys_test[[dataset_decoys_col_pep, dataset_decoys_col_label, dataset_decoys_col_hla_seq]].rename(columns={dataset_decoys_col_pep: dataset_src_col_pep, dataset_decoys_col_hla_seq: dataset_src_col_hla_seq})

df_test = pd.concat([dataset_true_test, dataset_decoys_test], ignore_index=True, axis=0)

del dataset_true_test, dataset_decoys_test

df_test[dataset_src_col_label] = df_test[dataset_src_col_label].replace({True: 1, False: 0})

test_peptide = df_test.peptide.values
test_labels = df_test.label.values

Input tokenization

In [None]:
print("Tokenizing inputs")

tokenized_test_peptide = [tokenizer.encode(sent) for sent in test_peptide]

tokenized_test = pad_sequences([sent for sent in tokenized_test_peptide],
                                dtype="long", truncating="post", padding="post")

Convert them into tensors

In [None]:
test_inputs = torch.tensor(tokenized_test)
test_labels = torch.tensor(test_labels)

Convert to a dataloader

In [None]:
test_data = TensorDataset(test_inputs, test_labels)
test_dataloader = DataLoader(test_data, batch_size=32)

Set the parameters

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Testing

Save the prediction to a dedicated file to avoid memory crash

In [None]:
with open(temp_prediction_flat_file, "w") as f:
    pass
with open(temp_prediction_prob_file, "w") as f:
    pass

model.eval()

for batch in tqdm(test_dataloader, leave=False, desc="Testing"):
    predictions = []

    batch = tuple(t.to(device) for t in batch)
    b_input_ids, b_labels = batch

    with torch.no_grad():
        logits = model(b_input_ids)
    logits = logits.detach().cpu().numpy()

    with open(temp_prediction_prob_file, "a") as f:
        for elem in logits[:, 1]:
            f.write(str(elem) + " ")

    predictions.append(logits)

    flat_predictions = [item for sublist in predictions for item in sublist]
    flat_predictions = np.argmax(flat_predictions, axis=1).flatten()

    with open(temp_prediction_flat_file, "a") as f:
        for elem in flat_predictions:
            f.write(str(elem) + " ")

Load the predictions

In [None]:
with open(temp_prediction_prob_file, "r") as f:
    data = f.read()
data_list = data.split(" ")
array_list = np.array(data_list[:-1])
prob_predictions = array_list.astype(float)
os.remove(temp_prediction_prob_file)

fpr, tpr, _ = roc_curve(test_labels, prob_predictions)

roc = pd.DataFrame({roc_col_fpr: fpr, roc_col_tpr: tpr})
roc.to_csv(roc_file, index=False)

In [None]:
with open(temp_prediction_flat_file, 'r') as f:
    data = f.read()

data_list = data.split(' ')

array_list = np.array(data_list[:-1])
flat_predictions = array_list.astype(int)

os.remove(temp_prediction_flat_file)

Print the final results

In [None]:
with open(test_results_file, 'w') as f:
    f.write('===== TEST RESULTS =====\n')
    f.write('ACCURACY --> ' + str(accuracy_score(test_labels, flat_predictions)) +'\n')
    f.write('F1-MACRO --> ' + str(f1_score(test_labels, flat_predictions, average='macro'))+'\n')
    f.write('F1 WEIGHTED --> '+ str(f1_score(test_labels, flat_predictions, average='weighted'))+'\n')
    f.write('Precision TRUE_label --> ' + str(precision_score(test_labels, flat_predictions, average='binary'))+'\n')
    f.write('Recall TRUE_label --> ' + str(recall_score(test_labels, flat_predictions, average='binary'))+'\n')
    f.write('F1 TRUE_label --> ' + str(f1_score(test_labels, flat_predictions, average='binary'))+'\n')
    f.write('AUC ROC --> ' + str(roc_auc_score(test_labels, flat_predictions))+'\n')