# Setup

In [1]:
import os
from pathlib import Path
os.chdir(Path.cwd().parent)
# print("cwd is now:", Path.cwd())

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from datetime import datetime
from data_loader import scores_df
# from fit import fit_statistical_model
from sklearn.linear_model import LinearRegression

null performances after coercion: 0
after filter num benchmarks 1965
after merge with model versions 1965
after date filter (>= 2022-11-01) 1766
after merge with benchmark dates 1766
Original number of rows: 1766
Number of rows after aggregation: 1324


In [3]:
anchor_benchmark = "Winogrande"
anchor_difficulty = 0
anchor_slope = 1
# df1, df_cm1, df_db1 = fit_statistical_model(scores_df, anchor_benchmark, anchor_difficulty, anchor_slope)

# # Convert date strings to datetime objects
# df_cm1['date_obj'] = pd.to_datetime(df_cm1['date'])

# Cross validation (Appendix E.4: Varying the statistical model)

In [4]:
import numpy as np
import pandas as pd
from scipy.optimize import least_squares
from sklearn.model_selection import KFold

# MODIFIED: Added anchor_slope and anchor_difficulty to the function signature.
def split_params(params: np.ndarray, num_models: int, num_benchmarks: int, anchor_idx: int, model_type: str, anchor_slope: float, anchor_difficulty: float):
    """
    Breaks the flat parameter vector into C, D, full-length α,
    and gammas (for sigmoid_with_offsets).
    """
    C = params[:num_models]

    # MODIFICATION: Reconstruct D by inserting the fixed anchor difficulty.
    # The optimizer only solves for the other N-1 difficulties.
    D_free = params[num_models : num_models + num_benchmarks - 1]
    D = np.insert(D_free, anchor_idx, anchor_difficulty)

    # MODIFICATION: Adjust index for alpha and insert the fixed anchor slope.
    alpha_free = params[num_models + num_benchmarks - 1 : num_models + 2 * num_benchmarks - 2]
    alpha = np.insert(alpha_free, anchor_idx, anchor_slope)

    # Handle parameters specific to the 'sigmoid_with_offsets' model
    if model_type == 'sigmoid_with_offsets':
        # MODIFICATION: Adjust indices for gamma parameters.
        gamma_lower = params[num_models + 2 * num_benchmarks - 2 : num_models + 3 * num_benchmarks - 2]
        gamma_upper = params[num_models + 3 * num_benchmarks - 2:]
        return C, D, alpha, gamma_lower, gamma_upper

    return C, D, alpha

# MODIFIED: Added anchor_slope and anchor_difficulty to the function signature.
def predict(params, model_idx, bench_idx, num_models, num_benchmarks, anchor_idx, model_type, anchor_slope, anchor_difficulty):
    """Calculates predictions based on the specified model type."""
    # Unpack the parameters based on the model type
    # MODIFICATION: Pass anchor values to split_params
    split_args = (params, num_models, num_benchmarks, anchor_idx, model_type, anchor_slope, anchor_difficulty)
    if model_type == 'sigmoid_with_offsets':
        C, D, alpha, lower_asymptote, upper_asymptote = split_params(*split_args)
    else:
        C, D, alpha = split_params(*split_args)

    # Core prediction logic
    x = alpha[bench_idx] * (C[model_idx] - D[bench_idx])

    # Apply the correct functional form
    if model_type == 'clipped_linear':
        return np.clip(x, 0, 1)
    elif model_type == 'sigmoid_with_offsets':
        return lower_asymptote[bench_idx] + (upper_asymptote[bench_idx] - lower_asymptote[bench_idx]) / (1 + np.exp(-x))
    else:  # Default to 'sigmoid'
        return 1.0 / (1.0 + np.exp(-x))

