In [None]:
# Standard libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
import time
import warnings
from datetime import datetime
import joblib
from pathlib import Path

# Modeling libraries
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, VotingClassifier, StackingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeClassifier
import xgboost as xgb
import lightgbm as lgb
from catboost import CatBoostClassifier

# Model evaluation
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score, 
                            roc_auc_score, confusion_matrix, classification_report, 
                            precision_recall_curve, roc_curve, average_precision_score)
from sklearn.model_selection import (train_test_split, cross_val_score, 
                                    StratifiedKFold, RandomizedSearchCV, GridSearchCV)

# For addressing class imbalance
from imblearn.over_sampling import SMOTE, ADASYN
from imblearn.under_sampling import RandomUnderSampler
from imblearn.combine import SMOTEENN, SMOTETomek
from imblearn.pipeline import Pipeline as ImbPipeline

# For model explanation
import shap
from sklearn.inspection import permutation_importance
from pdpbox import pdp, info_plots

# Visualization
import matplotlib.cm as cm
from matplotlib.colors import ListedColormap, Normalize
from mpl_toolkits.axes_grid1 import make_axes_locatable

# Set visualization parameters
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_palette('viridis')
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['font.size'] = 12
%matplotlib inline

# Display all columns
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', 100)

# Set random seed for reproducibility
RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)

# Ignore warnings
warnings.filterwarnings('ignore')

In [None]:
# Define helper functions for model evaluation and visualization

def evaluate_model(y_true, y_pred, y_pred_proba=None, model_name="Model"):
    """
    Evaluate a classification model using various metrics.
    
    Parameters:
    -----------
    y_true : array-like
        True labels
    y_pred : array-like
        Predicted labels
    y_pred_proba : array-like, optional
        Predicted probabilities for the positive class
    model_name : str, default="Model"
        Name of the model for display purposes
    
    Returns:
    --------
    dict
        Dictionary containing evaluation metrics
    """
    # Calculate metrics
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred)
    recall = recall_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred)
    
    # Calculate AUC if probabilities are provided
    auc_roc = None
    avg_precision = None
    if y_pred_proba is not None:
        auc_roc = roc_auc_score(y_true, y_pred_proba)
        avg_precision = average_precision_score(y_true, y_pred_proba)
    
    # Create confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    
    # Print results
    print(f"Evaluation Results for {model_name}:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    
    if auc_roc is not None:
        print(f"AUC-ROC: {auc_roc:.4f}")
        print(f"Average Precision: {avg_precision:.4f}")
    
    print("\nConfusion Matrix:")
    print(cm)
    print("\nClassification Report:")
    print(classification_report(y_true, y_pred))
    
    # Return metrics as dictionary
    metrics = {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'confusion_matrix': cm
    }
    
    if auc_roc is not None:
        metrics['auc_roc'] = auc_roc
        metrics['avg_precision'] = avg_precision
    
    return metrics

def plot_confusion_matrix(cm, classes=None, normalize=False, title='Confusion Matrix', cmap=plt.cm.Blues):
    """
    Plot a confusion matrix.
    
    Parameters:
    -----------
    cm : array-like
        Confusion matrix
    classes : list, optional
        List of class names
    normalize : bool, default=False
        Whether to normalize the confusion matrix
    title : str, default='Confusion Matrix'
        Title for the plot
    cmap : matplotlib colormap, default=plt.cm.Blues
        Colormap for the plot
    """
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized Confusion Matrix")
    else:
        print('Confusion Matrix, without normalization')

    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title, fontsize=16)
    plt.colorbar()

    if classes is None:
        classes = ['Not Default', 'Default']

    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, format(cm[i, j], fmt),
                     horizontalalignment="center",
                     color="white" if cm[i, j] > thresh else "black")

    plt.ylabel('True Label', fontsize=14)
    plt.xlabel('Predicted Label', fontsize=14)
    plt.tight_layout()
    plt.show()

def plot_roc_curve(y_true, y_pred_probas, model_names):
    """
    Plot ROC curves for multiple models.
    
    Parameters:
    -----------
    y_true : array-like
        True labels
    y_pred_probas : list of array-like
        List of predicted probabilities for each model
    model_names : list
        List of model names
    """
    plt.figure(figsize=(10, 8))
    
    for i, (y_pred_proba, model_name) in enumerate(zip(y_pred_probas, model_names)):
        fpr, tpr, _ = roc_curve(y_true, y_pred_proba)
        auc = roc_auc_score(y_true, y_pred_proba)
        plt.plot(fpr, tpr, lw=2, label=f'{model_name} (AUC = {auc:.4f})')
    
    # Plot random guessing line
    plt.plot([0, 1], [0, 1], color='gray', lw=2, linestyle='--', label='Random Guessing')
    
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate', fontsize=14)
    plt.ylabel('True Positive Rate', fontsize=14)
    plt.title('ROC Curve Comparison', fontsize=16)
    plt.legend(loc="lower right")
    plt.grid(True)
    plt.show()

def plot_precision_recall_curve(y_true, y_pred_probas, model_names):
    """
    Plot Precision-Recall curves for multiple models.
    
    Parameters:
    -----------
    y_true : array-like
        True labels
    y_pred_probas : list of array-like
        List of predicted probabilities for each model
    model_names : list
        List of model names
    """
    plt.figure(figsize=(10, 8))
    
    for i, (y_pred_proba, model_name) in enumerate(zip(y_pred_probas, model_names)):
        precision, recall, _ = precision_recall_curve(y_true, y_pred_proba)
        avg_precision = average_precision_score(y_true, y_pred_proba)
        plt.plot(recall, precision, lw=2, label=f'{model_name} (AP = {avg_precision:.4f})')
    
    # Calculate baseline based on proportion of positive class
    baseline = np.sum(y_true) / len(y_true)
    plt.axhline(y=baseline, color='gray', linestyle='--', label=f'Baseline (AP = {baseline:.4f})')
    
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('Recall', fontsize=14)
    plt.ylabel('Precision', fontsize=14)
    plt.title('Precision-Recall Curve Comparison', fontsize=16)
    plt.legend(loc="best")
    plt.grid(True)
    plt.show()

