# Credit Fairness Demo (final)

End-to-end walkthrough: generate synthetic credit underwriting data, train GLM / NN / ADV_NN, and reproduce the auto fairness figures (lambda sweep, fairness vs accuracy, fairness vs high-risk rate, and score distributions) on the credit data.


## 1. Setup


In [None]:
import sys
from pathlib import Path

PROJECT_ROOT = Path.cwd().resolve()
if not (PROJECT_ROOT / "src").exists():
    PROJECT_ROOT = PROJECT_ROOT.parent
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

print(f"Using project root: {PROJECT_ROOT}")


In [None]:
from __future__ import annotations

import random
import sys
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from dataclasses import replace
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset

from src.config import get_default_configs
from src.credit import generate_credit_underwriting_data, train_test_split_df
from src.evaluation.metrics import compute_accuracy_metrics
from src.evaluation.fairness import fairness_metrics, fairness_at_target_rate
from src.models.glm_model import GLMClassifier
from src.models.nn_model import PlainNN, train_plain_nn, predict_proba_plain_nn
from src.models.adv_nn_model import AdvPredictor, train_adv_nn, predict_proba_adv_nn


In [None]:
SEED = 1234
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)
try:
    torch.use_deterministic_algorithms(True, warn_only=True)
except Exception:
    pass


## 2. Generate Credit Data


In [None]:
sim_cfg, train_cfg, eval_cfg = get_default_configs()
sim_cfg = replace(sim_cfg, n_samples=100_000, seed=SEED)

df_full = generate_credit_underwriting_data(sim_cfg)
print(f"Dataset size: {len(df_full):,}")
df_full.head()


## 3. Train/Test Split


In [None]:
df_train, df_test = train_test_split_df(df_full, test_size=0.2, seed=sim_cfg.seed)
print(f"Train size: {len(df_train):,}, Test size: {len(df_test):,}")
df_train.head()


### Feature preprocessing


In [None]:
numeric_cols = ["S", "D", "L"]
proxy_col = "Z"

scaler = StandardScaler()
X_train_num = scaler.fit_transform(df_train[numeric_cols])
X_test_num = scaler.transform(df_test[numeric_cols])

X_train = np.concatenate(
    [X_train_num, df_train[[proxy_col]].to_numpy()],
    axis=1
).astype(np.float32)
X_test = np.concatenate(
    [X_test_num, df_test[[proxy_col]].to_numpy()],
    axis=1
).astype(np.float32)

y_train = df_train["Y"].to_numpy(dtype=np.float32)
y_test = df_test["Y"].to_numpy(dtype=np.float32)
A_train = df_train["A"].to_numpy(dtype=np.int64)
A_test = df_test["A"].to_numpy(dtype=np.int64)

X_tr, X_val, y_tr, y_val = train_test_split(
    X_train, y_train, test_size=0.2, random_state=sim_cfg.seed, stratify=y_train
)

def build_loader(X, y, batch_size, shuffle=True):
    ds = TensorDataset(torch.from_numpy(X).float(), torch.from_numpy(y).float())
    return DataLoader(ds, batch_size=batch_size, shuffle=shuffle)