def fit_statistical_model(
    df,
    anchor_benchmark,
    anchor_difficulty,
    anchor_slope,
    slope_init=0.05,
    df_model=None,
    model_type='sigmoid',
    full_dataset_maps=None
):
    """
    Fits a statistical model to benchmark performance data.

    Args:
        df (pd.DataFrame): DataFrame with benchmark performance data.
        anchor_benchmark (str): The name of the anchor benchmark.
        anchor_difficulty (float): The difficulty of the anchor benchmark to be fixed.
        anchor_slope (float): The slope of the anchor benchmark to be fixed.
        slope_init (float, optional): Initial slope value. Defaults to 0.05.
        df_model (pd.DataFrame, optional): DataFrame with model metadata. Defaults to None.
        model_type (str, optional): The type of model to fit.
        full_dataset_maps (dict, optional): Pre-computed mappings for models and benchmarks.
    """
    # ------------------------------------------------------------
    # 1) Mappings & data arrays
    # ------------------------------------------------------------
    if full_dataset_maps:
        model_id_to_idx = full_dataset_maps['model_id_to_idx']
        bench_id_to_idx = full_dataset_maps['bench_id_to_idx']
        valid_model_ids = list(model_id_to_idx.keys())
        benchmark_ids = list(bench_id_to_idx.keys())
    else:
        valid_model_ids = df["model_id"].unique()
        benchmark_ids = df["benchmark_id"].unique()
        model_id_to_idx = {m_id: i for i, m_id in enumerate(valid_model_ids)}
        bench_id_to_idx = {b_id: i for i, b_id in enumerate(benchmark_ids)}

    num_models = len(valid_model_ids)
    num_benchmarks = len(benchmark_ids)

    model_idx_data = np.array([model_id_to_idx[m] for m in df["model_id"]])
    bench_idx_data = np.array([bench_id_to_idx[b] for b in df["benchmark_id"]])
    observed_scores = df["performance"].values

    # ------------------------------------------------------------
    # 2) Anchor benchmark
    # ------------------------------------------------------------
    try:
        anchor_bench_id_from_df = df.loc[df["benchmark"] == anchor_benchmark, "benchmark_id"].iloc[0]
    except IndexError:
        raise ValueError(f"Anchor benchmark “{anchor_benchmark}” not found in the provided dataframe split.")

    anchor_idx = bench_id_to_idx[anchor_bench_id_from_df]

    # ------------------------------------------------------------
    # 3) Residuals function for least squares
    # ------------------------------------------------------------
    def residuals(params, model_idx, bench_idx, y):
        """Calculate the residuals for the least squares fit."""
        # MODIFICATION: Pass anchor values to the predict function.
        preds = predict(params, model_idx, bench_idx, num_models, num_benchmarks, anchor_idx, model_type, anchor_slope, anchor_difficulty)
        return preds - y

    # ------------------------------------------------------------
    # 4) Initial guesses
    # ------------------------------------------------------------
    initial_C = np.zeros(num_models)
    # MODIFICATION: We only optimize N-1 difficulties, since one is fixed.
    initial_D = np.zeros(num_benchmarks - 1)
    initial_alpha = np.full(num_benchmarks - 1, slope_init)
    initial_theta = np.concatenate([initial_C, initial_D, initial_alpha])

    if model_type == 'sigmoid_with_offsets':
        # MODIFICATION: The number of gamma parameters is still num_benchmarks
        initial_gamma_lower = np.full(num_benchmarks, 0.0)
        initial_gamma_upper = np.full(num_benchmarks, 1.0)
        initial_theta = np.concatenate([initial_theta, initial_gamma_lower, initial_gamma_upper])

    # ------------------------------------------------------------
    # 5) Fit
    # ------------------------------------------------------------
    result = least_squares(
        residuals,
        initial_theta,
        args=(model_idx_data, bench_idx_data, observed_scores),
        method="trf",
        verbose=0
    )

    # ------------------------------------------------------------
    # 6) Recover full parameter vectors and normalize
    # ------------------------------------------------------------
    theta_hat = result.x

    # MODIFICATION: Pass anchor values to get the full parameter vectors.
    split_args = (theta_hat, num_models, num_benchmarks, anchor_idx, model_type, anchor_slope, anchor_difficulty)
    if model_type == 'sigmoid_with_offsets':
        C_hat, D_hat, alpha_hat, gammas_hat_lower, gammas_hat_upper = split_params(*split_args)
    else:
        C_hat, D_hat, alpha_hat = split_params(*split_args)

    # MODIFICATION: The normalization shift now correctly centers the capabilities
    # around the fixed anchor difficulty. We no longer need to shift D_hat.
    shift = D_hat[anchor_idx] # This is equal to anchor_difficulty
    C_hat -= shift
    # D_hat -= shift # This line is no longer needed as D is fixed.

    # ------------------------------------------------------------
    # 7) Pack tidy DataFrames for inspection / downstream use
    # ------------------------------------------------------------
    # (This section remains unchanged)
    if df_model is not None:
        id_to_name = df.drop_duplicates("model_id").set_index("model_id")["model"].to_dict()
        model_cap_df = (
            pd.DataFrame({"model_id": valid_model_ids, "estimated_capability": C_hat})
            .assign(model=lambda d: d["model_id"].map(id_to_name))
        )
        model_cap_df = model_cap_df.merge(df_model, on="model", how="left")
        model_capabilities_df = model_cap_df.sort_values("estimated_capability", ascending=False)
    else:
        model_capabilities_df = pd.DataFrame(
            {"model_id": valid_model_ids, "estimated_capability": C_hat}
        ).sort_values("estimated_capability", ascending=False)

    release_date_map = (
        df.drop_duplicates("benchmark_id")
          .set_index("benchmark_id")["benchmark_release_date"]
          .to_dict()
    )

    benchmark_params_df = pd.DataFrame({
        "benchmark_id": benchmark_ids,
        "estimated_difficulty": D_hat,
        "estimated_slope": alpha_hat,
    })

    if model_type == 'sigmoid_with_offsets':
        benchmark_params_df['estimated_lower_asymptote'] = gammas_hat_lower
        benchmark_params_df['estimated_upper_asymptote'] = gammas_hat_upper

    benchmark_params_df = benchmark_params_df.assign(
        benchmark_name=lambda d: d["benchmark_id"].map(
            dict(zip(df["benchmark_id"], df["benchmark"]))
        ),
        benchmark_release_date=lambda d: d["benchmark_id"].map(release_date_map),
    ).sort_values("estimated_difficulty")

    return theta_hat, model_capabilities_df, benchmark_params_df

