In [2]:
import pandas as pd
import numpy as np
from math import log

# --- CONFIGURATION ---
FILE_PATH = "/content/Task 3 and 4_Loan_Data.csv"
TARGET_BUCKET_COUNT = 5 # Number of rating classes requested
MIN_FICO = 300 # Standard minimum FICO score
MAX_FICO = 850 # Standard maximum FICO score

def load_and_prepare_data(file_path):
    """
    Loads data, extracts FICO scores and default status, and groups the data.
    """
    try:
        data = pd.read_csv(file_path)
    except FileNotFoundError:
        print(f"Error: File not found at {file_path}")
        return None

    # 1. Prepare the key data points
    # We only need FICO and Default status
    df_fico = data[['fico_score', 'default']].copy()

    # Ensure scores are within the standard range for clean processing
    df_fico['fico_score'] = df_fico['fico_score'].clip(lower=MIN_FICO, upper=MAX_FICO)

    # Group data by each unique FICO score to pre-calculate n_i and k_i
    # This significantly speeds up the DP computation
    grouped_data = df_fico.groupby('fico_score').agg(
        n_i=('fico_score', 'size'), # total records (n_i)
        k_i=('default', 'sum')      # total defaults (k_i)
    ).reset_index()

    # Create a mapping from FICO score to the row index for DP table access
    fico_to_index = {score: i for i, score in enumerate(grouped_data['fico_score'])}

    return grouped_data, fico_to_index

def log_likelihood_contribution(k_i, n_i):
    """
    Calculates the contribution to the log-likelihood function for a single bucket.

    L = k_i * log(p_i) + (n_i - k_i) * log(1 - p_i)
    Where p_i = k_i / n_i
    """
    if n_i == 0:
        return 0.0

    p_i = k_i / n_i

    # Avoid log(0) errors if p_i is 0 or 1, by adding a tiny epsilon
    epsilon = 1e-9
    p_i = np.clip(p_i, epsilon, 1.0 - epsilon)

    log_L = k_i * log(p_i) + (n_i - k_i) * log(1.0 - p_i)
    return log_L

def solve_quantization_dp(grouped_data, K):
    """
    Implements Dynamic Programming to find K optimal bucket boundaries
    that maximize the total log-likelihood.

    Args:
        grouped_data (pd.DataFrame): Data grouped by FICO score.
        K (int): The target number of buckets.

    Returns:
        tuple: (Optimal boundaries list, Optimal log-likelihood score).
    """
    N = len(grouped_data) # Total number of unique FICO groups

    # --- Step 1: Pre-calculate the cost (Log-Likelihood) for all possible single intervals ---
    # cost[i][j] stores the max log-likelihood for one bucket covering groups i through j
    cost = np.zeros((N, N))

    for i in range(N):
        current_k = 0
        current_n = 0
        for j in range(i, N):
            current_k += grouped_data.loc[j, 'k_i']
            current_n += grouped_data.loc[j, 'n_i']
            cost[i][j] = log_likelihood_contribution(current_k, current_n)

    # --- Step 2: Dynamic Programming Table Setup ---
    # DP[k][j]: Maximum log-likelihood using 'k' buckets ending at group 'j'
    DP = np.full((K + 1, N), -np.inf)
    # P[k][j]: The index 'i' of the optimal split point for DP[k][j]
    P = np.zeros((K + 1, N), dtype=int)

    # Base Case: 1 bucket (k=1)
    for j in range(N):
        DP[1][j] = cost[0][j]
        P[1][j] = 0

    # DP Recurrence: k buckets ending at j
    for k in range(2, K + 1):
        for j in range(k - 1, N): # Must have at least k groups for k buckets
            for i in range(k - 2, j): # Previous split point i
                # DP[k][j] = max_{i < j} ( DP[k-1][i] + cost[i+1][j] )
                current_value = DP[k-1][i] + cost[i+1][j]

                if current_value > DP[k][j]:
                    DP[k][j] = current_value
                    P[k][j] = i

    # --- Step 3: Backtrack to find optimal boundaries ---

    # Start at the end: K buckets ending at the last group (N-1)
    boundaries_indices = []
    current_index = N - 1

    for k in range(K, 1, -1):
        # The split point index 'i' gives the end of the previous bucket (k-1)
        # The new bucket starts at index i + 1
        split_point_index = P[k][current_index]

        # The boundary FICO score is between grouped_data[i]['fico_score'] and grouped_data[i+1]['fico_score']
        # We store the index of the score that STARTS the current bucket (i+1)
        boundaries_indices.append(split_point_index + 1)

        # Move to the end of the previous optimal interval
        current_index = split_point_index

    # Reverse the indices to get them in ascending order
    boundaries_indices = sorted(boundaries_indices)

    # Convert group indices back to FICO scores (the boundary is the *start* of the new bucket)
    optimal_boundaries = [grouped_data.loc[idx, 'fico_score'] for idx in boundaries_indices]

    # Add the overall min and max FICO scores for clarity
    final_boundaries = [MIN_FICO] + optimal_boundaries + [MAX_FICO + 1]

    return final_boundaries, DP[K][N-1]

