## Model Explainability Analysis

This notebook generates comprehensive explanations for the ensemble model predictions.

**Purpose**: Understand how the model makes predictions and which features are most important.

**Explainability Methods**:
1. **SHAP (SHapley Additive exPlanations)**: 
   - Game theory-based feature importance
   - Shows how each feature contributes to each prediction
   - Provides both global (overall) and local (per-instance) explanations

2. **LIME (Local Interpretable Model-agnostic Explanations)**:
   - Approximates the model locally around a prediction
   - Fast and interpretable for single predictions
   - Shows which features push prediction up or down

3. **Combined Analysis**:
   - Merges SHAP and LIME insights
   - Identifies risk-increasing and risk-decreasing features
   - Compares local explanations to global patterns

**Output**: SHAP/LIME visualizations and importance rankings saved to `artifacts/04_shap_images/`

In [1]:
import os
import json
import pickle
import warnings
warnings.filterwarnings('ignore')

import numpy as np
import pandas as pd
import joblib
import matplotlib.pyplot as plt
import seaborn as sns

import shap
from lime.lime_tabular import LimeTabularExplainer

from tensorflow import keras

ROOT = os.path.abspath(os.getcwd())
PROJECT_ROOT = os.path.abspath(os.path.join(ROOT, '..'))

MODELS_DIR = os.path.join(PROJECT_ROOT, 'models')
DATASET_DIR = os.path.join(PROJECT_ROOT, 'dataset')
ARTIFACTS_DIR = os.path.join(PROJECT_ROOT, 'artifacts')
SHAP_IMAGES_DIR = os.path.join(ARTIFACTS_DIR, '04_shap_images')

os.makedirs(SHAP_IMAGES_DIR, exist_ok=True)

plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

## Data Loading


In [2]:
X_train = pd.read_pickle(os.path.join(DATASET_DIR, 'X_train.pkl'))
X_test = pd.read_pickle(os.path.join(DATASET_DIR, 'X_test.pkl'))
y_test = pd.read_pickle(os.path.join(DATASET_DIR, 'y_test.pkl'))

if isinstance(y_test, pd.DataFrame):
    y_test = y_test.iloc[:, 0]

feature_names = X_train.columns.tolist()

print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")
print(f"Number of features: {len(feature_names)}")


Training data shape: (26064, 17)
Test data shape: (6517, 17)
Number of features: 17


## Model Loading

In [3]:
base_models_dict = joblib.load(os.path.join(MODELS_DIR, 'ensemble_base_models_neural.joblib'))
meta_model = keras.models.load_model(os.path.join(MODELS_DIR, 'meta_learner_neural.h5'), compile=False)
neural_network_model = keras.models.load_model(os.path.join(MODELS_DIR, 'residual_neural.h5'), compile=False)

xgb_deep = base_models_dict['xgb_deep']
xgb_shallow = base_models_dict['xgb_shallow']
lgbm_fast = base_models_dict['lgbm_fast']
catboost_robust = base_models_dict['catboost_robust']

print("Multiscale ensemble model loaded successfully")

Multiscale ensemble model loaded successfully


In [4]:
# Combines predictions from 5 base models using a meta-learner
# Takes input features and returns final probability predictions
# Parameters: X - input features (DataFrame or numpy array)
# Returns: numpy array of probability predictions (0-1 range)
def ensemble_predict(X):
    if isinstance(X, pd.DataFrame):
        X = X.values
    
    pred_xgb_deep = xgb_deep.predict_proba(X)[:, 1]
    pred_xgb_shallow = xgb_shallow.predict_proba(X)[:, 1]
    pred_lgbm = lgbm_fast.predict_proba(X)[:, 1]
    pred_catboost = catboost_robust.predict_proba(X)[:, 1]
    pred_neural = neural_network_model.predict(X, verbose=0).ravel()
    
    base_preds = np.column_stack([pred_xgb_deep, pred_xgb_shallow, pred_lgbm, pred_catboost, pred_neural])
    final_pred = meta_model.predict(base_preds, verbose=0).ravel()
    
    return final_pred

## SHAP Global Explainability


In [5]:
background_data = X_train.sample(n=min(200, len(X_train)), random_state=42).values

### SHAP Explainer Initialization


