In [11]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import datetime as dt
import time
import os

from datetime import datetime

import shap
import lime
from lime import lime_tabular

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, RandomizedSearchCV, GridSearchCV
from sklearn.preprocessing import MinMaxScaler
import statsmodels.api as sm
from sklearn.linear_model import LogisticRegression
from sklearn.feature_selection import RFE
from statsmodels.stats.outliers_influence import variance_inflation_factor

from sklearn import metrics
from sklearn.metrics import confusion_matrix

from sklearn.metrics import precision_score, recall_score
from sklearn.metrics import precision_recall_curve

from sklearn.cluster import KMeans

import missingno as msno

from fancyimpute import IterativeImputer as MICE
from sklearn.impute import IterativeImputer
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.svm import SVC

import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam 


from sklearn.cluster import DBSCAN
from imblearn.over_sampling import SMOTE
from sklearn.neighbors import NearestNeighbors
from collections import Counter

from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import numpy as np

from imblearn.over_sampling import KMeansSMOTE
from sklearn.mixture import GaussianMixture


from xgboost import XGBClassifier
from rgf.sklearn import RGFClassifier  # Regularized Greedy Forest
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense


from sklearn.metrics import accuracy_score, confusion_matrix, f1_score, roc_auc_score, roc_curve
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline

from joblib import dump, load
import logging


In [14]:

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def split_dataset(dataset, target_column, test_size=0.2):
    """
    Split dataset into training and testing sets.
    """
    X = dataset.drop(columns=[target_column])
    y = dataset[target_column]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=42, stratify=y)

    logging.info("Dataset has been split and returned")
    return X_train, X_test, y_train, y_test

def train_ann(X_train, y_train):
    """
    Train an Artificial Neural Network (ANN) on the training data.
    """
    start_time = time.time()
    model = Sequential([
        Input(shape=(X_train.shape[1],)),
        Dense(12, activation='relu'),
        Dense(8, activation='relu'),
        Dense(1, activation='sigmoid')
    ])
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    model.fit(X_train, y_train, epochs=150, batch_size=10, verbose=0)
    end_time = time.time()

    logging.info(f"ANN has been trained in {end_time - start_time:.2f} seconds")
    return model

def train_models(X_train, y_train):
    """
    Train multiple models on the training data.
    """
    models = {}
    param_grids = {
        'RandomForest': {
            'n_estimators': [100, 200, 300],
            'max_depth': [None, 10, 20],
            'min_samples_split': [2, 5]
        },
        'XGBoost': {
            'n_estimators': [100, 200, 300],
            'max_depth': [3, 6],
            'learning_rate': [0.01, 0.1]
        },
        'SVM': {
            'C': [0.1, 1, 10],
            'kernel': ['linear', 'rbf']
        },
        'LogisticRegression': {
            'C': [0.1, 1, 10],
            'penalty': ['l2']
        },
        'GradientBoosting': {
            'n_estimators': [100, 200, 300],
            'learning_rate': [0.01, 0.1],
            'max_depth': [3, 5, 7]
        },
        'KNN': {
            'n_neighbors': [3, 5, 7],
            'weights': ['uniform', 'distance']
        }
    }

    models['ANN'] = train_ann(X_train, y_train)

    for model_name, param_grid in param_grids.items():
        start_time = time.time()
        try:
            if model_name == 'RandomForest':
                model = GridSearchCV(RandomForestClassifier(), param_grid, cv=5)
            elif model_name == 'XGBoost':
                model = GridSearchCV(XGBClassifier(), param_grid, cv=5)
            elif model_name == 'SVM':
                model = GridSearchCV(SVC(probability=True), param_grid, cv=5)
            elif model_name == 'LogisticRegression':
                model = GridSearchCV(LogisticRegression(), param_grid, cv=5)
            elif model_name == 'GradientBoosting':
                model = GridSearchCV(GradientBoostingClassifier(), param_grid, cv=5)
            elif model_name == 'KNN':
                model = GridSearchCV(KNeighborsClassifier(), param_grid, cv=5)

            model.fit(X_train, y_train)
            models[model_name] = model.best_estimator_
            end_time = time.time()
            logging.info(f"{model_name} has been trained in {end_time - start_time:.2f} seconds")
        except Exception as e:
            logging.error(f"Error training {model_name}: {e}")

    try:
        start_time = time.time()
        nb = GaussianNB()
        nb.fit(X_train, y_train)
        models['NaiveBayes'] = nb
        end_time = time.time()
        logging.info(f"Naive Bayes has been trained in {end_time - start_time:.2f} seconds")
    except Exception as e:
        logging.error(f"Error training Naive Bayes: {e}")

    return models

