In [None]:
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import LeaveOneOut
from sklearn.metrics import confusion_matrix
from imblearn.over_sampling import SMOTENC
from sklearn.neighbors import KNeighborsClassifier	
from sklearn.metrics import confusion_matrix
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import confusion_matrix, precision_recall_curve, auc, roc_auc_score

import warnings
warnings.filterwarnings('ignore')

In [None]:
class DataProcesser(object):
    
    def __init__(self, target_var = 'baycognitivapc', cat_threshold = 13, smote = False):    #Replace with impairment to predict (e.g. cognitive, motor, language)

        self.folder_name = '________'           #Replace with folder name
        self.name_df = '___________'            #Replace with file name
        self.df_name = os.path.join(os.getcwd(), self.folder_name, self.name_df)
        self.target_var = target_var
        self.cat_thres = cat_threshold
        self.smote = smote

    def cat_var_identifier(self, main_df):

        #Get variables object
        cat_vars = list(main_df.select_dtypes(include=['object']).columns)

        #add numeric variables with less than threshold numeric
        for column in main_df.columns.to_list():
            if (len(main_df[column].unique()) < self.cat_thres) & (column not in cat_vars):
                cat_vars.append(column)
        return cat_vars

    def data_imputter(self, main_df, cat_vars):

        for column in main_df.columns.to_list():

            #Impute with mode if categoric
            if column in cat_vars:
                imputting_value = main_df[column].mode()[0]

            #Impute numeric variables with mean
            else:
                imputting_value = main_df[column].mean()

            main_df[column] = main_df[column].fillna(imputting_value)
        
        return main_df

    def one_hot_encoder(self, main_df, cat_vars):

        main_df = pd.get_dummies(main_df, columns = cat_vars, drop_first=True)

        return main_df
    
    def normalizer(self, main_df, cat_vars):

        for column in main_df.columns.to_list():

            #Only if not target var and not categorical
            if (column not in cat_vars) & (column != self.target_var):
                
                train_min = int(main_df[column].min())
                train_max = int(main_df[column].max())

                if train_max - train_min == 0:
                    main_df[column] = 0

                else:

                    main_df[column] = (main_df[column] - train_min) / (train_max - train_min)

        return main_df
    
    def data_processer(self):

        main_df = pd.read_excel(self.df_name)

        #Identify categorical variables
        cat_vars = self.cat_var_identifier(main_df.drop(self.target_var, axis = 1))

        #Impute values
        main_df = self.data_imputter(main_df, cat_vars)

        #One hot encoding
        main_df = self.one_hot_encoder(main_df, cat_vars)

        #Normalize numeric vars
        main_df = self.normalizer(main_df, cat_vars)
        
        return main_df  

