# Author: "Matougui Zakaria"

# Import Libraries

In [None]:
import numpy as np
import seaborn as sns
from sklearn.metrics import mean_squared_error, roc_auc_score, recall_score, accuracy_score, cohen_kappa_score
from sklearn.model_selection import train_test_split, GridSearchCV, RepeatedKFold
from imblearn.under_sampling import RandomUnderSampler
import scipy.stats as stats
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis, QuadraticDiscriminantAnalysis
from sklearn.neural_network import MLPClassifier
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, roc_auc_score
import joblib
from sklearn.ensemble import VotingClassifier
from sklearn.ensemble import StackingClassifier
from deslib.des import METADES
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import BaggingClassifier
from statsmodels.stats.outliers_influence import variance_inflation_factor
from scipy.stats import wilcoxon
from researchpy import ttest
from easy_mpl import taylor_plot

# Input DATA

In [None]:
# Full_datafreme is the DataFrame containing all the data that has been pre-processed 

# Features is the DataFrame containing all the causative factors 
Features=Full_datafreme.drop("Target", axis=1)

# Target is a Series or DataFrame containing the target variable (Landslides)
Target=Full_datafreme['Target']

# Randomly select examples from the majority class to balance the class distribution
rus=RandomUnderSampler(random_state=0)

# X and y will contain your balanced dataset, where the number of samples in each class is the same
X,y=rus.fit_resample(Features,Target)

# Functions

In [None]:
# Define the list of baselearners
def B_models():
    models_all = [
        ('Logistic Regression', LogisticRegression()),
        ('Decision Tree', DecisionTreeClassifier(random_state=rng)),
        ('SVM', SVC(probability=True,random_state=rng)),
        ('Gaussian Naive Bayes', GaussianNB()),
        ('K-Nearest Neighbors', KNeighborsClassifier()),
        ('Linear Discriminant Analysis', LinearDiscriminantAnalysis()),
        ('Quadratic Discriminant Analysis', QuadraticDiscriminantAnalysis()),
        ('MLP Classifier', MLPClassifier(random_state=rng))
    ]
    return models
# Add the prefered models to the liste

In [None]:
# Function to calculate and plot VIF
def calculate_and_plot_vif(Features):
    # Create a DataFrame to store the VIF values
    vif_data = pd.DataFrame()
    vif_data["feature"] = Features.columns

    # Calculate VIF for each feature
    vif_data["VIF"] = [variance_inflation_factor(Features.values, i)
                      for i in range(len(Features.columns))]

    # Create a figure and axis for plotting
    fig, ax = plt.subplots(figsize=(12, 6))

    # Create a bar plot of VIF values
    sns.barplot(x='feature', y='VIF', data=vif_data.sort_values(by=['VIF']), ax=ax, palette='Spectral')
    ax.set_xticklabels(ax.get_xticklabels(), rotation=45)
    sns.color_palette("flare", as_cmap=True)
    ax.set_xlabel('Causative factors')
    
    # Add labels to the bars
    for c in ax.containers:
        ax.bar_label(c, fmt='%.2f', label_type='edge', padding=2)
    
    # Adjust spacing
    ax.margins(y=0.1)
    
    # Show the plot
    plt.show()

In [None]:
def plot_correlation_matrix(Featurs):
   
    # Calculate the correlation matrix
    correlation_matrix = Featurs.corr()

    # Create a figure and axis for the heatmap
    plt.figure(figsize=(10, 8))
    ax = sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', linewidths=0.5)

    # Set the plot title
    plt.title("Correlation Matrix Heatmap")

    # Show the plot
    plt.show()

In [None]:
# Function to plot roc curves
def plot_roc_curve(result_table):
    fig = plt.figure(figsize=(8, 6))

    for i in result_table.index:
        plt.plot(result_table.loc[i]['fpr'], 
                 result_table.loc[i]['tpr'], 
                 label="{}, AUC={:.3f}".format(i, result_table.loc[i]['auc']))

    plt.xlabel("False Positive Rate", fontsize=15)
    plt.ylabel("True Positive Rate", fontsize=15)
    plt.legend(prop={'size': 13}, loc='lower right')
    plt.grid(visible=True)
    plt.show()