def create_rating_map(boundaries, grouped_data):
    """
    Creates a rating map that assigns a rating (1=Best, K=Worst) to FICO score ranges.
    """

    # Create the final rating map by iterating through the groups
    rating_map = {}

    # K is the number of buckets, which is len(boundaries) - 1
    num_buckets = len(boundaries) - 1

    # Determine the credit rating for each bucket. Lower rating = Better Score.
    # Since FICO is sorted, the first bucket has the lowest scores (worst credit).
    # We need to map the best FICO score range (highest scores) to rating 1.

    # First, calculate the PD for each bucket
    pd_per_bucket = []
    for i in range(num_buckets):
        lower_bound = boundaries[i]
        upper_bound = boundaries[i+1]

        # Filter the original grouped data for the current bucket
        bucket_data = grouped_data[
            (grouped_data['fico_score'] >= lower_bound) &
            (grouped_data['fico_score'] < upper_bound)
        ]

        n_i = bucket_data['n_i'].sum()
        k_i = bucket_data['k_i'].sum()

        pd_per_bucket.append({
            'min_score': lower_bound,
            'max_score': upper_bound - 1,
            'default_rate': k_i / n_i if n_i > 0 else 0,
            'total_count': n_i,
        })

    # Sort the buckets by default rate (ascending) to assign ratings
    # Lowest default rate gets the best rating (Rating 1)
    pd_per_bucket.sort(key=lambda x: x['default_rate'], reverse=False)

    # Assign the final rating based on the sorted order
    # Rating 1 is the best (lowest PD)
    for i, bucket in enumerate(pd_per_bucket):
        # We need to assign a numerical rating from 1 (best) to K (worst)
        bucket['rating'] = i + 1

    # Convert the rating map back to FICO score order for presentation
    # Sort by FICO score range (min_score)
    pd_per_bucket.sort(key=lambda x: x['min_score'])

    return pd_per_bucket

def main():
    """
    Main execution logic.
    """
    print(f"Starting FICO Quantization for K={TARGET_BUCKET_COUNT} buckets...")

    # Step 1: Load and Prepare Data
    grouped_data, fico_to_index = load_and_prepare_data(FILE_PATH)
    if grouped_data is None:
        return

    # Step 2: Solve the Dynamic Programming problem
    boundaries, max_log_L = solve_quantization_dp(grouped_data, TARGET_BUCKET_COUNT)

    # Step 3: Create the Rating Map
    rating_map = create_rating_map(boundaries, grouped_data)

    # --- Display Results ---

    # Calculate the exact score where the boundary falls (the first score of the new bucket)
    boundary_scores = [s for s in boundaries if s > MIN_FICO and s <= MAX_FICO]

    print("\n" + "="*50)
    print(f"Optimal FICO Boundaries for K={TARGET_BUCKET_COUNT} (Max Log-Likelihood)")
    print("="*50)
    print(f"Maximum Log-Likelihood Achieved: {max_log_L:.2f}")
    print(f"Optimal Score Splits (Start of New Bucket): {boundary_scores}")

    # Create the final output table
    final_output = []

    # Use the boundaries list to create the final credit rating bands
    for i in range(len(boundaries) - 1):
        min_score = boundaries[i]
        max_score = boundaries[i+1] - 1

        # Find the corresponding PD and Rating from the map
        bucket_info = next(item for item in rating_map if item['min_score'] == min_score)

        final_output.append({
            "Rating": bucket_info['rating'],
            "FICO Range": f"{min_score}-{max_score}",
            "Default Rate (PD)": f"{bucket_info['default_rate']:.4f}",
            "Count": bucket_info['total_count']
        })

    # Sort the final output by FICO Range (ascending)
    final_output.sort(key=lambda x: int(x['FICO Range'].split('-')[0]))

    print("\n" + "="*20 + " FICO RATING MAP " + "="*21)
    print(" (Lower Rating signifies Better Creditworthiness)")
    print("="*50)

    # Print the final result table
    df_result = pd.DataFrame(final_output)
    print(df_result.to_markdown(index=False, numalign="left", stralign="left"))
    print("="*50)

if __name__ == "__main__":
    main()

Starting FICO Quantization for K=5 buckets...

Optimal FICO Boundaries for K=5 (Max Log-Likelihood)
Maximum Log-Likelihood Achieved: -4255.38
Optimal Score Splits (Start of New Bucket): [np.int64(521), np.int64(581), np.int64(641), np.int64(697)]

 (Lower Rating signifies Better Creditworthiness)
| Rating   | FICO Range   | Default Rate (PD)   | Count   |
|:---------|:-------------|:--------------------|:--------|
| 5        | 300-520      | 0.6611              | 301     |
| 4        | 521-580      | 0.381               | 1407    |
| 3        | 581-640      | 0.2045              | 3438    |
| 2        | 641-696      | 0.1051              | 3197    |
| 1        | 697-850      | 0.0465              | 1657    |