def test_models(models, X_test):
    """
    Test trained models on the test data.
    """
    start_time = time.time()
    predictions = {}
    for name, model in models.items():
        try:
            if name == 'ANN':
                predictions[name] = (model.predict(X_test) > 0.5).astype("int32")
            else:
                predictions[name] = model.predict(X_test)
        except Exception as e:
            logging.error(f"Error testing {name}: {e}")
    end_time = time.time()

    logging.info(f"Models have been tested in {end_time - start_time:.2f} seconds")
    return predictions

def evaluate_models(models, predictions, y_test, X_test):
    """
    Evaluate the performance of models.
    """
    start_time = time.time()
    metrics = {}
    for name, y_pred in predictions.items():
        try:
            accuracy = accuracy_score(y_test, y_pred)
            cm = confusion_matrix(y_test, y_pred)
            f1 = f1_score(y_test, y_pred)
            auc = roc_auc_score(y_test, models[name].predict_proba(X_test)[:, 1]) if name != 'ANN' else roc_auc_score(y_test, models[name].predict(X_test))
            metrics[name] = {
                'accuracy': accuracy,
                'confusion_matrix': cm,
                'f1_score': f1,
                'auc_roc': auc
            }
        except Exception as e:
            logging.error(f"Error evaluating {name}: {e}")
    end_time = time.time()

    logging.info(f"Models have been evaluated in {end_time - start_time:.2f} seconds")
    return metrics


def explainability_shap(models, df_name, X_test, feature_names):

    """
    
    """
    # Ensure X_test is a DataFrame with named columns
    X_test = pd.DataFrame(X_test, columns=feature_names).reset_index(drop=True)
    
    for name, model in models.items():
        if name == 'ANN':
            continue
        try:
            if name in ['RandomForest', 'XGBoost', 'GradientBoosting']:
                explainer = shap.TreeExplainer(model)
            
            # No existing methods to analyse other models using SHAP, so only these three models.
            
            shap_values = explainer.shap_values(X_test)
            
            plt.figure(figsize=(10, 6))
            shap.summary_plot(shap_values[1] if isinstance(shap_values, list) else shap_values, 
                              X_test, plot_type="bar", show=False, max_display=10)
            plt.title(f"Top 10 Most Important Features - {name}")
            plt.tight_layout()
            plt.savefig(f"{df_name}_shap_importance_{name}.png")
            plt.close()
            logging.info(f"SHAP explanations for {name} created and saved")
        except Exception as e:
            logging.error(f"Error generating SHAP explanations for {name}: {e}")



def explainability_lime(models, df_name, X_train, X_test, feature_names):
    
    """
    
    """
    # Ensure X_train and X_test are DataFrames with named columns
    X_train = pd.DataFrame(X_train, columns=feature_names).reset_index(drop=True)
    X_test = pd.DataFrame(X_test, columns=feature_names).reset_index(drop=True)
    
    explainer = lime.lime_tabular.LimeTabularExplainer(
        X_train.values,  # Use .values to get numpy array
        feature_names=feature_names, 
        class_names=['Negative', 'Positive'], 
        mode='classification'
    )
    for name, model in models.items():
        if name == 'ANN':
            continue
        try:
            i = np.random.randint(0, X_test.shape[0])
            exp = explainer.explain_instance(
                X_test.iloc[i].values,  # Use .iloc[i].values to get numpy array
                model.predict_proba, 
                num_features=6
            )
            feature_importance = pd.DataFrame(exp.as_list(), columns=['Feature', 'Importance'])
            feature_importance['Absolute Importance'] = abs(feature_importance['Importance'])
            feature_importance = feature_importance.sort_values('Absolute Importance', ascending=True)
            plt.figure(figsize=(10, 6))
            colors = ['red' if imp < 0 else 'green' for imp in feature_importance['Importance']]
            plt.barh(feature_importance['Feature'], feature_importance['Importance'], color=colors)
            plt.title(f"LIME Explanation for {name}\nTop 6 Features' Impact on Prediction")
            plt.xlabel('Impact on Prediction (Red = Negative, Green = Positive)')
            plt.tight_layout()
            plt.savefig(f"{df_name}_lime_explanation_{name}.png")
            plt.close()
            logging.info(f"LIME explanation for {name} created and saved")
        except Exception as e:
            logging.error(f"Error generating LIME explanations for {name}: {e}")



