In [None]:
import numpy as np
import pandas as pd


In [None]:
# Load only required columns
df = pd.read_csv('Loan_Data.csv')
df.columns = [c.strip().lower() for c in df.columns]

use_cols = ['fico_score', 'default']
dfq = df[use_cols].copy()

# Clean numeric
dfq['fico_score'] = pd.to_numeric(dfq['fico_score'], errors='coerce')
dfq['default']   = pd.to_numeric(dfq['default'], errors='coerce')

# Drop missing and non-binary targets
dfq = dfq.dropna(subset=['fico_score', 'default'])
dfq = dfq[dfq['default'].isin([0,1])]

# Basic stats
print("Count:", len(dfq))
print("FICO min/max:", dfq['fico_score'].min(), dfq['fico_score'].max())
print("FICO percentiles:", dfq['fico_score'].quantile([0.01,0.1,0.5,0.9,0.99]).to_dict())
print("Default rate:", dfq['default'].mean())


In [None]:
# Decide number of buckets
K = 10  # change if needed

# Compute raw quantile edges
qs = np.linspace(0, 1, K+1)
edges = dfq['fico_score'].quantile(qs).values

# Enforce strictly increasing edges by removing duplicates
# (can happen if many identical scores)
edges_unique = [edges[0]]
for v in edges[1:]:
    if v > edges_unique[-1]:
        edges_unique.append(v)
# If we lost edges due to ties, reduce K accordingly
K_eff = len(edges_unique) - 1
print(f"Requested K={K}, effective K after tie handling={K_eff}")
boundaries = np.array(edges_unique)

print("Quantile boundaries:", boundaries)




# boundaries and K_eff must be defined from Step 2
bins = boundaries
labels = list(range(1, K_eff+1))  # temporary labels 1..K_eff (low FICO -> low label)

# Assign provisional bin indices (0..K_eff-1) using pandas.cut
# include_lowest=True ensures the minimum is included
bin_idx = pd.cut(dfq['fico_score'], bins=bins, include_lowest=True, right=True, labels=False)

# Map to ratings where lower rating = better credit (higher FICO)
# Highest bin (largest FICO) should be rating 1
rating = (K_eff - bin_idx).astype(int)

dfq['rating'] = rating

# Quick checks: population and PD per rating
by = dfq.groupby('rating').agg(
    n=('default','size'),
    avg_fico=('fico_score','mean'),
    pd_rate=('default','mean')
).sort_index()

print(by)



In [None]:
# Start from per-rating stats
stats = dfq.groupby('rating').agg(
    n=('default','size'),
    k=('default','sum')
).sort_index().reset_index()

# PAV: enforce non-increasing PD as rating improves (rating 1 best)
stats['pd'] = stats['k'] / stats['n']

# Convert to lists for merging
ratings = stats['rating'].tolist()
n = stats['n'].astype(float).tolist()
k = stats['k'].astype(float).tolist()
pd_list = stats['pd'].tolist()

i = 0
while i < len(pd_list) - 1:
    # pd should be non-decreasing as rating index increases (worse credit has higher PD)
    if pd_list[i] > pd_list[i+1]:
        # Merge i and i+1
        n_merge = n[i] + n[i+1]
        k_merge = k[i] + k[i+1]
        pd_merge = k_merge / n_merge
        # Replace i with merged, remove i+1
        n[i] = n_merge; k[i] = k_merge; pd_list[i] = pd_merge
        del n[i+1]; del k[i+1]; del pd_list[i+1]; del ratings[i+1]
        # Step back one position if possible to re-check monotonicity
        if i > 0:
            i -= 1
    else:
        i += 1

# Build mapping from original rating to pooled group index
# After pooling, reindex pooled groups to consecutive ratings 1..len(pools)
pooled_groups = list(range(1, len(pd_list)+1))
rating_map = {old: pooled_groups[idx] for idx, old in enumerate(ratings)}

# Apply mapping
dfq['rating_pooled'] = dfq['rating'].map(rating_map)

# Summarize pooled ratings
by_pooled = dfq.groupby('rating_pooled').agg(
    n=('default','size'),
    avg_fico=('fico_score','mean'),
    pd_rate=('default','mean')
).sort_index()

print(by_pooled)


In [None]:
import numpy as np
import pandas as pd

# Choose which ratings to finalize: pooled or original
use_col = 'rating_pooled' if 'rating_pooled' in dfq.columns else 'rating'

diag = dfq.groupby(use_col).agg(
    n=('default','size'),
    avg_fico=('fico_score','mean'),
    pd_rate=('default','mean')
).sort_index()
print(diag)

# Simple checks
min_bucket_size = diag['n'].min()
is_monotone = (diag['pd_rate'].values == np.sort(diag['pd_rate'].values)).all()
print("Min bucket size:", min_bucket_size)
print("PD monotone non-decreasing with worse rating:", is_monotone)

# Finalize boundaries (bins) and build a rater
final_bins = boundaries  # if pooling changed groups, you may recompute explicit cutpoints per pooled rating
def rate_fico(fico_value, bins=final_bins):
    # Returns rating where 1 = best (highest bin), K = worst (lowest bin)
    idx = pd.cut(pd.Series([fico_value]), bins=bins, include_lowest=True, right=True, labels=False).iloc[0]
    return int(len(bins)-1 - idx)  # invert so best score -> 1

# Example usage:
# print(rate_fico(780))
# print(rate_fico(620))

import json
artifact = {'bins': final_bins.tolist(), 'rating_best_is_1': True}
with open('fico_rating_map.json','w') as f:
    json.dump(artifact, f)
