In [9]:
# pd_ead_ecl_pipeline_legends.py
# -----------------------------------------------------------
# End-to-end PD (Logit) + Exposure (EE/PFE/EAD) + ECL pipeline
# Generates synthetic data, trains a logistic model, evaluates,
# estimates EAD from EE/PFE, computes ECL, plots results (with legends),
# and writes a multi-sheet Excel report.
# -----------------------------------------------------------

import os
from datetime import datetime

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.api as sm
from scipy.stats import norm


# -------------------------------------------------------------------
# 0) CONFIG
# -------------------------------------------------------------------
np.random.seed(42)
OUTPUT_DIR = "."  # change to "/mnt/data" if you prefer
PLOT_DPI = 150
PFE_QUANTILE = 0.95     # quantile for PFE
LAMBDA_EAD = 0.5        # EAD blend: EAD = EE + lambda * PFE
BASE_VOL = 0.10         # base relative volatility for exposure
ASSUMED_LGD = 0.45      # constant LGD (can be replaced with model)
HORIZONS_MONTHS = np.array([1, 3, 6, 12])  # EAD horizons

# -------------------------------------------------------------------
# 1) SYNTHETIC DATA GENERATION
# -------------------------------------------------------------------

def gen_synthetic_credit_data(n=2000):
    """
    Create a synthetic dataset with intuitive relationships between features
    and default probability using a latent logistic function.
    """
    credit_score = np.clip(np.round(np.random.normal(680, 70, n)), 300, 850)
    loan_amount = np.round(np.random.lognormal(mean=11.0, sigma=0.7, size=n))
    loan_amount = np.clip(loan_amount, 5e4, 1e7)
    tenure_months = np.random.randint(6, 121, size=n)
    interest_rate = np.round(np.random.uniform(0.06, 0.24, size=n), 4)
    utilization = np.round(np.random.uniform(0.3, 1.0, size=n), 3)
    limit_amount = np.round(loan_amount / np.maximum(0.01, utilization))

    # Latent PD via logistic with intuitive signs
    x_score = (700 - credit_score) / 100.0
    x_amt = np.log(loan_amount) - np.log(2e5)
    x_ten = (tenure_months - 36) / 12.0
    x_rate = (interest_rate - 0.12) * 10
    x_util = (utilization - 0.7) * 2

    linpred = -2.2 + 0.8*x_score + 0.25*x_amt + 0.15*x_ten + 0.4*x_rate + 0.5*x_util
    true_pd = 1.0 / (1.0 + np.exp(-linpred))
    default_flag = (np.random.uniform(size=n) < true_pd).astype(int)

    return pd.DataFrame({
        'counterparty_id': [f'C{i:05d}' for i in range(1, n+1)],
        'credit_score': credit_score.astype(int),
        'loan_amount': loan_amount.astype(float),
        'limit_amount': limit_amount.astype(float),
        'tenure_months': tenure_months.astype(int),
        'interest_rate': interest_rate.astype(float),
        'utilization': utilization.astype(float),
        'default_flag': default_flag.astype(int),
        'true_pd': true_pd
    })

# -------------------------------------------------------------------
# 2) PD MODELING (LOGISTIC REGRESSION)
# -------------------------------------------------------------------

def prepare_features(df):
    df = df.copy()
    df['log_loan_amt'] = np.log(df['loan_amount'])
    df['rate_pct'] = df['interest_rate'] * 100.0
    df['score_centered'] = df['credit_score'] - 700
    model_cols = ['score_centered', 'log_loan_amt', 'tenure_months', 'rate_pct', 'utilization']
    X = sm.add_constant(df[model_cols])
    y = df['default_flag'].astype(int)
    return X, y, model_cols

def fit_logit(X, y):
    return sm.Logit(y, X).fit(disp=False)

