Download data and Import Libraries

In [135]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import os
import shap
from sklearn.utils import resample

In [136]:
transformation_names = [
    #"Original",            # No transformation
    # "PowerTransformer",    # Uses PowerTransformer (Yeo-Johnson)
    # "Log1p",               # log(x+1) transformation
    # "Sqrt",                # Square root transformation
    "BoxCox"#,              # Box-Cox transformation (with shifting if necessary)
    # "QuantileTransformer", # Maps data to a normal distribution
    # "Normalization" 
]

In [137]:
# Directory containing the CSV files
directory = './split_data'

# Dictionary to store DataFrames
dataframes = {}

# Loop through all files in the directory
for filename in os.listdir(directory):
    if filename.endswith('.csv'):
        # Read the CSV file into a DataFrame
        df_name = filename.split('.')[0]  # Use the filename (without extension) as the key
        dataframes[df_name] = pd.read_csv(os.path.join(directory, filename))

# Display the keys of the dictionary to confirm
print(dataframes.keys())

dict_keys(['X_test_Sqrt', 'y_train', 'y_test', 'X_train_Normalization', 'data', 'X_test_Normalization', 'X_test_BoxCox', 'X_train_Original', 'X_test_PowerTransformer', 'X_train_Log1p', 'X_test_Original', 'X_train_Sqrt', 'X_train_BoxCox', 'X_train_PowerTransformer', 'X_test_Log1p', 'X_test_QuantileTransformer', 'X_train_QuantileTransformer'])


In [138]:
# Import necessary libraries for modeling
from sklearn.linear_model import LogisticRegressionCV
from sklearn.svm import LinearSVC
from sklearn.neural_network import MLPClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import CategoricalNB, GaussianNB
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis
from sklearn.preprocessing import MinMaxScaler # Use MinMaxScaler to prevent negative values
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, confusion_matrix
from imblearn.over_sampling import SMOTE
from sklearn.model_selection import StratifiedKFold

# Define continuous and categorical columns for MixedNaiveBayes
continuous_cols = ['BMI', 'MentHlth', 'PhysHlth', 'Age']
categorical_cols = [col for col in dataframes['X_train_BoxCox'].columns if col not in continuous_cols]

# Custom implementation of Naive Bayes
# Use Gaussian distribution to predict continuous variables
# Use categorical distribution to predict discrete variables
# Combine probabilities for final prediction
class MixedNaiveBayes:
    def __init__(self, continuous_cols, categorical_cols, priors=None):
        self.continuous_cols = continuous_cols
        self.categorical_cols = categorical_cols
        self.priors = priors
        if self.priors is not None:  # If class prior is provided
            self.gaussian_nb = GaussianNB(priors=self.priors)
            self.categorical_nb = CategoricalNB(fit_prior=False, class_prior=self.priors)
        else:
            self.gaussian_nb = GaussianNB()
            self.categorical_nb = CategoricalNB()
        self.has_categorical = False

    def fit(self, X, y):

        #print(f'Cont cols: {self.continuous_cols}')
        #print(f'Cat cols: {self.categorical_cols}')
        # Check if there are categorical columns
        if len(self.categorical_cols) > 0:
            self.has_categorical = True
        else:
            self.has_categorical = False
        #print(f'Has cat: {self.has_categorical}, count: {len(self.categorical_cols)}')

        # Split continuous and categorical data
        X_continuous = X[self.continuous_cols]
        if self.has_categorical:
            X_categorical = X[self.categorical_cols]
            self.categorical_nb.fit(X_categorical, y)
        
        # Fit GaussianNB for continuous data
        self.gaussian_nb.fit(X_continuous, y)

    def predict(self, X):
        # Split continuous and categorical data
        X_continuous = X[self.continuous_cols]
        if self.has_categorical:
            X_categorical = X[self.categorical_cols]
        
        # Predict log probabilities for continuous and categorical data
        log_prob_continuous = self.gaussian_nb.predict_log_proba(X_continuous)
        log_prob_categorical = np.zeros_like(log_prob_continuous)
        if self.has_categorical:
            log_prob_categorical = self.categorical_nb.predict_log_proba(X_categorical)
        
        # Combine log probabilities and predict the class with maximum sum
        combined_log_prob = log_prob_continuous + log_prob_categorical
        return combined_log_prob.argmax(axis=1)

    def predict_proba(self, X):
        # Split continuous and categorical data
        X_continuous = X[self.continuous_cols]
        if self.has_categorical:
            X_categorical = X[self.categorical_cols]
        
        # Predict probabilities for continuous and categorical data
        prob_continuous = self.gaussian_nb.predict_proba(X_continuous)
        if self.has_categorical:
            prob_categorical = self.categorical_nb.predict_proba(X_categorical)
            combined_prob = prob_continuous * prob_categorical
        else:
            combined_prob = prob_continuous
        return combined_prob
    
    def score(self, X, y):
        y_pred = self.predict(X)
        return (y_pred == y).mean()

