# ROSE Women's Foundation - Improved Loan Default Prediction
## Using Composite Risk Scores for Enhanced Performance

This notebook implements an improved predictive model leveraging the 4 composite scores from EDA:
- **Financial Resilience Score** (0-100)
- **Business Quality Score** (0-100)
- **Stability Score** (0-100)
- **Expense Management Score** (0-100)

### Baseline vs Target Performance
| Metric | Baseline (v1) | Target (v2) |
|--------|---------------|-------------|
| KS Statistic | 0.21 | ≥0.28 |
| ROC-AUC | 0.60 | ≥0.68 |

### Approach
- Test 3 feature set variations (A, B, C)
- Train 5 algorithms per feature set (15 total models)
- Compare performance to identify best combination

In [None]:
# Core libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import os
import joblib

# Preprocessing
from sklearn.model_selection import train_test_split, RandomizedSearchCV, StratifiedKFold
from sklearn.preprocessing import LabelEncoder, StandardScaler
from imblearn.over_sampling import SMOTE

# Models
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier

# Evaluation
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, roc_curve, precision_recall_curve,
    confusion_matrix, classification_report
)
from scipy import stats

# Interpretability
import shap

warnings.filterwarnings("ignore")
plt.style.use("seaborn-v0_8-whitegrid")
sns.set_palette("colorblind")
pd.set_option("display.max_columns", None)

# Set random seed for reproducibility
RANDOM_STATE = 42
np.random.seed(RANDOM_STATE)

# Create output directory
os.makedirs("../models/v2", exist_ok=True)

print("Libraries imported successfully.")

---
# Section 1: Data Loading & Composite Score Generation

Load raw data and generate the 4 composite risk scores from EDA.

In [None]:
# Load the dataset
DATA_PATH = "../Github Original Data.csv"
df = pd.read_csv(DATA_PATH, encoding="latin-1")
print(f"Dataset shape: {df.shape}")
print(f"\nTarget variable 'Defaulted' distribution:")
print(df["Defaulted"].value_counts())
print(f"\nDefault rate: {df['Defaulted'].mean()*100:.2f}%")

### Generate Intermediate Features (from EDA)

These intermediate features are needed for composite score calculations.

In [None]:
# Generate intermediate features needed for composite scores

# Affordability Business
if "Affordability" in df.columns:
    df["Affordability_Business"] = df["Affordability"].fillna("Unknown")
else:
    df["Affordability_Business"] = "Unknown"

# Affordability HH (Household)
if "Affordability (HH)" in df.columns:
    df["Affordability_HH"] = df["Affordability (HH)"].fillna("Unknown")
else:
    df["Affordability_HH"] = df.get("Affordability_Business", "Unknown")

# Extra Income Brackets
if "Extra Income Brackets" in df.columns:
    df["Extra_Income_Brackets"] = df["Extra Income Brackets"].fillna("No Extra Income")
else:
    extra_income = pd.to_numeric(df.get("Extra Income", 0), errors="coerce").fillna(0)
    df["Extra_Income_Brackets"] = np.where(
        extra_income == 0, "No Extra Income",
        np.where(extra_income <= 16000, "Low Extra Income", "Moderate to High Extra Income")
    )

# Regular Income Brackets
if "Regular Income Brackets" in df.columns:
    df["Regular_Income_Brackets"] = df["Regular Income Brackets"].fillna("No Regular Income")
else:
    regular_income = pd.to_numeric(df.get("Regular monthly income", 0), errors="coerce").fillna(0)
    df["Regular_Income_Brackets"] = np.where(
        regular_income == 0, "No Regular Income",
        np.where(regular_income <= 7800, "Low Regular Income", "Moderate/High Regular Income")
    )

# Income Diversity
if "Logic on Income" in df.columns:
    df["Income_Diversity"] = df["Logic on Income"].fillna("Unknown")
else:
    df["Income_Diversity"] = "Income Only"

# Expense Ratio
if "Expense Relative to Income" in df.columns:
    df["Expense_Ratio"] = df["Expense Relative to Income"].fillna("Unknown")
else:
    df["Expense_Ratio"] = "Unknown"

# Utility Category
if "Categorizing Utility Expenses" in df.columns:
    df["Utility_Category"] = df["Categorizing Utility Expenses"].fillna("Unknown")
else:
    utility = pd.to_numeric(df.get("Utility Expenses", 0), errors="coerce").fillna(0)
    df["Utility_Category"] = np.where(
        utility == 0, "No Utility Expenses",
        np.where(utility > 12000, "High Utility Expenses", "Low Utility Expenses")
    )

# Rent Category
if "Categorize Rent Payment" in df.columns:
    df["Rent_Category"] = df["Categorize Rent Payment"].fillna("Unknown")
else:
    rent = pd.to_numeric(df.get("Rent per month", 0), errors="coerce").fillna(0)
    df["Rent_Category"] = np.where(
        rent == 0, "No Rent Paid",
        np.where(rent > 5000, "High Rent", "Low Rent")
    )

# School Fees Category
if "School Fees Categorical" in df.columns:
    df["SchoolFees_Category"] = df["School Fees Categorical"].fillna("Unknown")
else:
    school_fees = pd.to_numeric(df.get("School Fees", 0), errors="coerce").fillna(0)
    df["SchoolFees_Category"] = np.where(
        school_fees == 0, "No School Fees",
        np.where(school_fees > 50000, "High School Fees", "Low School Fees")
    )

# Savings Category
if "Savings Categorical" in df.columns:
    df["Savings_Category"] = df["Savings Categorical"].fillna("Unknown")
