In [1]:
"""
@author: MD Gufran Alam; Vaibhav Tripathi; Mohit Prakash Mohanty
"""


# Import important libraris

import pandas as pd
import numpy as np
import scikeras
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, recall_score, precision_score, cohen_kappa_score, confusion_matrix, roc_auc_score, roc_curve
from skopt import BayesSearchCV
from keras.models import Sequential
from scikeras.wrappers import KerasClassifier
from keras.layers import LSTM, Dense
from sklearn.ensemble import VotingClassifier
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier

#All the required preprocessing was done in GIS softwares

#Load the data in csv format 
data = pd.read_csv('file_path')
X, Y = data.drop('target_column', axis=1), data['target_column']

# Step 2: Split the extracted subset into training and testing datasets
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, stratify=Y, random_state=42)

# Information Gain Ratio (IGR)
from sklearn.feature_selection import mutual_info_classif
information_gain = mutual_info_classif(X_train, Y_train)
# Display Information Gain Ratio values
igr_values = pd.DataFrame({'Features': X_train.columns, 'Information Gain Ratio': information_gain})
sorted_igr_values = igr_values.sort_values(by='Information Gain Ratio', ascending=False)

# Create a bar plot
plt.figure(figsize=(8, 4))
sns.barplot(x='Features', y='Information Gain Ratio', data=sorted_igr_values, color='blue')
plt.title('Information Gain Ratio (IGR) for Flood Conditioning Factors')
plt.xlabel('Flood Conditioning Factors')
plt.ylabel('Information Gain Ratio Values')
plt.xticks(rotation=45, ha='right')
plt.ylim(0.0, 0.5)  # Set y-axis limits to display values from 0.0 to 0.5
plt.tight_layout()
plt.savefig('file_name.jpg', dpi =1000, bbox_inches ='tight')
plt.show()

from statsmodels.stats.outliers_influence import variance_inflation_factor
from sklearn.feature_selection import VarianceThreshold
# Calculate Pearson's correlation coefficient
correlation_matrix = data.corr()
# Display Pearson's correlation coefficient matrix
print("Correlation Coefficient Matrix:")

# Create a heatmap of the correlation matrix
mask = np.triu(np.ones_like(correlation_matrix, dtype=bool))
plt.figure(figsize=(18, 14))
annot_kws = {"size": 11, "weight": "bold"}  # Change the font size of numeric values
cbar_kws = {"shrink": 1}  # Adjust the size of the colorbar
# Plot the heatmap with modified parameters
heatmap = sns.heatmap(correlation_matrix, mask=mask, annot=True, cmap="RdBu", vmin=-1, vmax=1, annot_kws=annot_kws, cbar_kws=cbar_kws)

# Loop through the text elements to format their values
for text in heatmap.texts:
    text.set_text(f'{float(text.get_text()):.2f}')  # Format text to display only three decimal places
plt.title("Pearson's Correlation Coefficient Matrix", fontsize=22, fontweight='bold')
plt.xlabel("Flood Conditioning Factors", fontsize=20, fontweight='bold')
plt.ylabel("Flood Conditioning Factors", fontsize=20, fontweight='bold')
plt.savefig('plot.jpg', dpi=1800, bbox_inches='tight')
plt.show()

# Perform Variance Inflation Factor (VIF) calculation
vif_data_new = pd.DataFrame()
vif_data_new["Features"] = X_train.columns
vif_data_new["VIF"] = [variance_inflation_factor(X_train.values, i) for i in range(X_train.shape[1])]
# Display VIF values
vif_data_new.sort_values(by='VIF', ascending=True, inplace=True)
vif_data_new

# Initialize the StandardScaler
scaler = StandardScaler()
# Fit the scaler on the training data and transform both the training and testing data
X_train_scaled = scaler.fit_transform(X_train_sf)
X_test_scaled = scaler.transform(X_test_sf)
# Reshape the data to (n_samples, seq_length, n_features)
Reshaped_X_train = X_train_scaled.reshape(X_train_scaled.shape[0], X_train_scaled.shape[1], 1)
Reshaped_X_test = X_test_scaled.reshape(X_test_scaled.shape[0], X_test_scaled.shape[1], 1)