# Custom implementation of LinearSVC
# Default LinearSVC does NOT support predict_proba()
# Scale the decision_function output for binary classification
class CalibratedLinearSVC(LinearSVC):
    def fit(self, X, y):
        super().fit(X, y)
        df = self.decision_function(X)
        self.df_min_ = df.min()
        self.df_max_ = df.max()
    
    def predict_proba(self, X):
        df = self.decision_function(X)
        calibrated_df = (df - self.df_min_) / (self.df_max_ - self.df_min_)
        proba_pos_class = np.clip(calibrated_df, 0, 1)
        proba_neg_class = 1 - proba_pos_class
        proba = np.c_[proba_neg_class, proba_pos_class]
        return proba

# List of proposed models
models = {
    "LR": LogisticRegressionCV(
        scoring='f1',
        max_iter=5000,
        penalty='l1',
        solver='saga'
    ),
    "SVM": CalibratedLinearSVC(),
    "3-Layer-NeuralNetwork": MLPClassifier(

        hidden_layer_sizes=[int((len(dataframes['X_train_Original'].columns)*2)**0.5)]
    ),
    "knn": KNeighborsClassifier(n_neighbors=5),
    "GauNB": GaussianNB(),
    "MixNB": MixedNaiveBayes(
        continuous_cols=continuous_cols, 
        categorical_cols=categorical_cols
    ),
    "LDA": LinearDiscriminantAnalysis(),
    "QDA": QuadraticDiscriminantAnalysis()
}

In [139]:
class ModelTrainer:
    def __init__(self, models, dataframes, transformation_name='BoxCox', epochs=1, debug=False):
        self.models = models
        self.dataframes = dataframes
        self.transformation_name = transformation_name
        self.epochs = epochs
        self.debug = debug
        self.performance_metrics_df = None

        self.X_train = dataframes['X_train_' + transformation_name]
        self.X_test = dataframes['X_test_' + transformation_name]
        self.y_train = dataframes['y_train']
        self.y_test = dataframes['y_test']
        self.X_combined = pd.concat([self.X_train, self.X_test], axis=0)
        self.y_combined = pd.concat([self.y_train, self.y_test], axis=0)
        self.feature_names = None

        self.folds_performance_metrics = []
        self.folds_confusion_matrices = []
        self.lr_coefs = []
        self.svm_coefs = []

    def undersample_train_data(self, X_train, y_train):
        X_train['target'] = y_train
        class_0 = X_train[y_train == 0]
        class_1 = X_train[y_train == 1]

        class_0_undersampled = resample(class_0, replace=False, n_samples=len(class_1), random_state=69)
        X_train = pd.concat([class_0_undersampled, class_1])
        y_train = X_train['target']

        return X_train.drop(columns=['target']), y_train

    def apply_SMOTE(self, X_train, y_train):
        sm = SMOTE(random_state=69)
        X_train, y_train = sm.fit_resample(X_train, y_train)
        return X_train, y_train

    def scale_features(self, X_train, X_test):
        scaler = MinMaxScaler()
        scaler.set_output(transform='pandas')
        X_train = scaler.fit_transform(X_train)
        X_test = scaler.transform(X_test)
        return X_train, X_test

    def fit_models(self, APPLY_SMOTE_TO_TRAIN_DATA=False, 
                   UNDERSAMPLE_TRAIN_DATA=True, STAND_FEATURES=True,
                   custom_features=None):

        skf = StratifiedKFold(n_splits=self.epochs, shuffle=True, random_state=69)
        for i, (train_idx, test_idx) in enumerate(skf.split(self.X_combined, self.y_combined)):
            print(f"Starting fold {i + 1}/{self.epochs}...\n")

            self.X_train = self.X_combined.iloc[train_idx]
            self.X_test = self.X_combined.iloc[test_idx]
            self.y_train = self.y_combined.iloc[train_idx]
            self.y_test = self.y_combined.iloc[test_idx]

            performance_metrics = []
            confusion_matrixs = {}        

            if APPLY_SMOTE_TO_TRAIN_DATA:
                print("SMOTE train data applied.\n")
            if STAND_FEATURES:
                print("Standardized features applied.\n")
            if UNDERSAMPLE_TRAIN_DATA:
                print("Undersample train data applied.\n")

            if UNDERSAMPLE_TRAIN_DATA:
                self.X_train, self.y_train = self.undersample_train_data(self.X_train, self.y_train)

            if APPLY_SMOTE_TO_TRAIN_DATA:
                self.X_train, self.y_train = self.apply_SMOTE(self.X_train, self.y_train)

            if 'target' in self.X_train.columns:
                self.X_train = self.X_train.drop(columns=['target'])
            if 'target' in self.X_test.columns:
                self.X_test = self.X_test.drop(columns=['target'])

            if STAND_FEATURES:
                self.X_train, self.X_test = self.scale_features(self.X_train, self.X_test)

            if custom_features is not None:
                self.X_train = self.X_train[custom_features]
                self.X_test = self.X_test[custom_features]

                # for naive bayes, find continuous columns from custom features
                self.models['MixNB'].continuous_cols = [
                    col for col in self.models['MixNB'].continuous_cols if col in custom_features
                ]
                self.models['MixNB'].categorical_cols = [
                    col for col in self.models['MixNB'].categorical_cols if col in custom_features
                ]
                
            self.feature_names = self.X_train.columns.tolist()

            self.y_train, self.y_test = self.y_train.squeeze(), self.y_test.squeeze()

            for name, model in self.models.items():
                print(f'Fitting {name}.')

                if self.debug:
                    print(f"X_train median:\n{self.X_train.median()}\n")
                    print(f"X_train std:\n{self.X_train.std()}\n")
                    print(f"y_train distribution:\n{self.y_train.value_counts(normalize=True)}")
                    print(f"y_test distribution:\n{self.y_test.value_counts(normalize=True)}")

                model.fit(self.X_train, self.y_train)
                y_pred = model.predict(self.X_test)
                y_prob = model.predict_proba(self.X_test)[:, -1]

                if name == 'LR':
                    self.lr_coefs.append(model.coef_[0])
                elif name == 'SVM':
                    self.svm_coefs.append(model.coef_[0])
                    
                metrics = {
                    "Model": name,
                    "Accuracy": accuracy_score(self.y_test, y_pred),
                    "Precision": precision_score(self.y_test, y_pred),
                    "Recall": recall_score(self.y_test, y_pred),
                    "F1 Score": f1_score(self.y_test, y_pred),
                    "AUROC": roc_auc_score(self.y_test, y_prob)
                }

                performance_metrics.append(metrics)

                cm = confusion_matrix(self.y_test, y_pred)
                confusion_matrixs[name] = cm
                #print(f'Confusion Matrix for {name}:\n{cm}\n')

            self.performance_metrics_df = pd.DataFrame(performance_metrics)
            #print(self.performance_metrics_df)
            print(f'Completed fold {i + 1}/{self.epochs}\n')
            self.folds_performance_metrics.append(self.performance_metrics_df)
            self.folds_confusion_matrices.append(confusion_matrixs)