In [6]:
# PermutationExplainer: Shuffles features to measure marginal contribution
# Uses background_data as reference distribution for feature absence
shap_explainer = shap.Explainer(ensemble_predict, background_data)

### SHAP Values Computation


In [7]:
# Sample test data for SHAP analysis (faster computation)
test_sample_size = min(100, len(X_test))
X_test_sample = X_test.iloc[:test_sample_size].values

# Compute SHAP values: measures how each feature contributes to each prediction
# Higher absolute SHAP value means the feature has more impact
shap_values = shap_explainer(X_test_sample)

Exception in thread Thread-4 (_readerthread):
Traceback (most recent call last):
  File "d:\Conda\envs\final_last\lib\threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "d:\Conda\envs\final_last\lib\threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "d:\Conda\envs\final_last\lib\subprocess.py", line 1515, in _readerthread
    buffer.append(fh.read())
  File "d:\Conda\envs\final_last\lib\codecs.py", line 322, in decode
    (result, consumed) = self._buffer_decode(data, self.errors, final)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xce in position 4: invalid continuation byte
PermutationExplainer explainer: 101it [07:20,  4.40s/it]                         


### SHAP Summary Visualizations


In [8]:
plt.figure(figsize=(12, 8))
shap.summary_plot(shap_values, feature_names=feature_names, max_display=20, show=False)
plt.tight_layout()
plt.savefig(os.path.join(SHAP_IMAGES_DIR, 'shap_summary_plot.png'), dpi=300, bbox_inches='tight')
plt.close()


In [9]:
plt.figure(figsize=(12, 7))
shap.plots.bar(shap_values, max_display=20, show=False)
plt.tight_layout()
plt.savefig(os.path.join(SHAP_IMAGES_DIR, 'shap_bar_plot.png'), dpi=300, bbox_inches='tight')
plt.close()

In [10]:
plt.figure(figsize=(14, 8))
shap.plots.waterfall(shap_values[0], max_display=15, show=False)
plt.tight_layout()
plt.savefig(os.path.join(SHAP_IMAGES_DIR, 'shap_waterfall_plot.png'), dpi=300, bbox_inches='tight')
plt.close()


In [11]:
plt.figure(figsize=(12, 8))
shap.plots.beeswarm(shap_values, max_display=20, show=False)
plt.tight_layout()
plt.savefig(os.path.join(SHAP_IMAGES_DIR, 'shap_beeswarm_plot.png'), dpi=300, bbox_inches='tight')
plt.close()

### SHAP Feature Importance Ranking


In [12]:
# Calculate feature importance by averaging absolute SHAP values across all samples
# Features with higher average absolute SHAP values are more important globally
shap_importance = np.mean(np.abs(shap_values.values), axis=0)

shap_feature_importance = pd.DataFrame({
    'feature': feature_names,
    'importance': shap_importance
}).sort_values('importance', ascending=False)

shap_top_features = shap_feature_importance.head(10).to_dict('records')

with open(os.path.join(SHAP_IMAGES_DIR, 'shap_feature_importance.json'), 'w') as f:
    json.dump(shap_top_features, f, indent=2)

print(shap_feature_importance.head(10))

                        feature  importance
5           loan_percent_income    0.082715
7                    loan_grade    0.076841
1                 person_income    0.046387
10   person_home_ownership_RENT    0.041634
15          loan_intent_VENTURE    0.040208
3                     loan_amnt    0.028927
12  loan_intent_HOMEIMPROVEMENT    0.018909
14         loan_intent_PERSONAL    0.016583
11        loan_intent_EDUCATION    0.016057
2             person_emp_length    0.014057


## SHAP Interaction Values

In [13]:
top_interaction_features = shap_feature_importance.head(6)['feature'].tolist()

# Compute feature interactions: measures how features work together
# Interaction strength = average of (SHAP_i * SHAP_j) across all samples
# Higher values mean features have stronger joint effects on predictions
interaction_matrix = np.zeros((len(feature_names), len(feature_names)))
for i in range(len(feature_names)):
    for j in range(i+1, len(feature_names)):
        interaction_strength = np.mean(np.abs(shap_values.values[:, i] * shap_values.values[:, j]))
        interaction_matrix[i, j] = interaction_strength
        interaction_matrix[j, i] = interaction_strength

