In [1]:
import utils.feature_extractors as utils
from utils.evaluation import action_evaluator
import numpy as np
import os
import json
from sklearn.pipeline import make_pipeline
from sklearn.utils import shuffle
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import RidgeClassifier
from sklearn.decomposition import PCA
from sklearn.model_selection import cross_val_predict
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from itertools import product
from sklearn.svm import SVC
import optuna
import joblib

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
config = {
    "train_pos_loc":"data/TR_pos_SPIDER.txt",
    "train_neg_loc":"data/TR_neg_SPIDER.txt",
    "test_pos_loc":"data/TS_pos_SPIDER.txt",
    "test_neg_loc":"data/TS_neg_SPIDER.txt",
    "model_save_loc":"optimized",
    "random_seed":9
}

In [3]:
class SpiderDataGenerator(object):
    ALL_FEAT = ["AAC","DPC","CTD",
                "PAAC","APAAC","RSacid",
                "RSpolar","RSsecond","RScharge",
                "RSDHP"]
    def __init__(self, pos_data_file, neg_data_file,feat_type=None) -> None:
        super(SpiderDataGenerator).__init__()
        self.pos_data_file = pos_data_file
        self.neg_data_file = neg_data_file
        
        assert feat_type in SpiderDataGenerator.ALL_FEAT or feat_type == None
        
        self.feat_type = feat_type
        
        self.pos_data = utils.read_fasta(self.pos_data_file)
        self.neg_data = utils.read_fasta(self.neg_data_file)
        
        self.data = self.pos_data+self.neg_data
        self.targets = np.array([True]*len(self.pos_data)+[False]*len(self.neg_data))
        
        self.raw = [x[1] for x in self.data]
        
        self.feat_AAC = utils.AAC(self.data)[0]
        print("Generating AAC Feature .....")
        self.feat_DPC = utils.DPC(self.data,0)[0]
        print("Generating DPC Feature .....")
        self.feat_CTD = np.hstack((utils.CTDC(self.data)[0], 
                              utils.CTDD(self.data)[0], 
                              utils.CTDT(self.data)[0]))
        print("Generating CTD Feature .....")
        self.feat_PAAC = utils.PAAC(self.data,1)[0]
        print("Generating PAAC Feature .....")
        self.feat_APAAC = utils.APAAC(self.data,1)[0]
        print("Generating APAAC Feature .....")
        self.feat_RSacid = utils.reducedACID(self.data) 
        print("Generating reducedACID Feature .....")
        self.feat_RSpolar = utils.reducedPOLAR(self.data)
        print("Generating reducedPOLAR Feature .....")
        self.feat_RSsecond = utils.reducedSECOND(self.data)
        print("Generating reducedSECOND Feature .....")
        self.feat_RScharge = utils.reducedCHARGE(self.data)
        print("Generating reducedCHARGE Feature .....")
        self.feat_RSDHP = utils.reducedDHP(self.data)
        print("Generating reducedDHP Feature .....")
        
        
        
    
    def get_combination_feature(self,selected:list = None):
        
        all_feat =[self.feat_AAC,self.feat_DPC,self.feat_CTD,
                   self.feat_PAAC,self.feat_APAAC,self.feat_RSacid,
                   self.feat_RSpolar,self.feat_RSsecond,self.feat_RScharge,
                   self.feat_RSDHP]
        
        if selected:
            select_index = sorted([SpiderDataGenerator.ALL_FEAT.index(x) for x in selected])
            all_feat = [all_feat[x] for x in select_index]
            
        return np.concatenate(all_feat,axis=-1)
        
        
        
            
    def __len__(self) -> int:
        return len(self.data)

In [4]:
train_data = SpiderDataGenerator(pos_data_file=config["train_pos_loc"],neg_data_file=config["train_neg_loc"])

Generating AAC Feature .....
Generating DPC Feature .....
Generating CTD Feature .....
Generating PAAC Feature .....
Generating APAAC Feature .....
Generating reducedACID Feature .....
Generating reducedPOLAR Feature .....
Generating reducedSECOND Feature .....
Generating reducedCHARGE Feature .....
Generating reducedDHP Feature .....


In [5]:
X_data = {
    "AAC":train_data.feat_AAC,
    "DPC":train_data.feat_DPC,
    "CTD":train_data.feat_CTD,
    "PAAC":train_data.feat_PAAC,
    "APAAC":train_data.feat_APAAC,
    "RSacid":train_data.feat_RSacid,
    "RSpolar":train_data.feat_RSpolar,
    "RSsecond":train_data.feat_RSsecond,
    "RScharge":train_data.feat_RScharge,
    "RSDHP":train_data.feat_RSDHP,
    "Combine":train_data.get_combination_feature(),
}

data_pipelines = {
    "AAC":make_pipeline(StandardScaler()),
    "DPC":make_pipeline(StandardScaler()),
    "CTD":make_pipeline(StandardScaler()),
    "PAAC":make_pipeline(StandardScaler()),
    "APAAC":make_pipeline(StandardScaler()),
    "RSacid":make_pipeline(StandardScaler()),
    "RSpolar":make_pipeline(StandardScaler()),
    "RSsecond":make_pipeline(StandardScaler()),
    "RScharge":make_pipeline(StandardScaler()),
    "RSDHP":make_pipeline(StandardScaler()),
    "Combine":make_pipeline(StandardScaler()),
    }

