In [5]:
import os, json, pandas as pd, numpy as np
from pathlib import Path

DATA_PATH = Path("data/raw/german_credit.csv")
Path("reports").mkdir(parents=True, exist_ok=True)
Path("splits").mkdir(parents=True, exist_ok=True)


In [6]:
from sklearn.datasets import fetch_openml

DATA_PATH = Path("data/raw/german_credit.csv")

if DATA_PATH.exists():
    df = pd.read_csv(DATA_PATH)
else:
    try:
        # try by name
        data = fetch_openml(name="credit-g", as_frame=True, parser="pandas")
    except Exception as e1:
        # fallback: try by ID
        data = fetch_openml(data_id=31, as_frame=True, parser="pandas")

    df = data.frame
    # make binary target
    if "class" in df.columns:
        df["default"] = (df["class"] == "bad").astype(int)
        df = df.drop(columns=["class"])
    # tidy names
    df.columns = [c.replace(".", "_").lower() for c in df.columns]
    # persist a clean copy
    DATA_PATH.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(DATA_PATH, index=False)

df.shape, df["default"].mean()



- version 1, status: active
  url: https://www.openml.org/search?type=data&id=31
- version 2, status: active
  url: https://www.openml.org/search?type=data&id=44096



((1000, 21), np.float64(0.3))

In [7]:
from pandas.api.types import is_numeric_dtype, is_categorical_dtype, is_object_dtype, is_bool_dtype

print("shape:", df.shape)
print("target rate:", df["default"].mean())

# Identify dtypes safely
numeric = [c for c in df.columns if c != "default" and is_numeric_dtype(df[c])]
categorical = [
    c for c in df.columns
    if c != "default" and (
        isinstance(df[c].dtype, pd.CategoricalDtype) or
        df[c].dtype == "object" or
        is_bool_dtype(df[c])
    )
]

missing = df.isna().sum().sort_values(ascending=False)
cardinality = df.nunique().sort_values(ascending=False)

summary = {
    "rows": int(df.shape[0]),
    "cols": int(df.shape[1]),
    "positive_rate": float(df["default"].mean()),
    "n_numeric": len(numeric),
    "n_categorical": len(categorical),
}

display(df.head(3))
summary

shape: (1000, 21)
target rate: 0.3


Unnamed: 0,checking_status,duration,credit_history,purpose,credit_amount,savings_status,employment,installment_commitment,personal_status,other_parties,...,property_magnitude,age,other_payment_plans,housing,existing_credits,job,num_dependents,own_telephone,foreign_worker,default
0,<0,6,critical/other existing credit,radio/tv,1169,no known savings,>=7,4,male single,none,...,real estate,67,none,own,2,skilled,1,yes,yes,0
1,0<=X<200,48,existing paid,radio/tv,5951,<100,1<=X<4,2,female div/dep/mar,none,...,real estate,22,none,own,1,skilled,1,none,yes,1
2,no checking,12,critical/other existing credit,education,2096,<100,4<=X<7,2,male single,none,...,real estate,49,none,own,1,unskilled resident,2,none,yes,0


{'rows': 1000,
 'cols': 21,
 'positive_rate': 0.3,
 'n_numeric': 7,
 'n_categorical': 13}

In [8]:
os.makedirs("reports", exist_ok=True)

eda = {
    "rows": int(df.shape[0]),
    "cols": int(df.shape[1]),
    "positive_rate": float(df["default"].mean()),
    "numeric": numeric,
    "categorical": categorical,
    "top_missing": missing.head(10).to_dict(),
    "top_cardinality": cardinality.head(10).to_dict(),
}
with open("reports/eda_summary.json", "w") as f:
    json.dump(eda, f, indent=2)
print("saved: reports/eda_summary.json")

saved: reports/eda_summary.json


In [9]:
from sklearn.model_selection import train_test_split
os.makedirs("splits", exist_ok=True)

y = df["default"]
X = df.drop(columns=["default"])

