In [1]:
import pandas as pd
import numpy as np

In [2]:
def precompute_costs(fico, defaults):
    """
    Given sorted arrays "fico" and "defaults" (0/1), compute:
      - cost_mse[i,j]: sum of squared errors if {fico[i..j]} are approximated
                       by their mean.
      - cost_nll[i,j]: negative log-likelihood of defaults[i..j] under
                       p = k/(n), i.e. 
                         -(k ln p + (n-k) ln(1-p))
    """
    N = len(fico)
    # prefix sums for fast interval sums
    S1 = np.concatenate([[0], fico.cumsum()])
    S2 = np.concatenate([[0], (fico**2).cumsum()])
    D  = np.concatenate([[0], defaults.cumsum()])
    
    cost_mse = np.zeros((N, N))
    cost_nll = np.zeros((N, N))
    
    for i in range(N):
        for j in range(i, N):
            n = j - i + 1
            sum_x  = S1[j+1] - S1[i]
            sum_x2 = S2[j+1] - S2[i]
            # MSE cost = sum(x^2) - sum(x)^2/n
            cost_mse[i,j] = sum_x2 - (sum_x**2) / n
            
            # NLL cost
            k = D[j+1] - D[i]
            # avoid log(0)
            if k == 0 or k == n:
                cost_nll[i,j] = 0.0
            else:
                p = k / n
                cost_nll[i,j] = - (k * np.log(p) + (n-k) * np.log(1-p))
    return cost_mse, cost_nll

In [3]:
def optimal_buckets(cost, K):
    """
    Generic DP to split indices [0..N-1] into K buckets minimizing total cost.
    cost[i,j] = cost of one bucket covering i..j.
    Returns a list of boundary indices and total cost.
    """
    N = cost.shape[0]
    # dp[w][j]: min cost to partition 0..j into w+1 buckets
    dp = np.full((K, N), np.inf)
    back = np.zeros((K, N), int)
    
    # base: one bucket (w=0) covers 0..j
    dp[0, :] = cost[0, :]
    
    for w in range(1, K):
        for j in range(w, N):
            # try last break at t-1
            # bucket w covers t..j
            best = np.inf
            bt   = w
            for t in range(w, j+1):
                c = dp[w-1, t-1] + cost[t, j]
                if c < best:
                    best, bt = c, t
            dp[w, j]      = best
            back[w, j]    = bt
    
    # backtrack
    boundaries = []
    w, j = K-1, N-1
    while w >= 0:
        t = back[w, j] if w>0 else 0
        boundaries.append((t, j))
        j -= (j - t + 1)
        w -= 1
    boundaries.reverse()
    return boundaries, dp[K-1, N-1]

In [4]:
def bucket_fico(
    df: pd.DataFrame,
    K: int,
    method: str = 'mse'
):
    """
    df must have columns 'fico_score' and 'default' (0/1).
    K = number of buckets.
    method = 'mse' | 'nll'
    Returns:
      - bucket_ranges: list of (low, high) score ranges
      - bucket_defaults: list of observed default rates per bucket
    """
    # sort unique scores and aggregate defaults/counts
    agg = df.groupby('fico_score')['default'].agg(['count','sum']).reset_index()
    scores   = agg['fico_score'].values
    counts   = agg['count'].values
    defaults = agg['sum'].values
    
    # expand so each row = one record (since cost[i,j] is on raw sorted list,
    # but we can treat each unique score as repeated by its count)
    # Instead, we weight by count in prefix sums:
    #   replicate fico and default arrays by counts
    fico_exp   = np.repeat(scores, counts)
    def_exp    = np.repeat(defaults/counts, counts)  # fractional default?
    # alternatively for pure 0/1 expansion use:
    # def_exp = np.hstack([[1]*d + [0]*(c-d) for c,d in zip(counts, defaults)])
    
    cost_mse, cost_nll = precompute_costs(fico_exp, def_exp)
    cost = cost_mse if method=='mse' else cost_nll
    
    boundaries, total_cost = optimal_buckets(cost, K)
    
    bucket_ranges    = []
    bucket_defaults  = []
    for i,j in boundaries:
        vals = fico_exp[i:j+1]
        defs = def_exp[i:j+1]
        bucket_ranges.append((vals.min(), vals.max()))
        bucket_defaults.append(defs.mean())
    
    return bucket_ranges, bucket_defaults


In [6]:
# ───────────────
# Example
# ───────────────
if __name__=='__main__':
    # load your mortgage data
    df = pd.read_csv('Loan_Data.csv')  
      # must have fico_score, default columns
    
    for method in ('mse','nll'):
        ranges, rates = bucket_fico(df, K=5, method=method)
        print(f"\nMethod = {method.upper()}")
        for idx,(rng, p) in enumerate(zip(ranges, rates), 1):
            lo, hi = rng
            print(f"  Bucket {idx}: {lo:.0f}–{hi:.0f}, PD≈{p:.3f}")


Method = MSE
  Bucket 1: 408–552, PD≈0.537
  Bucket 2: 553–607, PD≈0.283
  Bucket 3: 608–654, PD≈0.161
  Bucket 4: 655–706, PD≈0.092
  Bucket 5: 707–850, PD≈0.045

Method = NLL
  Bucket 1: 408–520, PD≈0.661
  Bucket 2: 521–580, PD≈0.381
  Bucket 3: 581–640, PD≈0.204
  Bucket 4: 641–696, PD≈0.105
  Bucket 5: 697–850, PD≈0.046
