<a href="https://colab.research.google.com/github/Attabeezy/sequential-crm-for-dce/blob/main/credit_risk_prediction_v2_5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
"""
Credit Risk Prediction

This script implements an optimized approach for credit risk prediction
Uses Artificial Neural Network (ANN) with feature engineering and the
downsample_upsample data balancing strategy.

"""

import pandas as pd
import numpy as np
import kagglehub
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import (accuracy_score, precision_score, recall_score,
                            f1_score, roc_auc_score, classification_report,
                            confusion_matrix, roc_curve, auc, precision_recall_curve)
from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import RandomOverSampler
import tensorflow as tf
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import os
import time

warnings.filterwarnings('ignore')
os.makedirs('./results', exist_ok=True)

In [None]:

# ============================================================================
# STEP 1: DATA LOADING AND PREPROCESSING
# ============================================================================

def create_binary_target(df, positive_class):
    """Creates binary target variable from loan_status."""
    df_clean = df.copy()
    df_clean['loan_status_binary'] = df_clean['loan_status'].apply(
        lambda x: 1 if x == positive_class else 0
    )
    unique_statuses = df_clean['loan_status'].unique()
    encoding_info = {status: (1 if status == positive_class else 0)
                    for status in unique_statuses}
    return df_clean, encoding_info

print("Loading dataset...")
dataset_path = kagglehub.dataset_download('jeandedieunyandwi/lending-club-dataset')
file_path = f"{dataset_path}/lending_club_loan_two.csv"
df = pd.read_csv(file_path)
print(f"✓ Dataset shape: {df.shape}")

# Create binary target
df_clean, encoding_info = create_binary_target(df, positive_class='Charged Off')

# Sample and preprocess
SAMPLE_SIZE = 60000
X_clean = df_clean.drop(['loan_status', 'loan_status_binary'], axis=1)
y_clean = df_clean['loan_status_binary']

X_train_full, X_test_full, y_train_full, y_test_full = train_test_split(
    X_clean, y_clean, test_size=0.2, random_state=42, stratify=y_clean
)

if len(X_train_full) > SAMPLE_SIZE:
    train_indices = X_train_full.sample(n=SAMPLE_SIZE, random_state=42).index
    df_sampled = df_clean.loc[train_indices].copy()
else:
    df_sampled = df_clean.loc[X_train_full.index].copy()

X_sampled = df_sampled.drop(['loan_status', 'loan_status_binary'], axis=1)
y_sampled = df_sampled['loan_status_binary']

# Feature engineering
FEATURE_COLUMNS = [
    'loan_amnt', 'funded_amnt', 'funded_amnt_inv', 'term', 'int_rate',
    'installment', 'grade', 'sub_grade', 'emp_title', 'emp_length',
    'annual_inc', 'application_type'
]

available_cols = [col for col in FEATURE_COLUMNS if col in X_sampled.columns]

# Clean string columns
if 'term' in X_sampled.columns and X_sampled['term'].dtype == 'object':
    X_sampled['term'] = X_sampled['term'].str.replace(' months', '', regex=False).astype(float)

if 'int_rate' in X_sampled.columns and X_sampled['int_rate'].dtype == 'object':
    X_sampled['int_rate'] = X_sampled['int_rate'].str.replace('%', '', regex=False).astype(float)

if 'emp_length' in X_sampled.columns and X_sampled['emp_length'].dtype == 'object':
    X_sampled['emp_length'] = X_sampled['emp_length'].replace({
        '< 1 year': '0', '1 year': '1', '2 years': '2', '3 years': '3', '4 years': '4',
        '5 years': '5', '6 years': '6', '7 years': '7', '8 years': '8', '9 years': '9',
        '10+ years': '10', 'n/a': np.nan
    }).astype(float)

# Select features and one-hot encode
final_features = [col for col in available_cols if X_sampled[col].dtype in ['float64', 'int64']]
categorical_features = ['grade', 'application_type']
final_features.extend([c for c in categorical_features if c in X_sampled.columns])

X_final = X_sampled[final_features].copy()
y_final = y_sampled.copy()

combined_df = pd.concat([X_final, y_final], axis=1)
combined_df.dropna(inplace=True)

X_final = combined_df.drop('loan_status_binary', axis=1)
y_final = combined_df['loan_status_binary']