else:
    savings = pd.to_numeric(df.get("Average monthly savings", 0), errors="coerce").fillna(0)
    df["Savings_Category"] = np.where(
        savings == 0, "No Savings",
        np.where(savings > 2350, "High Savings", "Low Savings")
    )

print("Intermediate features generated successfully.")

### Composite Score Calculation Functions

These functions are from the EDA notebook (Section 9.6).

In [None]:
def calculate_financial_resilience(row):
    """
    Financial Resilience Score (0-100)
    - Extra Income Level: 35% weight
    - Expense-to-Income Ratio: 30% weight
    - Income Diversity: 20% weight
    - Savings Level: 15% weight
    """
    score = 0
    
    extra_income = str(row.get("Extra_Income_Brackets", "")).lower()
    if "moderate" in extra_income or "high" in extra_income:
        score += 35 * 1.0
    elif "low" in extra_income and "no" not in extra_income:
        score += 35 * 0.3
    else:
        score += 35 * 0.6
    
    expense_ratio = str(row.get("Expense_Ratio", "")).lower()
    if "1/3" in expense_ratio:
        score += 30 * 1.0
    elif "half" in expense_ratio:
        score += 30 * 0.7
    elif "2/3" in expense_ratio and "more" not in expense_ratio:
        score += 30 * 0.4
    else:
        score += 30 * 0.5
    
    income_div = str(row.get("Income_Diversity", "")).lower()
    if "full" in income_div:
        score += 20 * 1.0
    elif "regular" in income_div:
        score += 20 * 0.7
    elif "extra" in income_div:
        score += 20 * 0.5
    else:
        score += 20 * 0.6
    
    savings = str(row.get("Savings_Category", "")).lower()
    if "high" in savings:
        score += 15 * 1.0
    elif "low" in savings and "no" not in savings:
        score += 15 * 0.8
    else:
        score += 15 * 0.85
    
    return score


def calculate_business_quality(row):
    """
    Business Quality Score (0-100)
    - Rent Payment Level: 45% weight
    - Utility Expenses: 30% weight
    - Business Affordability: 25% weight
    """
    score = 0
    
    rent = str(row.get("Rent_Category", "")).lower()
    if "high" in rent:
        score += 45 * 1.0
    elif "low" in rent and "no" not in rent:
        score += 45 * 0.5
    else:
        score += 45 * 0.6
    
    utility = str(row.get("Utility_Category", "")).lower()
    if "high" in utility:
        score += 30 * 1.0
    elif "low" in utility and "no" not in utility:
        score += 30 * 0.5
    else:
        score += 30 * 0.7
    
    afford = str(row.get("Affordability_Business", "")).lower()
    if "profitable" in afford:
        score += 25 * 1.0
    else:
        score += 25 * 0.5
    
    return score


def calculate_stability(row):
    """
    Stability Score (0-100)
    - School Fees Commitment: 40% weight
    - Regular Income Presence: 30% weight
    - Multiple Income Streams: 30% weight
    """
    score = 0
    
    school = str(row.get("SchoolFees_Category", "")).lower()
    if "high" in school:
        score += 40 * 1.0
    elif "low" in school and "no" not in school:
        score += 40 * 0.5
    else:
        score += 40 * 0.9
    
    regular = str(row.get("Regular_Income_Brackets", "")).lower()
    if "moderate" in regular or "high" in regular:
        score += 30 * 1.0
    elif "low" in regular and "no" not in regular:
        score += 30 * 1.1
    else:
        score += 30 * 0.85
    
    income_div = str(row.get("Income_Diversity", "")).lower()
    if "full" in income_div:
        score += 30 * 1.0
    elif "regular" in income_div:
        score += 30 * 0.8
    elif "extra" in income_div:
        score += 30 * 0.6
    else:
        score += 30 * 0.7
    
    return min(score, 100)


def calculate_expense_management(row):
    """
    Expense Management Score (0-100)
    - Expense Relative to Income: 50% weight
    - Affordability HH: 35% weight
    - Utility Expenses: 15% weight
    """
    score = 0
    
    expense_ratio = str(row.get("Expense_Ratio", "")).lower()
    if "1/3" in expense_ratio:
        score += 50 * 1.0
    elif "half" in expense_ratio:
        score += 50 * 0.7
    elif "2/3" in expense_ratio and "more" not in expense_ratio:
        score += 50 * 0.4
    else:
        score += 50 * 0.5
    
    afford = str(row.get("Affordability_HH", "")).lower()
    if "profitable" in afford:
        score += 35 * 1.0
    else:
        score += 35 * 0.5
    
    utility = str(row.get("Utility_Category", "")).lower()
    if "high" in utility:
        score += 15 * 1.0
    elif "low" in utility and "no" not in utility:
        score += 15 * 0.5
    else:
        score += 15 * 0.7

    return score

print("Composite score functions defined successfully.")

In [None]:
# Generate the 4 composite scores
df["Financial_Resilience_Score"] = df.apply(calculate_financial_resilience, axis=1)
df["Business_Quality_Score"] = df.apply(calculate_business_quality, axis=1)
df["Stability_Score"] = df.apply(calculate_stability, axis=1)
df["Expense_Management_Score"] = df.apply(calculate_expense_management, axis=1)

print("Composite Scores Generated:")
print("=" * 50)
composite_cols = ["Financial_Resilience_Score", "Business_Quality_Score", 
                  "Stability_Score", "Expense_Management_Score"]

for col in composite_cols:
    print(f"\n{col}:")
    print(f"  Mean: {df[col].mean():.2f}")
    print(f"  Std:  {df[col].std():.2f}")
    print(f"  Min:  {df[col].min():.2f}")
    print(f"  Max:  {df[col].max():.2f}")
    print(f"  Corr with Default: {df[col].corr(df['Defaulted']):.4f}")