def plot_feature_importance(feature_importance, feature_names, title="Feature Importance", top_n=20):
    """
    Plot feature importances.
    
    Parameters:
    -----------
    feature_importance : array-like
        Feature importance scores
    feature_names : list
        List of feature names
    title : str, default="Feature Importance"
        Title for the plot
    top_n : int, default=20
        Number of top features to display
    """
    # Create DataFrame for easier handling
    importance_df = pd.DataFrame({
        'Feature': feature_names,
        'Importance': feature_importance
    }).sort_values('Importance', ascending=False)
    
    # Select top N features
    if len(importance_df) > top_n:
        importance_df = importance_df.head(top_n)
    
    plt.figure(figsize=(12, 8))
    sns.barplot(x='Importance', y='Feature', data=importance_df)
    plt.title(title, fontsize=16)
    plt.xlabel('Importance', fontsize=14)
    plt.ylabel('Feature', fontsize=14)
    plt.tight_layout()
    plt.show()
    
    return importance_df

def compare_models(models_metrics, metric_names=None):
    """
    Compare multiple models across various metrics.
    
    Parameters:
    -----------
    models_metrics : dict
        Dictionary with model names as keys and metric dictionaries as values
    metric_names : list, optional
        List of metrics to compare (defaults to accuracy, precision, recall, f1, auc_roc)
    
    Returns:
    --------
    pandas DataFrame
        DataFrame comparing models across metrics
    """
    if metric_names is None:
        metric_names = ['accuracy', 'precision', 'recall', 'f1', 'auc_roc', 'avg_precision']
    
    # Create DataFrame for comparison
    comparison_data = []
    
    for model_name, metrics in models_metrics.items():
        model_data = {'Model': model_name}
        
        for metric in metric_names:
            if metric in metrics:
                model_data[metric] = metrics[metric]
            else:
                model_data[metric] = None
        
        comparison_data.append(model_data)
    
    comparison_df = pd.DataFrame(comparison_data)
    
    # Plot comparison
    plt.figure(figsize=(14, 10))
    
    for i, metric in enumerate(metric_names):
        if metric in ['confusion_matrix']:  # Skip non-numeric metrics
            continue
            
        if all(comparison_df[metric].notna()):  # Only plot if all models have this metric
            plt.subplot(2, 3, i+1)
            sns.barplot(x='Model', y=metric, data=comparison_df)
            plt.title(f'Comparison by {metric.replace("_", " ").title()}')
            plt.xticks(rotation=45, ha='right')
            plt.ylim(0, 1)  # Most metrics are between 0 and 1
    
    plt.tight_layout()
    plt.show()
    
    return comparison_df.sort_values('f1', ascending=False)

def threshold_optimization(y_true, y_pred_proba, thresholds=None, metric='f1'):
    """
    Find optimal threshold for binary classification based on a specific metric.
    
    Parameters:
    -----------
    y_true : array-like
        True labels
    y_pred_proba : array-like
        Predicted probabilities for the positive class
    thresholds : array-like, optional
        List of thresholds to evaluate (default: 100 values from 0.01 to 0.99)
    metric : str, default='f1'
        Metric to optimize ('accuracy', 'precision', 'recall', 'f1')
    
    Returns:
    --------
    tuple
        Optimal threshold and corresponding metric value
    """
    if thresholds is None:
        thresholds = np.linspace(0.01, 0.99, 100)
    
    best_threshold = 0.5  # Default threshold
    best_metric_value = 0
    
    results = {'threshold': [], 'value': []}
    
    for threshold in thresholds:
        y_pred = (y_pred_proba >= threshold).astype(int)
        
        if metric == 'accuracy':
            value = accuracy_score(y_true, y_pred)
        elif metric == 'precision':
            value = precision_score(y_true, y_pred)
        elif metric == 'recall':
            value = recall_score(y_true, y_pred)
        elif metric == 'f1':
            value = f1_score(y_true, y_pred)
        else:
            raise ValueError(f"Unsupported metric: {metric}")
        
        results['threshold'].append(threshold)
        results['value'].append(value)
        
        if value > best_metric_value:
            best_metric_value = value
            best_threshold = threshold
    
    # Plot threshold vs. metric
    plt.figure(figsize=(10, 6))
    plt.plot(results['threshold'], results['value'])
    plt.axvline(x=best_threshold, color='r', linestyle='--', 
                label=f'Best Threshold = {best_threshold:.2f}')
    plt.axhline(y=best_metric_value, color='g', linestyle='--', 
                label=f'Best {metric} = {best_metric_value:.4f}')
    plt.xlabel('Threshold', fontsize=14)
    plt.ylabel(f'{metric.capitalize()} Score', fontsize=14)
    plt.title(f'Threshold Optimization for {metric.capitalize()}', fontsize=16)
    plt.legend()
    plt.grid(True)
    plt.show()
    
    return best_threshold, best_metric_value

In [None]:
# Load the processed dataset from the feature engineering notebook
selected_features_path = "../data/processed/kenyan_loan_default_engineered_selected.csv"
df = pd.read_csv(selected_features_path)

print(f"Dataset loaded with shape: {df.shape}")
df.head()