def roc_curve_manual(y_true, y_score, num=400):
    thresholds = np.linspace(0, 1, num)
    tpr_list, fpr_list = [], []
    P = (y_true == 1).sum()
    N = (y_true == 0).sum()
    for t in thresholds:
        y_pred = (y_score >= t).astype(int)
        TP = ((y_pred == 1) & (y_true == 1)).sum()
        FP = ((y_pred == 1) & (y_true == 0)).sum()
        TPR = TP / P if P > 0 else 0.0
        FPR = FP / N if N > 0 else 0.0
        tpr_list.append(TPR)
        fpr_list.append(FPR)
    return np.array(fpr_list), np.array(tpr_list), thresholds

def auc_trapz(fpr, tpr):
    sort_idx = np.argsort(fpr)
    return np.trapz(tpr[sort_idx], fpr[sort_idx]), sort_idx

def confusion_at_threshold(y_true, y_score, threshold):
    y_pred = (y_score >= threshold).astype(int)
    TP = int(((y_pred == 1) & (y_true == 1)).sum())
    TN = int(((y_pred == 0) & (y_true == 0)).sum())
    FP = int(((y_pred == 1) & (y_true == 0)).sum())
    FN = int(((y_pred == 0) & (y_true == 1)).sum())
    return TP, TN, FP, FN

def calibration_curve_df(y_true, y_prob, n_bins=10):
    df = pd.DataFrame({'y': y_true, 'p': y_prob}).copy()
    df['bin'] = pd.qcut(df['p'], q=n_bins, duplicates='drop')
    g = df.groupby('bin', observed=True).agg(
        avg_p=('p', 'mean'),
        obs_rate=('y', 'mean'),
        count=('y', 'size')
    ).reset_index(drop=True)
    return g

# -------------------------------------------------------------------
# 3) EXPOSURE CALCULATION (EE/PFE) & EAD
# -------------------------------------------------------------------

def balance_at_t(loan_amt, tenure_m, t_month):
    """Linear amortization of principal to horizon t."""
    frac = np.clip(1.0 - t_month / np.maximum(tenure_m, 1), 0.0, 1.0)
    return loan_amt * frac

def ee_trunc_normal(mu, sigma):
    """
    EE for X = max(0, N(mu, sigma^2))
    EE = sigma*phi(a) + mu*Phi(a), a = mu/sigma
    """
    if sigma <= 0:
        return max(mu, 0.0)
    a = mu / sigma
    return sigma * norm.pdf(a) + mu * norm.cdf(a)

def pfe_trunc_normal(mu, sigma, quantile=PFE_QUANTILE):
    """PFE_q = max(0, mu + sigma*z_q) under normal approximation."""
    z = norm.ppf(quantile)
    return max(mu + sigma * z, 0.0)

def compute_counterparty_ead_rows(test_df, horizons=HORIZONS_MONTHS,
                                  pfe_q=PFE_QUANTILE, lambda_ead=LAMBDA_EAD,
                                  base_vol=BASE_VOL, assumed_lgd=ASSUMED_LGD):
    rows = []
    median_rate = test_df['interest_rate'].median()

    for _, r in test_df.iterrows():
        limit_amt = float(r['limit_amount'])
        vol = base_vol * (1 + (r['interest_rate'] - median_rate) / 0.12) * (0.8 + 0.4*r['utilization'])
        vol = float(np.clip(vol, 0.02, 0.50))

        eads_t = []
        for t in horizons:
            mu = balance_at_t(r['loan_amount'], r['tenure_months'], t)
            sigma = vol * mu * np.sqrt(t / 12.0)
            EE_t = ee_trunc_normal(mu, sigma)
            PFE_t = pfe_trunc_normal(mu, sigma, pfe_q)
            EAD_t = min(limit_amt, EE_t + lambda_ead * PFE_t)
            eads_t.append(EAD_t)

        EAD_i = float(np.max(eads_t))  # conservative
        ECL_i = float(r['pd_hat'] * EAD_i * assumed_lgd)
        rows.append({
            'counterparty_id': r['counterparty_id'],
            'PD_hat': float(r['pd_hat']),
            'LGD_assumed': assumed_lgd,
            'EAD': EAD_i,
            'ECL': ECL_i,
            'credit_score': int(r['credit_score']),
            'loan_amount': float(r['loan_amount']),
            'tenure_months': int(r['tenure_months']),
            'interest_rate': float(r['interest_rate']),
            'utilization': float(r['utilization'])
        })

    return pd.DataFrame(rows)

