In [80]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

def pd_trend_tier(slope: float) -> str:
    """Convert slope (PD per quarter) into a human-readable tier."""
    slope_bps = slope * 10000  # 0.001 => 10 bps per quarter
    if slope_bps < 5:
        return "Stable"
    elif slope_bps < 15:
        return "Mild deterioration"
    elif slope_bps < 30:
        return "Moderate deterioration"
    else:
        return "Severe deterioration"

def compute_pd_trend_signals(
    df: pd.DataFrame,
    *,
    pd_col: str = "PD_T",
    quarter_col: str = "as_of_quarter",
    sector_col: str = "Sector",
    exposure_col: str = "Exposure",
    slope_th: float = 0.0015,
    seed_prev_pd_1: float | None = None,  # e.g., 0.019 if your series starts at 2023-Q1
    seed_prev_pd_2: float | None = None,  # e.g., 0.018
) -> tuple[pd.DataFrame, dict]:
    """
    Compute PD trend slope, delta, acceleration and a deterioration flag.
    Flag rule: (slope > slope_th) & (dPD_1Q > 0) & (accel >= 0)

    Returns:
      df_out: enriched dataframe
      summary: dict with slope/intercept/tier/flags/latest status
    """
    # ---- Validate ----
    required = {quarter_col, pd_col}
    missing = required - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

    df_out = df.copy()

    # ---- Sort by quarter ----
    # Works for 'YYYY-Qn' format. If already sorted, still safe.
    def _quarter_key(q: str) -> tuple[int, int]:
        y, qn = q.split("-Q")
        return int(y), int(qn)

    df_out = df_out.sort_values(by=quarter_col, key=lambda s: s.map(_quarter_key)).reset_index(drop=True)

    df_out["PD_T_1"] = df_out[pd_col].shift(1)
    df_out["PD_T_2"] = df_out[pd_col].shift(2)

    if seed_prev_pd_1 is not None:
        df_out.loc[0, "PD_T_1"] = float(seed_prev_pd_1)

    if seed_prev_pd_2 is not None:
        df_out.loc[0, "PD_T_2"] = float(seed_prev_pd_2)

    if seed_prev_pd_1 is not None and len(df_out) > 1:
        df_out.loc[1, "PD_T_2"] = float(seed_prev_pd_1)

        

    # ---- Time index for trend fit ----
    df_out["t"] = np.arange(len(df_out), dtype=float)

    # ---- Trend slope/intercept via linear fit ----
    # polyfit returns slope, intercept for y ~ slope*t + intercept
    slope, intercept = np.polyfit(df_out["t"], df_out[pd_col].astype(float), 1)

    # ---- Deltas + acceleration ----
    df_out["dPD_1Q"] = df_out[pd_col] - df_out["PD_T_1"]
    df_out["dPD_prev"] = df_out["PD_T_1"] - df_out["PD_T_2"]
    df_out["accel"] = df_out["dPD_1Q"] - df_out["dPD_prev"]

    # ---- Flag logic ----
    df_out["reason_slope"] = slope > slope_th
    df_out["reason_direction"] = df_out["dPD_1Q"] > 0
    df_out["reason_accel"] = df_out["accel"] >= 0

    df_out["deterioration_flag"] = (
        df_out["reason_slope"] &
        df_out["reason_direction"] &
        df_out["reason_accel"]
    ).astype(int)

    # ---- Flag reason (human-readable) ----
    slope_bps = slope * 10000
    base_reason = f"Trend↑ ({slope_bps:.1f} bps/q) + ΔPD>0 + accel≥0"
    df_out["flag_reason"] = np.where(df_out["deterioration_flag"].eq(1), base_reason, "No flag")

    # ---- Summary ----
    tier = pd_trend_tier(slope)
    flags = int(df_out["deterioration_flag"].sum())
    n_q = int(len(df_out))
    latest_flag = int(df_out["deterioration_flag"].iloc[-1]) if n_q else 0
    latest_q = df_out[quarter_col].iloc[-1] if n_q else None

    summary = {
        "slope": float(slope),
        "intercept": float(intercept),
        "slope_bps_per_q": float(slope_bps),
        "tier": tier,
        "flags": flags,
        "n_quarters": n_q,
        "latest_quarter": latest_q,
        "latest_flag": latest_flag,
        "slope_threshold": float(slope_th),
    }

    # Keep output columns in a nice order (leave others too)
    preferred = [
        quarter_col, sector_col if sector_col in df_out.columns else None,
        pd_col, "PD_T_1", "PD_T_2",
        exposure_col if exposure_col in df_out.columns else None,
        "t", "dPD_1Q", "dPD_prev", "accel",
        "deterioration_flag", "flag_reason"
    ]
    preferred = [c for c in preferred if c is not None and c in df_out.columns]
    remaining = [c for c in df_out.columns if c not in preferred]
    df_out = df_out[preferred + remaining]

    return df_out, summary