### Interaction Heatmap Visualization

In [14]:
interaction_df = pd.DataFrame(
    interaction_matrix,
    index=feature_names,
    columns=feature_names
)

top_interaction_subset = interaction_df.loc[top_interaction_features, top_interaction_features]

plt.figure(figsize=(10, 8))
sns.heatmap(top_interaction_subset, annot=True, fmt='.4f', cmap='coolwarm', center=0,
            square=True, linewidths=0.5, cbar_kws={'label': 'Interaction Strength'})
plt.title('SHAP Feature Interaction Matrix (Top 6 Features)', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(SHAP_IMAGES_DIR, 'shap_interaction_heatmap.png'), dpi=300, bbox_inches='tight')
plt.close()

## Partial Dependence Plots (PDP)


In [15]:
# Partial Dependence Plots: show how changing one feature affects predictions
# while keeping all other features at their original values
pdp_features = shap_feature_importance.head(6)['feature'].tolist()
pdp_indices = [feature_names.index(f) for f in pdp_features]

pdp_sample = background_data[:100]

fig, axes = plt.subplots(2, 3, figsize=(18, 12))
axes = axes.flatten()

for idx, (feat_name, feat_idx) in enumerate(zip(pdp_features, pdp_indices)):
    # Create range of feature values from 5th to 95th percentile
    feature_values = np.linspace(
        np.percentile(pdp_sample[:, feat_idx], 5),
        np.percentile(pdp_sample[:, feat_idx], 95),
        50
    )
    
    # For each value, set feature to that value and get average prediction
    pdp_values = []
    for val in feature_values:
        X_pdp = pdp_sample.copy()
        X_pdp[:, feat_idx] = val
        predictions = ensemble_predict(X_pdp)
        pdp_values.append(np.mean(predictions))
    
    axes[idx].plot(feature_values, pdp_values, linewidth=2.5, color='#2E86AB')
    axes[idx].fill_between(feature_values, pdp_values, alpha=0.3, color='#2E86AB')
    axes[idx].set_xlabel(feat_name, fontsize=11, fontweight='bold')
    axes[idx].set_ylabel('Predicted Probability', fontsize=11)
    axes[idx].grid(True, alpha=0.3)
    axes[idx].set_title(f'PDP: {feat_name}', fontsize=12, fontweight='bold')

plt.suptitle('Partial Dependence Plots - Top 6 Features', fontsize=16, fontweight='bold', y=0.995)
plt.tight_layout()
plt.savefig(os.path.join(SHAP_IMAGES_DIR, 'partial_dependence_plots.png'), dpi=300, bbox_inches='tight')
plt.close()

## Counterfactual Explanations

### Counterfactual Generation


In [16]:
# Counterfactual explanations: find minimal changes to flip predictions
# Shows "what if" scenarios - what features need to change to get different outcome
counterfactual_samples = []
cf_sample_indices = [0, 10, 20]

for sample_idx in cf_sample_indices:
    if sample_idx >= len(X_test_sample):
        continue
    
    original = X_test_sample[sample_idx:sample_idx+1]
    original_pred = ensemble_predict(original)[0]
    
    # Target: flip prediction (low risk -> high risk threshold, or vice versa)
    target_pred = 0.5 if original_pred < 0.5 else 0.3
    
    counterfactual = original.copy()
    # Modify top 3 most important features to push prediction toward target
    top_features = shap_feature_importance.head(3)['feature'].tolist()
    
    changes = {}
    for feat_name in top_features:
        feat_idx = feature_names.index(feat_name)
        original_val = original[0, feat_idx]
        
        # Change feature to extreme value (high or low) based on desired direction
        feature_range = np.percentile(background_data[:, feat_idx], [10, 90])
        if original_pred < 0.5:
            new_val = feature_range[1]  # Increase risk
        else:
            new_val = feature_range[0]  # Decrease risk
        
        counterfactual[0, feat_idx] = new_val
        changes[feat_name] = {'original': float(original_val), 'counterfactual': float(new_val)}
    
    cf_pred = ensemble_predict(counterfactual)[0]
    
    counterfactual_samples.append({
        'sample_idx': sample_idx,
        'original_pred': float(original_pred),
        'counterfactual_pred': float(cf_pred),
        'target_pred': float(target_pred),
        'changes': changes
    })