X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.40, stratify=y, random_state=42
)
X_valid, X_test, y_valid, y_test = train_test_split(
    X_temp, y_temp, test_size=0.50, stratify=y_temp, random_state=42
)

pd.DataFrame({"index": X_train.index}).to_csv("splits/train_idx.csv", index=False)
pd.DataFrame({"index": X_valid.index}).to_csv("splits/valid_idx.csv", index=False)
pd.DataFrame({"index": X_test.index}).to_csv("splits/test_idx.csv", index=False)
print("splits saved")


splits saved


In [10]:
schema = {"numeric": numeric, "categorical": categorical, "target": "default"}
with open("reports/feature_schema.json", "w") as f:
    json.dump(schema, f, indent=2)
print("saved: reports/feature_schema.json")


saved: reports/feature_schema.json


In [11]:
from pathlib import Path
print("CWD:", Path.cwd())
print("Repo exists here? ", (Path.home() / "Documents" / "credit-risk-scorecard").exists())


CWD: C:\Users\balla\Documents\credit-risk-scorecard\notebooks
Repo exists here?  True


In [12]:
import shutil

REPO = Path.home() / "Documents" / "credit-risk-scorecard"
NB   = REPO / "notebooks"

# sources 
src_csv   = NB / "data" / "raw" / "german_credit.csv"
src_splits= NB / "splits"
src_reports = NB / "reports"

# destinations 
dst_csv   = REPO / "data" / "raw" / "german_credit.csv"
dst_splits= REPO / "splits"
dst_reports = REPO / "reports"

dst_csv.parent.mkdir(parents=True, exist_ok=True)
dst_splits.mkdir(parents=True, exist_ok=True)
dst_reports.mkdir(parents=True, exist_ok=True)

# move CSV
if src_csv.exists():
    shutil.move(str(src_csv), str(dst_csv))

# move split CSVs
if src_splits.exists():
    for p in src_splits.glob("*.csv"):
        shutil.move(str(p), str(dst_splits / p.name))

# move report JSONs
if src_reports.exists():
    for p in src_reports.glob("*.json"):
        shutil.move(str(p), str(dst_reports / p.name))

print("moved files to repo root")


moved files to repo root


In [13]:
print("data/raw:", list((REPO/"data"/"raw").glob("*")))
print("splits:",   list((REPO/"splits").glob("*")))
print("reports:",  list((REPO/"reports").glob("*")))

data/raw: [WindowsPath('C:/Users/balla/Documents/credit-risk-scorecard/data/raw/german_credit.csv')]
splits: [WindowsPath('C:/Users/balla/Documents/credit-risk-scorecard/splits/test_idx.csv'), WindowsPath('C:/Users/balla/Documents/credit-risk-scorecard/splits/train_idx.csv'), WindowsPath('C:/Users/balla/Documents/credit-risk-scorecard/splits/valid_idx.csv')]
reports: [WindowsPath('C:/Users/balla/Documents/credit-risk-scorecard/reports/eda_summary.json'), WindowsPath('C:/Users/balla/Documents/credit-risk-scorecard/reports/feature_schema.json')]


In [22]:
import json, os, numpy as np, pandas as pd, matplotlib.pyplot as plt
from pathlib import Path

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.metrics import (
    roc_auc_score, average_precision_score, brier_score_loss,
    roc_curve, precision_recall_curve, confusion_matrix
)
from sklearn.calibration import calibration_curve
import joblib


REPO        = Path.cwd().parent                   
DATA_PATH   = REPO / "data" / "raw" / "german_credit.csv"
SPLITS_DIR  = REPO / "splits"
REPORTS_DIR = REPO / "reports"
MODELS_DIR  = REPO / "models"

print("Using:", DATA_PATH)

# --- load data + splits + schema ---
df = pd.read_csv(DATA_PATH)
train_idx = pd.read_csv(SPLITS_DIR / "train_idx.csv")["index"].to_numpy()
valid_idx = pd.read_csv(SPLITS_DIR / "valid_idx.csv")["index"].to_numpy()
test_idx  = pd.read_csv(SPLITS_DIR  / "test_idx.csv")["index"].to_numpy()