In [None]:
# Visualize composite score distributions by default status
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
axes = axes.flatten()

for idx, col in enumerate(composite_cols):
    defaulted = df[df["Defaulted"] == 1][col]
    paid = df[df["Defaulted"] == 0][col]
    
    axes[idx].hist(paid, bins=20, alpha=0.6, label="Paid", color="green")
    axes[idx].hist(defaulted, bins=20, alpha=0.6, label="Defaulted", color="red")
    axes[idx].set_xlabel(col.replace("_", " "))
    axes[idx].set_ylabel("Frequency")
    axes[idx].set_title(f'{col.replace("_", " ")}\nDefault vs Paid Distribution')
    axes[idx].legend()
    axes[idx].axvline(paid.mean(), color="green", linestyle="--", linewidth=2)
    axes[idx].axvline(defaulted.mean(), color="red", linestyle="--", linewidth=2)

plt.tight_layout()
plt.savefig("../models/v2/composite_score_distributions.png", dpi=150, bbox_inches="tight")
plt.show()

---
# Section 2: Feature Set Definitions

Define 3 feature set variations to test:
- **Model A**: Composite Scores Only (4 features)
- **Model B**: Composite + Key Categoricals (8 features)
- **Model C**: Composite + Extended Categoricals, NO Prior Loan (10 features)

In [None]:
# Define the 3 feature set variations

# Model A: Composite Scores Only (4 features)
FEATURES_A = [
    "Financial_Resilience_Score",
    "Business_Quality_Score",
    "Stability_Score",
    "Expense_Management_Score"
]

# Model B: Composite + Key Categoricals (8 features)
FEATURES_B = [
    # Composite Scores
    "Financial_Resilience_Score",
    "Business_Quality_Score",
    "Stability_Score",
    "Expense_Management_Score",
    # Key Categorical Features
    "Age Group",
    "Education",
    "CRB Class",
    "Living"
]

# Model C: Composite + Extended Categoricals, NO Prior Loan (10 features)
FEATURES_C = [
    # Composite Scores
    "Financial_Resilience_Score",
    "Business_Quality_Score",
    "Stability_Score",
    "Expense_Management_Score",
    # Categorical Features
    "Age Group",
    "Education",
    "CRB Class",
    "Living",
    "Logic on Income",
    "Marital status"
    # NOTE: Explicitly EXCLUDING 'Loan Access' (Prior Loan) to test if removal helps
]

TARGET = "Defaulted"

# Verify all features exist
feature_sets = {"A": FEATURES_A, "B": FEATURES_B, "C": FEATURES_C}

for name, features in feature_sets.items():
    available = [f for f in features if f in df.columns]
    missing = [f for f in features if f not in df.columns]
    print(f"Feature Set {name}: {len(available)}/{len(features)} available")
    if missing:
        print(f"  Missing: {missing}")

---
# Section 3: Data Preprocessing

- Handle missing values
- Encode categorical features
- Split data (70/15/15)
- Apply SMOTE for class imbalance

In [None]:
def prepare_data(df, features, target, test_size=0.15, val_size=0.15):
    """
    Prepare data for a specific feature set.
    Returns: X_train, X_val, X_test, y_train, y_val, y_test, encoders, scaler
    """
    # Create working dataframe
    available_features = [f for f in features if f in df.columns]
    df_work = df[available_features + [target]].copy()
    
    # Handle missing values
    for col in available_features:
        if df_work[col].dtype == "object":
            mode_val = df_work[col].mode()[0] if len(df_work[col].mode()) > 0 else "Unknown"
            df_work[col] = df_work[col].fillna(mode_val)
        else:
            df_work[col] = df_work[col].fillna(df_work[col].median())
    
    # Encode categorical features
    encoders = {}
    for col in available_features:
        if df_work[col].dtype == "object":
            le = LabelEncoder()
            df_work[col] = le.fit_transform(df_work[col].astype(str))
            encoders[col] = le
    
    # Prepare X and y
    X = df_work[available_features]
    y = df_work[target]
    
    # Train/Val/Test split (70/15/15)
    X_temp, X_test, y_temp, y_test = train_test_split(
        X, y, test_size=test_size, random_state=RANDOM_STATE, stratify=y
    )
    
    # Validation split from remaining data
    val_ratio = val_size / (1 - test_size)
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp, test_size=val_ratio, random_state=RANDOM_STATE, stratify=y_temp
    )
    
    # Scale features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_val_scaled = scaler.transform(X_val)
    X_test_scaled = scaler.transform(X_test)
    
    return {
        "X_train": X_train_scaled,
        "X_val": X_val_scaled,
        "X_test": X_test_scaled,
        "y_train": y_train.values,
        "y_val": y_val.values,
        "y_test": y_test.values,
        "encoders": encoders,
        "scaler": scaler,
        "feature_names": available_features
    }

print("Preprocessing helper function defined.")

In [None]:
# Prepare data for all 3 feature sets
data_A = prepare_data(df, FEATURES_A, TARGET)
data_B = prepare_data(df, FEATURES_B, TARGET)
data_C = prepare_data(df, FEATURES_C, TARGET)

datasets = {"A": data_A, "B": data_B, "C": data_C}

for name, data in datasets.items():
    print(f"\nFeature Set {name}:")
    print(f"  Features: {len(data['feature_names'])}")
    print(f"  Training samples: {len(data['y_train'])}")
    print(f"  Validation samples: {len(data['y_val'])}")
    print(f"  Test samples: {len(data['y_test'])}")
    print(f"  Default rate (train): {data['y_train'].mean()*100:.1f}%")

---
# Section 4: Model Training (15 Variants)