X_encoded = pd.get_dummies(X_final, columns=[c for c in categorical_features if c in X_final.columns], drop_first=True)


In [None]:
# ============================================================================
# STEP 2: FEATURE ENGINEERING (BEST APPROACH)
# ============================================================================

print("\nApplying feature engineering...")
X_engineered = X_encoded.copy()

# Create ratio feature
X_engineered['annual_inc_to_loan_amnt_ratio'] = X_engineered['annual_inc'] / (X_engineered['loan_amnt'] + 1e-6)

# Select optimal features (based on correlation analysis)
selected_features = ['loan_amnt', 'term', 'int_rate', 'emp_length', 'annual_inc',
                    'annual_inc_to_loan_amnt_ratio']

# Add one-hot encoded features
ohe_cols = [col for col in X_engineered.columns if col.startswith('grade_') or col.startswith('application_type_')]
selected_features.extend(ohe_cols)
selected_features = [f for f in selected_features if f in X_engineered.columns]

X = X_engineered[selected_features].copy()
y = y_final.copy()

print(f"✓ Final data: {X.shape[0]} samples, {X.shape[1]} features")
print(f"  Features: {selected_features}")


In [None]:
# STEP 3: DATA BALANCING (downsample_upsample)

def prepare_data_with_downsample_upsample(X, y, random_state=None):
    """Applies downsample_upsample balancing strategy."""
    X_resampled, y_resampled = X.copy(), y.copy()

    majority_class = y_resampled.value_counts().idxmax()
    minority_class = y_resampled.value_counts().idxmin()

    # Downsample majority
    rus = RandomUnderSampler(
        sampling_strategy={majority_class: y_resampled.value_counts()[minority_class]},
        random_state=random_state
    )
    X_downsampled, y_downsampled = rus.fit_resample(X_resampled, y_resampled)

    # Upsample minority
    ros = RandomOverSampler(
        sampling_strategy={minority_class: len(y_downsampled[y_downsampled == majority_class])},
        random_state=random_state
    )
    X_resampled, y_resampled = ros.fit_resample(X_downsampled, y_downsampled)

    # Shuffle
    combined_df = pd.concat([X_resampled, y_resampled], axis=1)
    combined_df = combined_df.sample(frac=1, random_state=random_state).reset_index(drop=True)
    X_resampled = combined_df.drop(y_resampled.name, axis=1)
    y_resampled = combined_df[y_resampled.name]

    balance_info = {
        'original_counts': y.value_counts().to_dict(),
        'resampled_counts': y_resampled.value_counts().to_dict()
    }

    return X_resampled, y_resampled, balance_info

print("\nApplying downsample_upsample balancing...")
X_balanced, y_balanced, balance_info = prepare_data_with_downsample_upsample(X, y, random_state=42)
print(f"✓ Original counts: {balance_info['original_counts']}")
print(f"✓ Balanced counts: {balance_info['resampled_counts']}")

# Split data
X_train, X_test, y_train, y_test = train_test_split(
    X_balanced, y_balanced, test_size=0.2, random_state=42, stratify=y_balanced
)

# Scale features
scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print(f"✓ Train set: {len(y_train):,} samples")
print(f"✓ Test set: {len(y_test):,} samples\n")


In [None]:
# STEP 4: BUILD AND TRAIN BEST ANN MODEL

