In [None]:
import optuna
from optuna import Trial

from math import sqrt
from typing import Tuple, List

import numpy as np
import pandas as pd
from mordred import Calculator, descriptors
#import openbabel
from openbabel import pybel
from PyBioMed.PyMolecule.fingerprint import CalculatePubChemFingerprint,CalculateECFP2Fingerprint
from rdkit import Chem
from rdkit.Chem.rdchem import Atom
from sklearn.metrics import f1_score, confusion_matrix

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split

from sklearn.metrics import accuracy_score, confusion_matrix, matthews_corrcoef, roc_curve, auc 
from sklearn.metrics import precision_recall_curve


from torch_geometric.data import Data
from torch_geometric.loader import DataLoader as G_Loader 
from torch_geometric.nn import GCNConv, GATConv
from torch_geometric.nn import global_mean_pool
from torch_geometric.nn import BatchNorm


# RDkit
from rdkit import Chem
from rdkit.Chem.rdmolops import GetAdjacencyMatrix
# Pytorch and Pytorch Geometric
import torch

import torch.nn as nn
from torch.nn import Linear
import torch.optim as optim
import torch.nn.functional as F # activation function
from torch.utils.data import Dataset
from torch.utils.data import DataLoader as V_Loader # dataset management
%run ./graph_feature.ipynb 
%run ./dataset_processing.ipynb 

In [None]:
k=10
final_clean_fingerp_train=[]
final_clean_fingerp_val=[]
for i in range(k):
    final_clean_fingerp_train.append(np.load('final_clean_fingerp_train'+ str(i)+'.npy'))
    final_clean_fingerp_val.append(np.load('final_clean_fingerp_val' +str(i)+'.npy'))

final_clean_fingerp_test = np.load('final_clean_fingerp_test.npy')

In [None]:
train_idx = np.load('train_indices.npy')
val_idx = np.load('val_indices.npy')
test_idx = np.load('test_indices.npy')

In [None]:
# load the output label 
total_train_targets =[]
total_validation_targets =[]
total_test_targets=[]
for i in range(k):
    total_train_targets.append(np.load('total_train_targets'+ str(i)+'.npy'))
    total_validation_targets.append(np.load('total_validation_targets' +str(i)+'.npy'))

total_test_targets= np.load('total_test_targets.npy')

In [None]:
# create dataloader for training (vector data)
#======================================================================================
list_data_fingerp_train =[]
list_data_fingerp_val =[]
list_data_target_train =[]
list_data_target_val =[]

for data_train, data_val, tr_targets, val_targets in zip(final_clean_fingerp_train, final_clean_fingerp_val,total_train_targets, total_validation_targets):
    train_loader = V_Loader(dataset = data_train, batch_size = 500)
    val_loader = V_Loader(dataset = data_val, batch_size = 500)
    
    tr_target_loader = V_Loader(dataset = tr_targets, batch_size = 500)
    val_target_loader =  V_Loader(dataset = val_targets, batch_size = 500)
    
    list_data_fingerp_train.append(train_loader)
    list_data_fingerp_val.append(val_loader)
    
    list_data_target_train.append(tr_target_loader)
    list_data_target_val.append(val_target_loader)

In [None]:
#criterion = torch.nn.CrossEntropyLoss()
#define the loss function 
criterion = torch.nn.BCELoss()

In [None]:
def train_1(train_v_loaderB,tr_target_v_loaderB, combined_model, optimizer):
    combined_model.train()
    
    for data_X2, data_target in zip(train_v_loaderB, tr_target_v_loaderB):  # Iterate in batches over the training dataset.
        out = combined_model(torch.tensor(data_X2, dtype=torch.float32))  # Perform a single forward pass.
        
        y_t = torch.tensor(data_target, dtype=torch.float32)
        loss = criterion(out[:,0], y_t)  # Compute the loss.
        #print(k)
        #print(loss)
        loss.backward()  # Derive gradients.
        optimizer.step()  # Update parameters based on gradients.
        optimizer.zero_grad()  # Clear gradients.
    return loss, combined_model, optimizer

def validation_1(val_v_loaderB,val_target_v_loaderB, combined_model):
    for data_X2, data_target in zip(val_v_loaderB, val_target_v_loaderB): # Iterate in batches over the training dataset.
        out = combined_model(torch.tensor(data_X2, dtype=torch.float32))  # Perform a single forward pass.
        
        y_t = torch.tensor(data_target, dtype=torch.float32)
        val_loss = criterion(out[:,0], y_t)  # Compute the loss.
        
    return val_loss

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix, matthews_corrcoef, roc_curve, auc 
from sklearn.metrics import precision_recall_curve
from sklearn.metrics import cohen_kappa_score
from sklearn.metrics import roc_auc_score

# performances visualization 
import matplotlib.pyplot as plt
#import seaborn as sns
import statistics
from prettytable import PrettyTable
%run ./my_performances.ipynb 