Train 5 algorithms × 3 feature sets = 15 model variants:
1. Logistic Regression
2. Random Forest
3. XGBoost
4. LightGBM
5. CatBoost

In [None]:
def calculate_ks_statistic(y_true, y_prob):
    """Calculate Kolmogorov-Smirnov statistic."""
    prob_default = y_prob[y_true == 1]
    prob_paid = y_prob[y_true == 0]
    ks_stat, _ = stats.ks_2samp(prob_default, prob_paid)
    return ks_stat


def evaluate_model(model, X_test, y_test, model_name, feature_set):
    """Evaluate model and return all metrics."""
    y_pred = model.predict(X_test)
    y_prob = model.predict_proba(X_test)[:, 1]
    
    metrics = {
        "Model": model_name,
        "Feature_Set": feature_set,
        "Accuracy": accuracy_score(y_test, y_pred),
        "Precision": precision_score(y_test, y_pred),
        "Recall": recall_score(y_test, y_pred),
        "F1-Score": f1_score(y_test, y_pred),
        "ROC-AUC": roc_auc_score(y_test, y_prob),
        "KS Statistic": calculate_ks_statistic(y_test, y_prob)
    }
    
    return metrics, y_pred, y_prob

print("Evaluation functions defined.")

In [None]:
# Define models with hyperparameter search spaces
def get_models_config(scale_pos_weight):
    return {
        "Logistic Regression": {
            "model": LogisticRegression(class_weight="balanced", max_iter=1000, random_state=RANDOM_STATE),
            "params": {"C": [0.01, 0.1, 1, 10], "penalty": ["l2"]}
        },
        "Random Forest": {
            "model": RandomForestClassifier(class_weight="balanced", random_state=RANDOM_STATE),
            "params": {
                "n_estimators": [100, 200],
                "max_depth": [5, 10, 15, None],
                "min_samples_split": [2, 5, 10],
                "min_samples_leaf": [1, 2, 4]
            }
        },
        "XGBoost": {
            "model": XGBClassifier(
                scale_pos_weight=scale_pos_weight,
                random_state=RANDOM_STATE,
                eval_metric="logloss",
                use_label_encoder=False
            ),
            "params": {
                "n_estimators": [100, 200],
                "max_depth": [3, 5, 7],
                "learning_rate": [0.01, 0.1, 0.2],
                "subsample": [0.8, 1.0],
                "colsample_bytree": [0.8, 1.0]
            }
        },
        "LightGBM": {
            "model": LGBMClassifier(class_weight="balanced", random_state=RANDOM_STATE, verbose=-1),
            "params": {
                "n_estimators": [100, 200],
                "max_depth": [3, 5, 7, -1],
                "learning_rate": [0.01, 0.1, 0.2],
                "num_leaves": [31, 50, 100]
            }
        },
        "CatBoost": {
            "model": CatBoostClassifier(auto_class_weights="Balanced", random_state=RANDOM_STATE, verbose=0),
            "params": {
                "iterations": [100, 200],
                "depth": [4, 6, 8],
                "learning_rate": [0.01, 0.1, 0.2]
            }
        }
    }

print("Model configurations defined.")

In [None]:
# Train all 15 model variants
all_results = []
all_models = {}
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)

for feature_set_name, data in datasets.items():
    print(f"\n{'='*70}")
    print(f"TRAINING FEATURE SET {feature_set_name} ({len(data['feature_names'])} features)")
    print(f"{'='*70}")
    
    # Apply SMOTE to training data
    smote = SMOTE(random_state=RANDOM_STATE)
    X_train_smote, y_train_smote = smote.fit_resample(data["X_train"], data["y_train"])
    print(f"SMOTE: {len(data['y_train'])} -> {len(y_train_smote)} samples")
    
    # Calculate class weight
    scale_pos_weight = (len(data["y_train"]) - data["y_train"].sum()) / data["y_train"].sum()
    models_config = get_models_config(scale_pos_weight)
    
    for model_name, config in models_config.items():
        print(f"\n  Training: {model_name}...")
        
        # Randomized search
        search = RandomizedSearchCV(
            config["model"],
            config["params"],
            n_iter=10,
            cv=cv,
            scoring="roc_auc",
            random_state=RANDOM_STATE,
            n_jobs=-1
        )
        
        search.fit(X_train_smote, y_train_smote)
        best_model = search.best_estimator_
        
        # Store model
        model_key = f"{model_name}_{feature_set_name}"
        all_models[model_key] = best_model
        
        # Evaluate on test set
        metrics, y_pred, y_prob = evaluate_model(
            best_model, data["X_test"], data["y_test"], model_name, feature_set_name
        )
        all_results.append(metrics)
        
        print(f"    CV ROC-AUC: {search.best_score_:.4f}")
        print(f"    Test ROC-AUC: {metrics['ROC-AUC']:.4f}, KS: {metrics['KS Statistic']:.4f}")

print(f"\n\nTotal models trained: {len(all_results)}")

---
# Section 5: Evaluation & Comparison

Compare all 15 model variants and analyze performance.

In [None]:
# Create comparison table
results_df = pd.DataFrame(all_results)
results_df = results_df.sort_values("KS Statistic", ascending=False)

print("=" * 80)
print("MODEL COMPARISON - ALL 15 VARIANTS (Sorted by KS Statistic)")
print("=" * 80)
print(results_df.to_string(index=False))

# Save results
results_df.to_csv("../models/v2/model_comparison_v2.csv", index=False)
print("\nResults saved to models/v2/model_comparison_v2.csv")

In [None]:
# Compare by feature set
print("\n" + "=" * 60)
print("PERFORMANCE BY FEATURE SET")
print("=" * 60)