In [140]:
# Initialize the ModelTrainer class
trainer = ModelTrainer(models=models, dataframes=dataframes, transformation_name='BoxCox', epochs=5, debug=False)

# Fit models
trainer.fit_models(
    APPLY_SMOTE_TO_TRAIN_DATA=True,
    UNDERSAMPLE_TRAIN_DATA=False,
    STAND_FEATURES=True,
)

Starting fold 1/5...

SMOTE train data applied.

Standardized features applied.

Fitting LR.
Fitting SVM.
Fitting 3-Layer-NeuralNetwork.
Fitting knn.
Fitting GauNB.
Fitting MixNB.
Fitting LDA.
Fitting QDA.
Completed fold 1/5

Starting fold 2/5...

SMOTE train data applied.

Standardized features applied.

Fitting LR.
Fitting SVM.
Fitting 3-Layer-NeuralNetwork.
Fitting knn.
Fitting GauNB.
Fitting MixNB.
Fitting LDA.
Fitting QDA.
Completed fold 2/5

Starting fold 3/5...

SMOTE train data applied.

Standardized features applied.

Fitting LR.
Fitting SVM.
Fitting 3-Layer-NeuralNetwork.
Fitting knn.
Fitting GauNB.
Fitting MixNB.
Fitting LDA.
Fitting QDA.
Completed fold 3/5

Starting fold 4/5...

SMOTE train data applied.

Standardized features applied.

Fitting LR.
Fitting SVM.
Fitting 3-Layer-NeuralNetwork.
Fitting knn.
Fitting GauNB.
Fitting MixNB.
Fitting LDA.
Fitting QDA.
Completed fold 4/5

Starting fold 5/5...

SMOTE train data applied.

Standardized features applied.

Fitting LR.
Fit

Fitted coefficients interpretation (evaluation metric in permutation importance: f1 score)

Logistic Regression

SHAP analysis

Maybe extract some incorrectly classified samples and visualize how the factors impacted the final classification?