## Imports

In [None]:
import pandas as pd, numpy as np, matplotlib.pyplot as plt, seaborn as sns
from scipy.stats import entropy

## Data

In [None]:
# loading both sheets from the excel file
fn = "/content/data.xlsx"
borrowers = pd.read_excel(fn, sheet_name="borrowers", header=[0,1])
stress = pd.read_excel(fn, sheet_name="stress_test_scenarios", header=None)

In [None]:
# flattening the multi-index cols
borrowers.columns = [f"{a}_{b}".strip().replace(" ", "_") for a,b in borrowers.columns]
id_col = borrowers.columns[0]

# splitting columns into groups (ratings, amounts, LGD, EAD)
ratings_cols = [c for c in borrowers.columns if c.startswith("Ratings_")]
amount_cols  = [c for c in borrowers.columns if c.startswith("Amount_in_Rs_Crore_")]
lgd_cols     = [c for c in borrowers.columns if c.startswith("Loss_Given_Default_")]
ead_cols     = [c for c in borrowers.columns if c.startswith("Exposure_at_Default")]

year_labels = [f"Year_{i}" for i in range(1,11)]

In [None]:
# helper fn to clean up each block into one df
def fixcols(cols):
    out = borrowers[cols].copy()
    out.columns = year_labels
    out.insert(0, "Borrower", borrowers[id_col])
    out.set_index("Borrower", inplace=True)
    return out

ratings_df = fixcols(ratings_cols)
amount_df  = fixcols(amount_cols).apply(pd.to_numeric, errors='coerce')
lgd_df     = fixcols(lgd_cols).apply(pd.to_numeric, errors='coerce')
ead_df     = fixcols(ead_cols).apply(pd.to_numeric, errors='coerce')

In [None]:
# fixing LGD display values
if lgd_df.values.max() > 1.5:
    lgd_df /= 100.0

# computing exposure
exposure_df = amount_df * ead_df
ratings_df = ratings_df.map(lambda x: str(x).strip().upper() if pd.notna(x) else x)

order = ['AAA','AA','A','BBB','BB','B','C','D']
all_ratings = sorted({r for r in ratings_df.values.ravel() if pd.notna(r)},
                     key=lambda x: order.index(x) if x in order else 99)
print("Detected ratings:", all_ratings)

## Transition Matrices

In [None]:
# making transition matrices (count + prob)
def transition(df, y1, y2):
    c = pd.crosstab(df[f"Year_{y1}"], df[f"Year_{y2}"]).reindex(
        index=all_ratings, columns=all_ratings, fill_value=0)
    p = c.div(c.sum(axis=1).replace(0, np.nan), axis=0).fillna(0)
    return c, p

counts, probs = {}, {}
for t in range(1,10):
    counts[t], probs[t] = transition(ratings_df, t, t+1)

# exporting transition matrices into excel
with pd.ExcelWriter("transition_matrices.xlsx") as writer:
    for t in range(1,10):
        counts[t].to_excel(writer, sheet_name=f"Counts_Y{t}_Y{t+1}")
        probs[t].to_excel(writer, sheet_name=f"Trans_Y{t}_Y{t+1}")

# plotting the heatmaps for transition matrices
for t in range(1,10):
    plt.figure(figsize=(5,4))
    sns.heatmap(probs[t], cmap="viridis", annot=False)
    plt.title(f"Transition Matrix Year {t} → {t+1}")
    plt.tight_layout()
    plt.show()

## Probability of Default (PD)

In [None]:
# computing PDs (per rating + portfolio weighted PD)
pd_ratings = pd.DataFrame(index=all_ratings, columns=[f"Year_{y}" for y in range(2,11)])
port_pd = pd.Series(index=[f"Year_{y}" for y in range(2,11)])