def interpret_results(models, X_test, feature_names):
    summary = "Model Interpretation Summary:\n\n"
    for name, model in models.items():
        if name == 'ANN':
            continue
        summary += f"{name} Model:\n"
        summary += f"Feature Importance from {name} Model:\n"
        try:
            if name in ['RandomForest', 'XGBoost', 'GradientBoosting']:
                importances = model.feature_importances_
                importance_df = pd.DataFrame({'Feature': feature_names, 'Importance': importances})
                importance_df = importance_df.sort_values('Importance', ascending=False).head(10)
            else:
                importances = model.coef_[0] if hasattr(model, 'coef_') else None
                importance_df = pd.DataFrame({'Feature': feature_names, 'Importance': importances})
                importance_df = importance_df.sort_values('Importance', ascending=False).head(10)
            summary += importance_df.to_string(index=False)
            summary += "\n\n"
        except Exception as e:
            logging.error(f"Error interpreting results for {name}: {e}")
    logging.info("Model interpretation summary created")
    return summary


def save_models(models, directory='models'):
    """
    Save trained models to disk.
    """
    if not os.path.exists(directory):
        os.makedirs(directory)
    for name, model in models.items():
        try:
            if name == 'ANN':
                model.save(os.path.join(directory, f'{name}_model.h5'))
            else:
                dump(model, os.path.join(directory, f'{name}_model.joblib'))
            logging.info(f"{name} model saved")
        except Exception as e:
            logging.error(f"Error saving {name} model: {e}")


# Use only if needed to run back with best models
def load_models(directory='models'):
    """
    Load trained models from disk.
    """
    models = {}
    for filename in os.listdir(directory):
        model_name, ext = os.path.splitext(filename)
        try:
            if ext == '.h5':
                models[model_name] = load_model(os.path.join(directory, filename))
            elif ext == '.joblib':
                models[model_name] = load(os.path.join(directory, filename))
            logging.info(f"{model_name} model loaded")
        except Exception as e:
            logging.error(f"Error loading {model_name} model: {e}")
    return models


def main(dataset, target_column, name):
    """
    Main function to train, test, evaluate, and explain models.
    """
    X_train, X_test, y_train, y_test = split_dataset(dataset, target_column)

    # Standardization
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    logging.info("Data has been standardized")

    models = train_models(X_train, y_train)
    predictions = test_models(models, X_test)
    metrics = evaluate_models(models, predictions, y_test, X_test)

    explainability_shap(models, name, X_test, feature_names=dataset.drop(columns=[target_column]).columns)
    explainability_lime(models, name, X_train, X_test, feature_names=dataset.drop(columns=[target_column]).columns)

    save_models(models)
    logging.info("Models have been saved")

    # Interpret results
    summary = interpret_results(models, X_test, feature_names=dataset.drop(columns=[target_column]).columns)
    print(summary)

    return metrics


def modelling_gs(df, name):
    """
    Function to run the main pipeline with the given dataset.
    """
    target_column = 'LABEL'  # Replace with your target column
    results = main(df, target_column, name)
    logging.info("Results have been documented.")
    return results

# To run the modelling function with a dataset 'df':
# results = modelling_gs(df)

