In [None]:
import numpy as np
import pandas as pd

def prefix_stats(sorted_scores, sorted_defaults):

    x = np.asarray(sorted_scores, dtype=float)
    y = np.asarray(sorted_defaults, dtype=float)
    n = len(x)
    ps_x = np.concatenate(([0.0], np.cumsum(x)))
    ps_x2 = np.concatenate(([0.0], np.cumsum(x * x)))
    ps_y = np.concatenate(([0.0], np.cumsum(y)))
    return ps_x, ps_x2, ps_y

def interval_stats(ps_x, ps_x2, ps_y, i, j):

    count = j - i + 1
    sum_x = ps_x[j+1] - ps_x[i]
    sum_x2 = ps_x2[j+1] - ps_x2[i]
    sum_y = ps_y[j+1] - ps_y[i]
    return count, sum_x, sum_x2, sum_y

def optimal_buckets_mse(scores, defaults, K, min_bucket_size=1):

    order = np.argsort(scores)
    s = np.array(scores)[order]
    d = np.array(defaults)[order]
    n = len(s)
    ps_x, ps_x2, ps_y = prefix_stats(s, d)


    cost = np.full((n, n), np.inf)
    for i in range(n):
        for j in range(i + min_bucket_size -1, n):
            cnt, sum_x, sum_x2, _ = interval_stats(ps_x, ps_x2, ps_y, i, j)
            mean = sum_x / cnt
            sse = sum_x2 - 2 * mean * sum_x + cnt * mean * mean
            cost[i, j] = sse


    K = int(K)
    dp = np.full((K+1, n), np.inf)
    prev = np.full((K+1, n), -1, dtype=int)

    for j in range(min_bucket_size-1, n):
        dp[1, j] = cost[0, j]
        prev[1, j] = -1

    for k in range(2, K+1):
        for j in range(k*min_bucket_size -1, n):

            i_min = (k-1)*min_bucket_size -1
            best_val = np.inf
            best_i = -1

            for i in range(i_min, j - min_bucket_size + 1):
                val = dp[k-1, i] + cost[i+1, j]
                if val < best_val:
                    best_val = val
                    best_i = i
            dp[k, j] = best_val
            prev[k, j] = best_i


    boundaries_idx = []
    k = K
    j = n-1
    while k > 0:
        i = prev[k, j]
        start = 0 if i == -1 else i+1
        boundaries_idx.append((start, j))
        j = i
        k -= 1
    boundaries_idx = list(reversed(boundaries_idx))

    buckets = []
    for (i,j) in boundaries_idx:
        cnt, sum_x, sum_x2, sum_y = interval_stats(ps_x, ps_x2, ps_y, i, j)
        mean_score = sum_x / cnt
        p_hat = sum_y / cnt
        low, high = s[i], s[j]
        sse = cost[i,j]
        buckets.append({
            'i': i, 'j': j, 'low': low, 'high': high,
            'n': int(cnt), 'k': int(sum_y),
            'p_hat': float(p_hat), 'mean_score': float(mean_score), 'sse': float(sse)
        })
    df = pd.DataFrame(buckets)
    return df, order


def interval_loglik(ps_x, ps_x2, ps_y, i, j, laplace_alpha=1.0):


    cnt, sum_x, sum_x2, sum_y = interval_stats(ps_x, ps_x2, ps_y, i, j)
    k = sum_y

    p = (k + laplace_alpha) / (cnt + 2*laplace_alpha)
    return k * np.log(p) + (cnt - k) * np.log(1 - p)

def optimal_buckets_loglik(scores, defaults, K, min_bucket_size=1, laplace_alpha=1.0):

    order = np.argsort(scores)
    s = np.array(scores)[order]
    d = np.array(defaults)[order]
    n = len(s)
    ps_x, ps_x2, ps_y = prefix_stats(s, d)


    loglik = np.full((n, n), -np.inf)
    for i in range(n):
        for j in range(i + min_bucket_size -1, n):
            loglik[i, j] = interval_loglik(ps_x, ps_x2, ps_y, i, j, laplace_alpha=laplace_alpha)


    dp = np.full((K+1, n), -np.inf)
    prev = np.full((K+1, n), -1, dtype=int)

    for j in range(min_bucket_size-1, n):
        dp[1, j] = loglik[0, j]
        prev[1, j] = -1

    for k in range(2, K+1):
        for j in range(k*min_bucket_size -1, n):
            best_val = -np.inf
            best_i = -1
            i_min = (k-1)*min_bucket_size -1
            for i in range(i_min, j - min_bucket_size + 1):
                val = dp[k-1, i] + loglik[i+1, j]
                if val > best_val:
                    best_val = val
                    best_i = i
            dp[k, j] = best_val
            prev[k, j] = best_i


    boundaries_idx = []
    k = K
    j = n-1
    while k > 0:
        i = prev[k, j]
        start = 0 if i == -1 else i+1
        boundaries_idx.append((start, j))
        j = i
        k -= 1
    boundaries_idx = list(reversed(boundaries_idx))

    buckets = []
    for (i,j) in boundaries_idx:
        cnt, sum_x, sum_x2, sum_y = interval_stats(ps_x, ps_x2, ps_y, i, j)
        p_hat = (sum_y + laplace_alpha) / (cnt + 2*laplace_alpha)
        low, high = s[i], s[j]
        buckets.append({
            'i': i, 'j': j, 'low': low, 'high': high,
            'n': int(cnt), 'k': int(sum_y),
            'p_hat': float(p_hat), 'mean_score': float(sum_x/cnt),
            'loglik': float(interval_loglik(ps_x, ps_x2, ps_y, i, j, laplace_alpha=laplace_alpha))
        })
    df = pd.DataFrame(buckets)
    return df, order


def build_rating_map(df_buckets):


    df = df_buckets.copy()

    df = df.sort_values('mean_score', ascending=False).reset_index(drop=True)
    df['rating'] = np.arange(1, len(df)+1)

    mapping = df[['rating', 'low', 'high', 'p_hat', 'n', 'k', 'mean_score']]
    return mapping

def score_to_rating(score, mapping_df):

    for _, row in mapping_df.iterrows():
        low = row['low']
        high = row['high']

        if low <= score <= high:
            return int(row['rating'])

    if score < mapping_df['low'].min():
        return int(mapping_df['rating'].max())
    else:
        return int(mapping_df['rating'].min())


if __name__ == "__main__":

    rng = np.random.default_rng(0)
    n = 2000
    fic = rng.integers(300, 851, size=n)
    prob = 1 / (1 + np.exp((fic - 600)/25)) * 0.5

    labels = rng.binomial(1, prob)
    K = 5

    df_mse, order = optimal_buckets_mse(fic, labels, K, min_bucket_size=20)
    print("MSE buckets (ascending FICO):")
    print(df_mse)

    df_ll, order_ll = optimal_buckets_loglik(fic, labels, K, min_bucket_size=20, laplace_alpha=1.0)
    print("\nLog-likelihood buckets (ascending FICO):")
    print(df_ll)

    rating_map_mse = build_rating_map(df_mse)
    print("\nRating map (MSE -> rating 1 best):")
    print(rating_map_mse)


    test_score = 720
    rating_for_test = score_to_rating(test_score, rating_map_mse)
    print(f"\nSample score {test_score} -> rating {rating_for_test}")