with open(REPORTS_DIR / "feature_schema.json") as f:
    schema = json.load(f)
num_cols, cat_cols = schema["numeric"], schema["categorical"]
target = schema.get("target", "default")

X_train, y_train = df.loc[train_idx, num_cols + cat_cols], df.loc[train_idx, target]
X_valid, y_valid = df.loc[valid_idx, num_cols + cat_cols], df.loc[valid_idx, target]
X_test,  y_test  = df.loc[test_idx,  num_cols + cat_cols], df.loc[test_idx,  target]

len(X_train), len(X_valid), len(X_test), y_train.mean()

MODELS_DIR.mkdir(parents=True, exist_ok=True)
REPORTS_DIR.mkdir(parents=True, exist_ok=True)


Using: C:\Users\balla\Documents\credit-risk-scorecard\data\raw\german_credit.csv


In [26]:
# preprocess
num_pipe = Pipeline([
    ("imp", SimpleImputer(strategy="median")),
    ("scaler", StandardScaler()),
])
cat_pipe = Pipeline([
    ("imp", SimpleImputer(strategy="most_frequent")),
    ("ohe", OneHotEncoder(handle_unknown="ignore")),
])
pre = ColumnTransformer([
    ("num", num_pipe, num_cols),
    ("cat", cat_pipe, cat_cols),
])

# model
logreg = LogisticRegression(max_iter=2000, class_weight="balanced", solver="lbfgs")
pipe = Pipeline([("pre", pre), ("clf", logreg)])
pipe.fit(X_train, y_train)

joblib.dump(pipe, MODELS_DIR / "baseline_logreg.pkl")
"trained + saved baseline"

'trained + saved baseline'

In [24]:
def ks_stat(y_true, y_score):
    fpr, tpr, _ = roc_curve(y_true, y_score)
    return float(np.max(tpr - fpr))

p_valid = pipe.predict_proba(X_valid)[:, 1]

metrics_valid = {
    "roc_auc": float(roc_auc_score(y_valid, p_valid)),
    "pr_auc": float(average_precision_score(y_valid, p_valid)),
    "brier": float(brier_score_loss(y_valid, p_valid)),
    "ks": ks_stat(y_valid, p_valid),
    "base_rate": float(y_valid.mean()),
}

prec, rec, thr = precision_recall_curve(y_valid, p_valid)
f1 = (2 * prec[:-1] * rec[:-1]) / (prec[:-1] + rec[:-1] + 1e-12)
best_i = int(np.nanargmax(f1))
best_thr = float(thr[best_i])

y_valid_hat = (p_valid >= best_thr).astype(int)
cm_valid = confusion_matrix(y_valid, y_valid_hat).tolist()

Path("reports").mkdir(parents=True, exist_ok=True)
prob_true, prob_pred = calibration_curve(y_valid, p_valid, n_bins=10, strategy="quantile")
plt.figure()
plt.plot([0,1],[0,1],"--")
plt.plot(prob_pred, prob_true, marker="o")
plt.xlabel("Predicted probability"); plt.ylabel("Observed frequency")
plt.title("Calibration — Baseline (validation)")
plt.tight_layout()
plt.savefig("reports/calibration_baseline_valid.png", dpi=150)
plt.close()

with open(REPORTS_DIR / "metrics_baseline_valid.json","w") as f:
    json.dump({
        "metrics_valid": metrics_valid,
        "best_threshold": best_thr,
        "confusion_valid": cm_valid
    }, f, indent=2)

metrics_valid, best_thr, cm_valid


({'roc_auc': 0.805595238095238,
  'pr_auc': 0.6453582728321428,
  'brier': 0.18685424120648478,
  'ks': 0.4642857142857143,
  'base_rate': 0.3},
 0.6199655024289628,
 [[114, 26], [21, 39]])

In [25]:
p_test = pipe.predict_proba(X_test)[:, 1]