# Check for any missing values
missing_values = df.isnull().sum()
missing_values = missing_values[missing_values > 0]
if len(missing_values) > 0:
    print("\nColumns with missing values:")
    print(missing_values)
else:
    print("\nNo missing values found in the dataset.")

# Display basic statistics
print("\nBasic statistics of the dataset:")
df.describe().T

# Check class distribution (target variable)
target_counts = df['defaulted'].value_counts(normalize=True) * 100
print("\nClass distribution:")
print(target_counts)

plt.figure(figsize=(8, 6))
sns.countplot(x='defaulted', data=df, palette=['#2ecc71', '#e74c3c'])
plt.title('Class Distribution of Default Status', fontsize=16)
plt.xlabel('Default Status (0 = No Default, 1 = Default)')
plt.ylabel('Count')

# Add percentages to the bars
for i, percentage in enumerate(target_counts):
    count = df['defaulted'].value_counts()[i]
    plt.text(i, count + 30, f'{percentage:.1f}%', ha='center', fontsize=12)

plt.tight_layout()
plt.show()

# Separate features and target
X = df.drop('defaulted', axis=1)
y = df['defaulted']

# Get feature names for later use
feature_names = X.columns.tolist()

print(f"Number of features: {len(feature_names)}")
print(f"Number of samples: {len(y)}")
print(f"Class distribution: {np.bincount(y)}")

In [None]:
# Split data into training and testing sets with stratification to maintain class distribution
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y
)

print(f"Training set shape: {X_train.shape}, Testing set shape: {X_test.shape}")
print(f"Training set class distribution: {np.bincount(y_train)}")
print(f"Testing set class distribution: {np.bincount(y_test)}")

In [None]:
# Check imbalance ratio
imbalance_ratio = np.bincount(y_train)[0] / np.bincount(y_train)[1]
print(f"Class imbalance ratio (majority:minority): {imbalance_ratio:.2f}:1")

# Function to apply resampling
def apply_resampling(X, y, method='smote', sampling_strategy=0.5):
    """
    Apply resampling to handle class imbalance.
    
    Parameters:
    -----------
    X : array-like
        Feature matrix
    y : array-like
        Target vector
    method : str, default='smote'
        Resampling method: 'smote', 'adasyn', 'undersampling', 'smoteenn', 'smotetomek'
    sampling_strategy : float or str, default=0.5
        Ratio of minority to majority class after resampling
        (if float), or 'auto'/'all' for specific strategies
    
    Returns:
    --------
    tuple
        Resampled feature matrix and target vector
    """
    if method == 'smote':
        resampler = SMOTE(sampling_strategy=sampling_strategy, random_state=RANDOM_STATE)
    elif method == 'adasyn':
        resampler = ADASYN(sampling_strategy=sampling_strategy, random_state=RANDOM_STATE)
    elif method == 'undersampling':
        resampler = RandomUnderSampler(sampling_strategy=sampling_strategy, random_state=RANDOM_STATE)
    elif method == 'smoteenn':
        resampler = SMOTEENN(sampling_strategy=sampling_strategy, random_state=RANDOM_STATE)
    elif method == 'smotetomek':
        resampler = SMOTETomek(sampling_strategy=sampling_strategy, random_state=RANDOM_STATE)
    else:
        raise ValueError(f"Unsupported resampling method: {method}")
    
    X_resampled, y_resampled = resampler.fit_resample(X, y)
    
    print(f"Original shape: {X.shape}, Resampled shape: {X_resampled.shape}")
    print(f"Original class distribution: {np.bincount(y)}")
    print(f"Resampled class distribution: {np.bincount(y_resampled)}")
    
    return X_resampled, y_resampled

# Apply SMOTE to handle class imbalance in the training set
X_train_resampled, y_train_resampled = apply_resampling(X_train, y_train, method='smote', sampling_strategy=0.8)

# Visualize class distribution before and after resampling
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

# Before resampling
sns.countplot(x=y_train, ax=ax1, palette=['#2ecc71', '#e74c3c'])
ax1.set_title('Class Distribution Before Resampling')
ax1.set_xlabel('Default Status')
ax1.set_ylabel('Count')

# After resampling
sns.countplot(x=y_train_resampled, ax=ax2, palette=['#2ecc71', '#e74c3c'])
ax2.set_title('Class Distribution After Resampling')
ax2.set_xlabel('Default Status')
ax2.set_ylabel('Count')

plt.tight_layout()
plt.show()

In [None]:
# Train a logistic regression model as a baseline
log_reg = LogisticRegression(random_state=RANDOM_STATE, max_iter=1000, 
                             class_weight='balanced', solver='liblinear')
log_reg.fit(X_train_resampled, y_train_resampled)

# Make predictions
y_pred_log_reg = log_reg.predict(X_test)
y_pred_proba_log_reg = log_reg.predict_proba(X_test)[:, 1]  # Probability for the positive class

# Evaluate the model
log_reg_metrics = evaluate_model(y_test, y_pred_log_reg, y_pred_proba_log_reg, "Logistic Regression")

# Plot confusion matrix
plot_confusion_matrix(log_reg_metrics['confusion_matrix'], 
                      classes=['Not Default', 'Default'], 
                      title='Logistic Regression Confusion Matrix')

# Analyze feature importance (coefficients)
if hasattr(log_reg, 'coef_'):
    coef = log_reg.coef_[0]
    log_reg_importance = np.abs(coef)  # Take absolute value for importance
    log_reg_importance_df = plot_feature_importance(log_reg_importance, feature_names, 
                                                  "Logistic Regression Feature Importance (Absolute Coefficients)")

In [None]:
# Train a decision tree model
dt = DecisionTreeClassifier(random_state=RANDOM_STATE, class_weight='balanced')
dt.fit(X_train_resampled, y_train_resampled)