with open(os.path.join(SHAP_IMAGES_DIR, 'counterfactual_explanations.json'), 'w') as f:
    json.dump(counterfactual_samples, f, indent=2)

print(f"Generated {len(counterfactual_samples)} counterfactual explanations")

Generated 3 counterfactual explanations


### Counterfactual Visualization


In [17]:
if counterfactual_samples:
    cf = counterfactual_samples[0]
    features_changed = list(cf['changes'].keys())
    original_vals = [cf['changes'][f]['original'] for f in features_changed]
    cf_vals = [cf['changes'][f]['counterfactual'] for f in features_changed]
    
    x = np.arange(len(features_changed))
    width = 0.35
    
    fig, ax = plt.subplots(figsize=(12, 6))
    ax.bar(x - width/2, original_vals, width, label='Original', alpha=0.8, color='#E63946')
    ax.bar(x + width/2, cf_vals, width, label='Counterfactual', alpha=0.8, color='#06A77D')
    
    ax.set_xlabel('Features', fontsize=12, fontweight='bold')
    ax.set_ylabel('Feature Values', fontsize=12)
    ax.set_title(f'Counterfactual Explanation\nOriginal Pred: {cf["original_pred"]:.3f} â†’ Counterfactual Pred: {cf["counterfactual_pred"]:.3f}', 
                 fontsize=13, fontweight='bold')
    ax.set_xticks(x)
    ax.set_xticklabels(features_changed, rotation=45, ha='right')
    ax.legend(fontsize=11)
    ax.grid(axis='y', alpha=0.3)
    
    plt.tight_layout()
    plt.savefig(os.path.join(SHAP_IMAGES_DIR, 'counterfactual_visualization.png'), dpi=300, bbox_inches='tight')
    plt.close()

## Feature Dependency Analysis


In [18]:
top_dep_features = shap_feature_importance.head(5)['feature'].tolist()
dependency_matrix = np.corrcoef(X_train[top_dep_features].T)

fig, ax = plt.subplots(figsize=(10, 8))
sns.heatmap(dependency_matrix, annot=True, fmt='.3f', cmap='RdYlBu_r', center=0,
            xticklabels=top_dep_features, yticklabels=top_dep_features,
            square=True, linewidths=1, cbar_kws={'label': 'Correlation'}, ax=ax)
ax.set_title('Feature Dependency Matrix (Top 5 Features)', fontsize=14, fontweight='bold')
plt.tight_layout()
plt.savefig(os.path.join(SHAP_IMAGES_DIR, 'feature_dependency_matrix.png'), dpi=300, bbox_inches='tight')
plt.close()

## LIME Local Explainability

In [19]:
# Wrapper function for LIME: converts probability output to class probabilities format
# LIME needs probabilities for both classes (class 0 and class 1)
def ensemble_predict_for_lime(X):
    prob_class1 = ensemble_predict(X)
    prob_class0 = 1 - prob_class1
    return np.column_stack([prob_class0, prob_class1])

# LIME explainer: creates local explanations by training simple model around each prediction
# Uses training data to understand feature distributions
lime_explainer = LimeTabularExplainer(
    X_train.values,
    feature_names=feature_names,
    mode='classification',
    random_state=42
)


### LIME Explanations Computation


In [20]:
# Compute LIME explanations for multiple test samples
# LIME gives local explanations showing which features push prediction up or down
lime_explanations = []
lime_importance_scores = np.zeros(len(feature_names))
num_samples = min(50, len(X_test))