def aggregate_exposure_over_time(test_df, horizons=HORIZONS_MONTHS,
                                 pfe_q=PFE_QUANTILE, base_vol=BASE_VOL):
    agg_rows = []
    median_rate = test_df['interest_rate'].median()
    for t in horizons:
        ee_sum = 0.0
        pfe_sum = 0.0
        for _, r in test_df.iterrows():
            vol = base_vol * (1 + (r['interest_rate'] - median_rate) / 0.12) * (0.8 + 0.4*r['utilization'])
            vol = float(np.clip(vol, 0.02, 0.50))
            mu = balance_at_t(r['loan_amount'], r['tenure_months'], t)
            sigma = vol * mu * np.sqrt(t / 12.0)
            EE_t = ee_trunc_normal(mu, sigma)
            PFE_t = pfe_trunc_normal(mu, sigma, pfe_q)
            ee_sum += EE_t
            pfe_sum += PFE_t     # conservative aggregation (sum quantiles)
        agg_rows.append({'t_month': int(t), 'EE_sum': ee_sum, 'PFE_sum_conservative': pfe_sum})
    return pd.DataFrame(agg_rows)

# -------------------------------------------------------------------
# 4) VISUALIZATIONS (with legends)
# -------------------------------------------------------------------

def plot_pd_distribution_with_legend(risk_df, path):
    fig, ax = plt.subplots(figsize=(8, 5))
    sns.histplot(
        risk_df['PD_hat'],
        bins=30, stat='count', color='#1f77b4', alpha=0.6,
        ax=ax, label='Histogram'
    )
    sns.kdeplot(
        risk_df['PD_hat'],
        color='#ff7f0e', lw=2, ax=ax, label='KDE'
    )
    ax.set_title('Predicted PD Distribution (Test Set)')
    ax.set_xlabel('PD')
    ax.set_ylabel('Count')
    ax.legend(title='Components', frameon=True)
    ax.grid(True, alpha=0.2)
    fig.tight_layout()
    fig.savefig(path, dpi=PLOT_DPI)
    plt.close(fig)

def plot_roc_with_legend(fpr, tpr, sort_idx, auc, path):
    fig, ax = plt.subplots(figsize=(6, 6))
    ax.plot(
        fpr[sort_idx], tpr[sort_idx],
        label=f'Model ROC (AUC={auc:.3f})', color='#2ca02c', lw=2
    )
    ax.plot([0, 1], [0, 1], '--', color='gray', lw=1.2, label='Random chance')
    ax.set_title('ROC Curve (Test Set)')
    ax.set_xlabel('False Positive Rate')
    ax.set_ylabel('True Positive Rate')
    ax.legend(loc='lower right', frameon=True)
    ax.grid(True, alpha=0.2)
    fig.tight_layout()
    fig.savefig(path, dpi=PLOT_DPI)
    plt.close(fig)

def plot_calibration_with_legend(calib_df, path):
    fig, ax = plt.subplots(figsize=(6, 5))
    ax.plot([0, 1], [0, 1], '--', color='gray', lw=1.2, label='Perfect calibration')
    ax.plot(
        calib_df['avg_p'], calib_df['obs_rate'],
        marker='o', color='#ff7f0e', label='Binned observed rate'
    )
    ax.set_title('Calibration Plot (Test Set)')
    ax.set_xlabel('Average Predicted PD (bin)')
    ax.set_ylabel('Observed Default Rate (bin)')
    ax.legend(frameon=True)
    ax.grid(True, alpha=0.2)
    # Optional: annotate bin sizes
    # for x, y, n in zip(calib_df['avg_p'], calib_df['obs_rate'], calib_df['count']):
    #     ax.annotate(f'n={n}', (x, y), textcoords='offset points', xytext=(6, 6), fontsize=8)
    fig.tight_layout()
    fig.savefig(path, dpi=PLOT_DPI)
    plt.close(fig)

