In [2]:
import pandas as pd
import os
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler, MinMaxScaler
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.linear_model import LogisticRegression, BayesianRidge
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from imblearn.pipeline import Pipeline
from sklearn.metrics import log_loss, roc_auc_score, precision_score, recall_score, accuracy_score, \
    f1_score, classification_report, confusion_matrix, ConfusionMatrixDisplay, roc_curve, auc

In [3]:
class DataImputer():

    def __init__(self):
        self.cat_estimator = None
        self.num_estimator = None
        self.input_df = None
        self.imputed_df = None
        self.cat_imp = None
        self.num_imp = None 
        self.hot_flag = False
        self.feature_map = dict()

    def set_state(self, cat_estimator = None, num_estimator = None, input_df = None):
        self.cat_estimator = cat_estimator
        self.num_estimator = num_estimator
        self.input_df = input_df

    def get_state(self):
        return (self.imputed_df, self.num_imp, self.cat_imp)

    def map_features(self):

        for iColumn in self.input_df.columns:
            if self.input_df[iColumn].dtypes != float:
                indx = self.input_df.index[self.input_df[iColumn].isnull().values == True]
                self.feature_map[iColumn] = indx

    def impute_data(self):


        if self.hot_flag:
            num_imp = self.num_imp
            cat_imp = self.cat_imp
        else:
            num_imp = IterativeImputer(estimator=self.num_estimator, max_iter=1000, tol=1e-3, skip_complete=True)
            cat_imp = IterativeImputer(estimator=self.cat_estimator, max_iter=1000, tol=1e-3, skip_complete=True)

        # Fit numerical first
        if self.hot_flag:
            imputed_data = num_imp.transform(self.input_df)
        else:
            imputed_data = num_imp.fit_transform(self.input_df)

        imputed_df = pd.DataFrame(data=imputed_data, columns=self.input_df.columns)

        # Refill nan in categorical 
        for iColumn in self.feature_map:
            indx = self.feature_map[iColumn]
            imputed_df.loc[indx,iColumn] = np.nan

        # Impute the categorical
        if self.hot_flag:
            imputed_data = cat_imp.transform(imputed_df)
        else:
            imputed_data = cat_imp.fit_transform(imputed_df)
        imputed_df = pd.DataFrame(data=imputed_data, columns=self.input_df.columns)

        if self.hot_flag: 
            pass
        else:
        # Store the results
            self.num_imp = num_imp
            self.cat_imp = cat_imp
            self.hot_flag = True
        
        self.imputed_df = imputed_df

def StrBinarize(input_df=pd.DataFrame, target_col='', target_label=''):
    if input_df.empty:
        print('Non empty Pandas dataframe must be passed!')
    else:
        input_df[target_col+'_encode'] = 1
        indx = input_df.index[input_df[target_col]==target_label].tolist()
        input_df.loc[indx,target_col+'_encode'] = 0 
    
    return input_df

In [5]:
def CalcROCCurve(y_test, y_score):
    fpr = dict()
    tpr = dict()
    # Compute ROC curve and ROC area for each class
    for i in range(np.shape(y_score)[1]):
        fpr[i], tpr[i], _ = roc_curve(y_test, y_score[:,i])

    return (fpr, tpr)

def LogROCCurve(fpr, tpr):
    # Compute ROC curve and ROC area for each class
    

    fig = plt.figure()
    lw = 2
    plt.plot(
        fpr[1],
        tpr[1],
        color="darkorange",
        lw=lw
    )
    plt.plot([0, 1], [0, 1], color="navy", lw=lw, linestyle="--")
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("Receiver operating characteristic example")
    plt.legend(loc="lower right")
    plt.savefig('output/roc_curve.png')

def LogMetrics(model,X_test, y_test):

    ypred = model.predict(X_test)
    y_scores = model.predict_proba(X_test)    

    fpr, tpr = CalcROCCurve(y_test,y_scores)
    LogROCCurve(fpr, tpr)

    crDict = classification_report(y_test, ypred, labels=[0, 1], target_names=['Not Readmitted', 'Readmitted'],output_dict=True)
    LogDict = {'':list(), 'precision':list(), 'recall':list(),'f1-score':list(), 'support':list()}
    for iRow in crDict.keys():
        if iRow == 'accuracy':
            continue
        else:
            LogDict[''].append(iRow)
            for iCol in crDict[iRow]:
                LogDict[iCol].append(crDict[iRow][iCol])

    MetricDict = dict()
    MetricDict['Accuracy'] = accuracy_score(y_test, ypred)
    MetricDict['F1 Score'] = f1_score(y_test, ypred)
    MetricDict['AUC'] = roc_auc_score(y_test, y_scores[:,1])
    MetricDict['Precision'] = precision_score(y_test, ypred)
    MetricDict['Recall'] = recall_score(y_test, ypred)
    MetricDict['logloss'] = log_loss(y_test, ypred)
    tn, fp, fn, tp = confusion_matrix(y_test, ypred).ravel()
    ppv = tp / (tp+fp)
    npv = tn / (tn+fn)
    MetricDict['PPV'] = ppv
    MetricDict['NPV'] = npv
    
    cm = confusion_matrix(y_test, ypred).ravel()
    disp = ConfusionMatrixDisplay(confusion_matrix=cm,display_labels=model.classes_)
    
    MetricDict['True Negative'] = tn
    MetricDict['False Positive'] = fp
    MetricDict['False Negative'] = fn
    MetricDict['True Positive'] = tp
    
    for iKey in MetricDict:
        print(iKey+':', MetricDict[iKey])

    return


In [1]:
def BuildSamplingDict(HyperParameterDict):
    paramDict = dict()
    for iKey in HyperParameterDict:
            sampleType = HyperParameterDict[iKey][0]
            #x0 = HyperParameterDict[iKey][1]
            #x1 = HyperParameterDict[iKey][2]

            if sampleType == 'Real':
                paramDict[iKey] = Real(HyperParameterDict[iKey][1],HyperParameterDict[iKey][2])
            elif sampleType == 'Categorical':
                paramDict[iKey] = Categorical(HyperParameterDict[iKey][1])
            elif sampleType == 'Integer':
                paramDict[iKey] = Integer(HyperParameterDict[iKey][1],HyperParameterDict[iKey][2])
            else:
                print(f'Unspecified sampling {sampleType} type given for {iKey}')
                print('Skipping hyperparameter. Check your input!')

    return paramDict

def TuneHyperParameters(
        X_train=None, y_train=None,
        estimator=None, param_dict={}, metric='roc_auc', n_iter=20):
    
    kf = StratifiedKFold(n_splits=5,shuffle=True)
    
    cv_i = list(kf.split(X_train, y_train))
    
    opt = BayesSearchCV(
        estimator,
        param_dict,
        scoring=metric,
        n_iter=n_iter,
        cv=cv_i,
        n_jobs=-1
    )

    opt.fit(X_train, y_train,callback=DeltaYStopper(delta=1e-2,n_best=5))

    return opt

def EvaluateModel(X_test=None, y_test=None,
                  estimator=None):
    
    return estimator.score(X_test, y_test)