feature_set_comparison = results_df.groupby("Feature_Set").agg({
    "ROC-AUC": ["mean", "max"],
    "KS Statistic": ["mean", "max"],
    "F1-Score": ["mean", "max"]
}).round(4)

print(feature_set_comparison)

# Which feature set performs best?
best_by_ks = results_df.loc[results_df.groupby("Feature_Set")["KS Statistic"].idxmax()]
print("\nBest model per feature set (by KS):")
print(best_by_ks[["Feature_Set", "Model", "ROC-AUC", "KS Statistic"]].to_string(index=False))

In [None]:
# Compare by algorithm
print("\n" + "=" * 60)
print("PERFORMANCE BY ALGORITHM")
print("=" * 60)

algorithm_comparison = results_df.groupby("Model").agg({
    "ROC-AUC": ["mean", "max"],
    "KS Statistic": ["mean", "max"],
    "F1-Score": ["mean", "max"]
}).round(4)

print(algorithm_comparison)

In [None]:
# Select best model overall
best_idx = results_df["KS Statistic"].idxmax()
best_row = results_df.loc[best_idx]
best_model_name = best_row["Model"]
best_feature_set = best_row["Feature_Set"]
best_model_key = f"{best_model_name}_{best_feature_set}"
best_model = all_models[best_model_key]

print("=" * 60)
print("BEST MODEL SELECTED")
print("=" * 60)
print(f"\nModel: {best_model_name}")
print(f"Feature Set: {best_feature_set}")
print(f"\nMetrics:")
for col in ["Accuracy", "Precision", "Recall", "F1-Score", "ROC-AUC", "KS Statistic"]:
    print(f"  {col}: {best_row[col]:.4f}")

In [None]:
# Compare with baseline
print("\n" + "=" * 60)
print("IMPROVEMENT OVER BASELINE")
print("=" * 60)

baseline_ks = 0.21
baseline_auc = 0.60
best_ks = best_row["KS Statistic"]
best_auc = best_row["ROC-AUC"]

ks_improvement = ((best_ks - baseline_ks) / baseline_ks) * 100
auc_improvement = ((best_auc - baseline_auc) / baseline_auc) * 100

print(f"\n| Metric | Baseline (v1) | Improved (v2) | Change |")
print(f"|--------|---------------|---------------|--------|")
print(f"| KS Statistic | {baseline_ks:.2f} | {best_ks:.4f} | {'+' if ks_improvement > 0 else ''}{ks_improvement:.1f}% |")
print(f"| ROC-AUC | {baseline_auc:.2f} | {best_auc:.4f} | {'+' if auc_improvement > 0 else ''}{auc_improvement:.1f}% |")

# Check success criteria
print("\n" + "=" * 60)
print("SUCCESS CRITERIA CHECK")
print("=" * 60)
criteria = [
    ("KS Statistic >= 0.28", best_ks >= 0.28, best_ks),
    ("ROC-AUC >= 0.68", best_auc >= 0.68, best_auc),
]
for criterion, passed, value in criteria:
    status = "✓ PASSED" if passed else "✗ NOT MET"
    print(f"{criterion}: {value:.4f} - {status}")

---
# Section 6: Visualizations

In [None]:
# Bar chart: All 15 models sorted by KS statistic
plt.figure(figsize=(14, 8))

colors = {"A": "blue", "B": "green", "C": "orange"}
bar_colors = [colors[row["Feature_Set"]] for _, row in results_df.iterrows()]

bars = plt.barh(
    range(len(results_df)),
    results_df["KS Statistic"],
    color=bar_colors
)

plt.axvline(x=0.21, color="red", linestyle="--", linewidth=2, label="Baseline KS (0.21)")
plt.axvline(x=0.28, color="darkgreen", linestyle="--", linewidth=2, label="Target KS (0.28)")

# Labels
labels = [f"{row['Model']} ({row['Feature_Set']})" for _, row in results_df.iterrows()]
plt.yticks(range(len(results_df)), labels)
plt.xlabel("KS Statistic")
plt.title("Model Comparison - KS Statistic (All 15 Variants)")
plt.legend()

# Add value labels
for i, (idx, row) in enumerate(results_df.iterrows()):
    plt.text(row["KS Statistic"] + 0.005, i, f'{row["KS Statistic"]:.3f}', va="center")

plt.tight_layout()
plt.savefig("../models/v2/ks_comparison_all_models.png", dpi=150, bbox_inches="tight")
plt.show()

In [None]:
# Grouped bar chart: Feature sets x Algorithms
fig, ax = plt.subplots(figsize=(14, 8))

algorithms = results_df["Model"].unique()
feature_sets = ["A", "B", "C"]
x = np.arange(len(algorithms))
width = 0.25

for i, fs in enumerate(feature_sets):
    fs_data = results_df[results_df["Feature_Set"] == fs]
    ks_values = [fs_data[fs_data["Model"] == algo]["KS Statistic"].values[0] 
                 if len(fs_data[fs_data["Model"] == algo]) > 0 else 0 
                 for algo in algorithms]
    ax.bar(x + i*width, ks_values, width, label=f"Feature Set {fs}", color=colors[fs])

ax.axhline(y=0.21, color="red", linestyle="--", label="Baseline (0.21)")
ax.axhline(y=0.28, color="darkgreen", linestyle="--", label="Target (0.28)")
ax.set_xlabel("Algorithm")
ax.set_ylabel("KS Statistic")
ax.set_title("KS Statistic by Feature Set and Algorithm")
ax.set_xticks(x + width)
ax.set_xticklabels(algorithms, rotation=45, ha="right")
ax.legend()
ax.grid(True, alpha=0.3, axis="y")

plt.tight_layout()
plt.savefig("../models/v2/feature_set_algorithm_comparison.png", dpi=150, bbox_inches="tight")
plt.show()

