In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
import matplotlib.pyplot as plt
import math
from sklearn.metrics import roc_curve, auc
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import KFold 

In [None]:
#define methods: 
def train_test_split(data,label,train_size = 0.8):
    if data.shape[0] != label.shape[0]:
        return
    else:
        num_samples = data.shape[0]
        train_sample = int(num_samples * train_size)

        train_data = data[:train_sample]
        train_labels = label[:train_sample]

        test_data = data[train_sample:]
        test_labels = label[train_sample:]

        return(train_data,train_labels,test_data,test_labels)
    
class Net_conv(nn.Module):
    def __init__(self):
        super(Net_conv, self).__init__()
        self.conv1 = nn.Conv1d(1, 16, kernel_size=3, stride=1, padding=1)
        self.dropout1 = nn.Dropout(0.2)          self.conv2 = nn.Conv1d(16, 32, kernel_size=3, stride=1, padding=1)
        self.dropout2 = nn.Dropout(0.2)          self.fc1 = nn.Linear(7840, 64)
        self.dropout3 = nn.Dropout(0.5)          self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.dropout1(x)          x = self.relu(self.conv2(x))
        x = self.dropout2(x)          x = x.view(x.size(0), -1)
        x = self.relu(self.fc1(x)) 
        x = self.dropout3(x)          x = self.relu(self.fc2(x))
        x = self.sigmoid(self.fc3(x))
        return x
    
def metrics_output(preds,labels):
    metrics_fpr, metrics_tpr, thresholds = roc_curve(labels.squeeze(-1), preds.squeeze(-1))
    roc_auc = auc(metrics_fpr, metrics_tpr)

    best_threshold = thresholds[np.argmax(metrics_tpr - metrics_fpr)]

    test_pred_binary = np.where(preds > best_threshold, 1 , 0)

    metrics_tn, metrics_fp, metrics_fn, metrics_tp = confusion_matrix(np.squeeze(labels,axis=-1), np.squeeze(test_pred_binary,axis=-1)).ravel()
    metrics_sn = metrics_tp / (metrics_tp + metrics_fn)
    metrics_sp = metrics_tn / (metrics_tn + metrics_fp)
    metrics_ACC = (metrics_tp + metrics_tn) / (metrics_tn + metrics_fp + metrics_fn + metrics_tp)
    metrics_pre = metrics_tp / (metrics_tp + metrics_fp)
    metrics_F1 = 2 * (metrics_pre * metrics_sn) / (metrics_pre + metrics_sn)
    metrics_MCC = (metrics_tp * metrics_tn - metrics_fp * metrics_fn) / math.sqrt((metrics_tp + metrics_fp)*
                                                                                  (metrics_tp + metrics_fn)*
                                                                                  (metrics_tn + metrics_fp)*
                                                                                  (metrics_tn + metrics_fn))
    
    return (metrics_fpr, metrics_tpr, roc_auc, metrics_sn, metrics_sp, metrics_ACC, metrics_F1, metrics_MCC)