# ==============================================================================
# NEW MODEL COMPARISON AND CROSS-VALIDATION CODE
# ==============================================================================

def calculate_metrics(y_true, y_pred, num_params):
    """
    Calculates R-squared, MSE, AIC, and BIC.
    """
    n_obs = len(y_true)
    rss = np.sum((y_true - y_pred) ** 2)
    mse = rss / n_obs
    tss = np.sum((y_true - np.mean(y_true)) ** 2)
    r2 = 1 - (rss / tss) if tss > 0 else 0
    aic = n_obs * np.log(mse) + 2 * num_params if mse > 0 else -np.inf
    bic = n_obs * np.log(mse) + num_params * np.log(n_obs) if mse > 0 else -np.inf
    return {'r2': r2, 'mse': mse, 'aic': aic, 'bic': bic}


def compare_models_cross_validation(df, anchor_benchmark, anchor_difficulty, anchor_slope, n_splits=10):
    """
    Compares sigmoid and clipped_linear models using k-fold cross-validation.

    Args:
        df (pd.DataFrame): The full dataset.
        anchor_benchmark (str): The name of the anchor benchmark.
        anchor_difficulty (float): The difficulty value for the anchor benchmark.
        anchor_slope (float): The slope value for the anchor benchmark.
        n_splits (int): The number of folds for cross-validation.
    """
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    model_types = ['sigmoid', 'clipped_linear']
    results = []

    # Pre-calculate mappings from the FULL dataset for consistency across folds
    valid_model_ids = df['model_id'].unique()
    benchmark_ids = df['benchmark_id'].unique()
    model_id_to_idx = {m_id: i for i, m_id in enumerate(valid_model_ids)}
    bench_id_to_idx = {b_id: i for i, b_id in enumerate(benchmark_ids)}
    num_models = len(valid_model_ids)
    num_benchmarks = len(benchmark_ids)

    try:
        anchor_bench_id = df.loc[df["benchmark"] == anchor_benchmark, "benchmark_id"].iloc[0]
    except IndexError:
        raise ValueError(f"Anchor benchmark '{anchor_benchmark}' not found in the dataset.")

    anchor_idx = bench_id_to_idx[anchor_bench_id]

    full_dataset_maps = {
        'model_id_to_idx': model_id_to_idx,
        'bench_id_to_idx': bench_id_to_idx,
    }

    print(f"Running {n_splits}-fold cross-validation...")
    for model_type in model_types:
        print(f"  Evaluating model: {model_type}")
        y_pred_out_of_sample = np.zeros(len(df))

        for fold, (train_index, test_index) in enumerate(kf.split(df)):
            # print(f"    - Fold {fold+1}/{n_splits}") # Optional: uncomment for more verbose output
            df_train, df_test = df.iloc[train_index], df.iloc[test_index]

            # --- FIX 1: Pass anchor_difficulty and anchor_slope to the fitting function ---
            theta_hat, _, _ = fit_statistical_model(
                df=df_train,
                anchor_benchmark=anchor_benchmark,
                anchor_difficulty=anchor_difficulty, # Corrected
                anchor_slope=anchor_slope,         # Corrected
                model_type=model_type,
                full_dataset_maps=full_dataset_maps
            )

            # Get indices for the test data using the full dataset mapping
            model_idx_test = np.array([model_id_to_idx[m] for m in df_test["model_id"]])
            bench_idx_test = np.array([bench_id_to_idx[b] for b in df_test["benchmark_id"]])

            # --- FIX 2: Pass anchor_difficulty and anchor_slope to the predict function ---
            y_pred_test = predict(
                theta_hat,
                model_idx_test,
                bench_idx_test,
                num_models,
                num_benchmarks,
                anchor_idx,
                model_type,
                anchor_slope=anchor_slope,         # Corrected
                anchor_difficulty=anchor_difficulty  # Corrected
            )
            y_pred_out_of_sample[test_index] = y_pred_test

        # --- Calculate metrics based on out-of-sample CV predictions ---
        y_true = df['performance'].values
        cv_mse = np.mean((y_true - y_pred_out_of_sample) ** 2)
        cv_tss = np.sum((y_true - np.mean(y_true)) ** 2)
        cv_rss = np.sum((y_true - y_pred_out_of_sample) ** 2)
        cv_r2 = 1 - (cv_rss / cv_tss)

        # --- Fit on full data to get AIC/BIC ---
        print(f"  Fitting {model_type} on full data for AIC/BIC...")
        # --- FIX 3: Use the passed anchor values instead of hardcoded ones ---
        theta_full, _, _ = fit_statistical_model(
            df=df,
            anchor_benchmark=anchor_benchmark,
            anchor_difficulty=anchor_difficulty, # Corrected
            anchor_slope=anchor_slope,         # Corrected
            model_type=model_type
        )

        model_idx_full = np.array([model_id_to_idx[m] for m in df["model_id"]])
        bench_idx_full = np.array([bench_id_to_idx[b] for b in df["benchmark_id"]])

        # --- FIX 4: Pass anchor values to the final predict call ---
        y_pred_full = predict(
            theta_full,
            model_idx_full,
            bench_idx_full,
            num_models,
            num_benchmarks,
            anchor_idx,
            model_type,
            anchor_slope=anchor_slope,         # Corrected
            anchor_difficulty=anchor_difficulty  # Corrected
        )

        num_params = len(theta_full)
        full_data_metrics = calculate_metrics(y_true, y_pred_full, num_params)

        results.append({
            'model_type': model_type,
            'cv_r2': cv_r2,
            'cv_mse': cv_mse,
            'aic': full_data_metrics['aic'],
            'bic': full_data_metrics['bic'],
            'num_params': num_params
        })

    return pd.DataFrame(results)