def build_best_ann(input_shape):
    """Best performing ANN architecture: 2 hidden layers with tanh activation."""
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(14, activation='tanh', input_shape=(input_shape,)),
        tf.keras.layers.Dense(7, activation='tanh'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    return model

def train_and_evaluate_keras_with_cv(build_model_func, X, y, cv_folds=5,
                                     epochs=50, batch_size=32, random_state=None,
                                     model_name="", verbose=False):
    """Train and evaluate Keras model with Stratified K-Fold CV."""
    skf = StratifiedKFold(n_splits=cv_folds, shuffle=True, random_state=random_state)
    fold_results = []

    if verbose:
        print(f"\n--- {model_name} Cross-Validation ---")

    for fold, (train_index, val_index) in enumerate(skf.split(X, y)):
        start_time = time.time()

        if isinstance(X, (pd.DataFrame, pd.Series)):
            X_train_cv, X_val = X.iloc[train_index], X.iloc[val_index]
        else:
            X_train_cv, X_val = X[train_index], X[val_index]

        if isinstance(y, (pd.DataFrame, pd.Series)):
            y_train_cv, y_val = y.iloc[train_index], y.iloc[val_index]
        else:
            y_train_cv, y_val = y[train_index], y[val_index]

        model = build_model_func()
        model.fit(X_train_cv, y_train_cv, epochs=epochs, batch_size=batch_size, verbose=0)

        y_pred_proba = model.predict(X_val, verbose=0).flatten()
        y_pred = (y_pred_proba > 0.5).astype(int)

        accuracy = accuracy_score(y_val, y_pred)
        precision = precision_score(y_val, y_pred, average='binary', zero_division=0)
        recall = recall_score(y_val, y_pred, average='binary', zero_division=0)
        f1 = f1_score(y_val, y_pred, average='binary', zero_division=0)
        macro_f1 = f1_score(y_val, y_pred, average='macro', zero_division=0)
        recall_class_1 = recall_score(y_val, y_pred, pos_label=1, zero_division=0)
        roc_auc = roc_auc_score(y_val, y_pred_proba)

        fold_time = time.time() - start_time

        fold_results.append({
            'fold': fold + 1,
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'macro_f1': macro_f1,
            'recall_class_1': recall_class_1,
            'roc_auc': roc_auc,
            'time': fold_time,
            'y_true': y_val,
            'y_pred': y_pred,
            'y_pred_proba': y_pred_proba
        })

        if verbose:
            print(f"  Fold {fold+1} completed in {fold_time:.2f} seconds.")
            print(f"    Accuracy: {accuracy:.4f}, Macro-F1: {macro_f1:.4f}, "
                  f"Recall (Class 1): {recall_class_1:.4f}, ROC-AUC: {roc_auc:.4f}")

    mean_metrics = {}
    std_metrics = {}
    for metric in ['accuracy', 'precision', 'recall', 'f1', 'macro_f1', 'recall_class_1', 'roc_auc', 'time']:
        mean_metrics[f'{metric}_mean'] = np.mean([res[metric] for res in fold_results])
        std_metrics[f'{metric}_std'] = np.std([res[metric] for res in fold_results])

    cv_summary = {**mean_metrics, **std_metrics}

    if verbose:
        print(f"\n--- Average Results for {model_name} ---")
        print(f"  Accuracy: {cv_summary['accuracy_mean']:.4f} ± {cv_summary['accuracy_std']:.4f}")
        print(f"  Macro-F1: {cv_summary['macro_f1_mean']:.4f} ± {cv_summary['macro_f1_std']:.4f}")
        print(f"  Recall (Class 1): {cv_summary['recall_class_1_mean']:.4f} ± {cv_summary['recall_class_1_std']:.4f}")
        print(f"  ROC-AUC: {cv_summary['roc_auc_mean']:.4f} ± {cv_summary['roc_auc_std']:.4f}")
        print("-" * (len(model_name) + 28))

    return cv_summary, fold_results

print("Training Best ANN Model with 5-fold Cross-Validation...")
ann_cv_results, ann_fold_details = train_and_evaluate_keras_with_cv(
    lambda: build_best_ann(X_train_scaled.shape[1]),
    X_train_scaled,
    y_train,
    cv_folds=5,
    epochs=50,
    batch_size=32,
    random_state=42,
    model_name='Best ANN (2HL-Tanh + Selected Features)',
    verbose=True
)

# Train final model on full training set
print("\nTraining final model on full training set...")
ann_final = build_best_ann(X_train_scaled.shape[1])
ann_final.fit(X_train_scaled, y_train, epochs=50, batch_size=32, verbose=0)
ann_test_proba = ann_final.predict(X_test_scaled, verbose=0).flatten()
ann_test_pred = (ann_test_proba > 0.5).astype(int)

print("✓ Model training complete\n")


In [None]:
# STEP 5: EVALUATION AND RESULTS

print("="*80)
print("CROSS-VALIDATION RESULTS")
print("="*80)
print(f"Accuracy:          {ann_cv_results['accuracy_mean']:.4f} ± {ann_cv_results['accuracy_std']:.4f}")
print(f"Macro-F1:          {ann_cv_results['macro_f1_mean']:.4f} ± {ann_cv_results['macro_f1_std']:.4f}")
print(f"Recall (Class 1):  {ann_cv_results['recall_class_1_mean']:.4f} ± {ann_cv_results['recall_class_1_std']:.4f}")
print(f"ROC-AUC:           {ann_cv_results['roc_auc_mean']:.4f} ± {ann_cv_results['roc_auc_std']:.4f}")
print(f"Avg Time (s):      {ann_cv_results['time_mean']:.2f} ± {ann_cv_results['time_std']:.2f}")
print("="*80 + "\n")

print("="*80)
print("TEST SET EVALUATION (Threshold = 0.5)")
print("="*80)
print("\nClassification Report:")
print(classification_report(
    y_test, ann_test_pred,
    target_names=['Fully Paid (0)', 'Charged Off (1)'],
    zero_division=0
))

print("Confusion Matrix:")
cm = confusion_matrix(y_test, ann_test_pred)
cm_df = pd.DataFrame(
    cm,
    index=['Actual Fully Paid', 'Actual Charged Off'],
    columns=['Pred Fully Paid', 'Pred Charged Off']
)
print(cm_df)
print()


In [None]:
# STEP 6: VISUALIZATIONS

print("Generating visualizations...")

# ROC Curve
plt.figure(figsize=(10, 8))
fpr, tpr, _ = roc_curve(y_test, ann_test_proba)
roc_auc = auc(fpr, tpr)
plt.plot(fpr, tpr, label=f'ANN (AUC = {roc_auc:.4f})', linewidth=2)
plt.plot([0, 1], [0, 1], 'k--', label='Random Guess')
plt.xlabel('False Positive Rate', fontsize=12)
plt.ylabel('True Positive Rate', fontsize=12)
plt.title('ROC Curve - Best Model', fontsize=14, fontweight='bold')
plt.legend(fontsize=11)
plt.grid(alpha=0.3)
plt.savefig('./results/roc_curve.png', dpi=300, bbox_inches='tight')
plt.close()
print("✓ ROC curve saved")

# Precision-Recall Curve
plt.figure(figsize=(10, 8))
precision, recall, _ = precision_recall_curve(y_test, ann_test_proba)
no_skill = len(y_test[y_test==1]) / len(y_test)
plt.plot([0, 1], [no_skill, no_skill], linestyle='--', label='No Skill Baseline')
plt.plot(recall, precision, label='ANN', linewidth=2)
plt.xlabel('Recall', fontsize=12)
plt.ylabel('Precision', fontsize=12)
plt.title('Precision-Recall Curve - Best Model', fontsize=14, fontweight='bold')
plt.legend(fontsize=11)
plt.grid(alpha=0.3)
plt.savefig('./results/precision_recall_curve.png', dpi=300, bbox_inches='tight')
plt.close()
print("✓ Precision-Recall curve saved")

# Confusion Matrix Heatmap
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', cbar=True,
            xticklabels=['Fully Paid', 'Charged Off'],
            yticklabels=['Fully Paid', 'Charged Off'])
