In [None]:
from optimization import *
from configuration_bert import *
from tokenization_bert import *
from modeling_bert import *
from tokenization_dna import *
import pathlib
import pandas as pd
import glob
import logging
import os
import pickle
import random
import re
import shutil
from typing import Dict, List, Tuple
from copy import deepcopy
from multiprocessing import Pool

import numpy as np
import torch
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset, RandomSampler, SequentialSampler
from torch.utils.data.distributed import DistributedSampler
from tqdm import tqdm, trange

from preparation import *
from helper_functions import *
torch.cuda.empty_cache()

MASK_LIST = [-1, 1, 2]

modeldir = "Diem_pretrained_model"
datadir = "dataset"

outdir = "."

path_to_main_output = os.path.join(outdir, "outputdir")
os.system("mkdir -p {}".format(path_to_main_output))

path_to_01_output = os.path.join(path_to_main_output, "01_output")
os.system("mkdir -p {}".format(path_to_01_output))

model = BertModel.from_pretrained(os.path.join(modeldir, "23082023_checkpoints", "checkpoint-9900"))

model = model.to(device)


In [None]:
if os.path.exists(os.path.join(path_to_01_output, "universal_testset_new.stringOnly.label.txt")) == False:    
    df_full = pd.read_csv(os.path.join(datadir, "train_fullpeps.csv"))
    df_test = pd.read_csv(os.path.join(datadir, "universal_testset_new.csv"))
    
    df_full["string"] = df_full[["CDR3b", "epitope"]].apply(lambda x: "{} [SEP] {}".format(" ".join([item for item in x[0]]), 
                                                                                              " ".join([item for item in x[1]])), axis = 1)
    df_binding = df_full[df_full["binder"] == 1]
    df_nonbinding = df_full[df_full["binder"] == 0]
    
    df_binding["string"].to_csv(os.path.join(path_to_01_output, "train_fullpeps.binding.stringOnly.txt"), sep = "\t", header = False, index = False)
    df_nonbinding["string"].to_csv(os.path.join(path_to_01_output, "train_fullpeps.nonbinding.stringOnly.txt"), sep = "\t", header = False, index = False)
    
    os.system("mkdir -p {}".format(os.path.join(path_to_01_output, "sampled_non_binding")))
    for i in tqdm(range(1,101)):
        tmp = df_nonbinding.sample(10000)["string"].to_csv(os.path.join(path_to_01_output, "sampled_non_binding", "train_fullpeps.nonbinding.stringOnly.sample_{}.txt".format(i)), 
                                                          sep = "\t", header = False, index = False)
    
    df_test["string"] = df_test[["CDR3b", "epitope"]].apply(lambda x: "{} [SEP] {}".format(" ".join([item for item in x[0]]), 
                                                                                              " ".join([item for item in x[1]])), axis = 1)
    
    df_test["string"].to_csv(os.path.join(path_to_01_output, "universal_testset_new.stringOnly.txt"), sep = "\t", header = False, index = False)
    df_test["binder"].to_csv(os.path.join(path_to_01_output, "universal_testset_new.stringOnly.label.txt"), sep = "\t", header = False, index = False)
    

In [None]:
#### TRANSFORM THE INPUT SEQUENCES TO [CLS] EMBEDDING VECTORS 1x1024
path_to_binding_dataset = os.path.join(path_to_01_output, "train_fullpeps.binding.stringOnly.txt")
batch_size = 32

cls_embeddings_binding = generate_embedding_matrix(path_to_binding_dataset, batch_size, model, tokenizer)

sample_id = 1
path_to_nonbinding_dataset = os.path.join(path_to_01_output, "sampled_non_binding", "train_fullpeps.nonbinding.stringOnly.sample_{}.txt".format(sample_id))
cls_embeddings_nonbinding = generate_embedding_matrix(path_to_nonbinding_dataset, batch_size, model, tokenizer)



