In [1]:
import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.metrics import roc_auc_score


In [2]:
# Load price data (same CSV as Task 1, but reloaded here)
from google.colab import drive

drive.mount('/content/drive')

CSV_PATH = "/content/drive/MyDrive/data/Task 3 and 4_Loan_Data.csv"
df = pd.read_csv(CSV_PATH)


# Quick sanity checks
print(df.head())
print(df.info())
print(df.describe(include="all"))


Mounted at /content/drive
   customer_id  credit_lines_outstanding  loan_amt_outstanding  \
0      8153374                         0           5221.545193   
1      7442532                         5           1958.928726   
2      2256073                         0           3363.009259   
3      4885975                         0           4766.648001   
4      4700614                         1           1345.827718   

   total_debt_outstanding       income  years_employed  fico_score  default  
0             3915.471226  78039.38546               5         605        0  
1             8228.752520  26648.43525               2         572        1  
2             2027.830850  65866.71246               4         602        0  
3             2501.730397  74356.88347               5         612        0  
4             1768.826187  23448.32631               6         631        0  
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 8 columns):
 #

In [3]:
df = df[(df["fico_score"] >= 300) & (df["fico_score"] <= 850)]
df = df.dropna(subset=["fico_score", "default"])


In [5]:
import numpy as np
import pandas as pd

def _bucket_ll(n, k, smoothing=0.5):
    """
    Log-likelihood for a Bernoulli bucket with optional smoothing to avoid log(0).
    smoothing=0.5 corresponds to a Jeffreys prior-ish stabilization.
    """
    # Smoothed PD
    p = (k + smoothing) / (n + 2 * smoothing)
    return k * np.log(p) + (n - k) * np.log(1 - p)

def optimal_fico_buckets_ll(
    df,
    n_buckets: int,
    score_col: str = "fico_score",
    default_col: str = "default",
    min_bucket_size: int = 50,
    smoothing: float = 0.5
):
    """
    Compute optimal contiguous buckets on FICO scores maximizing total log-likelihood.

    Returns:
      - boundaries: list of (low, high) inclusive score ranges for each bucket (sorted by score ascending)
      - bucket_stats: DataFrame with n, k, pd for each bucket
    """
    # 1) Aggregate counts by score
    g = (
        df[[score_col, default_col]]
        .dropna()
        .groupby(score_col)[default_col]
        .agg(n="count", k="sum")
        .reset_index()
        .sort_values(score_col)
        .reset_index(drop=True)
    )

    scores = g[score_col].to_numpy()
    n_arr = g["n"].to_numpy(dtype=int)
    k_arr = g["k"].to_numpy(dtype=int)

    m = len(g)
    if n_buckets < 1 or n_buckets > m:
        raise ValueError(f"n_buckets must be between 1 and number of unique scores ({m}).")

    # 2) Prefix sums for fast range queries
    pref_n = np.zeros(m + 1, dtype=int)
    pref_k = np.zeros(m + 1, dtype=int)
    pref_n[1:] = np.cumsum(n_arr)
    pref_k[1:] = np.cumsum(k_arr)

    def range_n(i, j):
        return pref_n[j + 1] - pref_n[i]

    def range_k(i, j):
        return pref_k[j + 1] - pref_k[i]

    # Precompute cost (LL) for all i..j that satisfy min_bucket_size
    # cost[i, j] = LL for bucket covering unique-score indices i..j
    cost = np.full((m, m), -np.inf, dtype=float)
    for i in range(m):
        n_sum = 0
        k_sum = 0
        for j in range(i, m):
            n_sum += n_arr[j]
            k_sum += k_arr[j]
            if n_sum >= min_bucket_size:
                cost[i, j] = _bucket_ll(n_sum, k_sum, smoothing=smoothing)

    # 3) Dynamic programming:
    # dp[b, j] = best LL using b buckets to cover scores[0..j]
    dp = np.full((n_buckets + 1, m), -np.inf, dtype=float)
    prev = np.full((n_buckets + 1, m), -1, dtype=int)

    # Base case: 1 bucket covering 0..j
    for j in range(m):
        dp[1, j] = cost[0, j]
        prev[1, j] = 0

    # Transitions
    for b in range(2, n_buckets + 1):
        for j in range(m):
            # last bucket starts at i and ends at j, previous covers 0..i-1
            best_val = -np.inf
            best_i = -1
            for i in range(1, j + 1):
                if dp[b - 1, i - 1] == -np.inf:
                    continue
                if cost[i, j] == -np.inf:
                    continue
                val = dp[b - 1, i - 1] + cost[i, j]
                if val > best_val:
                    best_val = val
                    best_i = i
            dp[b, j] = best_val
            prev[b, j] = best_i

    if dp[n_buckets, m - 1] == -np.inf:
        raise RuntimeError(
            "No feasible solution. Try lowering n_buckets or min_bucket_size."
        )

    # 4) Backtrack to recover bucket ranges (in terms of indices on unique scores)
    cuts = []
    j = m - 1
    b = n_buckets
    while b >= 1:
        i = prev[b, j]
        if i < 0:
            raise RuntimeError("Backtracking failed.")
        cuts.append((i, j))
        j = i - 1
        b -= 1
    cuts.reverse()

    # 5) Convert to score boundaries and compute stats
    boundaries = []
    stats_rows = []
    for idx, (i, j) in enumerate(cuts, start=1):
        low = int(scores[i])
        high = int(scores[j])
        n_sum = range_n(i, j)
        k_sum = range_k(i, j)
        pd_hat = (k_sum + smoothing) / (n_sum + 2 * smoothing)
        boundaries.append((low, high))
        stats_rows.append(
            {"bucket": idx, "fico_low": low, "fico_high": high, "n": n_sum, "k": k_sum, "pd": pd_hat}
        )

    bucket_stats = pd.DataFrame(stats_rows)

    return boundaries, bucket_stats

