In [1]:
# Step 1 — Setup
import pandas as pd
import numpy as np
from pathlib import Path

# Path to your file
CSV_PATH =r"C:/Users/vaibh/Downloads/Task 3 and 4_Loan_Data.csv"  # change if needed
K = 5  # number of buckets (ratings 1..5, where 1 = best credit)


In [2]:
# Step 2 — Load and quick scan
df = pd.read_csv(CSV_PATH)
print("Shape:", df.shape)
print("Columns:", list(df.columns))
df.head(3)


Shape: (10000, 8)
Columns: ['customer_id', 'credit_lines_outstanding', 'loan_amt_outstanding', 'total_debt_outstanding', 'income', 'years_employed', 'fico_score', 'default']


Unnamed: 0,customer_id,credit_lines_outstanding,loan_amt_outstanding,total_debt_outstanding,income,years_employed,fico_score,default
0,8153374,0,5221.545193,3915.471226,78039.38546,5,605,0
1,7442532,5,1958.928726,8228.75252,26648.43525,2,572,1
2,2256073,0,3363.009259,2027.83085,65866.71246,4,602,0


In [3]:
# Step 3 — Identify columns
cols_lower = {c: c.lower().strip() for c in df.columns}

# FICO-like column
fico_col = None
for key in ['fico', 'fico_score', 'fico score', 'credit_score', 'credit score']:
    for orig, low in cols_lower.items():
        if low == key:
            fico_col = orig; break
    if fico_col: break

if fico_col is None:
    # Fallback: numeric in plausible FICO range
    for c in df.select_dtypes(include=[np.number]).columns:
        s = df[c].dropna()
        if s.empty: continue
        lo, hi = s.quantile(0.01), s.quantile(0.99)
        if lo >= 250 and hi <= 900:
            fico_col = c; break

if fico_col is None:
    raise ValueError("FICO column not found. Inspect your data and set fico_col manually.")

# Default-like column
label_col = None
for key in ['default', 'is_default', 'default_flag', 'loan_status', 'target', 'bad', 'chargeoff']:
    for orig, low in cols_lower.items():
        if low == key:
            label_col = orig; break
    if label_col: break

print("Detected FICO column:", fico_col)
print("Detected Default column:", label_col)


Detected FICO column: fico_score
Detected Default column: default


In [4]:
# Step 4 — Clean and standardize
def to_binary(y: pd.Series) -> pd.Series:
    if y.dtype == object:
        ys = y.astype(str).str.lower().str.strip()
        default_like = ys.isin([
            'default','charged off','charge off','charged-off','defaulted',
            'bad','late','delinquent','defaulter','chargedoff'
        ])
        nondefault_like = ys.isin(['paid','fully paid','current','good'])
        z = pd.Series(np.nan, index=y.index, dtype=float)
        z[default_like] = 1.0
        z[nondefault_like] = 0.0
        return z
    return y.astype(float)

work = df[[fico_col] + ([label_col] if label_col else [])].rename(columns={fico_col:'FICO'}).copy()
if label_col:
    work['Default'] = to_binary(work[label_col])
    work.drop(columns=[label_col], inplace=True)
else:
    work['Default'] = np.nan

# Keep plausible FICO range
work = work.dropna(subset=['FICO'])
work = work[(work['FICO'] >= 250) & (work['FICO'] <= 900)].reset_index(drop=True)

has_labels = work['Default'].notna().any()
print("Rows after cleaning:", len(work))
print("Has labels:", has_labels)
work.head(3)


Rows after cleaning: 10000
Has labels: True


Unnamed: 0,FICO,Default
0,605,0.0
1,572,1.0
2,602,0.0


In [5]:
# Step 5 — Helpers
def assign_rating_from_edges(x: float, edges: np.ndarray, K: int) -> int:
    # edges length = K+1. Buckets are contiguous. Higher FICO => better => smaller rating.
    idx = np.searchsorted(edges[1:-1], x, side='right')  # 0..K-1
    return K - idx  # maps highest bucket to rating 1