In [None]:
# Function to calculate feature sensitivity for a given model
def Sensitivity(model, X, y):
    results = []  # Initialize an empty list to store sensitivity results

    # Split the dataset into training and validation sets
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.33, random_state=0)
    
    # Fit the model on the training data
    model.fit(X_train, y_train)
    
    # Calculate the baseline AUC score on the validation data
    baseline_auc = roc_auc_score(y_val, model.predict_proba(X_val)[:, 1])

    # Iterate through each predictor (feature) in the dataset
    for predictor in X.columns:
        X_val_copy = X_val.copy()  # Create a copy of the validation data
        X_val_copy[predictor] = np.random.permutation(X_val_copy[predictor])  # Permute the values of the predictor
        y_pred = model.predict_proba(X_val_copy)[:, 1]  # Predict probabilities with altered predictor
        auc_diff = baseline_auc - roc_auc_score(y_val, y_pred)  # Calculate AUC difference
        results.append({'Factor': predictor, 'AUC_Difference': auc_diff})  # Store factor and AUC difference

    SA_data = pd.DataFrame(results)  # Create a DataFrame from sensitivity results
    return SA_data  # Return the sensitivity data


In [None]:
# Function to plot sensitivity analysis results
def plot_sensitivity_results(results_df):
    # Create a horizontal bar plot to visualize AUC differences for each factor
    plt.figure(figsize=(12, 6))
    plt.barh(results_df['Factor'], results_df['AUC_Difference'], color='skyblue')
    plt.xlabel('AUC Difference')  # Set the label for the x-axis
    plt.title('Sensitivity Analysis - AUC Difference for Each Factor')  # Set the plot title
    plt.grid(axis='x', linestyle='--', alpha=0.6)  # Add grid lines for reference
    plt.show() 


# Features analysis

In [None]:
plot_correlation_matrix(Featurs)
calculate_and_plot_vif(Features)

# Base learners

In [None]:

# Set random seed
rng = 1

# Define cross-validation strategy
cv = RepeatedKFold(n_splits=5, n_repeats=2, random_state=rng)

# Split the dataset into train and test sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=rng)

# Define hyperparameters for each model
param_grid = {
    'Logistic Regression': {'C': [0.01, 0.1, 1, 10], 
                            'penalty': ['l1', 'l2']},
    
    'Decision Tree': {'max_depth': [None, 4,5,6,7,8,9,10], 
                      'min_samples_split': [2, 5, 10]},
    
    'SVM': {'C': [0.01, 0.1, 1, 10], 
            'kernel': ['linear', 'rbf'], 
            'gamma': [0.1, 1, 10,'scale', 'auto']},
    
    'Gaussian Naive Bayes': {"var_smoothing": [1e-8,1e-9,1e-10]},
    
    'K-Nearest Neighbors': {'n_neighbors': np.arange(2,60,2), 
                            'p':[1,2,3,4,5],
                            'weights': ['uniform', 'distance']},
    
    'Linear Discriminant Analysis': {},
    
    'Quadratic Discriminant Analysis': {'tol':[0.0001,0.001]},
    
    'MLP Classifier': {'hidden_layer_sizes': [np.arange(4,50,2) ], 
                       'activation': ['relu', 'tanh'],
                       'solver':["lbfgs", 'sgd', 'adam'],
                      'alpha':[0.001,0.01,0.1]}
}

B_models_name=['LR','DT','SVM','NB','KNN','LDA','QDA','MLP']

# Perform GridSearchCV for each model
best_params = {}
models = B_models()

fitted_models = {}

for name, model in models:
    if name in param_grid:
        grid = GridSearchCV(estimator=model, param_grid=param_grid[name], scoring='roc_auc', cv=cv)
        grid.fit(X_train, y_train)
        best_model = grid.best_estimator_
        fitted_models[name] = best_model

        # Save the best model using joblib
        joblib.dump(best_model, f"{name}_best_model.pkl")