for t in range(1,10):
    p = probs[t]
    pd_ratings[f"Year_{t+1}"] = p['D']

    exp = exposure_df[f"Year_{t}"]
    r = ratings_df[f"Year_{t}"]
    w = exp.groupby(r).sum().reindex(all_ratings).fillna(0)

    port_pd[f"Year_{t+1}"] = (w / w.sum() * p['D']).sum()

print("\nRating-level PDs:")
display(pd_ratings)

## Expected Loss (EL)

In [None]:
# expected loss calculation (rating-wise + portfolio)
el_by_rating = pd.DataFrame(index=all_ratings, columns=[f"Year_{y}" for y in range(2,11)])
port_el = pd.Series(index=[f"Year_{y}" for y in range(2,11)])

for t in range(1,10):
    exp, lgd, r = exposure_df[f"Year_{t}"], lgd_df[f"Year_{t}"], ratings_df[f"Year_{t}"]
    df = pd.DataFrame({'E': exp, 'L': lgd, 'R': r})
    df['PD'] = df['R'].map(probs[t]['D'])
    df['EL'] = df.E * df.L * df.PD

    el_by_rating[f"Year_{t+1}"] = df.groupby('R').EL.sum().reindex(all_ratings).fillna(0)
    port_el[f"Year_{t+1}"] = df.EL.sum()

print("\nRating-level ELs:")
display(el_by_rating)

el_by_rating.to_excel("el_by_rating.xlsx")

## Cumulative PD

In [None]:
# cumulative PD calculation (5yr and 10yr) using matrix multiplication
def make_abs(p):
    q = p.copy()
    if 'D' in q.index:
        q.loc['D'] = 0
        q.at['D','D'] = 1
    return q.reindex(index=all_ratings, columns=all_ratings, fill_value=0)

M5 = np.linalg.multi_dot([make_abs(probs[t]).values for t in range(1,5)])
M10 = np.linalg.multi_dot([make_abs(probs[t]).values for t in range(1,10)])

M5 = pd.DataFrame(M5, index=all_ratings, columns=all_ratings)
M10 = pd.DataFrame(M10, index=all_ratings, columns=all_ratings)

cum5, cum10 = M5['D'], M10['D']

w = exposure_df['Year_1'].groupby(ratings_df['Year_1']).sum().reindex(all_ratings).fillna(0)
p5 = (w/w.sum() * cum5).sum()
p10 = (w/w.sum() * cum10).sum()

## Stress Testing

In [None]:
# stress test I (LGD multipliers)
mults = None
for _, row in stress.iterrows():
    nums = pd.to_numeric(row, errors='coerce').dropna().values
    if len(nums) >= 10:
        mults = [float(x) for x in nums[:10]]
        break

if mults is None:
    raise ValueError("Couldn't find numeric LGD multipliers in stress sheet.")

mult = pd.Series(mults, index=[f"Year_{i}" for i in range(1,11)])

lgd_s = lgd_df.copy()
for y in range(1,11):
    lgd_s[f"Year_{y}"] = np.minimum(1.0, lgd_df[f"Year_{y}"] * mult[f"Year_{y}"])

In [None]:
# compute ELs again using stressed LGD
el_by_rating_s = pd.DataFrame(index=all_ratings, columns=[f"Year_{y}" for y in range(2,11)])
port_el_s = pd.Series(index=[f"Year_{y}" for y in range(2,11)])

for t in range(1,10):
    exp, lgd, r = exposure_df[f"Year_{t}"], lgd_s[f"Year_{t}"], ratings_df[f"Year_{t}"]

    df = pd.DataFrame({'E': exp, 'L': lgd, 'R': r})
    df['PD'] = df['R'].map(probs[t]['D'])
    df['EL'] = df.E * df.L * df.PD

    el_by_rating_s[f"Year_{t+1}"] = df.groupby('R').EL.sum().reindex(all_ratings).fillna(0)
    port_el_s[f"Year_{t+1}"] = df.EL.sum()

print("\nRating-level Stress I ELs:")
display(el_by_rating_s)