# Make predictions
y_pred_dt = dt.predict(X_test)
y_pred_proba_dt = dt.predict_proba(X_test)[:, 1]

# Evaluate the model
dt_metrics = evaluate_model(y_test, y_pred_dt, y_pred_proba_dt, "Decision Tree")

# Plot confusion matrix
plot_confusion_matrix(dt_metrics['confusion_matrix'], 
                      classes=['Not Default', 'Default'], 
                      title='Decision Tree Confusion Matrix')

# Analyze feature importance
if hasattr(dt, 'feature_importances_'):
    dt_importance_df = plot_feature_importance(dt.feature_importances_, feature_names, 
                                            "Decision Tree Feature Importance")

In [None]:
# Train a random forest model
rf = RandomForestClassifier(n_estimators=100, random_state=RANDOM_STATE, class_weight='balanced')
rf.fit(X_train_resampled, y_train_resampled)

# Make predictions
y_pred_rf = rf.predict(X_test)
y_pred_proba_rf = rf.predict_proba(X_test)[:, 1]

# Evaluate the model
rf_metrics = evaluate_model(y_test, y_pred_rf, y_pred_proba_rf, "Random Forest")

# Plot confusion matrix
plot_confusion_matrix(rf_metrics['confusion_matrix'], 
                      classes=['Not Default', 'Default'], 
                      title='Random Forest Confusion Matrix')

# Analyze feature importance
if hasattr(rf, 'feature_importances_'):
    rf_importance_df = plot_feature_importance(rf.feature_importances_, feature_names, 
                                            "Random Forest Feature Importance")

In [None]:
# Train a gradient boosting model
gb = GradientBoostingClassifier(n_estimators=100, random_state=RANDOM_STATE)
gb.fit(X_train_resampled, y_train_resampled)

# Make predictions
y_pred_gb = gb.predict(X_test)
y_pred_proba_gb = gb.predict_proba(X_test)[:, 1]

# Evaluate the model
gb_metrics = evaluate_model(y_test, y_pred_gb, y_pred_proba_gb, "Gradient Boosting")

# Plot confusion matrix
plot_confusion_matrix(gb_metrics['confusion_matrix'], 
                      classes=['Not Default', 'Default'], 
                      title='Gradient Boosting Confusion Matrix')

# Analyze feature importance
if hasattr(gb, 'feature_importances_'):
    gb_importance_df = plot_feature_importance(gb.feature_importances_, feature_names, 
                                            "Gradient Boosting Feature Importance")

In [None]:
# Compare the baseline models
baseline_models_metrics = {
    'Logistic Regression': log_reg_metrics,
    'Decision Tree': dt_metrics,
    'Random Forest': rf_metrics,
    'Gradient Boosting': gb_metrics
}

baseline_comparison = compare_models(baseline_models_metrics)
print("Baseline Models Comparison:")
print(baseline_comparison)

# Plot ROC curves
baseline_y_pred_probas = [y_pred_proba_log_reg, y_pred_proba_dt, 
                         y_pred_proba_rf, y_pred_proba_gb]
baseline_model_names = ['Logistic Regression', 'Decision Tree', 
                       'Random Forest', 'Gradient Boosting']

plot_roc_curve(y_test, baseline_y_pred_probas, baseline_model_names)

# Plot Precision-Recall curves
plot_precision_recall_curve(y_test, baseline_y_pred_probas, baseline_model_names)

In [None]:
# Train an XGBoost model
xgb_model = xgb.XGBClassifier(random_state=RANDOM_STATE, use_label_encoder=False, eval_metric='logloss')
xgb_model.fit(X_train_resampled, y_train_resampled)

# Make predictions
y_pred_xgb = xgb_model.predict(X_test)
y_pred_proba_xgb = xgb_model.predict_proba(X_test)[:, 1]

# Evaluate the model
xgb_metrics = evaluate_model(y_test, y_pred_xgb, y_pred_proba_xgb, "XGBoost")

# Plot confusion matrix
plot_confusion_matrix(xgb_metrics['confusion_matrix'], 
                      classes=['Not Default', 'Default'], 
                      title='XGBoost Confusion Matrix')

# Analyze feature importance
if hasattr(xgb_model, 'feature_importances_'):
    xgb_importance_df = plot_feature_importance(xgb_model.feature_importances_, feature_names, 
                                             "XGBoost Feature Importance")

In [None]:
# Train a LightGBM model
lgb_model = lgb.LGBMClassifier(random_state=RANDOM_STATE, class_weight='balanced')
lgb_model.fit(X_train_resampled, y_train_resampled)

# Make predictions
y_pred_lgb = lgb_model.predict(X_test)
y_pred_proba_lgb = lgb_model.predict_proba(X_test)[:, 1]

# Evaluate the model
lgb_metrics = evaluate_model(y_test, y_pred_lgb, y_pred_proba_lgb, "LightGBM")

# Plot confusion matrix
plot_confusion_matrix(lgb_metrics['confusion_matrix'], 
                      classes=['Not Default', 'Default'], 
                      title='LightGBM Confusion Matrix')

# Analyze feature importance
if hasattr(lgb_model, 'feature_importances_'):
    lgb_importance_df = plot_feature_importance(lgb_model.feature_importances_, feature_names, 
                                             "LightGBM Feature Importance")

In [None]:
# Train a CatBoost model
catboost_model = CatBoostClassifier(random_state=RANDOM_STATE, verbose=0, 
                                    class_weights={0: 1, 1: imbalance_ratio})
catboost_model.fit(X_train_resampled, y_train_resampled)

# Make predictions
y_pred_catboost = catboost_model.predict(X_test)
y_pred_proba_catboost = catboost_model.predict_proba(X_test)[:, 1]