sel_model = {
    "AAC":"SVC",
    "DPC":"RandomForest",
    "CTD":"LGBMClassifier",
    "PAAC":"SVC",
    "APAAC":"LGBMClassifier",
    "RSacid":"SVC",
    "RSpolar":"SVC",
    "RSsecond":"SVC",
    "RScharge":"SVC",
    "RSDHP":"SVC",
    "Combine":"LGBMClassifier",
}

In [6]:
#np.unique(train_data.targets,return_counts=True),0.518,0.482

## RandomForest Optimization

In [7]:
class ObjectiveFunctions(object):
    SEL_MODEL = ['RandomForest', 'SVC',"LGBMClassifier"]
    def __init__(self,model_name) -> None:
        assert model_name in ObjectiveFunctions.SEL_MODEL
        self.m = model_name
    
    @staticmethod
    def obj_func_RF(trial:optuna.trial):
        n_estimators = trial.suggest_int('n_estimators', 2, 200)
        max_depth = int(trial.suggest_float('max_depth', 1, 32, log=True))
        criterion = trial.suggest_categorical("criterion",['gini', 'entropy', 'log_loss'])
        
        clf = RandomForestClassifier(
            n_estimators=n_estimators, 
            max_depth=max_depth,
            criterion=criterion,
            class_weight={1:0.482,0:0.518},
            )
        
        return clf
    
    @staticmethod
    def obj_func_SVC(trial:optuna.trial):
        c = trial.suggest_float('svc_c', 1e-10, 1e10, log=True)
        kernel = trial.suggest_categorical("kernel",['linear', 'poly', 'rbf', 'sigmoid'])
        gamma = trial.suggest_categorical("gamma",['scale', 'auto'])
        
        
        clf = SVC(
            C=c, 
            kernel=kernel,
            class_weight={1:0.482,0:0.518},
            gamma=gamma)
        
        return clf
    
    @staticmethod
    def obj_func_LGBM(trial:optuna.trial):
        n_estimators = trial.suggest_int('n_estimators', 2, 200)
        max_depth = int(trial.suggest_float('max_depth', 1, 32, log=True))
        num_leaves = int(trial.suggest_float('max_depth', 10, 64, log=True))
        
        clf = LGBMClassifier(
            n_estimators=n_estimators,
            num_leaves=num_leaves,
            class_weight={1:0.482,0:0.518},
            max_depth=max_depth)
        
        return clf
    
    def objective_func(self,trial:optuna.trial):
        if self.m == 'RandomForest':
            return ObjectiveFunctions.obj_func_RF(trial)
        elif self.m == 'SVC':
            return ObjectiveFunctions.obj_func_SVC(trial)
        else:
            return ObjectiveFunctions.obj_func_LGBM(trial)

In [8]:
for feat_type in X_data.keys():
    m = sel_model[feat_type]
    print(f"Model :- {m}, DataType :- {feat_type}")
    
    #Data handling
    X,y = X_data[feat_type],train_data.targets
    X,y = shuffle(X,y,random_state=config["random_seed"])
    X = data_pipelines[feat_type].fit_transform(X,y)
    
    def objective(trial):
        clf = ObjectiveFunctions(m).objective_func(trial)
        
        y_pred = cross_val_predict(clf, X, y, cv=5)
    
        result_values = action_evaluator(y_pred,y,class_names=["Not Druggable","Druggable"],print_report=False,show_plot=False,save_outputs=None)

        return  result_values["f1"]
    
    study = optuna.create_study(direction='maximize')
    study.optimize(objective, n_trials=100)
    
    print('Accuracy: {}'.format(study.best_trial.value))
    print("Best hyperparameters: {}".format(study.best_trial.params))
    
    params = study.best_trial.params
    
    os.makedirs(os.path.join(config["model_save_loc"],feat_type,m),exist_ok=True)
    model_dir = os.path.join(config["model_save_loc"],feat_type,m)
    
    optuna.visualization.plot_slice(study)
    plt.savefig(os.path.join(model_dir,"hyperparameter_slice_analysis.png"),dpi=300)
    
    optuna.visualization.plot_contour(study, params=['n_estimators', 'max_depth'])
    plt.savefig(os.path.join(model_dir,"hyperparameter_contour.png"),dpi=300)
    
    with open(os.path.join(model_dir,"best_params.json"),"w") as f0:
        json.dump(params,f0)
    
    if m == 'RandomForest':
        clf = RandomForestClassifier(**params)
    elif m == 'SVC':
        clf = SVC(**params)
    else:
        clf = LGBMClassifier(**params)
        
    clf.fit(X,y)
    joblib.dump(study, os.path.join(model_dir,"study.sav"))
    joblib.dump(data_pipelines[feat_type], os.path.join(model_dir,"pipeline.sav"))
    joblib.dump(clf, os.path.join(model_dir,"model_save.sav"))
    print("\n\n")
    break

[32m[I 2023-05-09 16:11:02,092][0m A new study created in memory with name: no-name-d5f9d664-97e9-4a11-bcc8-98bda352e9d4[0m


Model :- SVC, DataType :- AAC


[32m[I 2023-05-09 16:11:02,517][0m Trial 0 finished with value: 0.7910676723258258 and parameters: {'svc_c': 29299.15067166134, 'kernel': 'sigmoid', 'gamma': 'scale'}. Best is trial 0 with value: 0.7910676723258258.[0m


Overall accuracy: 0.792