plt.xlabel('Predicted Label', fontsize=12)
plt.ylabel('True Label', fontsize=12)
plt.title('Confusion Matrix - Best Model', fontsize=14, fontweight='bold')
plt.savefig('./results/confusion_matrix.png', dpi=300, bbox_inches='tight')
plt.close()
print("✓ Confusion matrix saved")

# Threshold Analysis
thresholds = np.linspace(0, 1, 100)
precision_scores = []
recall_scores = []
f1_scores = []
macro_f1_scores = []

for thresh in thresholds:
    y_pred_thresh = (ann_test_proba > thresh).astype(int)
    precision_scores.append(precision_score(y_test, y_pred_thresh, zero_division=0))
    recall_scores.append(recall_score(y_test, y_pred_thresh, zero_division=0))
    f1_scores.append(f1_score(y_test, y_pred_thresh, zero_division=0))
    macro_f1_scores.append(f1_score(y_test, y_pred_thresh, average='macro', zero_division=0))

optimal_macro_f1_idx = np.argmax(macro_f1_scores)
optimal_recall_idx = np.argmax(recall_scores)

plt.figure(figsize=(12, 6))
plt.plot(thresholds, precision_scores, label='Precision', linewidth=2)
plt.plot(thresholds, recall_scores, label='Recall', linewidth=2)
plt.plot(thresholds, f1_scores, label='F1 Score', linewidth=2)
plt.plot(thresholds, macro_f1_scores, label='Macro F1 Score', linewidth=2)
plt.axvline(0.5, color='grey', linestyle='--', label='Default Threshold (0.5)')
plt.axvline(thresholds[optimal_macro_f1_idx], color='green', linestyle=':',
            label=f'Optimal Macro-F1 ({thresholds[optimal_macro_f1_idx]:.3f})')