def bucket_summary(df_, rating_col: str, has_labels: bool) -> pd.DataFrame:
    g = (df_.assign(bucket=df_[rating_col])
           .groupby('bucket', as_index=False)
           .agg(n=('FICO','size'),
                fico_min=('FICO','min'),
                fico_max=('FICO','max'),
                fico_mean=('FICO','mean'),
                defaults=('Default', lambda s: int(np.nansum(s))) if has_labels else ('FICO','size'))
        ).sort_values('bucket')
    if has_labels:
        g['pd'] = g['defaults'] / g['n']
    return g


In [6]:
# Step 6 — Quantile bucketing
edges_q = np.unique(np.quantile(work['FICO'], np.linspace(0, 1, K+1))).astype(float)
# enforce strictly increasing edges
for i in range(1, len(edges_q)):
    if edges_q[i] <= edges_q[i-1]:
        edges_q[i] = edges_q[i-1] + 1e-6
edges_q[0], edges_q[-1] = work['FICO'].min(), work['FICO'].max()

work['Rating_Q'] = work['FICO'].apply(lambda v: assign_rating_from_edges(v, edges_q, K))
sum_q = bucket_summary(work, 'Rating_Q', has_labels)
print("Quantile edges:", edges_q)
sum_q


Quantile edges: [408. 587. 623. 653. 688. 850.]


Unnamed: 0,bucket,n,fico_min,fico_max,fico_mean,defaults,pd
0,1,2050,688,850,720.592683,114,0.05561
1,2,2019,653,687,669.169391,202,0.10005
2,3,1970,623,652,637.526904,305,0.154822
3,4,1979,587,622,605.580091,426,0.21526
4,5,1982,408,586,551.431887,804,0.405651


In [7]:
# Step 7 — K-means 1D bucketing
def kmeans_1d(x: pd.Series, K: int, max_iter: int = 100):
    X = np.sort(x.values.reshape(-1, 1), axis=0)
    centers = np.linspace(X.min(), X.max(), K).reshape(-1, 1)
    for _ in range(max_iter):
        d = np.abs(X - centers.T)
        labels = d.argmin(axis=1)
        new_centers = np.array([X[labels == k].mean() if np.any(labels == k) else centers[k]
                                for k in range(K)]).reshape(-1,1)
        if np.allclose(new_centers, centers, atol=1e-6):
            centers = new_centers; break
        centers = new_centers
    centers = np.sort(centers.flatten())
    edges = np.r_[X.min(), (centers[:-1] + centers[1:]) / 2.0, X.max()].astype(float)
    return edges, centers

edges_km, centers_km = kmeans_1d(work['FICO'], K)
work['Rating_KM'] = work['FICO'].apply(lambda v: assign_rating_from_edges(v, edges_km, K))
sum_km = bucket_summary(work, 'Rating_KM', has_labels)
print("K-means centers:", centers_km)
print("K-means edges:", edges_km)
sum_km


K-means centers: [522.78676471 584.72378075 633.66304695 680.83073323 737.65273038]
K-means edges: [408.         553.75527273 609.19341385 657.24689009 709.2417318
 850.        ]


Unnamed: 0,bucket,n,fico_min,fico_max,fico_mean,defaults,pd
0,1,1172,710,850,737.65273,49,0.041809
1,2,2564,658,709,680.830733,230,0.089704
2,3,3131,610,657,633.663047,489,0.15618
3,4,2317,554,609,584.723781,650,0.280535
4,5,816,408,553,522.786765,433,0.530637


In [8]:
# Step 8 — DP log-likelihood bucketing
if has_labels:
    grp = work.groupby('FICO', as_index=False).agg(n=('Default','count'), k=('Default','sum'))
    grp['k'] = grp['k'].fillna(0.0)