def plot_exposure_over_time_with_legend(agg_expo_df, path, pfe_quantile=PFE_QUANTILE):
    fig, ax = plt.subplots(figsize=(8, 5))
    ax.plot(
        agg_expo_df['t_month'], agg_expo_df['EE_sum'] / 1e6,
        marker='o', label='EE (sum)', color='#1f77b4'
    )
    ax.plot(
        agg_expo_df['t_month'], agg_expo_df['PFE_sum_conservative'] / 1e6,
        marker='s', label=f'PFE {int(pfe_quantile*100)}% (sum, conservative)', color='#d62728'
    )
    ax.set_title('Portfolio Exposure over Time')
    ax.set_xlabel('Months Ahead')
    ax.set_ylabel('Exposure (INR millions)')  # INR to avoid font warnings
    ax.legend(frameon=True)
    ax.grid(True, alpha=0.2)
    fig.tight_layout()
    fig.savefig(path, dpi=PLOT_DPI)
    plt.close(fig)

def plot_ecl_by_segment_with_legend(segment_ecl_df, path, lgd=ASSUMED_LGD):
    fig, ax = plt.subplots(figsize=(8, 5))
    sns.barplot(
        x='score_band', y='ECL', data=segment_ecl_df,
        ax=ax, color='#9467bd', label=f'ECL (LGD={int(lgd*100)}%)'
    )
    ax.set_title('ECL by Credit Score Segment')
    ax.set_xlabel('Score Band')
    ax.set_ylabel('ECL (INR)')
    ax.legend(frameon=True)
    ax.ticklabel_format(axis='y', style='plain', useOffset=False)
    ax.grid(axis='y', alpha=0.2)
    fig.tight_layout()
    fig.savefig(path, dpi=PLOT_DPI)
    plt.close(fig)

# -------------------------------------------------------------------
# 5) REPORTING (EXCEL)
# -------------------------------------------------------------------

def write_excel_report(out_path, params_df, summary_df, coef_table, risk_sorted, agg_expo, segment_ecl):
    with pd.ExcelWriter(out_path, engine='openpyxl') as writer:
        params_df.to_excel(writer, sheet_name='Inputs', index=False)
        summary_df.to_excel(writer, sheet_name='PD_Model_Summary', index=False)
        coef_table.to_excel(writer, sheet_name='Logit_Coefficients', index=False)
        risk_sorted.to_excel(writer, sheet_name='Counterparty_Risk', index=False)
        agg_expo.to_excel(writer, sheet_name='EE_PFE_by_T', index=False)
        segment_ecl.to_excel(writer, sheet_name='Segment_ECL', index=False)

# -------------------------------------------------------------------
# MAIN
# -------------------------------------------------------------------