# Evaluate the model
catboost_metrics = evaluate_model(y_test, y_pred_catboost, y_pred_proba_catboost, "CatBoost")

# Plot confusion matrix
plot_confusion_matrix(catboost_metrics['confusion_matrix'], 
                      classes=['Not Default', 'Default'], 
                      title='CatBoost Confusion Matrix')

# Analyze feature importance
if hasattr(catboost_model, 'feature_importances_'):
    catboost_importance_df = plot_feature_importance(catboost_model.feature_importances_, feature_names, 
                                                  "CatBoost Feature Importance")

In [None]:
# Compare the advanced models
advanced_models_metrics = {
    'XGBoost': xgb_metrics,
    'LightGBM': lgb_metrics,
    'CatBoost': catboost_metrics
}

advanced_comparison = compare_models(advanced_models_metrics)
print("Advanced Models Comparison:")
print(advanced_comparison)

# Plot ROC curves
advanced_y_pred_probas = [y_pred_proba_xgb, y_pred_proba_lgb, y_pred_proba_catboost]
advanced_model_names = ['XGBoost', 'LightGBM', 'CatBoost']

plot_roc_curve(y_test, advanced_y_pred_probas, advanced_model_names)

# Plot Precision-Recall curves
plot_precision_recall_curve(y_test, advanced_y_pred_probas, advanced_model_names)

# Determine the best model for hyperparameter tuning
all_models_metrics = {**baseline_models_metrics, **advanced_models_metrics}
all_models_comparison = compare_models(all_models_metrics)
print("\nAll Models Comparison:")
print(all_models_comparison)

# Select the best model for hyperparameter tuning
best_model_name = all_models_comparison.iloc[0]['Model']
print(f"\nBest model: {best_model_name}")

In [None]:
# Hyperparameter tuning for XGBoost
# Define the parameter grid for RandomizedSearchCV
xgb_param_grid = {
    'n_estimators': [100, 200, 300, 500],
    'max_depth': [3, 4, 5, 6, 8, 10],
    'learning_rate': [0.01, 0.05, 0.1, 0.2],
    'subsample': [0.6, 0.7, 0.8, 0.9, 1.0],
    'colsample_bytree': [0.6, 0.7, 0.8, 0.9, 1.0],
    'min_child_weight': [1, 3, 5, 7],
    'gamma': [0, 0.1, 0.2, 0.3, 0.4],
    'scale_pos_weight': [1, imbalance_ratio]  # Adjust for class imbalance
}

# Set up RandomizedSearchCV
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
xgb_random_search = RandomizedSearchCV(
    estimator=xgb.XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=RANDOM_STATE),
    param_distributions=xgb_param_grid,
    n_iter=50,  # Number of parameter settings sampled
    scoring='f1',  # Optimize for F1 score
    cv=cv,
    verbose=1,
    random_state=RANDOM_STATE,
    n_jobs=-1  # Use all available cores
)

# Fit RandomizedSearchCV
print("Starting hyperparameter tuning for XGBoost...")
start_time = time.time()
xgb_random_search.fit(X_train, y_train)  # Use original training data, not resampled
end_time = time.time()
print(f"Hyperparameter tuning completed in {(end_time - start_time) / 60:.2f} minutes")

# Get best parameters and best score
print(f"Best parameters: {xgb_random_search.best_params_}")
print(f"Best cross-validation score: {xgb_random_search.best_score_:.4f}")

# Train the best model
best_xgb_model = xgb.XGBClassifier(
    **xgb_random_search.best_params_,
    random_state=RANDOM_STATE,
    use_label_encoder=False,
    eval_metric='logloss'
)
best_xgb_model.fit(X_train_resampled, y_train_resampled)

# Make predictions
y_pred_best_xgb = best_xgb_model.predict(X_test)
y_pred_proba_best_xgb = best_xgb_model.predict_proba(X_test)[:, 1]

# Evaluate the tuned model
best_xgb_metrics = evaluate_model(y_test, y_pred_best_xgb, y_pred_proba_best_xgb, "Tuned XGBoost")

# Plot confusion matrix
plot_confusion_matrix(best_xgb_metrics['confusion_matrix'], 
                      classes=['Not Default', 'Default'], 
                      title='Tuned XGBoost Confusion Matrix')

# Analyze feature importance
if hasattr(best_xgb_model, 'feature_importances_'):
    best_xgb_importance_df = plot_feature_importance(best_xgb_model.feature_importances_, feature_names, 
                                                  "Tuned XGBoost Feature Importance")

In [None]:
# Optimize the classification threshold for the best model
best_threshold, best_f1 = threshold_optimization(
    y_test, y_pred_proba_best_xgb, metric='f1'
)

print(f"Optimal threshold: {best_threshold:.4f}, F1 score: {best_f1:.4f}")

# Apply the optimal threshold
y_pred_best_threshold = (y_pred_proba_best_xgb >= best_threshold).astype(int)

# Evaluate with the optimal threshold
best_threshold_metrics = evaluate_model(
    y_test, y_pred_best_threshold, y_pred_proba_best_xgb, 
    f"Tuned XGBoost (Threshold = {best_threshold:.2f})"
)

# Plot confusion matrix with the optimal threshold
plot_confusion_matrix(
    best_threshold_metrics['confusion_matrix'], 
    classes=['Not Default', 'Default'], 
    title=f'Tuned XGBoost Confusion Matrix (Threshold = {best_threshold:.2f})'
)