# Print the best parameters
for name, params in best_params.items():
    print(f"Best parameters for {name}: {params}")

# Evaluate models and store results in a DataFrame
results_df = pd.DataFrame(columns=['Model', 'RMSE', 'AUC', 'RECALL', 'ACCURACY', 'KAPPA'])

for name, model in fitted_models.items():
    model.fit(X_train, y_train)  
    y_pred = model.predict(X_test)  

    # Calculate metrics
    rmse = mean_squared_error(y_test, y_pred) ** 0.5
    auc = roc_auc_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    kappa = cohen_kappa_score(y_test, y_pred)

    # Add results to the DataFrame
    results_df = results_df.append({'Model': name, 'RMSE': rmse, 'AUC': auc,
                                    'RECALL': recall, 'ACCURACY': accuracy,
                                    'KAPPA': kappa}, ignore_index=True)

# Print the results
print(results_df)

In [None]:

result_table = pd.DataFrame(columns=['Models', 'fpr', 'tpr', 'auc'])

for name, model in fitted_models.items():
    model.fit(X_train, y_train)
    yproba = model.predict_proba(X_test)[:, 1]
    
    fpr, tpr, _ = roc_curve(y_test, yproba)
    auc = roc_auc_score(y_test, yproba)
    
    result_table = result_table.append({'Models': name,
                                        'fpr': fpr, 
                                        'tpr': tpr, 
                                        'auc': auc}, ignore_index=True)
sns.set_theme()

result_table.set_index('Models', inplace=True)

plot_roc_curve(result_table)


# Heterogeneous ensembles

In [None]:
# Create the list of tuples for the ensembles
estimators = [(name, model) for name, model in fitted_models.items()]

### Stacking

In [None]:
Stacking= StackingClassifier(estimators=estimators, final_estimator=LogisticRegression(),cv=10)
Stacking.fit(X_train,y_train)

### METADES

In [None]:
# Split the data for base learners and meta-learner
Xb, X_m, yb, y_m = train_test_split(X_train, y_train, test_size=0.5, random_state=rng)

# Extract second elements (classifiers) from the tuples
classifiers = [classifier for name, classifier in estimators]

# Train base classifiers
for classifier in classifiers:
    classifier.fit(Xb, yb)

# Train the METADES ensemble
metades = METADES(pool_classifiers=classifiers,
                  meta_classifier=LogisticRegression(),
                  k=6, mode='hybrid', random_state=rng, n_jobs=2)
metades.fit(X_m, y_m)

###  Voting 

In [None]:
Voting=VotingClassifier(estimators=estimators, voting='soft',weights=None)
Voting.fit(X_train,y_train)

### Weighted voting 

In [None]:
# Calculate the ROC AUC scores for each model
roc_auc_scores = {}
for name, model in fitted_models.items():
    y_probas = model.predict_proba(X_test)[:, 1]
    roc_auc = roc_auc_score(y_test, y_probas)
    roc_auc_scores[name] = roc_auc

# Calculate the weights based on ROC AUC scores
total_auc = sum(roc_auc_scores.values())
weights = {name: auc / total_auc for name, auc in roc_auc_scores.items()}

# Create the weighted voting classifier
WE_voting= VotingClassifier(estimators=estimators, voting='soft', weights=list(weights.values()))

WE_voting.fit(X_train,y_train)

In [None]:
#Evaluation of heterogeneous ensembles

Het_en=[Stacking,metades,Voting,WE_voting]
Het_en_names=['Stacking','METADES','Voting','Weighted']
results_df_het = pd.DataFrame(columns=['Model', 'RMSE', 'AUC', 'RECALL', 'ACCURACY', 'KAPPA'])