def test_1(v_loaderB,v_target, combined_model):
    combined_model.eval()
    list_pred =[]
    list_targets =[]
    correct = 0
    for data_X2, data_target in zip (v_loaderB,v_target):  # Iterate in batches over the training/test dataset.
            out = combined_model(torch.tensor(data_X2, dtype=torch.float32))
            out_1 = out[:,0]
            
            
            list_pred.append(out_1.item())
            list_targets.append(data_target.item())
    return list_pred, list_targets 

# used to count the train accuracy ,and validation accuracy when in the training mode 
def test(v_loaderB,target_v_loaderB, combined_model):
    combined_model.eval()

    correct = 0
    for data_X2, data_target in zip(v_loaderB,target_v_loaderB):  # Iterate in batches over the training/test dataset.
            out = combined_model(torch.tensor(data_X2, dtype=torch.float32))
            out_1 = out[:,0]
            for i,value in enumerate(out_1) :
                if value > 0.5 :
                    out_1[i] = 1
                else : out_1[i] = 0
            pred = out_1  # Use the class with highest probability.
            correct += int((pred == data_target).sum())  # Check against ground-truth labels.
    return correct / len(v_loaderB.dataset)  # Derive ratio of correct predictions.

In [None]:
def get_optimizer(gnn_model, learning_rate, optimizer_type):
    if optimizer_type==1:
        optimizer = torch.optim.SGD(gnn_model.parameters(), lr=learning_rate, momentum=0.9)
    if optimizer_type==2:
        optimizer = torch.optim.Adam(gnn_model.parameters(), lr=learning_rate, weight_decay =1e-4)
    if optimizer_type ==3 :
        optimizer = torch.optim.Adamax(gnn_model.parameters(), lr=learning_rate, betas=(0.9, 0.999), eps=1e-08, weight_decay=1e-4)
        
    return optimizer

In [None]:
 class modelB1(torch.nn.Module):
    def __init__(self, input_features, output_features,dropout_rateB1,dropout_rateB2, dense_layer1):
        super(modelB1, self).__init__()
        self.lin1 = nn.Linear(input_features,dense_layer1)
      
        self.lin2 = nn.Linear(int(dense_layer1), int(dense_layer1/2))
        self.lin3 = nn.Linear(int(dense_layer1/2), int(dense_layer1/2))
        self.lin4 = nn.Linear(int(dense_layer1/2), output_features)
        
        self.bn1 = nn.BatchNorm1d(int(dense_layer1))
        self.bn2 = nn.BatchNorm1d(int(dense_layer1/2))
        self.bn3 = nn.BatchNorm1d(int(dense_layer1/2))
        self.dropoutB1 = dropout_rateB1
        self.dropoutB2 = dropout_rateB2
        
    def forward(self, x):
        x = self.lin1(x)
        x = self.bn1(x)
       
        x = F.dropout(x, p= self.dropoutB1, training=self.training)
        x = x.relu()
  #      
        x = self.lin2(x)
        x = self.bn2(x)   
        x = F.dropout(x, p= self.dropoutB2, training=self.training)
        x = x.relu()
  #      
        x = self.lin3(x)
        x = self.bn3(x)   
        x = x.relu()
        x = self.lin4(x)
        return torch.sigmoid(x)

In [None]:
list_data_vec_trainB = list_data_fingerp_train
list_data_vec_valB = list_data_fingerp_val

def objective(trial):
    
    k=10 # number of fold 
    
    # Categorical parameter
    # Create k model for training k fold dataset 
    # for data graph
    input_featuresB    = 36 # length of feature data vector 
    output_featuresB   = 1
      
    optimizer_type = trial.suggest_int('optimizer_type',1,3, step=1) 
    #optimizer_type = 2
    #dense_layer1 = 58
    #dropout_rateB1 = 0.11271207164779487
    #dropout_rateB2 = 0.24808286943977564
    dense_layer1 =trial.suggest_int('dense_layer1',32,128, step=2) 
    #dense_layer1= 64
    dropout_rateB1 = trial.suggest_uniform('dropout_rateB1',  0.1, 0.5)
    dropout_rateB2 = trial.suggest_uniform('dropout_rateB2',  0.1, 0.5)
    
    learning_rate = trial.suggest_loguniform('learning_rate', 0.0001, 0.001)
    #learning_rate = 0.0008197186615142072
    
    
    list_trained_model =[]
    list_val_acc=[]
    
    for i in range(k):
        combined_model   = modelB1(input_featuresB, output_featuresB,dropout_rateB1,dropout_rateB1, dense_layer1)
        optimizer = get_optimizer(combined_model, learning_rate, optimizer_type)
   
        history_train_loss=[]
        history_val_loss=[]
        history_train_acc=[]
        history_val_acc=[]
        
        for epoch in range(1,500):
            
                
            train_loss,combined_model, optimizer = train_1(list_data_vec_trainB[i],list_data_target_train[i],
                                                           combined_model,optimizer)
            val_loss                             = validation_1(list_data_vec_valB[i],list_data_target_val[i],
                                                                combined_model) 
        
            train_acc                            = test(list_data_vec_trainB[i],list_data_target_train[i],combined_model)
            val_acc                              = test(list_data_vec_valB[i],list_data_target_val[i], combined_model)
        
           
            
            history_train_loss.append(train_loss.item())
            history_val_loss.append(val_loss.item())
            history_train_acc.append(train_acc)
            history_val_acc.append(val_acc)
        
            print(f'Fold: {i}')
            print(f'Epoch: {epoch:03d}, Train Loss:, {train_loss.item():.4f}')
            print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}')
            print(f'Epoch: {epoch:03d}, val Loss:, {val_loss.item():.4f}')
            print(f'Epoch: {epoch:03d}, val Acc: {val_acc:.4f}')
            print(" ")
            # RUNNING THE 5 PRETRAINED MODEL USING INDEPENDENT DATA
        #list_val_acc.append(val_acc)
        list_trained_model.append(combined_model)
    return statistics.mean(list_val_acc)