# Define LSTM Model
def create_lstm_model(input_shape):
    model = Sequential([
        LSTM(64, activation='relu', input_shape=input_shape, return_sequences=True),
        LSTM(32, activation='relu'),
        Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

# Bayesian Hyperparameter Tuning for LSTM
lstm_classifier = KerasClassifier(
    model=create_lstm_model,
    input_shape=(X_train_scaled.shape[1], 1),
    epochs=50,
    batch_size=32,
    verbose=0
)

lstm_search = BayesSearchCV(
    lstm_classifier,
    {
        'model__lstm_1__units': (32, 128),
        'model__lstm_2__units': (16, 64),
        'batch_size': (16, 64),
        'model__learning_rate': (0.001, 0.01, 'log-uniform'),
        'epochs': (5, 30),
    },
    n_iter=30,
    random_state=42,
    cv=3
)

# Reshape input for LSTM
X_train_lstm = X_train_scaled.reshape(X_train_scaled.shape[0], X_train_scaled.shape[1], 1)
X_test_lstm = X_test_scaled.reshape(X_test_scaled.shape[0], X_test_scaled.shape[1], 1)

lstm_search.fit(X_train_lstm, Y_train)
best_lstm_model = lstm_search.best_estimator_

# Bayesian Hyperparameter Tuning for RandomForestClassifier
rf_search = BayesSearchCV(
    RandomForestClassifier(random_state=42),
    {
        'n_estimators': (50, 500),
        'max_depth': (3, 20),
        'min_samples_split': (2, 20),
        'min_samples_leaf': (1, 10)
    },
    n_iter=30,
    random_state=42,
    cv=3
)
rf_search.fit(X_train_scaled, Y_train)
best_rf_model = rf_search.best_estimator_

# Bayesian Hyperparameter Tuning for XGBClassifier
xgb_search = BayesSearchCV(
    XGBClassifier(random_state=42, use_label_encoder=False, eval_metric='logloss'),
    {
        'n_estimators': (50, 500),
        'max_depth': (3, 20),
        'learning_rate': (0.01, 0.3, 'log-uniform'),
        'subsample': (0.5, 1.0),
        'colsample_bytree': (0.5, 1.0)
    },
    n_iter=30,
    random_state=42,
    cv=3
)
xgb_search.fit(X_train_scaled, Y_train)
best_xgb_model = xgb_search.best_estimator_

# Bayesian Hyperparameter Tuning for CatBoostClassifier
cat_search = BayesSearchCV(
    CatBoostClassifier(verbose=0, random_state=42),
    {
        'iterations': (100, 500),
        'depth': (3, 10),
        'learning_rate': (0.01, 0.3, 'log-uniform'),
        'l2_leaf_reg': (1, 10)
    },
    n_iter=30,
    random_state=42,
    cv=3
)
cat_search.fit(X_train_scaled, Y_train)
best_cat_model = cat_search.best_estimator_

# Bayesian Hyperparameter Tuning for Meta-Learner (XGBClassifier)
meta_search = BayesSearchCV(
    XGBClassifier(random_state=42, use_label_encoder=False, eval_metric='logloss'),
    {
        'n_estimators': (50, 500),
        'max_depth': (3, 20),
        'learning_rate': (0.01, 0.3, 'log-uniform'),
        'subsample': (0.5, 1.0),
        'colsample_bytree': (0.5, 1.0)
    },
    n_iter=30,
    random_state=42,
    cv=3
)
meta_search.fit(X_train_scaled, Y_train)
best_meta_model = meta_search.best_estimator_

# Define Base Models for Stacking
base_models = [
    ('rf', best_rf_model),
    ('xgb', best_xgb_model),
    ('cb', best_cat_model),
    ('lstm', best_lstm_model)
]

# Stacking Classifier with Optimized Meta-Learner
stacked_model = StackingClassifier(estimators=base_models, final_estimator=best_meta_model)
stacked_model.fit(X_train_scaled, Y_train)

# -------------------
# Evaluate Models
models = {
    'Stacked Model': stacked_model,
    'Random Forest': best_rf_model,
    'XGBoost': best_xgb_model,
    'CatBoost': best_cat_model,
    'LSTM': best_lstm_model
}
results = {}

for name, model in models.items():
    predictions = model.predict(X_test_scaled if name != 'LSTM' else X_test_lstm)
    results[name] = {
        'Accuracy': accuracy_score(Y_test, predictions),
        'Precision': precision_score(Y_test, predictions),
        'Recall': recall_score(Y_test, predictions),
        'F1 Score': f1_score(Y_test, predictions),
        'Cohen Kappa': cohen_kappa_score(Y_test, predictions)
    }

results_df = pd.DataFrame(results).T
print("Evaluation Metrics:\n", results_df)


from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# Initialize an empty dictionary to store confusion matrices
confusion_matrices = {}

# Iterate over models to calculate confusion matrices
for name, model in models.items():
    predictions = model.predict(X_test_scaled if name != 'LSTM' else X_test_lstm)
    cm = confusion_matrix(Y_test, predictions)
    confusion_matrices[name] = cm  # Store the confusion matrix
    
    # Plot each confusion matrix
    plt.figure(figsize=(6, 4))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=['Not Flooded', 'Flooded'], 
                yticklabels=['Not Flooded', 'Flooded'])
    plt.title(f'Confusion Matrix - {name}')
    plt.ylabel('Actual')
    plt.xlabel('Predicted')
    plt.tight_layout()
    plt.savefig(f'{name}_confusion_matrix.jpg', dpi=300, bbox_inches='tight')
    plt.show()

# Print confusion matrices for quick reference
for name, cm in confusion_matrices.items():
    print(f"\nConfusion Matrix for {name}:\n{cm}")


# Predict probabilities using the modelS
rf_test_pred_prob = best_rf_model.predict_proba(Std_scaled_test)[:,1]
xgb_test_pred_prob = best_xgb_model.predict_proba(Std_scaled_test)[:,1]
cb_test_pred_prob = best_cb_model.predict_proba(Std_scaled_test)[:,1]
ensemble_test_pred_prob = ensemble_model_new.predict_proba(Std_scaled_test)[:, 1]

#Reshape the output to actual spatial grid
susceptibility_map = ensemble_test_pred_prob.reshape(n_rows, n_cols) #We got ensemble_test_pred_prob as the best model based on performance metrics
# Visualize the susceptibility map
plt.figure(figsize=(10, 8))
plt.imshow(susceptibility_map, cmap='RdYlGn', interpolation='none')
plt.colorbar(label='Susceptibility Score')
plt.title('Flood Susceptibility Map')
plt.xlabel('X Coordinate')
plt.ylabel('Y Coordinate')
plt.savefig('flood_susceptibility_map.jpg', dpi=300)
plt.show()


import shap
from shap import summary_plot

# Initialize JS visualization for SHAP
shap.initjs()
# Prepare a dictionary to store SHAP values and explainers for each model
shap_results = {}
# Define a list of models for which SHAP analysis will be performed
model_names = ['Random Forest', 'XGBoost', 'CatBoost', 'LSTM', 'Stacked Model']
models_dict = {
    'Random Forest': best_rf_model,
    'XGBoost': best_xgb_model,
    'CatBoost': best_cat_model,
    'LSTM': best_lstm_model,
    'Stacked Model': stacked_model
}

# SHAP Analysis for Each Model
for name, model in models_dict.items():
    print(f"Generating SHAP values for {name}...")
    
    if name == 'LSTM':  # For LSTM model
        explainer = shap.DeepExplainer(model.model_, X_train_lstm)  # Use DeepExplainer for neural networks
        shap_values = explainer.shap_values(X_test_lstm)
    else:  # For tree-based and stacking models
        explainer = shap.TreeExplainer(model, X_train_scaled)
        shap_values = explainer.shap_values(X_test_scaled)
    
    # Store SHAP results
    shap_results[name] = {'explainer': explainer, 'shap_values': shap_values}
    
    # Generate Summary Plot
    print(f"Creating SHAP summary plot for {name}...")
    summary_plot(shap_values, X_test_scaled if name != 'LSTM' else X_test_lstm, feature_names=X.columns)
    plt.savefig(f'{name}_shap_summary_plot.jpg', dpi=300, bbox_inches='tight')
    plt.show()