for name, model in zip(Het_en_names,Het_en):
    y_pred = model.predict(X_test)  

    # Calculate metrics
    rmse = mean_squared_error(y_test, y_pred) ** 0.5
    auc = roc_auc_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    kappa = cohen_kappa_score(y_test, y_pred)

    # Add results to the DataFrame
    results_df_het = results_df_het.append({'Model': name, 'RMSE': rmse, 'AUC': auc,
                                    'RECALL': recall, 'ACCURACY': accuracy,
                                    'KAPPA': kappa}, ignore_index=True)

# Print the results
print(results_df_het)

In [None]:
result_table_het = pd.DataFrame(columns=['Models', 'fpr', 'tpr', 'auc'])

for name, model in zip(Het_en_names,Het_en):
    yproba = model.predict_proba(X_test)[:, 1]
    
    fpr, tpr, _ = roc_curve(y_test, yproba)
    auc = roc_auc_score(y_test, yproba)
    
    result_table_het = result_table_het.append({'Models': name,
                                        'fpr': fpr, 
                                        'tpr': tpr, 
                                        'auc': auc}, ignore_index=True)

result_table_het.set_index('Models', inplace=True)
#Plot roc curves
plot_roc_curve(result_table_het)

# Homogeneous ensembles

In [None]:
RF=RandomForestClassifier(random_state=rng)

# Define the hyperparameters
param_grid_RF = {
    'n_estimators': np.arange(50,200,25),
    'max_depth': [None, 5,6,7,8],
    'min_samples_split': [2, 5,10],
    'min_samples_leaf': [1, 2,4,6]
}

# Create the GridSearchCV instance
grid_search_RF = GridSearchCV(estimator=RF, param_grid=param_grid_RF, scoring='roc_auc', cv=cv, n_jobs=-1)

# Fit the GridSearchCV to the data
grid_search_RF.fit(X_train, y_train)

RF=grid_search_RF.best_estimator_

In [None]:
ADA = AdaBoostClassifier(random_state=rng)

# Define the hyperparameters
param_grid_ADA = {
    'n_estimators': np.arange(50,200,25),
    'learning_rate': [0.01, 0.1, 1.0],
    'algorithm': ['SAMME', 'SAMME.R']
}

# Create GridSearchCV object
grid_search_ADA = GridSearchCV(estimator=ada_model, param_grid=param_grid_ADA, scoring='roc_auc', cv=cv, n_jobs=-1)

# Fit the GridSearchCV to the data
grid_search_ADA.fit(X, y)

# Get the best estimator
ADA=grid_search_ADA.best_estimator_

In [None]:
BG=BaggingClassifier(base_estimator=fitted_models['Decision Tree'],random_state=rng)

# Define hyperparameters
param_grid_BG = {
    'n_estimators': np.arange(50,200,25),
    'max_samples': [0.5, 0.7, 1.0],
    'max_features': [0.5, 0.7, 1.0]
}

# Perform GridSearchCV for the BaggingClassifier
grid_search_BG = GridSearchCV(estimator=BG, param_grid=param_grid_BG, scoring='roc_auc', cv=cv)
grid_search_BG.fit(X, y)

# Get the best estimator

BG = grid_search_BG.best_estimator_

In [None]:
RSS=BaggingClassifier(base_estimator=fitted_models['Decision Tree'],bootstrap=False,random_state=rng)

# Define hyperparameters
param_grid_RSS = {
    'n_estimators': np.arange(50,200,25),
    'max_samples': [0.5, 0.7, 1.0],
    'max_features': [0.5, 0.7, 1.0]
}

# Perform GridSearchCV for the BaggingClassifier
grid_search_RSS = GridSearchCV(estimator=BG, param_grid=param_grid_RSS, scoring='roc_auc', cv=cv)
grid_search_RSS.fit(X, y)

# Get the best estimator

RSS = grid_search_BG.best_estimator_

In [None]:
Hom_en=[RSS,ADA,BG,RF]
Hom_en_names=['RSS','Adb','BG','RF']

results_df_hom = pd.DataFrame(columns=['Model', 'RMSE', 'AUC', 'RECALL', 'ACCURACY', 'KAPPA'])