def assign_rating_from_boundaries(fico_series: pd.Series, boundaries):
    """
    Given boundaries [(low1, high1), ...] sorted ascending by score,
    returns bucket index (1..R) where 1 is lowest-score bucket (worst credit).
    We will later flip this into a rating where 1 = best credit.
    """
    fico = fico_series.to_numpy()
    bucket = np.zeros_like(fico, dtype=int)

    for b, (low, high) in enumerate(boundaries, start=1):
        mask = (fico >= low) & (fico <= high)
        bucket[mask] = b

    # Basic safety check
    if (bucket == 0).any():
        raise ValueError("Some FICO scores were not assigned to any bucket. Check boundaries.")

    return pd.Series(bucket, index=fico_series.index, name="bucket")

def make_rating_map(df, boundaries, score_col="fico_score"):
    """
    Creates a rating where LOWER rating = BETTER credit score.
    Since boundaries are ascending by FICO, the highest-FICO bucket should get rating=1.
    """
    bucket = assign_rating_from_boundaries(df[score_col], boundaries)

    R = len(boundaries)
    # bucket 1 = low FICO (worst) -> rating R
    # bucket R = high FICO (best) -> rating 1
    rating = (R + 1) - bucket
    return rating.rename("rating")

# --------------------
# Example usage:
# --------------------
# boundaries, bucket_stats = optimal_fico_buckets_ll(df, n_buckets=10, min_bucket_size=100, smoothing=0.5)
# df["rating"] = make_rating_map(df, boundaries)
# df["bucket_raw"] = assign_rating_from_boundaries(df["fico_score"], boundaries)
# print(boundaries)
# print(bucket_stats.sort_values("bucket"))
# print(df[["fico_score", "default", "rating"]].head())


In [6]:
boundaries, bucket_stats = optimal_fico_buckets_ll(
    df,
    n_buckets=10,
    score_col="fico_score",
    default_col="default",
    min_bucket_size=100,
    smoothing=0.5
)

print("Bucket boundaries (low, high):")
for i, b in enumerate(boundaries, start=1):
    print(f"Bucket {i}: {b}")

print("\nBucket statistics:")
print(bucket_stats)


Bucket boundaries (low, high):
Bucket 1: (408, 502)
Bucket 2: (503, 520)
Bucket 3: (521, 552)
Bucket 4: (553, 580)
Bucket 5: (581, 611)
Bucket 6: (612, 649)
Bucket 7: (650, 696)
Bucket 8: (697, 732)
Bucket 9: (733, 739)
Bucket 10: (740, 850)