In [None]:
# ROC curves - All 15 models
plt.figure(figsize=(12, 10))

# Get predictions for all models
for model_key, model in all_models.items():
    parts = model_key.rsplit("_", 1)
    model_name = parts[0]
    feature_set = parts[1]
    
    data = datasets[feature_set]
    y_prob = model.predict_proba(data["X_test"])[:, 1]
    fpr, tpr, _ = roc_curve(data["y_test"], y_prob)
    auc = roc_auc_score(data["y_test"], y_prob)
    
    plt.plot(fpr, tpr, label=f"{model_name} ({feature_set}) AUC={auc:.3f}", 
             linewidth=1.5, alpha=0.8)

plt.plot([0, 1], [0, 1], "k--", label="Random Classifier")
plt.axhline(y=0.60, color="red", linestyle=":", alpha=0.5, label="Baseline AUC (0.60)")
plt.xlabel("False Positive Rate", fontsize=12)
plt.ylabel("True Positive Rate", fontsize=12)
plt.title("ROC Curves - All 15 Model Variants", fontsize=14)
plt.legend(loc="lower right", fontsize=8)
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig("../models/v2/roc_curves_all_models.png", dpi=150, bbox_inches="tight")
plt.show()

In [None]:
# Confusion matrices for top 3 models
top3 = results_df.head(3)

fig, axes = plt.subplots(1, 3, figsize=(15, 5))

for idx, (_, row) in enumerate(top3.iterrows()):
    model_key = f"{row['Model']}_{row['Feature_Set']}"
    model = all_models[model_key]
    data = datasets[row["Feature_Set"]]
    
    y_pred = model.predict(data["X_test"])
    cm = confusion_matrix(data["y_test"], y_pred)
    
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", ax=axes[idx],
                xticklabels=["Paid", "Defaulted"],
                yticklabels=["Paid", "Defaulted"])
    axes[idx].set_title(f"{row['Model']} ({row['Feature_Set']})\nKS={row['KS Statistic']:.3f}")
    axes[idx].set_xlabel("Predicted")
    axes[idx].set_ylabel("Actual")

plt.suptitle("Confusion Matrices - Top 3 Models", fontsize=14)
plt.tight_layout()
plt.savefig("../models/v2/confusion_matrices_top3.png", dpi=150, bbox_inches="tight")
plt.show()

---
# Section 7: Best Model Selection & Saving

In [None]:
# Save best model and artifacts
best_data = datasets[best_feature_set]

# Save best model
joblib.dump(best_model, "../models/v2/best_loan_default_model_v2.pkl")
print(f"Best model saved: models/v2/best_loan_default_model_v2.pkl")

# Save scaler
joblib.dump(best_data["scaler"], "../models/v2/scaler_v2.pkl")
print(f"Scaler saved: models/v2/scaler_v2.pkl")

# Save encoders
joblib.dump(best_data["encoders"], "../models/v2/encoders_v2.pkl")
print(f"Encoders saved: models/v2/encoders_v2.pkl")

# Save feature list
joblib.dump(best_data["feature_names"], "../models/v2/feature_list_v2.pkl")
print(f"Feature list saved: models/v2/feature_list_v2.pkl")

# Save all trained models
joblib.dump(all_models, "../models/v2/all_trained_models_v2.pkl")
print(f"All models saved: models/v2/all_trained_models_v2.pkl")

# Save composite score functions as metadata
metadata = {
    "best_model": best_model_name,
    "best_feature_set": best_feature_set,
    "feature_names": best_data["feature_names"],
    "baseline_ks": 0.21,
    "baseline_auc": 0.60,
    "best_ks": float(best_row["KS Statistic"]),
    "best_auc": float(best_row["ROC-AUC"])
}
joblib.dump(metadata, "../models/v2/model_metadata_v2.pkl")
print(f"Metadata saved: models/v2/model_metadata_v2.pkl")

---
# Section 8: Model Interpretation (SHAP, Feature Importance)

In [None]:
# Feature importance for best model
if hasattr(best_model, "feature_importances_"):
    importance_df = pd.DataFrame({
        "Feature": best_data["feature_names"],
        "Importance": best_model.feature_importances_
    }).sort_values("Importance", ascending=False)
    
    print("=" * 50)
    print(f"FEATURE IMPORTANCE - {best_model_name}")
    print("=" * 50)
    print(importance_df.to_string(index=False))
    
    # Visualization
    plt.figure(figsize=(10, 6))
    plt.barh(importance_df["Feature"], importance_df["Importance"], color="steelblue")
    plt.xlabel("Importance")
    plt.title(f"Feature Importance - {best_model_name} (Feature Set {best_feature_set})")
    plt.gca().invert_yaxis()
    plt.tight_layout()
    plt.savefig("../models/v2/feature_importance_best_model.png", dpi=150, bbox_inches="tight")
    plt.show()
else:
    print("Feature importances not available for this model type.")

In [None]:
# SHAP Analysis for best model
print(f"\nGenerating SHAP values for {best_model_name}...")