In [None]:
# Create a voting classifier with the best models
voting_clf = VotingClassifier(
    estimators=[
        ('rf', RandomForestClassifier(n_estimators=100, random_state=RANDOM_STATE, class_weight='balanced')),
        ('xgb', xgb.XGBClassifier(**xgb_random_search.best_params_, random_state=RANDOM_STATE, 
                                 use_label_encoder=False, eval_metric='logloss')),
        ('lgb', lgb.LGBMClassifier(random_state=RANDOM_STATE, class_weight='balanced')),
    ],
    voting='soft'  # Use predicted probabilities for voting
)

# Train the voting classifier
voting_clf.fit(X_train_resampled, y_train_resampled)

# Make predictions
y_pred_voting = voting_clf.predict(X_test)
y_pred_proba_voting = voting_clf.predict_proba(X_test)[:, 1]

# Evaluate the model
voting_metrics = evaluate_model(y_test, y_pred_voting, y_pred_proba_voting, "Voting Classifier")

# Plot confusion matrix
plot_confusion_matrix(voting_metrics['confusion_matrix'], 
                      classes=['Not Default', 'Default'], 
                      title='Voting Classifier Confusion Matrix')

In [None]:
# Create a stacking classifier
base_models = [
    ('rf', RandomForestClassifier(n_estimators=100, random_state=RANDOM_STATE, class_weight='balanced')),
    ('gb', GradientBoostingClassifier(n_estimators=100, random_state=RANDOM_STATE)),
    ('xgb', xgb.XGBClassifier(random_state=RANDOM_STATE, use_label_encoder=False, eval_metric='logloss'))
]

stacking_clf = StackingClassifier(
    estimators=base_models,
    final_estimator=LogisticRegression(random_state=RANDOM_STATE, class_weight='balanced'),
    cv=5,
    stack_method='predict_proba'
)

# Train the stacking classifier
stacking_clf.fit(X_train_resampled, y_train_resampled)

# Make predictions
y_pred_stacking = stacking_clf.predict(X_test)
y_pred_proba_stacking = stacking_clf.predict_proba(X_test)[:, 1]

# Evaluate the model
stacking_metrics = evaluate_model(y_test, y_pred_stacking, y_pred_proba_stacking, "Stacking Classifier")

# Plot confusion matrix
plot_confusion_matrix(stacking_metrics['confusion_matrix'], 
                      classes=['Not Default', 'Default'], 
                      title='Stacking Classifier Confusion Matrix')

In [None]:
# Add ensemble models to the comparison
ensemble_models_metrics = {
    'Voting Classifier': voting_metrics,
    'Stacking Classifier': stacking_metrics,
    'Tuned XGBoost': best_xgb_metrics,
    'Tuned XGBoost (Optimal Threshold)': best_threshold_metrics
}

# Compare all models
final_models_metrics = {**all_models_metrics, **ensemble_models_metrics}
final_comparison = compare_models(final_models_metrics)
print("Final Models Comparison:")
print(final_comparison)

# Plot ROC curves for the final selection of models
final_y_pred_probas = [y_pred_proba_best_xgb, y_pred_proba_voting, y_pred_proba_stacking]
final_model_names = ['Tuned XGBoost', 'Voting Classifier', 'Stacking Classifier']

plot_roc_curve(y_test, final_y_pred_probas, final_model_names)

# Plot Precision-Recall curves
plot_precision_recall_curve(y_test, final_y_pred_probas, final_model_names)

# Determine the best overall model
best_overall_model_name = final_comparison.iloc[0]['Model']
print(f"\nBest overall model: {best_overall_model_name}")

In [None]:
# Select a small subset of the test data for SHAP analysis to keep visualization clear
X_test_sample = X_test.sample(min(100, len(X_test)), random_state=RANDOM_STATE)

# Initialize the SHAP explainer
explainer = shap.Explainer(best_xgb_model)
shap_values = explainer(X_test_sample)

# Summary plot
plt.figure(figsize=(12, 10))
shap.summary_plot(shap_values, X_test_sample, plot_type="bar", show=False)
plt.title("SHAP Feature Importance", fontsize=16)
plt.tight_layout()
plt.show()

# Detailed summary plot showing the impact of features
plt.figure(figsize=(12, 10))
shap.summary_plot(shap_values, X_test_sample, show=False)
plt.title("SHAP Summary Plot", fontsize=16)
plt.tight_layout()
plt.show()

# Dependence plots for top features
top_features = best_xgb_importance_df['Feature'].head(3).tolist()
for feature in top_features:
    plt.figure(figsize=(10, 7))
    shap.dependence_plot(feature, shap_values.values, X_test_sample, show=False)
    plt.title(f"SHAP Dependence Plot for {feature}", fontsize=16)
    plt.tight_layout()
    plt.show()

In [None]:
# Create partial dependence plots for top features
top_features = best_xgb_importance_df['Feature'].head(5).tolist()

# Set up the figure
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
axes = axes.flatten()

for i, feature in enumerate(top_features):
    if i < len(axes):
        # Create PDPs
        pdp_iso = pdp.pdp_isolate(
            model=best_xgb_model,
            dataset=X_test,
            model_features=feature_names,
            feature=feature
        )
        
        # Plot PDPs
        pdp.pdp_plot(pdp_iso, feature, ax=axes[i], center=True)
        axes[i].set_title(f'PDP for {feature}', fontsize=14)

plt.tight_layout()
plt.suptitle('Partial Dependence Plots for Top Features', fontsize=18, y=1.02)
plt.show()

In [None]:
# Analyze interactions between top features
top_2_features = best_xgb_importance_df['Feature'].head(2).tolist()