else:
    grp = work.groupby('FICO', as_index=False).agg(n=('FICO','count'))
    grp['k'] = 0.0  # placeholder

grp = grp.sort_values('FICO').reset_index(drop=True)
n = len(grp)

cs_n = grp['n'].cumsum().to_numpy()
cs_k = grp['k'].cumsum().to_numpy()

def seg_counts(i, j):
    n_ = cs_n[j] - (cs_n[i-1] if i > 0 else 0.0)
    k_ = cs_k[j] - (cs_k[i-1] if i > 0 else 0.0)
    return n_, k_

def seg_loglik(i, j):
    n_, k_ = seg_counts(i, j)
    if n_ <= 0: return -np.inf
    if k_ <= 0 or k_ >= n_: return 0.0  # limit cases
    p = k_ / n_
    return k_ * np.log(p) + (n_ - k_) * np.log(1 - p)

DP = np.full((K, n), -np.inf)
PREV = np.full((K, n), -1, dtype=int)

# One bucket over prefix
for j in range(n):
    DP[0, j] = seg_loglik(0, j)

# Transitions
for k in range(1, K):
    for j in range(k, n):
        best, where = -np.inf, -1
        for i in range(k-1, j):
            val = DP[k-1, i] + seg_loglik(i+1, j)
            if val > best:
                best, where = val, i
        DP[k, j] = best; PREV[k, j] = where

# Recover edges
if np.isfinite(DP[K-1, n-1]):
    cuts = []
    k = K-1; j = n-1
    while k > 0:
        i = PREV[k, j]; cuts.append(i); j = i; k -= 1
    cuts = sorted(cuts)
    edges_dp = [grp['FICO'].iloc[0]]
    for c in cuts:
        edges_dp.append((grp['FICO'].iloc[c] + grp['FICO'].iloc[c+1]) / 2.0)
    edges_dp.append(grp['FICO'].iloc[-1])
    edges_dp = np.array(edges_dp, dtype=float)
else:
    edges_dp = np.unique(np.quantile(work['FICO'], np.linspace(0, 1, K+1))).astype(float)

work['Rating_DP'] = work['FICO'].apply(lambda v: assign_rating_from_edges(v, edges_dp, K))
sum_dp = bucket_summary(work, 'Rating_DP', has_labels)

print("DP edges:", edges_dp)
if has_labels:
    print("PD by rating (DP):")
    display(sum_dp[['bucket','n','defaults','pd']])
else:
    print("No labels -> DP cannot separate risk; treat as unsupervised.")
sum_dp


DP edges: [408.  520.5 580.5 640.5 696.5 850. ]
PD by rating (DP):


Unnamed: 0,bucket,n,defaults,pd
0,1,1657,77,0.04647
1,2,3197,336,0.105099
2,3,3438,703,0.204479
3,4,1407,536,0.380952
4,5,301,199,0.66113


Unnamed: 0,bucket,n,fico_min,fico_max,fico_mean,defaults,pd
0,1,1657,697,850,727.461074,77,0.04647
1,2,3197,641,696,666.358148,336,0.105099
2,3,3438,581,640,612.789122,703,0.204479
3,4,1407,521,580,557.218195,536,0.380952
4,5,301,408,520,495.189369,199,0.66113


In [9]:
# Step 9 — Comparison table
def extract_edges_df(edges, name):
    return pd.DataFrame({
        'method': name,
        'bin': list(range(1, K+1)),
        'edge_lo': edges[:-1],
        'edge_hi': edges[1:]
    })

edges_table = pd.concat([
    extract_edges_df(edges_q,  'quantile'),
    extract_edges_df(edges_km, 'kmeans_mse'),
    extract_edges_df(edges_dp, 'dp_loglik')
], ignore_index=True)

print("Edges by method:")
edges_table


Edges by method:


Unnamed: 0,method,bin,edge_lo,edge_hi
0,quantile,1,408.0,587.0
1,quantile,2,587.0,623.0
2,quantile,3,623.0,653.0
3,quantile,4,653.0,688.0
4,quantile,5,688.0,850.0
5,kmeans_mse,1,408.0,553.755273
6,kmeans_mse,2,553.755273,609.193414
7,kmeans_mse,3,609.193414,657.24689
8,kmeans_mse,4,657.24689,709.241732
9,kmeans_mse,5,709.241732,850.0


In [10]:
# Optional PD monotonicity check
if has_labels:
    print("Quantile PD:")
    display(sum_q[['bucket','pd']])
    print("K-means PD:")
    display(sum_km[['bucket','pd']])
    print("DP PD:")
    display(sum_dp[['bucket','pd']])


Quantile PD:


Unnamed: 0,bucket,pd
0,1,0.05561
1,2,0.10005
2,3,0.154822
3,4,0.21526
4,5,0.405651


K-means PD:


Unnamed: 0,bucket,pd
0,1,0.041809
1,2,0.089704
2,3,0.15618
3,4,0.280535
4,5,0.530637


DP PD:


Unnamed: 0,bucket,pd
0,1,0.04647
1,2,0.105099
2,3,0.204479
3,4,0.380952
4,5,0.66113


In [11]:
# Step 10 — Export artifacts
out_dir = Path("./")  # change if you want
rating_map = (
    pd.concat([
        pd.DataFrame({'method':'quantile','rating':list(range(K,0,-1)),
                      'fico_min_inclusive':edges_q[:-1],'fico_max_inclusive':edges_q[1:]}),
        pd.DataFrame({'method':'kmeans_mse','rating':list(range(K,0,-1)),
                      'fico_min_inclusive':edges_km[:-1],'fico_max_inclusive':edges_km[1:]}),
        pd.DataFrame({'method':'dp_loglik','rating':list(range(K,0,-1)),
                      'fico_min_inclusive':edges_dp[:-1],'fico_max_inclusive':edges_dp[1:]})
    ], ignore_index=True)
)

rating_map.to_csv(out_dir/"fico_rating_map_5buckets.csv", index=False)
sum_q.to_csv(out_dir/"fico_summary_quantile.csv", index=False)
sum_km.to_csv(out_dir/"fico_summary_kmeans.csv", index=False)
sum_dp.to_csv(out_dir/"fico_summary_dp.csv", index=False)

print("Saved:",
      (out_dir/"fico_rating_map_5buckets.csv").resolve(),
      (out_dir/"fico_summary_quantile.csv").resolve(),
      (out_dir/"fico_summary_kmeans.csv").resolve(),
      (out_dir/"fico_summary_dp.csv").resolve(),
)


Saved: C:\Users\vaibh\fico_rating_map_5buckets.csv C:\Users\vaibh\fico_summary_quantile.csv C:\Users\vaibh\fico_summary_kmeans.csv C:\Users\vaibh\fico_summary_dp.csv


In [12]:
# Step 11 — Choose and freeze one method for production
# Example: choose DP if labels exist and PD is monotone; else choose quantile.
if has_labels:
    chosen_edges = edges_dp
    chosen_name = "dp_loglik"
else:
    chosen_edges = edges_q
    chosen_name = "quantile"

def fico_to_rating(fico_value: float, edges: np.ndarray, K: int) -> int:
    idx = np.searchsorted(edges[1:-1], fico_value, side='right')
    return K - idx

print("Chosen method:", chosen_name)
print("Chosen edges:", chosen_edges)

# Example usage:
test_scores = [580, 620, 680, 720, 780]
ratings = [fico_to_rating(v, chosen_edges, K) for v in test_scores]
list(zip(test_scores, ratings))


Chosen method: dp_loglik
Chosen edges: [408.  520.5 580.5 640.5 696.5 850. ]


[(580, 4), (620, 3), (680, 2), (720, 1), (780, 1)]