for name, model in zip(Hom_en_names,Hom_en):
    y_pred = model.predict(X_test)  

    # Calculate metrics
    rmse = mean_squared_error(y_test, y_pred) ** 0.5
    auc = roc_auc_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)
    kappa = cohen_kappa_score(y_test, y_pred)

    # Add results to the DataFrame
    results_df_hom = results_df_hom.append({'Model': name, 'RMSE': rmse, 'AUC': auc,
                                    'RECALL': recall, 'ACCURACY': accuracy,
                                    'KAPPA': kappa}, ignore_index=True)

# Print the results
print(results_df_het)

In [None]:
result_table_hom = pd.DataFrame(columns=['Models', 'fpr', 'tpr', 'auc'])

for name, model in zip(Hom_en_names,Hom_en):
    yproba = model.predict_proba(X_test)[:, 1]
    
    fpr, tpr, _ = roc_curve(y_test, yproba)
    auc = roc_auc_score(y_test, yproba)
    
    result_table_hom = result_table_hom.append({'Models': name,
                                        'fpr': fpr, 
                                        'tpr': tpr, 
                                        'auc': auc}, ignore_index=True)

result_table_hom.set_index('Models', inplace=True)

#Plot roc curves
plot_roc_curve(result_table_hom)

In [None]:
All_models=models+Het_en+Hom_en
All_models_names=B_models_name+Het_en_names+Hom_en_names

# Predictions

In [None]:
predict_data=pd.DataFrame()

# Iterate through the list of models and add predictions to the DataFrame
for name, model in zip(All_models_names, All_models):
    # Predict probabilities 
    probs = model.predict_proba(predict_features)
    preds = probs[:, 1]  # Extract probabilities
    
    # Add the predictions as a new column in the DataFrame
    predict_data[name] = preds

# Now, predict_data contains columns with predictions from each model

# Wilcoxon analysis

In [None]:
# Create an empty dictionary to store the results
wilcoxon_results = {}

# Iterate through pairs of models
for i in range(len(All_models)):
    for j in range(i + 1, len(All_models)):
        model1_name = All_models_names[i]
        model2_name = All_models_names[j]
        
        # Get the predictions of the two models to be compared
        predictions_model1 = predict_data[model1_name]
        predictions_model2 = predict_data[model2_name]
        
        # Perform the Wilcoxon signed-rank test
        _, p_value = stats.wilcoxon(predictions_model1, predictions_model2)
        
        # Store the p-value in the results dictionary
        pair_name = f"{model1_name} vs {model2_name}"
        wilcoxon_results[pair_name] = p_value

# Create a DataFrame from the results dictionary
wilcoxon_results_df = pd.DataFrame(list(wilcoxon_results.items()), columns=['Model Pair', 'p-value'])


# Sensitivity analysis

In [None]:
# Assuming you have a list of best models named 'best_models'

for model in best_models:
    # Perform sensitivity analysis and obtain the results DataFrame
    SA_results = FSA_pfi(model, X, y)  # Assuming you have defined the FSA_pfi function

    # Plot the sensitivity results
    plot_sensitivity_results(SA_results)

# Taylor Diagram 

In [None]:
for name, model in zip(All_models_names, All_models):
    # Predict probabilities 
    probs = model.predict_proba(X_test)
    preds = probs[:, 1] 
    predict_data[name] = preds

Taylor_data = {}

for i, model in enumerate(All_models_names):
    Taylor_data[model] = X_test[:, i]
    
Coov=np.cov(predict_data)
cov = Coov 
observations =  y_test
simulations =  Taylor_data

taylor_plot(observations=observations,
            simulations=simulations,plot_bias=True,grid_kws={'axis': 'x', 'color': 'black', 'lw': 0.5},
                           leg_kws={'facecolor': 'white',
                'edgecolor': 'black','bbox_to_anchor':(1.15, 0.8),
                'fontsize': 14, 'labelspacing': 1.0},)