In [None]:
import random
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from Bio import SeqIO
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, HistGradientBoostingClassifier
from sklearn.model_selection import StratifiedKFold, GridSearchCV, train_test_split
from sklearn.metrics import roc_auc_score, roc_curve, precision_recall_curve, auc, make_scorer, matthews_corrcoef
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.decomposition import PCA
from statistics import stdev, mean, variance
import torch
from torch import nn, optim
from sklearn.base import BaseEstimator, ClassifierMixin

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, verbose=False, path='checkpoint_model.pth'):
        self.patience = patience
        self.verbose = verbose
        self.counter = 0
        self.best_score = None
        self.early_stop = False
        self.val_loss_min = np.Inf
        self.path = path

    def __call__(self, val_loss, model):
        score = -val_loss

        if self.best_score is None:  
            self.best_score = score   
            self.checkpoint(val_loss, model)  
        elif score <= self.best_score:  
            self.counter += 1   
            if self.verbose:  
                print(f'EarlyStopping counter: {self.counter} out of {self.patience}')   
            if self.counter >= self.patience:  
                self.early_stop = True
        else:  
            self.best_score = score  
            self.checkpoint(val_loss, model)  
            self.counter = 0  
            
    def checkpoint(self, val_loss, model):
        if self.verbose:  
            print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
        torch.save(model.state_dict(), self.path)  
        self.val_loss_min = val_loss  
        
def training_loop(n_epochs, optimizer, model, loss, mask_train, x_train,  y_train):
    loss=loss
    
    n_samples=x_train.shape[0]
    n_val=int(n_samples*0.2)

    shuffled_ind=torch.randperm(n_samples)

    train_ind=shuffled_ind[:-n_val] 
    val_ind=shuffled_ind[-n_val:]
    
    x_val=x_train[val_ind]
    y_val=y_train[val_ind]
    
    x_train=x_train[train_ind]
    y_train=y_train[train_ind]
    
    x_train=x_train
    y_train=y_train
    
    x_val=x_val
    y_val=y_val

    patience=10
    earlystopping = EarlyStopping(patience=patience, verbose=False)
    for epoch in range(1, n_epochs+1):
        model.train()
        
        y_train_pred=model.forward(x_train)
        loss_train=loss(y_train_pred, y_train)
        
        model.eval()
        with torch.no_grad():
            y_val_pred=model.forward(x_val)
            loss_val=loss(y_val_pred, y_val)

        earlystopping(loss_val, model) 
        if earlystopping.early_stop: 
            break
            
        optimizer.zero_grad()
        loss_train.backward()
        optimizer.step()
        
class FNN2(nn.Module):
    def __init__(self, embeddings_dim=1024, dropout=0.25):
        super(FNN2, self).__init__()

        self.linear = nn.Sequential(
            nn.Linear(embeddings_dim, 32),
            nn.Dropout(dropout),
            nn.ReLU(),
            nn.Linear(32,2)
        )


    def forward(self, x: torch.Tensor, **kwargs) -> torch.Tensor:
        o = self.linear(x)  
        return o
    

class NN(BaseEstimator, ClassifierMixin):
    def __init__(self, n_epochs=500, lr=0.03):
        self.n_epochs = n_epochs
        self.lr = lr
        self.model = None
        self.optim = None
        self.loss = nn.CrossEntropyLoss()

    def fit(self, X, y):
        self.classes_ = np.unique(y)
        X_tensor = torch.tensor(X, dtype=torch.float)
        y_tensor = torch.tensor(y, dtype=torch.long)
        n_dim = X_tensor.shape[1]
        self.model=FNN2(embeddings_dim=n_dim)
        self.optim = optim.Adam(self.model.parameters(), lr=self.lr)
        training_loop(
            n_epochs=self.n_epochs,
            optimizer=self.optim,
            model=self.model,
            loss=self.loss,
            mask_train=None,
            x_train=X_tensor,
            y_train=y_tensor,
        )
        return self

    def predict(self, X):
        with torch.no_grad():
            X_tensor = torch.tensor(X, dtype=torch.float)
            self.model.eval()
            y_pred = self.model(X_tensor)
            _, predicted = torch.max(y_pred, 1)
            return predicted.numpy()

    def predict_proba(self, X):
        with torch.no_grad():
            X_tensor = torch.tensor(X, dtype=torch.float)
            self.model.eval()
            y_pred = self.model(X_tensor)
            probas = nn.Softmax(dim=1)(y_pred)
            return probas.numpy()