for i in range(num_samples):
    # Get LIME explanation for this instance
    explanation = lime_explainer.explain_instance(
        X_test.iloc[i].values,
        ensemble_predict_for_lime,
        num_features=len(feature_names)
    )
    lime_explanations.append(explanation)
    
    # Extract feature importance from LIME explanation
    # LIME returns conditions like "feature_name <= value" with importance scores
    exp_list = explanation.as_list()
    for item in exp_list:
        condition_string, importance = item
        
        # Parse feature name from condition string (handles various operators)
        if isinstance(condition_string, str):
            extracted_feature_name = None
            
            if condition_string in feature_names:
                extracted_feature_name = condition_string
            else:
                # Extract feature name from conditions like "feature <= value"
                operators = [' <= ', ' >= ', ' > ', ' < ', ' == ', ' != ']
                for op in operators:
                    if op in condition_string:
                        parts = condition_string.split(op, 1)
                        if len(parts) > 0:
                            potential_name = parts[0].strip()
                            if potential_name in feature_names:
                                extracted_feature_name = potential_name
                                break
                
                # Handle nested conditions
                if not extracted_feature_name and ' < ' in condition_string and ' <= ' in condition_string:
                    parts = condition_string.split(' < ')
                    if len(parts) >= 2 and ' <= ' in parts[1]:
                        potential_name = parts[1].split(' <= ')[0].strip()
                        if potential_name in feature_names:
                            extracted_feature_name = potential_name
            
            # Accumulate importance scores across all samples
            if extracted_feature_name:
                feature_idx_num = feature_names.index(extracted_feature_name)
                lime_importance_scores[feature_idx_num] += abs(importance)

# Average importance across all samples
if num_samples > 0:
    lime_importance_scores = lime_importance_scores / num_samples

### LIME Visualization


In [21]:
fig = lime_explanations[0].as_pyplot_figure()
plt.tight_layout()
plt.savefig(os.path.join(SHAP_IMAGES_DIR, 'lime_explanation_plot.png'), dpi=300, bbox_inches='tight')
plt.close()

### LIME Feature Importance

In [22]:
lime_feature_importance = pd.DataFrame({
    'feature': feature_names,
    'importance': lime_importance_scores
}).sort_values('importance', ascending=False)

lime_top_features = lime_feature_importance.head(10).to_dict('records')

with open(os.path.join(SHAP_IMAGES_DIR, 'lime_feature_importance.json'), 'w') as f:
    json.dump(lime_top_features, f, indent=2)

print(lime_feature_importance.head(10))

                        feature  importance
9     person_home_ownership_OWN    0.157923
15          loan_intent_VENTURE    0.123223
5           loan_percent_income    0.119924
7                    loan_grade    0.108267
10   person_home_ownership_RENT    0.102286
12  loan_intent_HOMEIMPROVEMENT    0.080422
11        loan_intent_EDUCATION    0.069892
1                 person_income    0.067300
14         loan_intent_PERSONAL    0.053458
8   person_home_ownership_OTHER    0.052252


## Combined SHAP + LIME Analysis


### Combined Importance Calculation


In [23]:
# Normalize SHAP and LIME importances to 0-1 range for fair comparison
# Then average them to get combined importance score
shap_normalized = shap_importance / (shap_importance.max() + 1e-9)
lime_max = lime_importance_scores.max()
lime_normalized = lime_importance_scores / (lime_max + 1e-9) if lime_max > 1e-9 else np.zeros_like(lime_importance_scores)

# Combined importance: average of normalized SHAP and LIME scores
# This gives more robust feature ranking by combining global (SHAP) and local (LIME) perspectives
combined_importance = (shap_normalized + lime_normalized) / 2

combined_feature_importance = pd.DataFrame({
    'feature': feature_names,
    'shap_importance': shap_normalized,
    'lime_importance': lime_normalized,
    'combined_importance': combined_importance
}).sort_values('combined_importance', ascending=False)

combined_top_features = combined_feature_importance.head(10).to_dict('records')

with open(os.path.join(SHAP_IMAGES_DIR, 'combined_feature_importance.json'), 'w') as f:
    json.dump(combined_top_features, f, indent=2)


### Combined Importance Visualization

In [24]:
top_n = 15
top_features = combined_feature_importance.head(top_n)

fig, ax = plt.subplots(figsize=(14, 8))
x = np.arange(len(top_features))
width = 0.25

ax.bar(x - width, top_features['shap_importance'], width, label='SHAP', alpha=0.85, color='#E63946')
ax.bar(x, top_features['lime_importance'], width, label='LIME', alpha=0.85, color='#F77F00')
ax.bar(x + width, top_features['combined_importance'], width, label='Combined', alpha=0.85, color='#2E86AB')