def plot_loss_curve(losses1, losses2=None):
    epochs = range(1, len(losses1) + 1)
    plt.plot(epochs, losses1, 'b', label='Train losses')

    if losses2 is not None:
        plt.plot(epochs, losses2, 'r', label='Validation losses')

    plt.title('Training Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

In [None]:
data = {
    'AUC_val': [np.nan]*20,
    'ACC_val': [np.nan]*20,
    'MCC_val': [np.nan]*20,
    'Sn_val': [np.nan]*20,
    'Sp_val': [np.nan]*20,
    'AUC_test': [np.nan]*20,
    'ACC_test': [np.nan]*20,
    'MCC_test': [np.nan]*20,
    'Sn_test': [np.nan]*20,
    'Sp_test': [np.nan]*20
}

df = pd.DataFrame(data)

In [None]:
pos_domain = pd.read_csv('./pos_domain_encoding.csv') 
neg_domain = pd.read_csv('./neg_domain_encoding.csv') 

pos_sequence = pd.read_csv('./pos_encoding_OH_ND.csv') 
neg_sequence = pd.read_csv('./neg_encoding_OH_ND.csv') 

pos = pd.merge(pos_domain, pos_sequence)
neg = pd.merge(neg_domain, neg_sequence)

pos = pos.iloc[:,1:]
neg = neg.iloc[:,1:]

In [None]:
device = "mps"

In [None]:
best_test_accuracy = 0.0


for i in range(0, 13):
    print(f"Processing on neg Part: {i+1}...")
    
    seed_value = i
    neg = neg.sample(n=1892, random_state=seed_value)

    raw_datas = np.concatenate((pos,neg),axis = 0)
    raw_datas = np.expand_dims(raw_datas, axis = -2)
    
    raw_labels = np.concatenate(([1] * pos.shape[0], [0] * neg.shape[0]),axis = 0)
    raw_labels = np.expand_dims(raw_labels,-1)
    
    np.random.seed(i)
    indices = np.random.permutation(raw_labels.shape[0])
    
    datas = raw_datas[indices,:,:]
    labels = raw_labels[indices,:]
    
    (two_datas,two_labels,test_datas,test_labels) = train_test_split(datas, labels)
    test_datas = torch.from_numpy(test_datas.astype(np.float32)).to(device).type(torch.float)
    test_labels = torch.from_numpy(test_labels.astype(np.float32)).to(device).type(torch.float)
    
    num_folds = 5
    kf = KFold(n_splits=num_folds, shuffle=True, random_state=i)
    
    val_accuracy_per_fold = []
    models = []
    
    best_val_accuracy = 0.0
    
    for fold, (train_indices, val_indices) in enumerate(kf.split(two_datas)):
        print(f"Training on fold {fold + 1}...")
        
        train_datas, train_labels = two_datas[train_indices], two_labels[train_indices]
        val_datas, val_labels = two_datas[val_indices], two_labels[val_indices]
        
        train_datas = torch.from_numpy(train_datas.astype(np.float32)).to(device).type(torch.float)
        train_labels = torch.from_numpy(train_labels.astype(np.float32)).to(device).type(torch.float)
        val_datas = torch.from_numpy(val_datas.astype(np.float32)).to(device).type(torch.float)
        val_labels = torch.from_numpy(val_labels.astype(np.float32)).to(device).type(torch.float)
        
        net = Net_conv().to(device)
        criterion = nn.BCELoss()
        optimizer = optim.Adam(net.parameters(), lr=0.001)
        scheduler = lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.93)
        
        num_epochs = 50
    
        train_losses = []
        validation_losses = []
        
        for epoch in range(num_epochs):
            optimizer.zero_grad()
            outputs = net(train_datas)
            loss = criterion(outputs, train_labels)
            loss.backward()
            optimizer.step()
            scheduler.step()
            
            train_losses.append(loss.item())
            if (epoch + 1) % 10 == 0:
                print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}")
            
            with torch.no_grad():
                net.eval()
                
                preds = net(val_datas)
                #preds = preds.detach().cpu().numpy()####
                
                #val_labels = val_labels.to("cpu").numpy()
                #print(val_labels.dtype)
                
                validation_loss = criterion(preds, val_labels)
                
                validation_losses.append(validation_loss.item())
                
        print(f"plot of training and validation loss in fold: {fold+1}")        
        plot_loss_curve(train_losses, validation_losses)
                
        with torch.no_grad():
            net.eval()
            
            preds = net(val_datas)
            preds = preds.detach().cpu().numpy() 
            
            val_labels = val_labels.to("cpu").numpy()
            
            metrics_fpr, metrics_tpr, roc_auc, metrics_sn, metrics_sp, metrics_ACC, metrics_F1, metrics_MCC = metrics_output(preds,val_labels)
            
            val_accuracy = metrics_ACC
            val_accuracy_per_fold.append(val_accuracy)
            
            models.append(net.state_dict().copy())
            
                
        if(val_accuracy > best_val_accuracy):
            best_val_accuracy = val_accuracy
            best_model_state_dict = net.state_dict().copy()
            
            roc_auc = roc_auc
            metrics_ACC = metrics_ACC
            metrics_MCC = metrics_MCC
            metrics_sn = metrics_sn
            metrics_sp = metrics_sp
                
        df.iloc[i,0] = roc_auc
        df.iloc[i,1] = metrics_ACC
        df.iloc[i,2] = metrics_MCC
        df.iloc[i,3] = metrics_sn
        df.iloc[i,4] = metrics_sp
                
    print(f"val_accuracy_per_fold: {val_accuracy_per_fold}")
    print(f"best_val_accuracy: {best_val_accuracy}")
                
    net = Net_conv().to(device)
    net.load_state_dict(best_model_state_dict)
        
    with torch.no_grad():
        net.eval()
        
        preds = net(test_datas)
        preds = preds.detach().cpu().numpy() 
        
        test_labels = test_labels.to("cpu").numpy()
        
        metrics_fpr, metrics_tpr, roc_auc, metrics_sn, metrics_sp, metrics_ACC, metrics_F1, metrics_MCC = metrics_output(preds,test_labels)
        
        df.iloc[i,5] = roc_auc
        df.iloc[i,6] = metrics_ACC
        df.iloc[i,7] = metrics_MCC
        df.iloc[i,8] = metrics_sn
        df.iloc[i,9] = metrics_sp
            
        test_accuracy = metrics_ACC

        print("Test accuracy: ", test_accuracy)
        print(df)

        if(test_accuracy > best_test_accuracy):
            best_test_accuracy = test_accuracy
            best_model_state_dict = net.state_dict().copy()
            
            torch.save(best_model_state_dict, '/Users/jiaming/Desktop/f5c/webserver/model_parameters.pth')
            
            print(f"state of current best model renewed with current best test accuracy: {best_test_accuracy}")
            
        print(f"state of current best model maintained with current best test accuracy: {best_test_accuracy}")
        
print(f"best_test_accuracy_for_all_10_loops: {best_test_accuracy}")

In [None]:
column_means = df.mean()
column_means