train_loader = build_loader(X_tr, y_tr, train_cfg.batch_size, shuffle=True)
val_loader = build_loader(X_val, y_val, train_cfg.batch_size, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X_train.shape, X_test.shape, device


## 4. GLM Baseline


In [None]:
glm = GLMClassifier().fit(X_train, y_train)
y_proba_glm = glm.predict_proba(X_test)


## 5. Neural Network


In [None]:
plain_nn = PlainNN(input_dim=X_train.shape[1]).to(device)
train_plain_nn(plain_nn, train_loader, val_loader, train_cfg, device)
y_proba_nn = predict_proba_plain_nn(plain_nn, X_test, device=device)


## 6. Adversarial Neural Network


In [None]:
train_cfg_adv = replace(train_cfg, lambda_adv=0.8)

adv_ds = TensorDataset(
    torch.from_numpy(X_train).float(),
    torch.from_numpy(y_train).float(),
    torch.from_numpy(A_train).long(),
)
adv_loader = DataLoader(adv_ds, batch_size=train_cfg.batch_size, shuffle=True)

adv_model = AdvPredictor(input_dim=X_train.shape[1]).to(device)
train_adv_nn(adv_model, adv_loader, train_cfg_adv, device=device)
y_proba_adv = predict_proba_adv_nn(adv_model, X_test, device=device)


In [None]:
# Sweep lambda for ADV_NN and pick the fairest model at 2% target rate
lambda_grid = [0.1, 0.3, 0.8, 1.5, 2.0]
frontier_rows = []
best_row = None
best_proba = None
for lam in lambda_grid:
    cfg_lam = replace(train_cfg, lambda_adv=lam)
    model_lam = AdvPredictor(input_dim=X_train.shape[1]).to(device)
    train_adv_nn(model_lam, adv_loader, cfg_lam, device=device)
    y_proba_lam = predict_proba_adv_nn(model_lam, X_test, device=device)
    acc = compute_accuracy_metrics(y_test, y_proba_lam)
    fair = fairness_at_target_rate(y_test, y_proba_lam, A_test, target_rate=0.02)
    row = {'model': 'ADV_NN', 'lambda_adv': lam, **acc, **fair}
    frontier_rows.append(row)
    score = (fair['eo_gap_tpr'], fair['eo_gap_fpr'], fair['dp_ratio'])
    if best_row is None or score < (best_row['eo_gap_tpr'], best_row['eo_gap_fpr'], best_row['dp_ratio']):
        best_row = row
        best_proba = y_proba_lam
df_frontier_02 = pd.DataFrame(frontier_rows)
y_proba_adv = best_proba
best_lambda_adv = float(best_row['lambda_adv'])
print(f"Selected ADV lambda={best_lambda_adv:.2f} with EO gap {best_row['eo_gap_tpr']:.4f} and ROC AUC {best_row['roc_auc']:.3f}")


## 7. Evaluation & Fairness


In [None]:
def summarize(model, y_true, y_proba, A_true, threshold=eval_cfg.threshold):
    acc = compute_accuracy_metrics(y_true, y_proba)
    fair = fairness_metrics(y_true, y_proba, A_true, threshold=threshold)
    return {"model": model, **acc, **fair}

summary_default = pd.DataFrame(
    [
        summarize("GLM", y_test, y_proba_glm, A_test),
        summarize("NN", y_test, y_proba_nn, A_test),
        summarize("ADV_NN", y_test, y_proba_adv, A_test),
    ]
)
summary_default


### Target 0.02


In [None]:
TARGET_RATE = 0.02

def summarize_fixed(model, y_true, y_proba, A_true):
    fair = fairness_at_target_rate(y_true, y_proba, A_true, TARGET_RATE)
    acc = compute_accuracy_metrics(y_true, y_proba)
    return {"model": model, **acc, **fair}

summary_fixed = pd.DataFrame(
    [
        summarize_fixed("GLM", y_test, y_proba_glm, A_test),
        summarize_fixed("NN", y_test, y_proba_nn, A_test),
        summarize_fixed("ADV_NN", y_test, y_proba_adv, A_test),
    ]
)
summary_fixed[
    ["model", "roc_auc", "eo_gap_tpr", "eo_gap_fpr", "dp_diff", "dp_ratio", "threshold", "actual_rate"]
]


## 9. Credit experiment plots

Use the GLM/NN/ADV_NN predictions from this notebook to visualize fairness vs accuracy and fairness vs high-risk rate. ADV_NN uses the best lambda from the sweep above.


In [None]:
import matplotlib.pyplot as plt
from src.evaluation.fairness import fairness_at_target_rate
from src.evaluation.metrics import compute_accuracy_metrics

target_rates = [0.01, 0.02, 0.05]
colors = {'GLM': 'tab:orange', 'NN': 'tab:green', 'ADV_NN': 'tab:purple'}

def summarize_at_rates(model, y_proba):
    acc = compute_accuracy_metrics(y_test, y_proba)
    rows = []
    for r in target_rates:
        fair = fairness_at_target_rate(y_test, y_proba, A_test, r)
        rows.append({'model': model, **acc, **fair})
    return rows

rows = []
rows += summarize_at_rates('GLM', y_proba_glm)
rows += summarize_at_rates('NN', y_proba_nn)
rows += summarize_at_rates('ADV_NN', y_proba_adv)
df_rates = pd.DataFrame(rows)
summary_fixed = df_rates[df_rates['target_rate'] == 0.02].reset_index(drop=True)
df_frontier_plot = pd.concat([
    df_frontier_02[['model','lambda_adv','eo_gap_tpr','roc_auc']],
    summary_fixed.assign(lambda_adv=np.nan)[['model','lambda_adv','eo_gap_tpr','roc_auc']]
])
display(summary_fixed[['model','roc_auc','eo_gap_tpr','eo_gap_fpr','dp_diff','dp_ratio','threshold','actual_rate']])


### 9.1 ADV lambda sweep (2% high-risk)

EO gap vs ROC AUC for ADV_NN across lambdas, with GLM/NN anchors.


In [None]:
fig, ax = plt.subplots(figsize=(7,5))
ax.grid(True, linestyle='--', alpha=0.5)
lambda_pts = df_frontier_plot[df_frontier_plot['model']=='ADV_NN']
if not lambda_pts.empty:
    ax.scatter(lambda_pts['eo_gap_tpr'], lambda_pts['roc_auc'], c='tab:blue', label='ADV lambda sweep')
    for _, row in lambda_pts.iterrows():
        if pd.notna(row.get('lambda_adv')):
            ax.annotate(f"lambda={row['lambda_adv']}", (row['eo_gap_tpr'], row['roc_auc']), textcoords='offset points', xytext=(5,5), fontsize=8)
for model in ['GLM','NN']:
    subset = summary_fixed[summary_fixed['model']==model]
    if subset.empty:
        continue
    row = subset.iloc[0]
    ax.scatter(row['eo_gap_tpr'], row['roc_auc'], c=colors.get(model, 'tab:gray'), marker='D', label=model)
    ax.annotate(model, (row['eo_gap_tpr'], row['roc_auc']), textcoords='offset points', xytext=(5,-10), fontsize=9, fontweight='bold', color=colors.get(model, 'black'))
ax.set_xlabel('EO TPR difference @ 2% high-risk')
ax.set_ylabel('ROC AUC')
ax.set_title('Fairness vs Accuracy (2% high-risk) - lambda sweep')
ax.legend()
plt.show()


### 9.2 Fairness vs accuracy (2% high-risk, chosen lambda)


In [None]:
fig, ax = plt.subplots(figsize=(7,5))
ax.grid(True, linestyle='--', alpha=0.5)
for _, row in summary_fixed.iterrows():
    ax.scatter(row['eo_gap_tpr'], row['roc_auc'], color=colors.get(row['model'],'tab:blue'), marker='D', label=row['model'])
    ax.annotate(row['model'], (row['eo_gap_tpr'], row['roc_auc']), textcoords='offset points', xytext=(5,-10), fontsize=9, fontweight='bold', color=colors.get(row['model'],'black'))
ax.set_title('Fairness vs Accuracy (2% high-risk)')
ax.set_xlabel('EO TPR difference @2%')
ax.set_ylabel('ROC AUC')
ax.legend()
plt.show()


### 9.3 Fairness vs high-risk rate


In [None]:
fig, ax = plt.subplots(figsize=(7,5))
for model, color in colors.items():
    subset = df_rates[df_rates['model']==model].sort_values('target_rate')
    ax.plot(subset['target_rate'], subset['dp_ratio'], marker='o', color=color, label=model)
ax.set_xlabel('High-risk rate')
ax.set_ylabel('DP ratio')
ax.set_title('DP ratio vs high-risk rate')
ax.grid(True, linestyle='--', alpha=0.5)
ax.legend()
plt.show()

fig, ax = plt.subplots(figsize=(7,5))
for model, color in colors.items():
    subset = df_rates[df_rates['model']==model].sort_values('target_rate')
    ax.plot(subset['target_rate'], subset['eo_gap_tpr'], marker='o', color=color, label=model)
ax.set_xlabel('High-risk rate')
ax.set_ylabel('EO TPR difference')
ax.set_title('EO gap vs high-risk rate')
ax.grid(True, linestyle='--', alpha=0.5)
ax.legend()
plt.show()


### 9.4 Score distributions by group (GLM vs ADV_NN)


In [None]:
fig, axes = plt.subplots(1, 2, figsize=(14, 4))
for ax, group in zip(axes, [0, 1]):
    mask = (A_test == group)
    all_scores = np.concatenate([y_proba_glm[mask], y_proba_adv[mask]])
    bins = np.linspace(all_scores.min(), all_scores.max(), 51)
    ax.hist(y_proba_glm[mask], bins=bins, alpha=0.5, label=f'GLM A={group}')
    ax.hist(y_proba_adv[mask], bins=bins, alpha=0.5, label=f'ADV A={group}')
    ax.set_title(f'Predicted probability distribution (A={group})')
    ax.set_xlabel('Predicted probability')
    ax.set_ylabel('Count')
    ax.legend()
plt.tight_layout()
plt.show()


## 10. Simulated variable distributions

Quick check of the generated credit features and targets using the df_full dataset from earlier.


In [None]:
numeric_pairs_1 = ['S_star', 'S']
numeric_pairs_2 = ['D', 'L']
binary_cols = ['A', 'Z', 'Y']

fig, axes = plt.subplots(1, 2, figsize=(10, 3))
axes = axes.ravel()
axes[0].hist(df_full[numeric_pairs_1[0]], bins=40, color='tab:blue', alpha=0.6)
axes[0].set_title(numeric_pairs_1[0])
axes[1].hist(df_full[numeric_pairs_1[1]], bins=40, color='tab:orange', alpha=0.6)
axes[1].set_title(numeric_pairs_1[1])
plt.tight_layout()
plt.show()

fig, axes = plt.subplots(1, 2, figsize=(10, 3))
axes = axes.ravel()
axes[0].hist(df_full[numeric_pairs_2[0]], bins=40, color='tab:green', alpha=0.7)
axes[0].set_title(numeric_pairs_2[0])
axes[1].hist(df_full[numeric_pairs_2[1]], bins=40, color='tab:purple', alpha=0.7)
axes[1].set_title(numeric_pairs_2[1])
plt.tight_layout()
plt.show()

fig, axes = plt.subplots(1, len(binary_cols), figsize=(3 * len(binary_cols), 3))
for ax, col in zip(axes, binary_cols):
    counts = df_full[col].value_counts().reindex([0, 1], fill_value=0)
    ax.bar(counts.index.astype(str), counts.values, color='tab:cyan', alpha=0.7)
    ax.set_title(col)
plt.tight_layout()
plt.show()