ax.set_xlabel('Features', fontsize=13, fontweight='bold')
ax.set_ylabel('Normalized Importance', fontsize=13, fontweight='bold')
ax.set_title('SHAP vs LIME vs Combined Feature Importance', fontsize=15, fontweight='bold')
ax.set_xticks(x)
ax.set_xticklabels(top_features['feature'], rotation=45, ha='right', fontsize=10)
ax.legend(fontsize=12, loc='upper right')
ax.grid(axis='y', alpha=0.3, linestyle='--')

plt.tight_layout()
plt.savefig(os.path.join(SHAP_IMAGES_DIR, 'combined_importance_comparison.png'), dpi=300, bbox_inches='tight')
plt.close()

### Expected Calibration Score (ECS)

In [25]:
y_test_values = y_test.values if hasattr(y_test, 'values') else y_test
if isinstance(y_test_values, pd.Series):
    y_test_values = y_test_values.values

X_test_values = X_test.values if isinstance(X_test, pd.DataFrame) else X_test

validation_sample_size = min(500, len(X_test_values))
X_val_sample = X_test_values[:validation_sample_size]
y_val_sample = y_test_values[:validation_sample_size]

y_pred_proba = ensemble_predict(X_val_sample)

# Expected Calibration Score: measures if predicted probabilities match actual frequencies
# Well-calibrated model: when it predicts 70% risk, actual risk should be close to 70%
# Lower ECS = better calibration
n_bins = 10
bins = np.linspace(0, 1, n_bins + 1)
bin_indices = np.digitize(y_pred_proba, bins) - 1
bin_indices = np.clip(bin_indices, 0, n_bins - 1)

ecs_per_bin = []
bin_counts = []
bin_accuracies = []
bin_confidences = []

# For each bin, compare average predicted probability vs actual accuracy
for i in range(n_bins):
    bin_mask = bin_indices == i
    bin_count = np.sum(bin_mask)
    
    if bin_count > 0:
        bin_acc = np.mean(y_val_sample[bin_mask])  # Actual accuracy in this bin
        bin_conf = np.mean(y_pred_proba[bin_mask])  # Average predicted probability
        bin_error = np.abs(bin_acc - bin_conf)  # Calibration error for this bin
        
        ecs_per_bin.append(bin_error)
        bin_counts.append(bin_count)
        bin_accuracies.append(bin_acc)
        bin_confidences.append(bin_conf)
    else:
        ecs_per_bin.append(0.0)
        bin_counts.append(0)
        bin_accuracies.append(0.0)
        bin_confidences.append(0.0)

# Weighted ECS: average error weighted by bin size (more samples = more weight)
# Unweighted ECS: simple average of non-empty bins
ecs_weighted = np.sum([ecs * count for ecs, count in zip(ecs_per_bin, bin_counts)]) / validation_sample_size
ecs_unweighted = np.mean([ecs for ecs, count in zip(ecs_per_bin, bin_counts) if count > 0]) if any(bin_counts) else 0.0

### Calibration Curve Visualization

### Data Perturbation Distance (DPD)


In [26]:
# Data Perturbation Distance: tests model robustness to small input changes
# Adds small random noise to inputs and measures how much predictions change
# Lower DPD = more stable model (predictions don't change much with small input changes)
perturbation_strength = 0.01  # 1% noise level
n_perturbations = 10

dpd_sample_size = min(100, len(X_val_sample))
X_dpd_original = X_val_sample[:dpd_sample_size]

original_predictions = ensemble_predict(X_dpd_original)
perturbation_differences = []

# Run multiple perturbations and average results
for _ in range(n_perturbations):
    noise = np.random.normal(0, perturbation_strength, X_dpd_original.shape)
    X_perturbed = X_dpd_original + noise
    perturbed_predictions = ensemble_predict(X_perturbed)
    differences = np.abs(original_predictions - perturbed_predictions)
    perturbation_differences.append(differences)

# Compute statistics: mean, std, and max prediction change
mean_differences = np.mean(perturbation_differences, axis=0)
dpd_mean = np.mean(mean_differences)
dpd_std = np.std(mean_differences)
dpd_max = np.max(mean_differences)


### Equalized Odds Difference (EOD)


In [27]:
# Equalized Odds Difference: measures fairness across different groups
# Fair model should have similar true positive rates (TPR) and false positive rates (FPR) for all groups
# Lower EOD = more fair (groups treated similarly)
threshold = 0.5
y_pred_binary = (y_pred_proba >= threshold).astype(int)