In [None]:
X = np.vstack([cls_embeddings_binding, cls_embeddings_nonbinding])
y = np.array([1 for i in range(cls_embeddings_binding.shape[0])] + [0 for i in range(cls_embeddings_nonbinding.shape[0])])

from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

In [None]:
from xgboost import XGBClassifier

bst = XGBClassifier(n_estimators=100, max_depth=100, learning_rate=0.01, objective='binary:logistic')
# fit model
bst.fit(X_train, y_train)

preds = bst.predict(X_test)
from sklearn.metrics import accuracy_score
acc = accuracy_score(y_test, preds)

In [None]:
count = 0
universal_test_acc = []
finished_split_name = []
all_sens = []
all_spec = []

all_num_binding = []
all_num_nonbinding = []

path_to_universal_test_dir = os.path.join(datadir, "split_universal_testset")

all_split_names = [item.name for item in pathlib.Path(path_to_universal_test_dir).glob("*")]

path_to_save_predictiondf = "{}/prediction_results_with_nonBinding_sampleID_{}".format(path_to_01_output, sample_id)
os.system("mkdir -p {}".format(path_to_save_predictiondf))

done_predictiondf = [item.name.replace("predictiondf_", "").replace(".csv", "") for item in pathlib.Path(path_to_save_predictiondf).glob("*.csv")]
for split_name in all_split_names:
    if (split_name in done_predictiondf) == False:
        path_to_universal_test = os.path.join(datadir, "split_universal_testset", split_name)
        cls_embeddings_univervsal_test = generate_embedding_matrix(path_to_universal_test, batch_size, model, tokenizer)
        
        path_to_universal_test_labels = os.path.join(datadir, "split_universal_testset_labels", split_name)
        universal_test_labels = pd.read_csv(path_to_universal_test_labels, header = None)[0].to_numpy()
        
        test_preds = bst.predict(cls_embeddings_univervsal_test)
        test_probs = bst.predict_proba(cls_embeddings_univervsal_test)
        predictiondf = pd.DataFrame(data = universal_test_labels, columns = ["true_label"])
        predictiondf["prediction"] = test_preds
        acc = accuracy_score(universal_test_labels, test_preds)
        if predictiondf[predictiondf["true_label"] == 1].shape[0] == 0:
            tp = "no binding sample"
            fp = predictiondf[(predictiondf["true_label"] == 0) & (predictiondf["prediction"] == 1)].shape[0]/predictiondf[predictiondf["true_label"] == 0].shape[0]
        elif predictiondf[predictiondf["true_label"] == 0].shape[0] == 0:
            tp = predictiondf[(predictiondf["true_label"] == 1) & (predictiondf["prediction"] == 1)].shape[0]/predictiondf[predictiondf["true_label"] == 1].shape[0]
            fp = "no nonbinding sample"
        else:
            fp = predictiondf[(predictiondf["true_label"] == 0) & (predictiondf["prediction"] == 1)].shape[0]/predictiondf[predictiondf["true_label"] == 0].shape[0]
            tp = predictiondf[(predictiondf["true_label"] == 1) & (predictiondf["prediction"] == 1)].shape[0]/predictiondf[predictiondf["true_label"] == 1].shape[0]
        
        spec = 1 - fp
        sens = tp
        
        universal_test_acc.append(acc)
        finished_split_name.append(split_name)
        all_sens.append(sens)
        all_spec.append(spec)
        all_num_binding.append(predictiondf[predictiondf["true_label"] == 1].shape[0])
        all_num_nonbinding.append(predictiondf[predictiondf["true_label"] == 0].shape[0])
        predictiondf["string"] = pd.read_csv(path_to_universal_test, header = None)[0].to_list()
        predictiondf["prob_0"] = test_probs[:, 0]
        predictiondf["prob_1"] = test_probs[:, 1]
        predictiondf.to_csv(os.path.join(path_to_save_predictiondf, "predictiondf_{}.csv".format(split_name)))
        