Bucket statistics:
   bucket  fico_low  fico_high     n    k        pd
0       1       408        502   168  121  0.718935
1       2       503        520   133   78  0.585821
2       3       521        552   496  229  0.461771
3       4       553        580   911  307  0.337171
4       5       581        611  1561  381  0.244238
5       6       612        649  2465  402  0.163220
6       7       650        696  2609  256  0.098276
7       8       697        732  1104   64  0.058371
8       9       733        739   122    0  0.004065
9      10       740        850   431   13  0.031250


In [7]:
df["rating"] = make_rating_map(
    df,
    boundaries,
    score_col="fico_score"
)

df["bucket_raw"] = assign_rating_from_boundaries(
    df["fico_score"],
    boundaries
)

df[["fico_score", "default", "rating"]].head()


Unnamed: 0,fico_score,default,rating
0,605,0,6
1,572,1,7
2,602,0,6
3,612,0,5
4,631,0,5


In [8]:
rating_pd = (
    df.groupby("rating")["default"]
    .agg(["count", "mean"])
    .rename(columns={"mean": "pd"})
    .reset_index()
    .sort_values("rating")
)

print(rating_pd)


   rating  count        pd
0       1    431  0.030162
1       2    122  0.000000
2       3   1104  0.057971
3       4   2609  0.098122
4       5   2465  0.163083
5       6   1561  0.244074
6       7    911  0.336992
7       8    496  0.461694
8       9    133  0.586466
9      10    168  0.720238


In [9]:
from sklearn.isotonic import IsotonicRegression

# PD brute par bucket (bucket_raw = 1 = low FICO)
bucket_pd = (
    df.groupby("bucket_raw")["default"]
    .agg(["count", "mean"])
    .reset_index()
    .sort_values("bucket_raw")
)

# On inverse l'ordre car FICO ↑ → PD ↓
x = bucket_pd["bucket_raw"].values
y = bucket_pd["mean"].values

iso = IsotonicRegression(increasing=False)
bucket_pd["pd_monotone"] = iso.fit_transform(x, y)

bucket_pd


Unnamed: 0,bucket_raw,count,mean,pd_monotone
0,1,168,0.720238,0.720238
1,2,133,0.586466,0.586466
2,3,496,0.461694,0.461694
3,4,911,0.336992,0.336992
4,5,1561,0.244074,0.244074
5,6,2465,0.163083,0.163083
6,7,2609,0.098122,0.098122
7,8,1104,0.057971,0.057971
8,9,122,0.0,0.015081
9,10,431,0.030162,0.015081


In [10]:
df["bucket_quantile"] = pd.qcut(
    df["fico_score"],
    q=10,
    labels=False
) + 1

quantile_pd = (
    df.groupby("bucket_quantile")["default"]
    .mean()
    .reset_index()
    .rename(columns={"default": "pd"})
)

quantile_pd


Unnamed: 0,bucket_quantile,pd
0,1,0.490659
1,2,0.307841
2,3,0.24924
3,4,0.181911
4,5,0.174395
5,6,0.128385
6,7,0.105161
7,8,0.094845
8,9,0.071642
9,10,0.036437


In [11]:
from sklearn.cluster import KMeans

kmeans = KMeans(n_clusters=10, random_state=0)
df["bucket_kmeans"] = kmeans.fit_predict(
    df[["fico_score"]]
) + 1

kmeans_pd = (
    df.groupby("bucket_kmeans")["default"]
    .mean()
    .reset_index()
    .rename(columns={"default": "pd"})
)

kmeans_pd


Unnamed: 0,bucket_kmeans,pd
0,1,0.205811
1,2,0.161121
2,3,0.057647
3,4,0.659794
4,5,0.032328
5,6,0.297727
6,7,0.104891
7,8,0.083062
8,9,0.418206
9,10,0.023529


**Among the tested approaches, likelihood-based optimal bucketing combined with monotonic PD smoothing provides the best trade-off between statistical optimality, interpretability, and regulatory consistency. This makes it suitable for production credit risk models.**