# Create interaction plot
if len(top_2_features) >= 2:
    pdp_interact = pdp.pdp_interact(
        model=best_xgb_model,
        dataset=X_test,
        model_features=feature_names,
        features=top_2_features
    )
    
    # Plot the interaction
    fig, axes = plt.subplots(1, 2, figsize=(16, 8))
    pdp.pdp_interact_plot(
        pdp_interact=pdp_interact,
        feature_names=top_2_features,
        plot_type='contour',
        ax=axes[0]
    )
    pdp.pdp_interact_plot(
        pdp_interact=pdp_interact,
        feature_names=top_2_features,
        plot_type='grid',
        ax=axes[1]
    )
    
    plt.suptitle(f'Feature Interaction: {top_2_features[0]} and {top_2_features[1]}', fontsize=16)
    plt.tight_layout()
    plt.show()

In [None]:
# Perform cross-validation on the best model
# We'll use the original non-resampled data for proper validation
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)

# Define metrics to evaluate
scoring = {
    'accuracy': 'accuracy',
    'precision': 'precision',
    'recall': 'recall',
    'f1': 'f1',
    'roc_auc': 'roc_auc'
}

# Perform cross-validation
cv_results = cross_val_score(best_xgb_model, X, y, cv=cv, scoring='f1')

print(f"Cross-validation F1 scores: {cv_results}")
print(f"Mean F1 score: {cv_results.mean():.4f}")
print(f"Standard deviation: {cv_results.std():.4f}")

# Visualize cross-validation results
plt.figure(figsize=(10, 6))
plt.bar(range(1, len(cv_results) + 1), cv_results)
plt.axhline(y=cv_results.mean(), color='r', linestyle='--', 
           label=f'Mean F1: {cv_results.mean():.4f}')
plt.xlabel('Fold', fontsize=14)
plt.ylabel('F1 Score', fontsize=14)
plt.title('Cross-Validation F1 Scores', fontsize=16)
plt.xticks(range(1, len(cv_results) + 1))
plt.ylim(0, 1)
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# Define costs and benefits for business impact analysis
# These values would be determined by domain experts and business stakeholders
# For example:
# - Cost of false positive (predicting default when there is none): Lost interest income
# - Cost of false negative (missing a default): Lost principal and collection costs

# Example values (in KES)
cost_false_positive = 10000  # Lost interest income from rejecting a good loan
cost_false_negative = 100000  # Lost principal from approving a bad loan

# Calculate confusion matrix elements from the best model
best_model_cm = best_threshold_metrics['confusion_matrix']
tn, fp, fn, tp = best_model_cm.ravel()

# Calculate total cost
total_cost = (fp * cost_false_positive) + (fn * cost_false_negative)
print(f"Business Costs:")
print(f"False Positives (Rejected Good Loans): {fp} × {cost_false_positive} KES = {fp * cost_false_positive} KES")
print(f"False Negatives (Approved Bad Loans): {fn} × {cost_false_negative} KES = {fn * cost_false_negative} KES")
print(f"Total Cost: {total_cost} KES")

# Calculate savings compared to a baseline (e.g., approving all loans)
baseline_fn = np.sum(y_test)  # All defaults would be missed
baseline_cost = baseline_fn * cost_false_negative
savings = baseline_cost - total_cost
print(f"\nBaseline Cost (Approving All Loans): {baseline_cost} KES")
print(f"Cost with Model: {total_cost} KES")
print(f"Savings: {savings} KES ({savings/baseline_cost*100:.2f}%)")

# Create a visual representation of costs
labels = ['False Positives', 'False Negatives', 'Total Cost', 'Baseline Cost', 'Savings']
values = [
    fp * cost_false_positive, 
    fn * cost_false_negative, 
    total_cost, 
    baseline_cost, 
    savings
]

plt.figure(figsize=(12, 8))
bars = plt.bar(labels, values, color=['orange', 'red', 'purple', 'gray', 'green'])

# Add value labels
for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2., height + 0.1,
             f'{height:,.0f} KES', ha='center', va='bottom', fontsize=12)

plt.title('Business Impact Analysis', fontsize=16)
plt.ylabel('Cost in KES', fontsize=14)
plt.xticks(fontsize=12, rotation=45)
plt.grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.show()

In [None]:
# Create risk segments based on predicted probabilities
# Define risk segments
risk_bins = [0, 0.2, 0.4, 0.6, 0.8, 1.0]
risk_labels = ['Very Low', 'Low', 'Medium', 'High', 'Very High']

# Calculate actual default rates within each predicted risk segment
risk_df = pd.DataFrame({
    'predicted_proba': y_pred_proba_best_xgb,
    'actual': y_test.values
})

risk_df['risk_segment'] = pd.cut(risk_df['predicted_proba'], bins=risk_bins, labels=risk_labels)

# Calculate default rate by risk segment
risk_analysis = risk_df.groupby('risk_segment').agg(
    count=('actual', 'count'),
    default_rate=('actual', 'mean'),
    default_count=('actual', 'sum')
)

# Calculate the percentage of the portfolio in each segment
risk_analysis['portfolio_percentage'] = risk_analysis['count'] / len(risk_df) * 100

print("Risk Segmentation Analysis:")
print(risk_analysis)

# Visualize risk segments
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 8))

# Default rate by risk segment
sns.barplot(x=risk_analysis.index, y=risk_analysis['default_rate'], ax=ax1, palette='YlOrRd')
ax1.set_title('Default Rate by Risk Segment', fontsize=16)
ax1.set_xlabel('Risk Segment', fontsize=14)
ax1.set_ylabel('Default Rate', fontsize=14)
ax1.grid(axis='y', alpha=0.3)

# Add percentage labels
for i, v in enumerate(risk_analysis['default_rate']):
    ax1.text(i, v + 0.02, f'{v:.1%}', ha='center', fontsize=12)