In [13]:
file_paths = [
    "C:\\Users\\dev\\Desktop\\MSC thesis\\Code\\final_codes\\Processed Datasets\\ADASYN_AE_3_PCA.xlsx",
    "C:\\Users\\dev\\Desktop\\MSC thesis\\Code\\final_codes\\Processed Datasets\\ADASYN_MICE_3_PCA.xlsx",
    "C:\\Users\\dev\\Desktop\\MSC thesis\\Code\\final_codes\\Processed Datasets\\KMSMOTE_AE_3_PCA.xlsx",
    "C:\\Users\\dev\\Desktop\\MSC thesis\\Code\\final_codes\\Processed Datasets\\KMSMOTE_MICE_3_PCA.xlsx",
    "C:\\Users\\dev\\Desktop\\MSC thesis\\Code\\final_codes\\Processed Datasets\\SVMSMOTE_AE_3_PCA.xlsx",
    "C:\\Users\\dev\\Desktop\\MSC thesis\\Code\\final_codes\\Processed Datasets\\SVMSMOTE_MICE_3_PCA.xlsx"
]

# Read the Excel files into dataframes
dfs = [pd.read_excel(file_path) for file_path in file_paths]

print("Datasets are read into dataframes")

tot_start_time = time.time()
start_time = time.time()
# Store results in variables
results_ADASYN_AE_3_PCA = modelling_gs(dfs[0], "ADASYN_AE_3_PCA" )
end_time = time.time()  # End timing
elapsed_time = (end_time - start_time) / 60
print("_______________________________________________________________________________")
print(f" Total time taken by ADASYN_AE_3_PCA: {elapsed_time:.2f} mins")

start_time = time.time()
results_ADASYN_MICE_3_PCA = modelling_gs(dfs[1], "ADASYN_MICE_3_PCA")

end_time = time.time()  # End timing
elapsed_time = (end_time - start_time) / 60
print("_______________________________________________________________________________")
print(f" Total time taken by ADASYN_MICE_3_PCA: {elapsed_time:.2f} mins")

start_time = time.time()
results_KMSMOTE_AE_3_PCA = modelling_gs(dfs[2], "KMSMOTE_AE_3_PCA")

end_time = time.time()  # End timing
elapsed_time = (end_time - start_time) / 60
print("_______________________________________________________________________________")
print(f" Total time taken by KMSMOTE_AE_3_PCA: {elapsed_time:.2f} mins")

start_time = time.time()
results_KMSMOTE_MICE_3_PCA = modelling_gs(dfs[3], "KMSMOTE_MICE_3_PCA")

end_time = time.time()  # End timing
elapsed_time = (end_time - start_time) / 60
print("_______________________________________________________________________________")
print(f" Total time taken by KMSMOTE_MICE_3_PCA: {elapsed_time:.2f} mins")

start_time = time.time()
results_SVMSMOTE_AE_3_PCA = modelling_gs(dfs[4], "SVMSMOTE_AE_3_PCA")

end_time = time.time()  # End timing
elapsed_time = (end_time - start_time) / 60
print("_______________________________________________________________________________")
print(f" Total time taken by SVMSMOTE_AE_3_PCA: {elapsed_time:.2f} mins")

start_time = time.time()
results_SVMSMOTE_MICE_3_PCA = modelling_gs(dfs[5], "SVMSMOTE_MICE_3_PCA")

end_time = time.time()  # End timing
elapsed_time = (end_time - start_time) / 60
print("_______________________________________________________________________________")
print(f" Total time taken by SVMSMOTE_MICE_3_PCA: {elapsed_time:.2f} mins")


print(" ")
print("_______________________________________________________________________________")
tot_end_time = time.time()  # End timing
tot_elapsed_time = (tot_end_time - tot_start_time) / 60
print(f" Total time taken by all the models : {tot_elapsed_time:.2f} mins")

# Print the results with variable names
print("Results for ADASYN_AE_3_PCA:", results_ADASYN_AE_3_PCA)
print("Results for ADASYN_MICE_3_PCA:", results_ADASYN_MICE_3_PCA)
print("Results for KMSMOTE_AE_3_PCA:", results_KMSMOTE_AE_3_PCA)
print("Results for KMSMOTE_MICE_3_PCA:", results_KMSMOTE_MICE_3_PCA)
print("Results for SVMSMOTE_AE_3_PCA:", results_SVMSMOTE_AE_3_PCA)
print("Results for SVMSMOTE_MICE_3_PCA:", results_SVMSMOTE_MICE_3_PCA)

Datasets are read into dataframes


TypeError: main() takes 2 positional arguments but 3 were given