df_out, summary = compute_pd_trend_signals(
    df,
    slope_th=0.0015,
    seed_prev_pd_1=0.019,
    seed_prev_pd_2=0.018
)

def plot_pd_trend(df_out, summary, quarter_col="as_of_quarter", pd_col="PD_T"):
    # Build trendline from slope/intercept
    t = df_out["t"].to_numpy(dtype=float)
    trend = summary["intercept"] + summary["slope"] * t

    # Prepare x-axis labels
    x = np.arange(len(df_out))
    labels = df_out[quarter_col].astype(str).tolist()

    # Flag mask
    flag_mask = df_out["deterioration_flag"].to_numpy(dtype=int) == 1

    plt.figure(figsize=(10, 4))
    plt.plot(x, df_out[pd_col].to_numpy(dtype=float), marker="o", label="PD_T (actual)")
    plt.plot(x, trend, linestyle="--", label="PD trendline (fit)")

    # Highlight flagged points
    plt.scatter(x[flag_mask],
                df_out.loc[flag_mask, pd_col].to_numpy(dtype=float),
                marker="o", s=80, label="Flagged quarters")

    title = f"{df_out['Sector'].iloc[0] if 'Sector' in df_out.columns else 'Sector'} | " \
            f"{summary['tier']} | {summary['slope_bps_per_q']:.1f} bps/q"
    plt.title(title)

    plt.xticks(x, labels, rotation=45, ha="right")
    plt.xlabel("Quarter")
    plt.ylabel("PD")
    plt.grid(True, alpha=0.3)
    plt.legend()
    plt.tight_layout()
    plt.show()

# Usage:
#plot_pd_trend(df_out, summary)

def validate_inputs(df_out: pd.DataFrame,
                    exposure_col: str = "Exposure",
                    flag_col: str = "deterioration_flag",
                    el_col: str ="EL",
                    require_el: bool = False) -> None:
    required = {exposure_col,flag_col}
    if require_el:
        required.add(el_col)
    missing = required - set(df_out.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")
    if not pd.api.types.is_numeric_dtype(df_out[exposure_col]):
        raise TypeError(f"{exposure_col} must be numeric")
    if (df_out[exposure_col] < 0).any():
        raise ValueError(f"{exposure_col} contains negative values")
    if df_out[exposure_col].isna().any():
        raise ValueError(f"{exposure_col} contains NaN values")

    # 3) Flag must contain only 0/1 (no NaN)
    valid_flags = {0, 1}
    unique_flags = set(df_out[flag_col].dropna().unique().tolist())

    if not unique_flags.issubset(valid_flags):
        raise ValueError(f"{flag_col} must contain only 0/1. Found: {unique_flags}")

    if df_out[flag_col].isna().any():
        raise ValueError(f"{flag_col} contains NaN values (fill them before computing EaR)")

    # 4) Optional EL checks
    if require_el:
        if not pd.api.types.is_numeric_dtype(df_out[el_col]):
            raise TypeError(f"{el_col} must be numeric")
        if (df_out[el_col] < 0).any():
            raise ValueError(f"{el_col} contains negative values")
   
exposure_col="Exposure"
flag_col="deterioration_flag"
validate_inputs(df_out, exposure_col=exposure_col, flag_col=flag_col)  # raises error if invalid

total_exposure = df_out[exposure_col].sum()
flagged_exposure = df_out.loc[df_out[flag_col] == 1, exposure_col].sum()
ear_pct = (flagged_exposure / total_exposure * 100) if total_exposure else 0

#print(total_exposure, flagged_exposure, ear_pct)

df_out["flagged_exposure"] = np.where(df_out["deterioration_flag"] == 1, df_out["Exposure"], 0.0)

summary = {
    "total_exposure": float(df_out["Exposure"].sum()),
    "exposure_at_risk": float(df_out["flagged_exposure"].sum()),
}
summary["exposure_at_risk_pct"] = (summary["exposure_at_risk"] / summary["total_exposure"] * 100) if summary["total_exposure"] else 0.0

print(summary)

{'total_exposure': 7477270.58, 'exposure_at_risk': 5359592.23, 'exposure_at_risk_pct': 71.67845770267685}