# Portfolio distribution by risk segment
sns.barplot(x=risk_analysis.index, y=risk_analysis['portfolio_percentage'], ax=ax2, palette='Blues')
ax2.set_title('Portfolio Distribution by Risk Segment', fontsize=16)
ax2.set_xlabel('Risk Segment', fontsize=14)
ax2.set_ylabel('Percentage of Portfolio', fontsize=14)
ax2.grid(axis='y', alpha=0.3)

# Add percentage labels
for i, v in enumerate(risk_analysis['portfolio_percentage']):
    ax2.text(i, v + 1, f'{v:.1f}%', ha='center', fontsize=12)

plt.tight_layout()
plt.show()

# Create a more detailed visualization of default distribution
plt.figure(figsize=(12, 8))
sns.histplot(data=risk_df, x='predicted_proba', hue='actual', bins=20, 
             kde=True, stat='probability', common_norm=False)
plt.axvline(x=best_threshold, color='red', linestyle='--', 
           label=f'Optimal Threshold: {best_threshold:.2f}')
plt.title('Distribution of Default Probability Scores', fontsize=16)
plt.xlabel('Predicted Default Probability', fontsize=14)
plt.ylabel('Probability Density', fontsize=14)
plt.legend(['Non-Default', 'Default', 'Optimal Threshold'])
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

In [None]:
# Based on our analysis, select the final model
# Assuming the tuned XGBoost with optimal threshold is the best model
final_model = best_xgb_model

# Create a directory for model artifacts if it doesn't exist
model_dir = "../models"
Path(model_dir).mkdir(parents=True, exist_ok=True)

# Save the model
model_filename = f"{model_dir}/loan_default_prediction_model.pkl"
joblib.dump(final_model, model_filename)
print(f"Final model saved to {model_filename}")

# Save the optimal threshold
threshold_filename = f"{model_dir}/optimal_threshold.pkl"
with open(threshold_filename, 'wb') as f:
    pickle.dump(best_threshold, f)
print(f"Optimal threshold saved to {threshold_filename}")

# Save feature names
feature_names_filename = f"{model_dir}/feature_names.pkl"
with open(feature_names_filename, 'wb') as f:
    pickle.dump(feature_names, f)
print(f"Feature names saved to {feature_names_filename}")

# Create a model metadata file with performance metrics and training info
model_metadata = {
    'model_type': type(final_model).__name__,
    'training_date': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    'performance_metrics': {
        'accuracy': best_threshold_metrics['accuracy'],
        'precision': best_threshold_metrics['precision'],
        'recall': best_threshold_metrics['recall'],
        'f1': best_threshold_metrics['f1'],
        'auc_roc': best_threshold_metrics['auc_roc'],
        'avg_precision': best_threshold_metrics['avg_precision']
    },
    'optimal_threshold': best_threshold,
    'feature_count': len(feature_names),
    'class_distribution': {
        'train_positive': np.sum(y_train),
        'train_negative': len(y_train) - np.sum(y_train),
        'test_positive': np.sum(y_test),
        'test_negative': len(y_test) - np.sum(y_test)
    },
    'top_features': best_xgb_importance_df['Feature'].head(10).tolist()
}

# Save model metadata
metadata_filename = f"{model_dir}/model_metadata.json"
with open(metadata_filename, 'w') as f:
    import json
    json.dump(model_metadata, f, indent=4)
print(f"Model metadata saved to {metadata_filename}")

In [None]:
# Create a prediction function that can be used in production
def predict_loan_default(data, model=None, threshold=0.5, feature_names=None):
    """
    Predict loan default probability and binary outcome.
    
    Parameters:
    -----------
    data : pandas DataFrame
        Data containing the features needed for prediction
    model : trained model, optional
        The trained model. If None, will load the saved model.
    threshold : float, default=0.5
        Classification threshold
    feature_names : list, optional
        List of feature names. If None, will load from saved file.
    
    Returns:
    --------
    dict
        Dictionary containing probabilities and binary predictions
    """
    # Load the model if not provided
    if model is None:
        model = joblib.load(f"{model_dir}/loan_default_prediction_model.pkl")
    
    # Load the optimal threshold if not provided
    if threshold == 0.5:
        with open(f"{model_dir}/optimal_threshold.pkl", 'rb') as f:
            threshold = pickle.load(f)
    
    # Load feature names if not provided
    if feature_names is None:
        with open(f"{model_dir}/feature_names.pkl", 'rb') as f:
            feature_names = pickle.load(f)
    
    # Ensure data has all required features
    missing_features = set(feature_names) - set(data.columns)
    if missing_features:
        raise ValueError(f"Missing features in input data: {missing_features}")
    
    # Select and order features correctly
    X = data[feature_names]
    
    # Make predictions
    probabilities = model.predict_proba(X)[:, 1]
    predictions = (probabilities >= threshold).astype(int)
    
    # Create risk segments
    risk_bins = [0, 0.2, 0.4, 0.6, 0.8, 1.0]
    risk_labels = ['Very Low', 'Low', 'Medium', 'High', 'Very High']
    risk_segments = pd.cut(probabilities, bins=risk_bins, labels=risk_labels).astype(str)
    
    # Return results
    return {
        'probabilities': probabilities,
        'predictions': predictions,
        'risk_segments': risk_segments
    }

# Test the prediction function on a sample of test data
test_sample = X_test.head(5)
prediction_results = predict_loan_default(
    test_sample, model=final_model, threshold=best_threshold, feature_names=feature_names
)

print("Sample Prediction Results:")
for i in range(len(test_sample)):
    print(f"Sample {i+1}:")
    print(f"  Default Probability: {prediction_results['probabilities'][i]:.4f}")
    print(f"  Prediction: {'Default' if prediction_results['predictions'][i] == 1 else 'No Default'}")
    print(f"  Risk Segment: {prediction_results['risk_segments'][i]}")
    print()