In [5]:
comparison_results = compare_models_cross_validation(
    df=scores_df,
    anchor_benchmark=anchor_benchmark,
    anchor_difficulty=anchor_difficulty,
    anchor_slope=anchor_slope
)

print("\n" + "="*50)
print("MODEL COMPARISON RESULTS")
print("="*50)
print("CV R2 / CV MSE are from 5-fold cross-validation (higher is better for R2, lowe r for MSE).")
print("AIC / BIC are from fitting on the full dataset (lower is better for both).")
print("-" * 50)
print(comparison_results.round(4))
print("="*50)

Running 10-fold cross-validation...
  Evaluating model: sigmoid
  Fitting sigmoid on full data for AIC/BIC...
  Evaluating model: clipped_linear
  Fitting clipped_linear on full data for AIC/BIC...

MODEL COMPARISON RESULTS
CV R2 / CV MSE are from 5-fold cross-validation (higher is better for R2, lowe r for MSE).
AIC / BIC are from fitting on the full dataset (lower is better for both).
--------------------------------------------------
       model_type   cv_r2  cv_mse        aic        bic  num_params
0         sigmoid  0.8641  0.0098 -6490.3797 -5177.7113         253
1  clipped_linear  0.8683  0.0095 -6472.1491 -5159.4807         253