In [None]:
class PerformanceEvaluatorLOOCV(object):

    def __init__(self, models=['log_reg', 'tree', 'forest', 'gaussian', 'knn', 'ada'], metrics=['accuracy', 'precision', 'recall', 'f_score', 'auc_score', 'auc_pr'], target_var='baycognitivapc', smote=False, hyperparameters_dict=None):
        self.models = models
        self.metrics = metrics
        self.target_var = target_var
        self.perf_excel_file = '{model}_all_cognitive.xlsx'
        self.params_file = '{model}_all_cognitive.txt'
        self.smote = smote

    def hyperparameters_grid_generator(self, ml_model, X_train, y_train):
        all_best_params = {}
        hilos = 60

        if ml_model == 'log_reg':
            hp_dict = {
                'solver': ['newton-cg', 'lbfgs', 'sag', 'saga'],
                'penalty': ['none', 'l1', 'l2', 'elasticnet'],
                'C': [100, 10, 1.0, 0.1, 0.01]
            }

            model = LogisticRegression()
            grid = GridSearchCV(estimator=model, param_grid=hp_dict, cv=5, scoring='recall', n_jobs=hilos)
            grid_fit = grid.fit(X_train, y_train)
            best_params = grid_fit.best_params_
            best_parameters = LogisticRegression(**best_params)
            all_best_params[ml_model] = best_params

        elif ml_model == 'tree':
            hp_dict = {
                'criterion': ['gini', 'entropy', 'log_loss'],
                'splitter': ['best', 'random'],
                'min_samples_split': [2, 4, 6, 8, 10, 12],
                'min_samples_leaf': [5, 10, 20, 50, 100],
                'max_depth': [2, 4, 6, 8, 10, 12],
                'max_features': [0.2, 0.4, 0.6, 0.8]
            }

            model = DecisionTreeClassifier()
            grid = GridSearchCV(estimator=model, param_grid=hp_dict, cv=5, scoring='recall', n_jobs=hilos)
            grid_fit = grid.fit(X_train, y_train)
            best_params = grid_fit.best_params_
            best_parameters = DecisionTreeClassifier(**best_params)
            all_best_params[ml_model] = best_params

        elif ml_model == 'forest':
            hp_dict = {
                'n_estimators': [20, 50, 100],
                'max_depth': [None, 5, 8, 11, 14],
                'max_features': [0.2, 0.4, 0.6, 0.8]
            }

            model = RandomForestClassifier()
            grid = GridSearchCV(estimator=model, param_grid=hp_dict, cv=5, scoring='recall', n_jobs=hilos)
            grid_fit = grid.fit(X_train, y_train)
            best_params = grid_fit.best_params_
            best_parameters = RandomForestClassifier(**best_params)
            all_best_params[ml_model] = best_params

        elif ml_model == 'gaussian':
            hp_dict = {
                'var_smoothing': np.logspace(0, -9, num=100)
            }

            model = GaussianNB()
            grid = GridSearchCV(estimator=model, param_grid=hp_dict, cv=5, scoring='recall', n_jobs=hilos)
            grid_fit = grid.fit(X_train, y_train)
            best_params = grid_fit.best_params_
            best_parameters = GaussianNB(**best_params)
            all_best_params[ml_model] = best_params

        elif ml_model == 'knn':
            hp_dict = {
                'n_neighbors': [1, 2, 3, 4, 5, 6, 7, 8],
                'weights': ['uniform', 'distance']
            }
            model = KNeighborsClassifier()
            grid = GridSearchCV(estimator=model, param_grid=hp_dict, cv=5, scoring='recall', n_jobs=hilos)
            grid_fit = grid.fit(X_train, y_train)
            best_params = grid_fit.best_params_
            best_parameters = KNeighborsClassifier(**best_params)
            all_best_params[ml_model] = best_params

        elif ml_model == 'ada':
            hp_dict = {
                'n_estimators': [10, 50, 70, 100, 150],
                'learning_rate': [0.0001, 0.001, 0.01, 0.1, 1.0],
                'algorithm': ['SAMME', 'SAMME.R']
            }

            model = AdaBoostClassifier()
            grid = GridSearchCV(estimator=model, param_grid=hp_dict, cv=5, scoring='recall', n_jobs=hilos)
            grid_fit = grid.fit(X_train, y_train)
            best_params = grid_fit.best_params_
            best_parameters = AdaBoostClassifier(**best_params)
            all_best_params[ml_model] = best_params

        with open(self.params_file.format(model=model), 'a') as file:
            for model, params in all_best_params.items():
                file.write(f'{model}:\n')
                for param, value in params.items():
                    file.write(f'  {param}: {value}\n')
                file.write('\n')
        print(best_parameters)

        return best_parameters

    def results_compiler(self, metric_type, metric_value, performance_dict):
        if metric_type in performance_dict:
            performance_dict[metric_type].append(metric_value)
        else:
            performance_dict[metric_type] = [metric_value]

    def performance_evaluator(self, actuals, predicted, probability_pred, performance_dict):
        tn, fp, fn, tp = confusion_matrix(actuals, predicted).ravel()                           

        self.results_compiler('t_positive', tp, performance_dict)
        self.results_compiler('t_negative', tn, performance_dict)
        self.results_compiler('f_positive', fp, performance_dict)
        self.results_compiler('f_negative', fn, performance_dict)

        if 'accuracy' in self.metrics:
            accuracy = (tn + tp) / (tn + tp + fp + fn)
            self.results_compiler('accuracy', accuracy, performance_dict)

        if 'precision' in self.metrics:
            precision = tp / (tp + fp)
            self.results_compiler('precision', precision, performance_dict)

        if 'recall' in self.metrics:
            recall = tp / (tp + fn)
            self.results_compiler('recall', recall, performance_dict)

        if 'f_score' in self.metrics:
            f_score = tp / (tp + 0.5 * (fp + fn))
            self.results_compiler('f_score', f_score, performance_dict)

        if 'auc_score' in self.metrics:
            auc_score = roc_auc_score(actuals, probability_pred)            
            self.results_compiler('auc_score', auc_score, performance_dict)

        if 'auc_pr' in self.metrics:
            precision, recall, thresholds = precision_recall_curve(actuals, probability_pred)
            auc_pr = auc(recall, precision)
            self.results_compiler('au_pr', auc_pr, performance_dict)

    def cross_validator(self):
        for model in self.models:
            performance_dict = {}

            data_processer = DataProcesser()
            main_df = data_processer.data_processer()

            X = main_df.drop([self.target_var], axis=1).to_numpy()
            y = main_df[self.target_var].to_numpy()

            predictions = []
            true_labels = []
            probability_pred = []

            for train_indexes, test_indexes in LeaveOneOut().split(X):
                X_train = X[train_indexes]
                y_train = y[train_indexes]
                X_test = X[test_indexes]
                y_test = y[test_indexes]

                if self.smote:
                    X_train = pd.DataFrame(X_train)
                    y_train = pd.DataFrame(y_train)
                    sm = SMOTENC(random_state=42, categorical_features=[10, 90])
                    X_train, y_train = sm.fit_resample(X_train, y_train)

                best_parameters_t = self.hyperparameters_grid_generator(model, X_train, y_train)
                ml_model = best_parameters_t.fit(X_train, y_train)

                preds = ml_model.predict(X_test)
                prob_preds = ml_model.predict_proba(X_test)[:, 1]
                predictions.extend(preds)
                true_labels.extend(y_test)
                probability_pred.extend(prob_preds)


            self.performance_evaluator(true_labels, predictions, probability_pred, performance_dict)

            results_df = pd.DataFrame(performance_dict)

            results_df.to_excel(self.perf_excel_file.format(model=model), index=False)



In [10]:
perf_eval = PerformanceEvaluatorLOOCV(smote = False, models = ['log_reg', 'tree', 'forest', 'gaussian', 'knn', 'ada'], metrics = ['accuracy', 'precision', 'recall', 'f_score', 'auc_score', 'auc_pr'])
perf_eval.cross_validator()

LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='newton-cg')
LogisticRegression(C=100, solver='