# Define sensitive attributes (groups to compare fairness across)
sensitive_attributes = {}

loan_grade_features = [f for f in feature_names if 'loan_grade' in f.lower()]
if loan_grade_features:
    for grade_feat in loan_grade_features[:5]:
        if grade_feat in X_test.columns:
            grade_mask = X_test.iloc[:validation_sample_size][grade_feat].values > 0.5
            if np.sum(grade_mask) > 10 and np.sum(~grade_mask) > 10:
                sensitive_attributes[f"has_{grade_feat}"] = grade_mask

person_income_median = np.median(X_test['person_income'].values[:validation_sample_size]) if 'person_income' in X_test.columns else None
if person_income_median is not None:
    income_high = X_test['person_income'].values[:validation_sample_size] >= person_income_median
    if np.sum(income_high) > 10 and np.sum(~income_high) > 10:
        sensitive_attributes["high_income"] = income_high

loan_percent_income_median = np.median(X_test['loan_percent_income'].values[:validation_sample_size]) if 'loan_percent_income' in X_test.columns else None
if loan_percent_income_median is not None:
    debt_ratio_low = X_test['loan_percent_income'].values[:validation_sample_size] < loan_percent_income_median
    if np.sum(debt_ratio_low) > 10 and np.sum(~debt_ratio_low) > 10:
        sensitive_attributes["low_debt_ratio"] = debt_ratio_low

eod_results = {}

# For each sensitive attribute, compare TPR and FPR between groups
for attr_name, attr_mask in sensitive_attributes.items():
    group_0_mask = ~attr_mask
    group_1_mask = attr_mask
    
    y_true_group_0 = y_val_sample[group_0_mask]
    y_pred_group_0 = y_pred_binary[group_0_mask]
    y_true_group_1 = y_val_sample[group_1_mask]
    y_pred_group_1 = y_pred_binary[group_1_mask]
    
    # Compute confusion matrix for both groups
    tn_0 = np.sum((y_true_group_0 == 0) & (y_pred_group_0 == 0))
    fp_0 = np.sum((y_true_group_0 == 0) & (y_pred_group_0 == 1))
    fn_0 = np.sum((y_true_group_0 == 1) & (y_pred_group_0 == 0))
    tp_0 = np.sum((y_true_group_0 == 1) & (y_pred_group_0 == 1))
    
    tn_1 = np.sum((y_true_group_1 == 0) & (y_pred_group_1 == 0))
    fp_1 = np.sum((y_true_group_1 == 0) & (y_pred_group_1 == 1))
    fn_1 = np.sum((y_true_group_1 == 1) & (y_pred_group_1 == 0))
    tp_1 = np.sum((y_true_group_1 == 1) & (y_pred_group_1 == 1))
    
    # Calculate TPR (recall) and FPR for each group
    tpr_0 = tp_0 / (tp_0 + fn_0) if (tp_0 + fn_0) > 0 else 0.0
    fpr_0 = fp_0 / (fp_0 + tn_0) if (fp_0 + tn_0) > 0 else 0.0
    
    tpr_1 = tp_1 / (tp_1 + fn_1) if (tp_1 + fn_1) > 0 else 0.0
    fpr_1 = fp_1 / (fp_1 + tn_1) if (fp_1 + tn_1) > 0 else 0.0
    
    # EOD = maximum difference in TPR or FPR between groups
    tpr_diff = abs(tpr_1 - tpr_0)
    fpr_diff = abs(fpr_1 - fpr_0)
    eod = max(tpr_diff, fpr_diff)
    
    eod_results[attr_name] = {
        'eod': float(eod),
        'tpr_diff': float(tpr_diff),
        'fpr_diff': float(fpr_diff),
        'tpr_group_0': float(tpr_0),
        'tpr_group_1': float(tpr_1),
        'fpr_group_0': float(fpr_0),
        'fpr_group_1': float(fpr_1),
        'group_0_size': int(np.sum(group_0_mask)),
        'group_1_size': int(np.sum(group_1_mask))
    }

overall_eod = np.mean([result['eod'] for result in eod_results.values()]) if eod_results else 0.0