metrics_test = {
    "roc_auc": float(roc_auc_score(y_test, p_test)),
    "pr_auc": float(average_precision_score(y_test, p_test)),
    "brier": float(brier_score_loss(y_test, p_test)),
    "ks": ks_stat(y_test, p_test),
    "base_rate": float(y_test.mean()),
    "threshold_used": best_thr,
}
y_test_hat = (p_test >= best_thr).astype(int)
cm_test = confusion_matrix(y_test, y_test_hat).tolist()

with open(REPORTS_DIR / "metrics_baseline_test.json","w") as f:
    json.dump({
        "metrics_test": metrics_test,
        "confusion_test": cm_test
    }, f, indent=2)

metrics_test, cm_test

({'roc_auc': 0.7901190476190476,
  'pr_auc': 0.6217050843038479,
  'brier': 0.18775861644483072,
  'ks': 0.48809523809523814,
  'base_rate': 0.3,
  'threshold_used': 0.6199655024289628},
 [[113, 27], [23, 37]])

In [27]:
# Laplace smoothing 
_EPS = 0.5

def _woe_table(bin_series: pd.Series, y: pd.Series, eps=_EPS):
    dfb = pd.DataFrame({"bin": bin_series, "y": y}).dropna()
    g = dfb.groupby("bin").y
    good = (g.count() - g.sum()).astype(float)
    bad  = g.sum().astype(float)
    good_t, bad_t = good.sum(), bad.sum()
    woe = np.log(((good + eps)/(good_t + eps)) / ((bad + eps)/(bad_t + eps)))
    iv  = (((good + eps)/(good_t + eps)) - ((bad + eps)/(bad_t + eps))) * woe
    tab = pd.DataFrame({"good": good, "bad": bad, "woe": woe, "iv": iv})
    return tab, float(iv.sum())

def fit_woe_numeric(x: pd.Series, y: pd.Series, q: int = 5):
    # quantile bins; drop duplicate edges if needed
    binned, edges = None, None
    for k in range(q, 1, -1):  
        try:
            binned, edges = pd.qcut(x, q=k, duplicates="drop", retbins=True)
            break
        except Exception:
            continue
    if binned is None:
        # fallback: single bin → zero WOE
        return {"type": "numeric", "edges": [], "woe_by_bin": {}, "iv": 0.0}
    tab, iv = _woe_table(binned, y)
    # store WOE keyed by interval string
    woe_by_bin = {str(idx): float(v) for idx, v in tab["woe"].items()}
    return {"type": "numeric", "edges": edges.tolist(), "woe_by_bin": woe_by_bin, "iv": iv}

def fit_woe_categorical(x: pd.Series, y: pd.Series):
    x = x.astype("object")
    tab, iv = _woe_table(x, y)
    woe_by_cat = {str(idx): float(v) for idx, v in tab["woe"].items()}
    return {"type": "categorical", "woe_by_cat": woe_by_cat, "iv": iv}

def apply_woe_numeric(x: pd.Series, edges, woe_by_bin):
    if not edges:
        return pd.Series(0.0, index=x.index)
    binned = pd.cut(x, bins=np.array(edges), include_lowest=True)
    s = binned.astype(str).map(woe_by_bin)
    return s.fillna(0.0)

def apply_woe_categorical(x: pd.Series, woe_by_cat):
    s = x.astype("object").map(woe_by_cat)
    return s.fillna(0.0)


In [28]:
from pandas.api.types import is_numeric_dtype

BIN_PATH = REPORTS_DIR / "binning.json"
IV_PATH  = REPORTS_DIR / "iv_table.csv"

woe_cfg = {}
iv_rows = []

# Fit on TRAIN only
for col in num_cols:
    cfg = fit_woe_numeric(X_train[col], y_train, q=5)
    woe_cfg[col] = cfg
    iv_rows.append({"feature": col, "type": "num", "iv": cfg["iv"]})

for col in cat_cols:
    cfg = fit_woe_categorical(X_train[col], y_train)
    woe_cfg[col] = cfg
    iv_rows.append({"feature": col, "type": "cat", "iv": cfg["iv"]})