In [None]:
# stress test II (forced downgrade mapping)
mapping_row = None
for i, row in stress.iterrows():
    vals = [str(x).strip().upper() for x in row if isinstance(x, str)]
    if 'AAA' in vals and 'D' in vals:
        mapping_row = row
        break

if mapping_row is not None:
    mapping_idx = stress.index.get_loc(i)
    mapping_targets = [str(x).strip().upper() for x in stress.iloc[mapping_idx+1] if isinstance(x, str)]
    srcs = ['AAA','AA','A','BBB','BB','B','C','D']
    downgrade = dict(zip(srcs, mapping_targets[:8]))
else:
    # fallback to downgrade mapping if nothing found
    downgrade = dict(zip(['AAA','AA','A','BBB','BB','B','C','D'],
                         ['BBB','A','B','B','C','D','D','D']))

r9 = ratings_df['Year_9']
r10s = r9.map(lambda x: downgrade.get(x, x))

print("\nRating distribution Year 9:")
print(r9.value_counts().reindex(all_ratings).fillna(0))

print("\nRating distribution Year 10 AFTER forced downgrade:")
print(r10s.value_counts().reindex(all_ratings).fillna(0))

exp9, lgd9 = exposure_df['Year_9'], lgd_df['Year_9']
pd10 = probs[9]['D']

el10_base = port_el['Year_10']
el10_stress = (exp9 * lgd9 * r10s.map(pd10)).sum()

## Extras and output

In [None]:
# some extra analytics (stability + KL div)
diag = pd.DataFrame({t: np.diag(probs[t]) for t in probs})
diag.index = all_ratings
stability = diag.std(axis=1)

print("\nRating stability (lower = more stable):")
print(stability)

print("\nMost stable rating:", stability.idxmin())

dist = pd.DataFrame({
    y: exposure_df[y].groupby(ratings_df[y]).sum().reindex(all_ratings).fillna(0)
    for y in [f"Year_{i}" for i in range(1,11)]
})

pdist = dist.div(dist.sum())

kl = {}
years = list(pdist.columns)
for i in range(len(years)-1):
    kl[f"{years[i]}→{years[i+1]}"] = entropy(pdist[years[i]], pdist[years[i+1]])

print("\nKL divergence (rating distribution shifts):")
print(pd.Series(kl))

In [None]:
# basic visuals for PD + EL trends + rating mix
plt.figure(figsize=(8,4))
plt.plot(port_pd.index, port_pd.values)
plt.title("Portfolio PD Over Time")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

plt.figure(figsize=(8,4))
plt.plot(port_el.index, port_el.values, label="Baseline EL")
plt.plot(port_el_s.index, port_el_s.values, label="Stress I EL")
plt.title("Baseline vs Stress EL")
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

sns.heatmap(pd_ratings.astype(float), cmap="magma")
plt.title("PD by Rating Over Time")
plt.show()

comp = {
    f"Year_{y}": exposure_df[f'Year_{y}'].groupby(ratings_df[f'Year_{y}']).sum().reindex(all_ratings).fillna(0)
    for y in range(1,11)
}
comp_df = pd.DataFrame(comp).T
(comp_df / comp_df.sum(axis=1).values.reshape(-1,1)).plot.area(figsize=(10,4))
plt.title("Rating Composition by Exposure")
plt.tight_layout()
plt.show()

# printing final summary numbers
print("\nPortfolio PDs:\n", port_pd.round(4))
print("\nPortfolio ELs:\n", port_el.round(4))
print("\nStress I ELs:\n", port_el_s.round(4))
print("\nΔEL Stress I:\n", (port_el_s - port_el).round(4))
print(f"\nCumulative PD 5y = {p5:.4f}, 10y = {p10:.4f}")
print(f"\nStress II Year10 EL baseline = {el10_base:.4f}, stressed = {el10_stress:.4f}, Δ = {(el10_stress - el10_base):.4f}")