## Results Summary and Export


In [28]:
with open(os.path.join(SHAP_IMAGES_DIR, 'shap_values.pkl'), 'wb') as f:
    pickle.dump(shap_values.values, f)


### Comprehensive Research Report


In [29]:
research_report = {
    'timestamp': pd.Timestamp.now().isoformat(),
    'sample_sizes': {
        'shap_analysis': int(test_sample_size),
        'lime_analysis': int(num_samples),
        'validation_metrics': int(validation_sample_size)
    },
    'feature_importance': {
        'shap_top_10': shap_top_features,
        'lime_top_10': lime_top_features,
        'combined_top_10': combined_top_features
    },
    'validation_metrics': {
        'expected_calibration_score': {
            'ecs_weighted': float(ecs_weighted),
            'ecs_unweighted': float(ecs_unweighted),
            'n_bins': int(n_bins)
        },
        'data_perturbation_distance': {
            'dpd_mean': float(dpd_mean),
            'dpd_std': float(dpd_std),
            'dpd_max': float(dpd_max),
            'perturbation_strength': float(perturbation_strength),
            'n_perturbations': int(n_perturbations)
        },
        'equalized_odds_difference': {
            'overall_eod': float(overall_eod),
            'attributes': eod_results
        }
    },
    'counterfactual_explanations': {
        'num_samples': len(counterfactual_samples),
        'samples': counterfactual_samples
    },
    'interpretation_guidelines': {
        'ecs': 'Lower is better. ECS < 0.05 indicates good calibration.',
        'dpd': 'Lower is better. DPD < 0.05 indicates good stability to perturbations.',
        'eod': 'Lower is better. EOD < 0.1 indicates acceptable fairness.'
    }
}

with open(os.path.join(SHAP_IMAGES_DIR, 'comprehensive_research_report.json'), 'w') as f:
    json.dump(research_report, f, indent=2)

print("Research Report Summary:")
print(f"  ECS (Weighted): {ecs_weighted:.6f}")
print(f"  DPD (Mean): {dpd_mean:.6f}")
print(f"  EOD (Overall): {overall_eod:.6f}")
print(f"  Top SHAP Feature: {shap_feature_importance.iloc[0]['feature']}")
print(f"  Top LIME Feature: {lime_feature_importance.iloc[0]['feature']}")
print(f"  Counterfactuals Generated: {len(counterfactual_samples)}")
print("\nAll results saved to artifacts directory")


Research Report Summary:
  ECS (Weighted): 0.117146
  DPD (Mean): 0.077951
  EOD (Overall): 0.064004
  Top SHAP Feature: loan_percent_income
  Top LIME Feature: person_home_ownership_OWN
  Counterfactuals Generated: 3

All results saved to artifacts directory


In [30]:
fig, ax = plt.subplots(figsize=(11, 7))

bin_centers = [(bins[i] + bins[i+1]) / 2 for i in range(n_bins)]
non_zero_bins = [i for i, count in enumerate(bin_counts) if count > 0]

ax.plot([0, 1], [0, 1], 'k--', label='Perfect Calibration', linewidth=2.5, alpha=0.7)
ax.plot([bin_centers[i] for i in non_zero_bins], 
        [bin_accuracies[i] for i in non_zero_bins], 
        'o-', label='Model Calibration', linewidth=3, markersize=10, color='#2E86AB')

for i in non_zero_bins:
    if bin_counts[i] > 0:
        ax.bar(bin_centers[i], bin_accuracies[i], width=0.08, alpha=0.25, color='#2E86AB')

ax.set_xlabel('Mean Predicted Probability (Confidence)', fontsize=13, fontweight='bold')
ax.set_ylabel('Mean Actual Accuracy', fontsize=13, fontweight='bold')
ax.set_title(f'Calibration Curve (ECS = {ecs_weighted:.4f})', fontsize=15, fontweight='bold')
ax.legend(fontsize=12, loc='upper left')
ax.grid(alpha=0.3, linestyle='--')
ax.set_xlim([0, 1])
ax.set_ylim([0, 1])

plt.tight_layout()
plt.savefig(os.path.join(SHAP_IMAGES_DIR, 'calibration_curve.png'), dpi=300, bbox_inches='tight')
plt.close()