# Save binning + IV table
with open(BIN_PATH, "w") as f:
    json.dump(woe_cfg, f, indent=2)
pd.DataFrame(iv_rows).sort_values("iv", ascending=False).to_csv(IV_PATH, index=False)

# Transform → WOE space
def to_woe_frame(df_part: pd.DataFrame):
    cols = []
    for col in num_cols:
        cfg = woe_cfg[col]
        s = apply_woe_numeric(df_part[col], cfg["edges"], cfg["woe_by_bin"])
        cols.append(s.rename(f"woe_{col}"))
    for col in cat_cols:
        cfg = woe_cfg[col]
        s = apply_woe_categorical(df_part[col], cfg["woe_by_cat"])
        cols.append(s.rename(f"woe_{col}"))
    return pd.concat(cols, axis=1)

Xtr_w, Xva_w, Xte_w = to_woe_frame(X_train), to_woe_frame(X_valid), to_woe_frame(X_test)
Xtr_w.head(3), Xva_w.head(3)


  g = dfb.groupby("bin").y
  g = dfb.groupby("bin").y
  g = dfb.groupby("bin").y
  g = dfb.groupby("bin").y
  g = dfb.groupby("bin").y
  g = dfb.groupby("bin").y
  g = dfb.groupby("bin").y


(     woe_duration  woe_credit_amount  woe_installment_commitment  \
 381     -0.163576          -0.339293                    0.001584   
 580     -0.163576          -0.006274                    0.001584   
 88      -0.163576           0.021505                   -0.165605   
 
      woe_residence_since   woe_age  woe_existing_credits  woe_num_dependents  \
 381            -0.012181  0.164233             -0.001659                 0.0   
 580            -0.012181  0.067907             -0.001659                 0.0   
 88             -0.012181  0.067907             -0.001659                 0.0   
 
      woe_checking_status  woe_credit_history  woe_purpose  woe_savings_status  \
 381            -0.399757           -0.087562     0.589371           -0.293184   
 580            -0.399757            0.743176    -0.187835           -0.293184   
 88             -0.714574           -0.087562    -0.187835           -0.163262   
 
      woe_employment  woe_personal_status  woe_other_parties  \
 3

In [29]:
from sklearn.linear_model import LogisticRegression

scard = LogisticRegression(max_iter=2000, class_weight="balanced", solver="lbfgs")
scard.fit(Xtr_w, y_train)

# VALID
p_valid_sc = scard.predict_proba(Xva_w)[:, 1]
def ks_stat(y_true, y_score):
    fpr, tpr, _ = roc_curve(y_true, y_score)
    return float((tpr - fpr).max())

metrics_valid_sc = {
    "roc_auc": float(roc_auc_score(y_valid, p_valid_sc)),
    "pr_auc": float(average_precision_score(y_valid, p_valid_sc)),
    "brier": float(brier_score_loss(y_valid, p_valid_sc)),
    "ks": ks_stat(y_valid, p_valid_sc),
    "base_rate": float(y_valid.mean()),
}

# threshold by max F1 on VALID
prec, rec, thr = precision_recall_curve(y_valid, p_valid_sc)
f1 = (2 * prec[:-1] * rec[:-1]) / (prec[:-1] + rec[:-1] + 1e-12)
best_i_sc = int(np.nanargmax(f1))
best_thr_sc = float(thr[best_i_sc])

# calibration plot (VALID)
prob_true, prob_pred = calibration_curve(y_valid, p_valid_sc, n_bins=10, strategy="quantile")
plt.figure()
plt.plot([0,1],[0,1],"--")
plt.plot(prob_pred, prob_true, marker="o")
plt.xlabel("Predicted probability"); plt.ylabel("Observed frequency")
plt.title("Calibration — Scorecard (validation)")
plt.tight_layout()
plt.savefig(REPORTS_DIR / "calibration_scorecard_valid.png", dpi=150)
plt.close()

# TEST with fixed threshold
p_test_sc = scard.predict_proba(Xte_w)[:, 1]
metrics_test_sc = {
    "roc_auc": float(roc_auc_score(y_test, p_test_sc)),
    "pr_auc": float(average_precision_score(y_test, p_test_sc)),
    "brier": float(brier_score_loss(y_test, p_test_sc)),
    "ks": ks_stat(y_test, p_test_sc),
    "base_rate": float(y_test.mean()),
    "threshold_used": best_thr_sc,
}

from sklearn.metrics import confusion_matrix
cm_valid_sc = confusion_matrix(y_valid, (p_valid_sc >= best_thr_sc)).tolist()
cm_test_sc  = confusion_matrix(y_test,  (p_test_sc  >= best_thr_sc)).tolist()

# Save artifacts
MODELS_DIR.mkdir(parents=True, exist_ok=True)
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
import joblib, json
joblib.dump(scard, MODELS_DIR / "scorecard_logreg.pkl")
with open(REPORTS_DIR / "metrics_scorecard_valid.json","w") as f:
    json.dump({"metrics_valid": metrics_valid_sc,
               "best_threshold": best_thr_sc,
               "confusion_valid": cm_valid_sc}, f, indent=2)
with open(REPORTS_DIR / "metrics_scorecard_test.json","w") as f:
    json.dump({"metrics_test": metrics_test_sc,
               "confusion_test": cm_test_sc}, f, indent=2)

metrics_valid_sc, best_thr_sc, metrics_test_sc


({'roc_auc': 0.8026190476190477,
  'pr_auc': 0.6447269935240115,
  'brier': 0.18317461409080202,
  'ks': 0.4904761904761905,
  'base_rate': 0.3},
 0.49179808894079324,
 {'roc_auc': 0.7899999999999999,
  'pr_auc': 0.5903092788969597,
  'brier': 0.18531904333479268,
  'ks': 0.519047619047619,
  'base_rate': 0.3,
  'threshold_used': 0.49179808894079324})

In [30]:
import json
with open(REPORTS_DIR / "metrics_baseline_valid.json") as f: base_v = json.load(f)["metrics_valid"]
with open(REPORTS_DIR / "metrics_baseline_test.json")  as f: base_t = json.load(f)["metrics_test"]
with open(REPORTS_DIR / "metrics_scorecard_valid.json") as f: sc_v = json.load(f)["metrics_valid"]
with open(REPORTS_DIR / "metrics_scorecard_test.json")  as f: sc_t = json.load(f)["metrics_test"]
{"valid": {"baseline": base_v, "scorecard": sc_v},
 "test":  {"baseline": base_t, "scorecard": sc_t}}


{'valid': {'baseline': {'roc_auc': 0.805595238095238,
   'pr_auc': 0.6453582728321428,
   'brier': 0.18685424120648478,
   'ks': 0.4642857142857143,
   'base_rate': 0.3},
  'scorecard': {'roc_auc': 0.8026190476190477,
   'pr_auc': 0.6447269935240115,
   'brier': 0.18317461409080202,
   'ks': 0.4904761904761905,
   'base_rate': 0.3}},
 'test': {'baseline': {'roc_auc': 0.7901190476190476,
   'pr_auc': 0.6217050843038479,
   'brier': 0.18775861644483072,
   'ks': 0.48809523809523814,
   'base_rate': 0.3,
   'threshold_used': 0.6199655024289628},
  'scorecard': {'roc_auc': 0.7899999999999999,
   'pr_auc': 0.5903092788969597,
   'brier': 0.18531904333479268,
   'ks': 0.519047619047619,
   'base_rate': 0.3,
   'threshold_used': 0.49179808894079324}}}

In [31]:
from sklearn.isotonic import IsotonicRegression
import numpy as np, json

# Use the scorecard raw scores you already have:
# p_valid_sc, p_test_sc from Cell G

iso = IsotonicRegression(out_of_bounds="clip")
iso.fit(p_valid_sc, y_valid.astype(float))

p_valid_sc_cal = iso.transform(p_valid_sc)
p_test_sc_cal  = iso.transform(p_test_sc)

def ks_stat(y_true, y_score):
    fpr, tpr, _ = roc_curve(y_true, y_score)
    return float((tpr - fpr).max())

cal_valid = {
    "roc_auc": float(roc_auc_score(y_valid, p_valid_sc_cal)),   # (won't change much)
    "pr_auc": float(average_precision_score(y_valid, p_valid_sc_cal)),
    "brier":  float(brier_score_loss(y_valid, p_valid_sc_cal)), # should ↓ or hold
    "ks":     ks_stat(y_valid, p_valid_sc_cal),
    "base_rate": float(y_valid.mean()),
}

cal_test = {
    "roc_auc": float(roc_auc_score(y_test, p_test_sc_cal)),
    "pr_auc": float(average_precision_score(y_test, p_test_sc_cal)),
    "brier":  float(brier_score_loss(y_test, p_test_sc_cal)),
    "ks":     ks_stat(y_test, p_test_sc_cal),
    "base_rate": float(y_test.mean()),
}

with open(REPORTS_DIR / "metrics_scorecard_valid_calibrated.json","w") as f:
    json.dump(cal_valid, f, indent=2)
with open(REPORTS_DIR / "metrics_scorecard_test_calibrated.json","w") as f:
    json.dump(cal_test, f, indent=2)

cal_valid, cal_test


({'roc_auc': 0.8227380952380953,
  'pr_auc': 0.646482977471909,
  'brier': 0.14843071736896365,
  'ks': 0.4904761904761905,
  'base_rate': 0.3},
 {'roc_auc': 0.7845238095238095,
  'pr_auc': 0.5727944470389871,
  'brier': 0.16373407110788954,
  'ks': 0.5023809523809524,
  'base_rate': 0.3})

In [32]:
# Choose threshold by simple expected-cost (FN = 5 x FP)
thr_grid = np.linspace(0.05, 0.95, 181)
best_thr_cost = None
best_cost = float("inf")

for t in thr_grid:
    yhat = (p_valid_sc_cal >= t).astype(int)
    # confusion entries: tn, fp, fn, tp
    tn, fp, fn, tp = confusion_matrix(y_valid, yhat).ravel()
    cost = 5*fn + 1*fp
    if cost < best_cost:
        best_cost, best_thr_cost = cost, float(t)

best_thr_cost, best_cost


(0.075, np.int64(92))

In [33]:
import pandas as pd

# bucket edges (tune if you like)
edges = np.array([0.0, 0.05, 0.10, 0.20, 1.0])
labels = ["Very Low", "Low", "Medium", "High"]

# assign buckets on TEST (holdout)
buckets = pd.cut(p_test_sc_cal, bins=edges, labels=labels, include_lowest=True)

bucket_df = pd.DataFrame({
    "bucket": buckets,
    "pd": p_test_sc_cal,
    "y": y_test.values
})
summary = bucket_df.groupby("bucket").agg(
    n=("y","size"),
    default_rate=("y","mean"),
    pd_mean=("pd","mean")
).reset_index()

summary.to_csv(REPORTS_DIR / "risk_buckets_test.csv", index=False)
summary


  summary = bucket_df.groupby("bucket").agg(


Unnamed: 0,bucket,n,default_rate,pd_mean
0,Very Low,39,0.076923,0.02723
1,Low,35,0.085714,0.073171
2,Medium,22,0.136364,0.1875
3,High,104,0.490385,0.496315


In [34]:
pd.read_csv(REPORTS_DIR / "iv_table.csv").sort_values("iv", ascending=False).head(10)


Unnamed: 0,feature,type,iv
0,checking_status,cat,0.534943
1,credit_history,cat,0.258963
2,savings_status,cat,0.246377
3,employment,cat,0.160335
4,duration,num,0.140566
5,other_payment_plans,cat,0.13148
6,purpose,cat,0.121404
7,housing,cat,0.117337
8,property_magnitude,cat,0.093637
9,foreign_worker,cat,0.086427