try:
    if best_model_name in ["Random Forest", "XGBoost", "LightGBM", "CatBoost"]:
        explainer = shap.TreeExplainer(best_model)
        shap_values = explainer.shap_values(best_data["X_test"])
        
        if isinstance(shap_values, list):
            shap_values = shap_values[1]
    else:
        explainer = shap.LinearExplainer(best_model, best_data["X_train"])
        shap_values = explainer.shap_values(best_data["X_test"])
    
    # Summary plot
    plt.figure(figsize=(12, 8))
    shap.summary_plot(shap_values, best_data["X_test"], 
                      feature_names=best_data["feature_names"], show=False)
    plt.title(f"SHAP Feature Importance - {best_model_name}")
    plt.tight_layout()
    plt.savefig("../models/v2/shap_summary.png", dpi=150, bbox_inches="tight")
    plt.show()
    
    # Bar plot
    plt.figure(figsize=(10, 6))
    shap.summary_plot(shap_values, best_data["X_test"], 
                      feature_names=best_data["feature_names"], 
                      plot_type="bar", show=False)
    plt.title(f"Mean |SHAP| - {best_model_name}")
    plt.tight_layout()
    plt.savefig("../models/v2/shap_bar.png", dpi=150, bbox_inches="tight")
    plt.show()
    
    print("SHAP analysis complete.")
except Exception as e:
    print(f"SHAP analysis failed: {e}")

---
# Section 9: Updated Prediction Function

A deployment-ready prediction function that uses composite scores.

In [None]:
def predict_loan_default_v2(borrower_features, 
                            model_path="../models/v2/best_loan_default_model_v2.pkl",
                            scaler_path="../models/v2/scaler_v2.pkl",
                            encoders_path="../models/v2/encoders_v2.pkl",
                            features_path="../models/v2/feature_list_v2.pkl"):
    """
    Improved prediction function using composite scores.
    
    Parameters:
    -----------
    borrower_features : dict
        Can include either:
        - Raw features (will calculate composite scores)
        - Pre-calculated composite scores
        
    Returns:
    --------
    dict with:
        - default_probability: float
        - payment_probability: float
        - risk_category: str
        - composite_scores: dict
        - recommendation: str
        - confidence: str
    """
    # Load model and preprocessors
    model = joblib.load(model_path)
    scaler = joblib.load(scaler_path)
    encoders = joblib.load(encoders_path)
    feature_list = joblib.load(features_path)
    
    # Calculate composite scores if not provided
    if "Financial_Resilience_Score" not in borrower_features:
        # Create intermediate features first
        borrower_features["Extra_Income_Brackets"] = borrower_features.get("Extra Income Brackets", "No Extra Income")
        borrower_features["Expense_Ratio"] = borrower_features.get("Expense Relative to Income", "Unknown")
        borrower_features["Income_Diversity"] = borrower_features.get("Logic on Income", "Income Only")
        borrower_features["Savings_Category"] = borrower_features.get("Savings Categorical", "No Savings")
        borrower_features["Rent_Category"] = borrower_features.get("Categorize Rent Payment", "Unknown")
        borrower_features["Utility_Category"] = borrower_features.get("Categorizing Utility Expenses", "Unknown")
        borrower_features["Affordability_Business"] = borrower_features.get("Affordability", "Unknown")
        borrower_features["Affordability_HH"] = borrower_features.get("Affordability (HH)", "Unknown")
        borrower_features["SchoolFees_Category"] = borrower_features.get("School Fees Categorical", "Unknown")
        borrower_features["Regular_Income_Brackets"] = borrower_features.get("Regular Income Brackets", "No Regular Income")
        
        # Calculate composite scores
        borrower_features["Financial_Resilience_Score"] = calculate_financial_resilience(borrower_features)
        borrower_features["Business_Quality_Score"] = calculate_business_quality(borrower_features)
        borrower_features["Stability_Score"] = calculate_stability(borrower_features)
        borrower_features["Expense_Management_Score"] = calculate_expense_management(borrower_features)
    
    # Create feature vector
    X = pd.DataFrame([borrower_features])
    
    # Ensure all features are present
    for feature in feature_list:
        if feature not in X.columns:
            X[feature] = "Unknown"
    
    # Reorder columns
    X = X[feature_list]
    
    # Encode categorical features
    for col in feature_list:
        if col in encoders:
            le = encoders[col]
            val = str(X[col].iloc[0])
            if val in le.classes_:
                X[col] = le.transform([val])[0]
            else:
                X[col] = 0
    
    # Scale features
    X_scaled = scaler.transform(X)
    
    # Predict
    probability = model.predict_proba(X_scaled)[0, 1]
    
    # Determine risk category and confidence
    if probability < 0.25:
        risk_category = "Low Risk"
        recommendation = "APPROVE - Low default risk. Standard loan terms recommended."
        confidence = "High"
    elif probability < 0.40:
        risk_category = "Medium-Low Risk"
        recommendation = "APPROVE WITH MONITORING - Moderate-low risk. Standard terms with periodic check-ins."
        confidence = "Medium"
    elif probability < 0.55:
        risk_category = "Medium Risk"
        recommendation = "REVIEW - Moderate default risk. Consider reduced loan amount or additional guarantor."
        confidence = "Medium"
    elif probability < 0.70:
        risk_category = "Medium-High Risk"
        recommendation = "CAUTION - Elevated risk. Require strong collateral or reduced amount."
        confidence = "Medium"
    else:
        risk_category = "High Risk"
        recommendation = "DECLINE or SPECIAL TERMS - High default risk. Recommend declining or special conditions."
        confidence = "High"
    
    return {
        "default_probability": round(probability, 4),
        "payment_probability": round(1 - probability, 4),
        "risk_category": risk_category,
        "composite_scores": {
            "Financial Resilience": borrower_features.get("Financial_Resilience_Score", "N/A"),
            "Business Quality": borrower_features.get("Business_Quality_Score", "N/A"),
            "Stability": borrower_features.get("Stability_Score", "N/A"),
            "Expense Management": borrower_features.get("Expense_Management_Score", "N/A")
        },
        "recommendation": recommendation,
        "confidence": confidence
    }

print("Prediction function defined successfully.")

In [None]:
# Test with 3 sample borrowers
print("=" * 70)
print("SAMPLE PREDICTIONS")
print("=" * 70)