In [None]:
def param(model_type, X, y):
    if model_type == 'rf':
        model = RandomForestClassifier(class_weight="balanced", n_estimators=200, n_jobs=80)
        param_grid = {'max_depth': [5, 10, 15, 20], "max_features": ["log2","sqrt",None]}
        grid_search = GridSearchCV(model, param_grid, cv=5, scoring='roc_auc')
    elif model_type == 'hgbc':
        model = HistGradientBoostingClassifier(class_weight="balanced")
        param_grid = {'learning_rate': [0.05, 0.1, 0.2], "max_leaf_nodes": [15, 31, 63], "min_samples_leaf": [20, 40, 80]}
        grid_search = GridSearchCV(model, param_grid, cv=5, scoring='roc_auc')
    elif model_type == 'nn':
        model = NN()
        param_grid = {'lr': [0.01, 0.02, 0.03, 0.04, 0.05]}
        grid_search = GridSearchCV(model, param_grid, cv=5, scoring='roc_auc')
    if model_type == 'svm':
        model = SVC(class_weight="balanced", probability=True)
        param_grid = {'C': [1, 10], 'kernel': ['rbf', 'sigmoid'], 'gamma':['scale', 'auto']}
        grid_search = GridSearchCV(model, param_grid, cv=5, n_jobs=-1, scoring='roc_auc')
    grid_search.fit(X, y)

    return grid_search.best_params_, grid_search.best_estimator_

def eval_test_data(trained_model, X_test, y_test):
    y_pred_prob = trained_model.predict_proba(X_test)[:, 1]
    
    rocauc = roc_auc_score(y_test, y_pred_prob)
    
    precision, recall, _ = precision_recall_curve(y_test, y_pred_prob)
    prauc = auc(recall, precision)
    
    y_pred = trained_model.predict(X_test)
    mcc = matthews_corrcoef(y_test, y_pred)

    return {'rocauc': rocauc, 'prauc': prauc, 'mcc': mcc}

In [None]:
class ModelEvaluation():
    def __init__(self, x, y):
        self.x=x
        self.y=y
        self.df = pd.DataFrame(columns=['Dimension', 'Model', 'Fold', 'Best_Params', 'Scores'])
        
    def run(self):
        score_types=["rocauc", "prauc", "mcc"]
        temp_df_list = ['Dimension', 'Model', 'Fold', 'Best_Params', 'Scores']

        with open("result/sca_pram_tuning_tmp.tsv", "w") as f:
            f.write('Dimension\tModel\tFold\tBest_Params\tScores')
            f.write("\n")
            for n_dim in [128, 64, 32, 16, 8, 4]:
                print("n_dim:{}".format(n_dim))
    
                for model in ["nn", "hgbc", "rf", "svm"]:
                    print("model:{}".format(model))
                    cv=StratifiedKFold(n_splits=5, shuffle=True, random_state=0)
                    for i, (train, test) in enumerate(cv.split(self.x, self.y)):
                        sc=StandardScaler()
                        pca=PCA(n_components=n_dim)
                        scaled_x_train=sc.fit_transform(self.x[train])
                        reduced_x_train=pca.fit_transform(scaled_x_train)
    
                        scaled_x_test=sc.transform(self.x[test])
                        reduced_x_test=pca.transform(scaled_x_test)
                        
                        best_params, trained_model=param(model, reduced_x_train, self.y[train])
                        scores=eval_test_data(trained_model, reduced_x_test, self.y[test])
    
                        row = {
                            #'Sample': sample,
                            'Dimension': str(n_dim),
                            'Model': model,
                            'Fold': i,
                            'Best_Params': best_params,
                            'Scores': {k: scores[k] for k in score_types}
                        }
                        temp_df_list.append(row)
                        f.write(str(n_dim)+"\t"+model+"\t"+str(i)+"\t"+str(best_params)+"\t"+str({k: scores[k] for k in score_types}))
                        f.write("\n")
            self.df = pd.concat([self.df, pd.DataFrame(temp_df_list)], ignore_index=True)

In [None]:
mat=np.load("embedding/PTT5XLU50_human.npy", allow_pickle=True).item()
scaffold_set=set(SeqIO.index("../fig1/result/drllps_scaffold_clstr_Homo_sapiens.fasta", "fasta").keys())
nonllps_set=set(SeqIO.index("../fig1/result/drllps_nonllps_clstr_Homo_sapiens.fasta", "fasta").keys())

list_nonllps=[]
list_scaffold=[]
for k in mat.keys():
    if k in nonllps_set:
        list_nonllps.append(mat[k])
    elif k in scaffold_set:
        list_scaffold.append(mat[k])

In [None]:
x = np.array(list_scaffold + list_nonllps)
y = np.array([True]*len(list_scaffold) + [False]*len(list_nonllps))

In [None]:
modeleval=ModelEvaluation(x, y)

In [None]:
modeleval.run()

In [None]:
result_df=pd.DataFrame(list(modeleval.df.iloc[5:,-1]))

In [None]:
result_df.to_csv("scores_scaffold_nonllps.csv")

In [None]:
client_set=set(SeqIO.index("../fig1/result/drllps_client_clstr_Homo_sapiens.fasta", "fasta").keys())

list_client=[]
for k in mat.keys():
    if k in client_set:
        list_client.append(mat[k])

In [None]:
x = np.array(list_scaffold + list_client)
y = np.array([True]*len(list_scaffold) + [False]*len(list_client))

In [None]:
modeleval=ModelEvaluation(x, y)

In [None]:
modeleval.run()

In [None]:
result_df=pd.DataFrame(list(modeleval.df.iloc[5:,-1]))
result_df.to_csv("scores_scaffold_client.csv")