def main():
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    # 1) Data
    raw = gen_synthetic_credit_data(n=2000)
    msk = np.random.rand(len(raw)) < 0.7
    train = raw[msk].copy()
    test = raw[~msk].copy()

    # 2) PD Modeling
    X_train, y_train, model_cols = prepare_features(train)
    result = fit_logit(X_train, y_train)

    X_test, y_test, _ = prepare_features(test)
    proba_test = result.predict(X_test)
    test = test.assign(pd_hat=proba_test)

    # Metrics
    fpr, tpr, thr = roc_curve_manual(y_test.values, test['pd_hat'].values, num=400)
    auc, sort_idx = auc_trapz(fpr, tpr)
    j_scores = tpr - fpr
    best_idx = int(np.argmax(j_scores))
    best_thr = float(thr[best_idx])
    TP, TN, FP, FN = confusion_at_threshold(y_test.values, test['pd_hat'].values, best_thr)
    calib = calibration_curve_df(y_test, test['pd_hat'], n_bins=10)

    # 3) Exposure and EAD (per counterparty + aggregate)
    risk_df = compute_counterparty_ead_rows(
        test_df=test,
        horizons=HORIZONS_MONTHS,
        pfe_q=PFE_QUANTILE,
        lambda_ead=LAMBDA_EAD,
        base_vol=BASE_VOL,
        assumed_lgd=ASSUMED_LGD
    )
    agg_expo = aggregate_exposure_over_time(
        test_df=test,
        horizons=HORIZONS_MONTHS,
        pfe_q=PFE_QUANTILE,
        base_vol=BASE_VOL
    )

    # Score segments for reporting
    bins = [0, 580, 670, 740, 800, 1000]
    labels = ['Poor (<580)', 'Fair (580-669)', 'Good (670-739)', 'Very Good (740-799)', 'Excellent (800+)']
    risk_df['score_band'] = pd.cut(risk_df['credit_score'], bins=bins, labels=labels, right=False)
    segment_ecl = risk_df.groupby('score_band', observed=True)['ECL'].sum().reset_index()

    # 4) Visualizations with legends
    plt.style.use('seaborn-v0_8')
    pd_dist_path = os.path.join(OUTPUT_DIR, 'pd_distribution.png')
    roc_path = os.path.join(OUTPUT_DIR, 'roc_curve.png')
    calib_path = os.path.join(OUTPUT_DIR, 'calibration_plot.png')
    expo_path = os.path.join(OUTPUT_DIR, 'exposure_over_time.png')
    ecl_seg_path = os.path.join(OUTPUT_DIR, 'ecl_by_segment.png')

    plot_pd_distribution_with_legend(risk_df, pd_dist_path)
    plt.show()  # Display the PD distribution plot
    plot_roc_with_legend(fpr, tpr, sort_idx, auc, roc_path)
    plt.show()  # Display the ROC curve plot
    plot_calibration_with_legend(calib, calib_path)
    plt.show()  # Display the calibration plot
    plot_exposure_over_time_with_legend(agg_expo, expo_path, pfe_quantile=PFE_QUANTILE)
    plt.show()  # Display the exposure over time plot
    plot_ecl_by_segment_with_legend(segment_ecl, ecl_seg_path, lgd=ASSUMED_LGD)
    plt.show()  # Display the ECL by segment plot



    # 5) Reporting
    summary = pd.DataFrame({
        'metric': ['AUC', 'Best_Threshold', 'TP', 'TN', 'FP', 'FN'],
        'value': [auc, best_thr, TP, TN, FP, FN]
    })
    coef_table = result.summary2().tables[1].reset_index()
    coef_table.rename(columns={'index': 'variable'}, inplace=True)
    params_df = pd.DataFrame({
        'parameter': ['LGD', 'PFE_quantile', 'lambda_EAD', 'base_volatility'],
        'value': [ASSUMED_LGD, PFE_QUANTILE, LAMBDA_EAD, BASE_VOL]
    })

    risk_sorted = risk_df.sort_values('ECL', ascending=False)
    out_xlsx = os.path.join(OUTPUT_DIR, 'PD_EAD_ECL_Report.xlsx')
    write_excel_report(out_xlsx, params_df, summary, coef_table, risk_sorted, agg_expo, segment_ecl)

    text_summary = f"""
Run timestamp: {datetime.now().isoformat()}

Test set size: {len(test)}
AUC: {auc:.4f}
Optimal threshold (Youden's J): {best_thr:.3f}
Confusion Matrix @ threshold:
  TP={TP}, TN={TN}, FP={FP}, FN={FN}

Files created:
- {out_xlsx}
- {pd_dist_path}
- {roc_path}
- {calib_path}
- {expo_path}
- {ecl_seg_path}
"""
    with open(os.path.join(OUTPUT_DIR, 'run_summary.txt'), 'w') as f:
        f.write(text_summary.strip() + "\n")
    print(text_summary)

if __name__ == "__main__":
    main()


Run timestamp: 2025-09-01T17:14:56.534418

Test set size: 589
AUC: 0.6837
Optimal threshold (Youden's J): 0.138
Confusion Matrix @ threshold:
  TP=69, TN=248, FP=253, FN=19

Files created:
- .\PD_EAD_ECL_Report.xlsx
- .\pd_distribution.png
- .\roc_curve.png
- .\calibration_plot.png
- .\exposure_over_time.png
- .\ecl_by_segment.png