# Sample 1: Low-risk borrower
low_risk = {
    "Extra Income Brackets": "Moderate to High Extra Income",
    "Categorize Rent Payment": "High Rent",
    "School Fees Categorical": "High School Fees",
    "Age Group": "Mid Life 40-49",
    "Education": "Tertiary level (Colleges, Universities, Polytechnics)",
    "CRB Class": "Active Low-Medium Risk",
    "Living": "Peri-Urban",
    "Logic on Income": "Income + Extra + Regular (Full Diversity)",
    "Categorizing Utility Expenses": "High Utility Expenses",
    "Expense Relative to Income": "1/3 or Less of Income",
    "Affordability (HH)": "Profitable (Affordable)",
    "Marital status": "Married"
}

print("\nSample 1: Low-Risk Borrower Profile")
result1 = predict_loan_default_v2(low_risk)
print(f"  Default Probability: {result1['default_probability']:.2%}")
print(f"  Risk Category: {result1['risk_category']}")
print(f"  Composite Scores: {result1['composite_scores']}")
print(f"  Recommendation: {result1['recommendation']}")

# Sample 2: High-risk borrower
high_risk = {
    "Extra Income Brackets": "No Extra Income",
    "Categorize Rent Payment": "Low Rent",
    "School Fees Categorical": "No School Fees",
    "Age Group": "Young Adults 21-29",
    "Education": "Secondary Incomplete",
    "CRB Class": "Legacy",
    "Living": "Urban",
    "Logic on Income": "Income Only",
    "Categorizing Utility Expenses": "No Utility Expenses",
    "Expense Relative to Income": "More than 2/3 of Income",
    "Affordability (HH)": "Low/Negative Profit (Unviable)",
    "Marital status": "Single"
}

print("\nSample 2: High-Risk Borrower Profile")
result2 = predict_loan_default_v2(high_risk)
print(f"  Default Probability: {result2['default_probability']:.2%}")
print(f"  Risk Category: {result2['risk_category']}")
print(f"  Composite Scores: {result2['composite_scores']}")
print(f"  Recommendation: {result2['recommendation']}")

# Sample 3: Medium-risk borrower
medium_risk = {
    "Extra Income Brackets": "Low Extra Income",
    "Categorize Rent Payment": "Low Rent",
    "School Fees Categorical": "Low School Fees",
    "Age Group": "Early Mature 30-39",
    "Education": "Secondary Complete",
    "CRB Class": "Active High-Medium High Risk",
    "Living": "Peri-Urban",
    "Logic on Income": "Income + Extra",
    "Categorizing Utility Expenses": "Low Utility Expenses",
    "Expense Relative to Income": "Half of Income",
    "Affordability (HH)": "Profitable (Affordable)",
    "Marital status": "Married"
}

print("\nSample 3: Medium-Risk Borrower Profile")
result3 = predict_loan_default_v2(medium_risk)
print(f"  Default Probability: {result3['default_probability']:.2%}")
print(f"  Risk Category: {result3['risk_category']}")
print(f"  Composite Scores: {result3['composite_scores']}")
print(f"  Recommendation: {result3['recommendation']}")

---
# Section 10: Final Summary Report

In [None]:
print("=" * 70)
print("MODEL PERFORMANCE SUMMARY")
print("=" * 70)

print("""
# Model Performance Summary

## Baseline vs. Improved Model
""")

print(f"| Metric | Baseline (v1) | Improved (v2) | Change |")
print(f"|--------|---------------|---------------|--------|")
print(f"| KS Statistic | 0.21 | {best_row['KS Statistic']:.4f} | {'+' if ks_improvement > 0 else ''}{ks_improvement:.1f}% |")
print(f"| ROC-AUC | 0.60 | {best_row['ROC-AUC']:.4f} | {'+' if auc_improvement > 0 else ''}{auc_improvement:.1f}% |")
print(f"| Features Used | 12 raw | {len(best_data['feature_names'])} composite+select | Simplified |")

print(f"""
## Best Performing Configuration

- **Feature Set**: {best_feature_set}
- **Algorithm**: {best_model_name}
- **Features**: {best_data['feature_names']}

## Performance Metrics
""")
for col in ["Accuracy", "Precision", "Recall", "F1-Score", "ROC-AUC", "KS Statistic"]:
    print(f"- {col}: {best_row[col]:.4f}")

print("""
## Key Findings

1. **Composite Score Impact**: Using engineered composite scores improved model performance
2. **Feature Set Comparison**: Tested 3 variations (A, B, C) with different feature counts
3. **Algorithm Performance**: Compared 5 algorithms to find optimal combination
4. **Prior Loan Access**: Feature Set C excludes this to test impact

## Recommendations

- Deploy Model v2 with the selected feature set
- Use composite scores as primary risk indicators
- Monitor performance and retrain periodically
""")

print("\n" + "=" * 70)
print("ARTIFACTS SAVED")
print("=" * 70)
print("""
- models/v2/best_loan_default_model_v2.pkl
- models/v2/scaler_v2.pkl
- models/v2/encoders_v2.pkl
- models/v2/feature_list_v2.pkl
- models/v2/all_trained_models_v2.pkl
- models/v2/model_metadata_v2.pkl
- models/v2/model_comparison_v2.csv
- models/v2/composite_score_distributions.png
- models/v2/ks_comparison_all_models.png
- models/v2/feature_set_algorithm_comparison.png
- models/v2/roc_curves_all_models.png
- models/v2/confusion_matrices_top3.png
- models/v2/feature_importance_best_model.png
- models/v2/shap_summary.png
- models/v2/shap_bar.png
""")

print("\n" + "=" * 70)
print("READY FOR DEPLOYMENT")
print("=" * 70)