plt.xlabel('Threshold', fontsize=12)
plt.ylabel('Score', fontsize=12)
plt.title('Threshold Analysis - Best Model', fontsize=14, fontweight='bold')
plt.legend(fontsize=10)
plt.grid(alpha=0.3)
plt.savefig('./results/threshold_analysis.png', dpi=300, bbox_inches='tight')
plt.close()
print("✓ Threshold analysis saved")


In [None]:
# STEP 7: SAVE RESULTS

# Save CV results
cv_results_df = pd.DataFrame([{
    'Metric': 'Accuracy',
    'Mean': f"{ann_cv_results['accuracy_mean']:.4f}",
    'Std': f"{ann_cv_results['accuracy_std']:.4f}"
}, {
    'Metric': 'Macro-F1',
    'Mean': f"{ann_cv_results['macro_f1_mean']:.4f}",
    'Std': f"{ann_cv_results['macro_f1_std']:.4f}"
}, {
    'Metric': 'Recall (Class 1)',
    'Mean': f"{ann_cv_results['recall_class_1_mean']:.4f}",
    'Std': f"{ann_cv_results['recall_class_1_std']:.4f}"
}, {
    'Metric': 'ROC-AUC',
    'Mean': f"{ann_cv_results['roc_auc_mean']:.4f}",
    'Std': f"{ann_cv_results['roc_auc_std']:.4f}"
}])
cv_results_df.to_csv('./results/cv_results.csv', index=False)

# Save optimal thresholds
threshold_results = pd.DataFrame([{
    'Threshold Type': 'Default',
    'Threshold': 0.500,
    'Macro-F1': macro_f1_scores[50],
    'Recall': recall_scores[50]
}, {
    'Threshold Type': 'Optimal Macro-F1',
    'Threshold': thresholds[optimal_macro_f1_idx],
    'Macro-F1': macro_f1_scores[optimal_macro_f1_idx],
    'Recall': recall_scores[optimal_macro_f1_idx]
}])
threshold_results.to_csv('./results/optimal_thresholds.csv', index=False)

print("\n✓ Results saved to ./results/")
print("  - cv_results.csv")
print("  - optimal_thresholds.csv")
print("  - roc_curve.png")
print("  - precision_recall_curve.png")
print("  - confusion_matrix.png")
print("  - threshold_analysis.png")


In [None]:
# FINAL SUMMARY

print("\n" + "="*80)
print("FINAL SUMMARY - BEST MODEL PERFORMANCE")
print("="*80)
print(f"Model Architecture:    ANN with 2 Hidden Layers (14-7 neurons, tanh activation)")
print(f"Balancing Strategy:    downsample_upsample")
print(f"Feature Engineering:   Selected features + income-to-loan ratio")
print(f"CV Macro-F1:           {ann_cv_results['macro_f1_mean']:.4f} ± {ann_cv_results['macro_f1_std']:.4f}")
print(f"CV Recall (Class 1):   {ann_cv_results['recall_class_1_mean']:.4f} ± {ann_cv_results['recall_class_1_std']:.4f}")
print(f"CV ROC-AUC:            {ann_cv_results['roc_auc_mean']:.4f} ± {ann_cv_results['roc_auc_std']:.4f}")
print(f"\nKey Achievement:")
print(f"  Recall improved from ~3% (baseline) to {ann_cv_results['recall_class_1_mean']*100:.1f}%")
print(f"  This means the model can now identify ~{ann_cv_results['recall_class_1_mean']*100:.0f}% of risky loans!")
print("="*80)
print("\n✓ ALL PROCESSING COMPLETE!")