In [None]:
my_hyper = {'optimizer_type': 3, 'dense_layer1': 116, 'dropout_rateB1': 0.19140697875883203, 'dropout_rateB2': 0.4378593600513469, 'learning_rate': 0.0005085512117234079}

In [None]:
k=10 # number of fold 

# for data vector 
input_features  = 36 # length of feature data vector 
output_features   = 1
    #output_featuresB   = trial.suggest_int('output_featuresB',16,9, step=1) 
optimizer_type =my_hyper['optimizer_type']   
dropout_rateB1 = my_hyper['dropout_rateB1']
dropout_rateB2 = my_hyper['dropout_rateB2']
learning_rate = my_hyper['learning_rate']
dense_layer1 =my_hyper['dense_layer1']    

In [None]:
# APPLY THE HYPERPARAMETER 
# RUNNING EVERY FOLD 
#=============================================================

total_history_train_loss=[]
total_history_val_loss=[]
total_history_train_acc=[]
total_history_val_acc=[]
list_trained_model =[]
# APPLY THE HYPERPARAMETER 
# RUNNING EVERY FOLD 
#=============================================================

total_history_train_loss=[]
total_history_val_loss=[]
total_history_train_acc=[]
total_history_val_acc=[]
list_trained_model =[]

for i in range(k):
        
        combined_model   = modelB1(input_features, output_features,dropout_rateB1,dropout_rateB2, dense_layer1)
        optimizer = get_optimizer(combined_model, learning_rate, optimizer_type)
   
        history_train_loss=[]
        history_val_loss=[]
        history_train_acc=[]
        history_val_acc=[]
        
        for epoch in range(1,500):
            
                
            train_loss,combined_model, optimizer = train_1(list_data_vec_trainB[i],list_data_target_train[i],
                                                           combined_model,optimizer)
            val_loss                             = validation_1(list_data_vec_valB[i],list_data_target_val[i],
                                                                combined_model) 
        
            train_acc                            = test(list_data_vec_trainB[i],list_data_target_train[i],combined_model)
            val_acc                              = test(list_data_vec_valB[i],list_data_target_val[i], combined_model)
        
           
            
            history_train_loss.append(train_loss.item())
            history_val_loss.append(val_loss.item())
            history_train_acc.append(train_acc)
            history_val_acc.append(val_acc)
        
            print(f'Fold: {i}')
            print(f'Epoch: {epoch:03d}, Train Loss:, {train_loss.item():.4f}')
            print(f'Epoch: {epoch:03d}, Train Acc: {train_acc:.4f}')
            print(f'Epoch: {epoch:03d}, val Loss:, {val_loss.item():.4f}')
            print(f'Epoch: {epoch:03d}, val Acc: {val_acc:.4f}')
            print(" ")
        total_history_train_loss.append(history_train_loss)
        total_history_val_loss.append(history_val_loss)
        total_history_train_acc.append(history_train_acc)
        total_history_val_acc.append(history_val_acc)

        list_trained_model.append(combined_model)


In [None]:
# RUNNING THE 10 PRETRAINED MODEL USING INDEPENDENT DATA
#======================================================================
nCV= 10 # ten crossfold validation 
list_fold_pred =[]
list_fold_targets =[]


v_test_loaderB = V_Loader(dataset = final_clean_fingerp_test, batch_size = 1)
v_test_target = V_Loader(dataset = total_test_targets, batch_size = 1)

for combined_model in list_trained_model:  
    list_pred, list_targets = test_1(v_test_loaderB,v_test_target,combined_model)
    list_fold_pred.append(list_pred)
    list_fold_targets.append(list_targets)
    
# GET THE PERFORMANCES FROM THE TEST
#========================================================================
total_performances = performances(list_fold_pred, list_fold_targets, nCV)
list_bal_acc = []
for sen, spec in zip (total_performances[1] , total_performances[2]):
    bal_acc = (sen + spec)/2
    list_bal_acc.append(bal_acc)
                
statistics.mean(list_bal_acc)

In [None]:
import statistics
from prettytable import PrettyTable
perf = total_performances
model_title = 'Test Perf single model'
data_type ='fingerprint vector'
Create_Tables(perf, model_title, data_type)

In [None]:
for i, model in enumerate(list_trained_model):
            torch.save(model.state_dict(), str(bal_acc)+"model_Dense_fingerp" +str(i)+ ".pth")