In [None]:
# package imports
from __future__ import annotations

import os
import sys
import re
import traceback
import warnings
warnings.filterwarnings("ignore")

import time
import json
import pickle
import warnings
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, field

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler, RobustScaler
from sklearn.model_selection import (StratifiedKFold, cross_val_score, cross_val_predict)

from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    f1_score,
    log_loss,
    brier_score_loss,
    precision_recall_curve,
    average_precision_score,
    make_scorer,
)
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import (
    RandomForestClassifier,
    GradientBoostingClassifier,
    VotingClassifier,
    StackingClassifier,
    HistGradientBoostingClassifier,
)

from sklearn.calibration import CalibratedClassifierCV
from sklearn.feature_selection import SelectFromModel, RFECV

from xgboost import XGBClassifier

import optuna
from optuna.samplers import TPESampler
from optuna.pruners import MedianPruner

In [None]:
# Configuration

# using @dataclass config for simplicity
@dataclass
class Config:
        
    DATA_PATH = r"contracts_with_isi_v2_SWEEP_WIDE_WITH_KEYS_PLUS_CPI.csv"

    BAT_RATES_PATH = r"batting_rates_by_season.csv"
    PIT_RATES_PATH = r"pitching_rates_by_season.csv"
    DEF_STATS_PATH = r"defensive_stats.csv"
    STATCAST_PIT_PATH = r"statcast_pitching_2015_2025.csv"

    # WAR folder directory (contains war files from 2010-2025)
    WAR_DIR = r"war_rankings_raw\war"

    # output file
    OUTPUT_DIR: str = r"v2_optimized"
    
    # Time limits
    # stops if no improvements after 30 min
    MAX_RUNTIME_HOURS: float = 2.0
    EARLY_STOP_NO_IMPROVE_MINUTES: int = 30
    
    # CV setup
    # optuna trials set to 10k to ensure ennough iters for time limit
    N_CV_FOLDS: int = 5
    N_OPTUNA_TRIALS: int = 10000
    
    # setup is IMMUTABLE, WILL LOOK DIFFERENT THAN BASELINE
    # this config should prevent any issues with optuna
    # Data splits
    TRAIN_YEARS: set = field(default_factory=lambda: {2020, 2021, 2022, 2023})
    TEST_YEARS: set = field(default_factory=lambda: {2024, 2025})
    
    # Targets
    WAR_LOSS_THRESHOLDS: List[float] = field(default_factory=lambda: [0.5, 1.0, 1.5])
    POST_YEARS_OPTIONS: List[int] = field(default_factory=lambda: [1, 3])
    PRIMARY_WAR_THRESHOLD: float = 1.0  # Focus optimization here
    PRIMARY_POST_YEARS: int = 1
    
    PRE_YEARS: int = 3
    MAX_CONTRACT_YEARS: int = 5
    
    # ISI configs for lookback and lambda
    ISI_LOOKBACKS: List[int] = field(default_factory=lambda: [3, 5])
    ISI_LAMBDAS: List[str] = field(default_factory=lambda: ["35", "5", "7"])
    
    # best results
    TOP_N_MODELS: int = 10
    
    RANDOM_STATE: int = 42

In [None]:
# Helper functions

# drop dups, preserves order
def unique_list(seq):
    """Preserve order, drop duplicates."""
    return list(dict.fromkeys(seq))

# drop dup col names
def dedupe_columns(df: pd.DataFrame) -> pd.DataFrame:
    return df.loc[:, ~df.columns.duplicated()].copy()


PITCHER_PREFIXES = ("P", "SP", "RP", "RHP", "LHP")

def is_pitcher(pos) -> int:
    if pd.isna(pos):
        return 0
    s = str(pos).strip().upper()
    # handles pitcher positional variations
    return int(s.startswith(PITCHER_PREFIXES) or ("RHP" in s) or ("LHP" in s))


def safe_cols(df: pd.DataFrame, cols: List[str]) -> List[str]:
    """Return only columns that exist in df."""
    return [c for c in cols if c in df.columns]


def time_split(df: pd.DataFrame, train_years: set, test_years: set, year_col: str = "year") -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Split data by year."""
    y = pd.to_numeric(df[year_col], errors="coerce").astype("Int64")
    train = df[y.isin(train_years)].copy()
    test = df[y.isin(test_years)].copy()
    return train, test

def _safe_numeric(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame:
    out = df.copy()
    for c in cols:
        if c in out.columns:
            out[c] = pd.to_numeric(out[c], errors="coerce")
    return out

def _weighted_mean(series: pd.Series, weights: pd.Series) -> float:
    s = pd.to_numeric(series, errors="coerce")
    w = pd.to_numeric(weights, errors="coerce").fillna(0.0)
    mask = np.isfinite(s) & np.isfinite(w) & (w > 0)
    if mask.sum() == 0:
        s2 = s[np.isfinite(s)]
        return float(s2.mean()) if len(s2) else np.nan
    return float(np.average(s[mask], weights=w[mask]))

def add_pre_rate_features(
    contracts: pd.DataFrame, season_rates: pd.DataFrame, *,
    rate_cols: List[str], weight_col: Optional[str], prefix: str,
    pre_years: int = 3,
) -> pd.DataFrame:

    dfc = contracts.copy()
    dfc["key_fangraphs"] = pd.to_numeric(dfc.get("key_fangraphs"), errors="coerce")
    dfc["year"] = pd.to_numeric(dfc.get("year"), errors="coerce")
    dfc["_row_id"] = np.arange(len(dfc), dtype=int)

    dfs = season_rates.copy()
    dfs["playerId"] = pd.to_numeric(dfs.get("playerId"), errors="coerce")
    dfs["Season"] = pd.to_numeric(dfs.get("Season"), errors="coerce")
    dfs = _safe_numeric(dfs, rate_cols + ([weight_col] if weight_col else []))

    m = dfc[["_row_id", "key_fangraphs", "year"]].merge(
        dfs, left_on="key_fangraphs", right_on="playerId", how="left"
    )

    # Apply lookback filter
    m["lb_start"] = m["year"] - pre_years
    m["lb_end"] = m["year"] - 1
    m = m[m["Season"].between(m["lb_start"], m["lb_end"], inclusive="both")].copy()

    out = dfc.copy()

    cov = m.groupby("_row_id")["Season"].nunique().rename(f"{prefix}_pre_seasons")
    out = out.merge(cov, left_on="_row_id", right_index=True, how="left")
    out[f"{prefix}_pre_seasons"] = out[f"{prefix}_pre_seasons"].fillna(0).astype(int)

    if weight_col and weight_col in m.columns:
        rel_sum = m.groupby("_row_id")[weight_col].sum(min_count=1).rename(f"{prefix}_pre_reliability_sum")
        out = out.merge(rel_sum, left_on="_row_id", right_index=True, how="left")
        out[f"{prefix}_pre_reliability_sum"] = pd.to_numeric(out[f"{prefix}_pre_reliability_sum"], errors="coerce").fillna(0.0)

    for rc in rate_cols:
        feat_name = f"{prefix}_pre_{rc}"
        if rc not in m.columns:
            out[feat_name] = np.nan
            continue

        if weight_col and weight_col in m.columns:
            agg = m.groupby("_row_id").apply(lambda g: _weighted_mean(g[rc], g[weight_col]), include_groups=False).rename(feat_name)
        else:
            agg = m.groupby("_row_id")[rc].mean().rename(feat_name)

        out = out.merge(agg, left_on="_row_id", right_index=True, how="left")

    out[f"has_{prefix}_pre"] = (out[f"{prefix}_pre_seasons"] > 0).astype(int)
    out = out.drop(columns=["_row_id"])
    return out

In [None]:
# apply pre panel features
# reduction of 'if' statements since baseline confirmed previous validity

def add_pre_panel_features(
    contracts: pd.DataFrame,
    panel: pd.DataFrame,
    *,
    contract_key_col: str,
    panel_key_col: str,
    contract_year_col: str,
    panel_year_col: str,
    feature_cols: List[str],
    weight_col: Optional[str],
    prefix: str,
    pre_years: int = 3,
) -> pd.DataFrame:
    """ Generic season panel aggregator (only used for defense and statcast files) """
    # Data integrity
    # Missing value check
    missing_contract = [c for c in [contract_key_col, contract_year_col] if c not in contracts.columns]
    missing_panel = [c for c in [panel_key_col, panel_year_col] if c not in panel.columns]
    if missing_contract:
        raise KeyError(f"contracts missing: {missing_contract}")
    if missing_panel:
        raise KeyError(f"panel missing: {missing_panel}")

    out = contracts.copy()
    out["_row_id"] = np.arange(len(out), dtype=int)

    # temp cols
    dfc = out[["_row_id", contract_key_col, contract_year_col]].copy()
    dfc["_contract_key"] = pd.to_numeric(dfc[contract_key_col], errors="coerce")
    dfc["_contract_year"] = pd.to_numeric(dfc[contract_year_col], errors="coerce")

    # returns necessary cols only
    keep_feats = [c for c in feature_cols if c in panel.columns]
    keep_cols = [panel_key_col, panel_year_col] + keep_feats
    if weight_col and weight_col in panel.columns and weight_col not in keep_cols:
        keep_cols.append(weight_col)

    p = panel[keep_cols].copy()
    p["_panel_key"] = pd.to_numeric(p[panel_key_col], errors="coerce")
    p["_panel_year"] = pd.to_numeric(p[panel_year_col], errors="coerce")

    for c in keep_feats:
        p[c] = pd.to_numeric(p[c], errors="coerce")
    if weight_col and weight_col in p.columns:
        p[weight_col] = pd.to_numeric(p[weight_col], errors="coerce")

    merge_cols = ["_panel_key", "_panel_year"] + keep_feats + ([weight_col] if (weight_col and weight_col in p.columns) else [])
    m = dfc[["_row_id", "_contract_key", "_contract_year"]].merge(
        p[merge_cols], left_on="_contract_key", right_on="_panel_key", how="left"
    )

    # Apply lookback filter
    m["lb_start"] = m["_contract_year"] - pre_years
    m["lb_end"] = m["_contract_year"] - 1
    m = m[m["_panel_year"].between(m["lb_start"], m["lb_end"], inclusive="both")].copy()

    cov = m.groupby("_row_id")["_panel_year"].nunique().rename(f"{prefix}_pre_seasons")
    out = out.merge(cov, on="_row_id", how="left")
    out[f"{prefix}_pre_seasons"] = out[f"{prefix}_pre_seasons"].fillna(0).astype(int)

    # Weighted sums
    if weight_col and weight_col in m.columns:
        wsum = m.groupby("_row_id")[weight_col].sum(min_count=1).rename(f"{prefix}_pre_weight_sum")
        out = out.merge(wsum, on="_row_id", how="left")
        out[f"{prefix}_pre_weight_sum"] = pd.to_numeric(out[f"{prefix}_pre_weight_sum"], errors="coerce").fillna(0.0)
        
        # Aggregate features where necessary
        w = pd.to_numeric(m[weight_col], errors="coerce").fillna(0.0)
        for fc in keep_feats:
            feat_name = f"{prefix}_pre_{fc}"
            x = pd.to_numeric(m[fc], errors="coerce")
            num = (x * w).groupby(m["_row_id"]).sum(min_count=1)
            den = w.groupby(m["_row_id"]).sum(min_count=1)
            wmean = (num / den).replace([np.inf, -np.inf], np.nan).rename(feat_name)
            out = out.merge(wmean, on="_row_id", how="left")
    else:
        for fc in keep_feats:
            feat_name = f"{prefix}_pre_{fc}"
            mean = m.groupby("_row_id")[fc].mean().rename(feat_name)
            out = out.merge(mean, on="_row_id", how="left")

    out[f"has_{prefix}_pre"] = (out[f"{prefix}_pre_seasons"] > 0).astype(int)
    out = out.drop(columns=["_row_id"])
    return out

In [None]:
# Load and Utilize WAR Statistics

# Data loader & Data Integrity Check
def load_war_panel(
        war_dir: str, start_year: int = 2010, end_year: int = 2025
) -> pd.DataFrame:

    dfs = []
    for y in range(start_year, end_year + 1):
        fp = os.path.join(war_dir, f"war_{y}.csv")
        try:
            d = pd.read_csv(fp)
            d["Season"] = pd.to_numeric(d.get("Season", y), errors="coerce")
            dfs.append(d)
        # using pass to prevent file crashing since players are not expected
        # to be found in all war files
        except FileNotFoundError:
            pass
    
    # Raises error if directory path is invalid
    if not dfs:
        raise FileNotFoundError(f"No WAR files found in {war_dir}")
    
    # combines war files into a single concatenated df
    war = pd.concat(dfs, ignore_index=True)

    if "PlayerId" not in war.columns:
        raise KeyError("WAR df missing 'PlayerId' col")

    if "Total WAR" not in war.columns:
        # multiple candidates since term can change over time
        cands = [c for c in war.columns if c.strip().lower() in {"total war", "war", "total_war"}]
        if cands:
            war = war.rename(columns={cands[0]: "Total WAR"})
        else:
            raise KeyError("WAR df missing 'Total WAR' col")

    # Data Integrity
    war["PlayerId"] = pd.to_numeric(war["PlayerId"], errors="coerce")
    war["Season"] = pd.to_numeric(war["Season"], errors="coerce")
    war["Total WAR"] = pd.to_numeric(war["Total WAR"], errors="coerce")
    war = war.dropna(subset=["PlayerId", "Season"]).copy()
    
    return war

In [None]:

def add_war_windows(
    contracts: pd.DataFrame, war_panel: pd.DataFrame, *,
    key_col: str = "key_fangraphs", year_col: str = "year",
    pre_years: int = 3, post_years: int = 1,
) -> pd.DataFrame:
    """adds pre/post WAR aggregated stats"""

    # Following rows are daata type conversions
    # should be unnecessary, but prevents future errors
    out = contracts.copy()
    out["_row_id"] = np.arange(len(out), dtype=int)

    c = out[["_row_id", key_col, year_col]].copy()
    c["_pid"] = pd.to_numeric(c[key_col], errors="coerce")
    c["_yr"] = pd.to_numeric(c[year_col], errors="coerce")

    w = war_panel[["PlayerId", "Season", "Total WAR"]].copy()
    w = w.rename(columns={"PlayerId": "_pid", "Season": "_season", "Total WAR": "_war"})
    w["_pid"] = pd.to_numeric(w["_pid"], errors="coerce")
    w["_season"] = pd.to_numeric(w["_season"], errors="coerce")
    w["_war"] = pd.to_numeric(w["_war"], errors="coerce")

    m = c.merge(w, on="_pid", how="left")

    # Pre: [Y-pre, Y-1]
    m["pre_start"] = m["_yr"] - pre_years
    m["pre_end"] = m["_yr"] - 1
    pre = m[m["_season"].between(m["pre_start"], m["pre_end"], inclusive="both")].copy()

    pre_sum = pre.groupby("_row_id")["_war"].sum(min_count=1).rename("war_pre_sum")
    pre_n = pre.groupby("_row_id")["_season"].nunique().rename("war_pre_n")
    pre_mean = (pre_sum / pre_n).rename("war_pre_mean")

    out = out.merge(pre_sum, on="_row_id", how="left")
    out = out.merge(pre_n, on="_row_id", how="left")
    out = out.merge(pre_mean, on="_row_id", how="left")

    # Post: [Y, Y + post-1]
    m["post_start"] = m["_yr"]
    m["post_end"] = m["_yr"] + (post_years - 1)
    post = m[m["_season"].between(m["post_start"], m["post_end"], inclusive="both")].copy()

    post_sum = post.groupby("_row_id")["_war"].sum(min_count=1).rename("war_post_sum")
    post_n = post.groupby("_row_id")["_season"].nunique().rename("war_post_n")
    post_mean = (post_sum / post_n).rename("war_post_mean")

    out = out.merge(post_sum, on="_row_id", how="left")
    out = out.merge(post_n, on="_row_id", how="left")
    out = out.merge(post_mean, on="_row_id", how="left")

    for ccol in ["war_pre_sum", "war_pre_mean", "war_post_sum", "war_post_mean"]:
        out[ccol] = pd.to_numeric(out[ccol], errors="coerce")

    # war loss, explanation: + means worse than expected
    # mean loss is more easily comparable across post windows
    out["has_war_pre"] = out["war_pre_n"].fillna(0).astype(int).clip(lower=0)
    out["has_war_post"] = out["war_post_n"].fillna(0).astype(int).clip(lower=0)
    out["war_loss_mean"] = (out["war_pre_mean"] - out["war_post_mean"]).clip(lower=0)

    out = out.drop(columns=["_row_id"])
    return out

In [None]:
# utilizes the different ISI lambda and lookback combos

# function uses regex in case of any convention errors
def discover_isi_families(df: pd.DataFrame) -> Dict[Tuple[int, str], List[str]]:
    isi_pattern = re.compile(r"^ISI_lb(\d+)_lamdba_(\d+)$", re.IGNORECASE)
    
    families = {}
    for col in df.columns:
        match = isi_pattern.match(col)
        if match:
            lb = int(match.group(1))
            lam = match.group(2)
            key = (lb, lam)
            
            # looks for related cols
            suffix = f"_lb{lb}_lamdba_{lam}"
            related = [c for c in df.columns if suffix in c.lower() or c == col]
            families[key] = unique_list(related)
    
    return families

# implementation of new polynomial ISI transformations
def add_isi_transforms(df: pd.DataFrame, isi_col: str, prefix: str) -> pd.DataFrame:

    if isi_col not in df.columns:
        return df
    
    x = pd.to_numeric(df[isi_col], errors="coerce")
    
    # transformations
    df[f"{prefix}_isi_raw"] = x
    df[f"{prefix}_isi_sq"] = x ** 2
    df[f"{prefix}_isi_sqrt"] = np.sqrt(np.clip(x, 0, None))
    df[f"{prefix}_isi_log"] = np.log1p(np.clip(x, 0, None))
    
    # interactions
    if "years_int" in df.columns:
        yrs = pd.to_numeric(df["years_int"], errors="coerce")
        df[f"{prefix}_isi_x_years"] = x * yrs
    
    if "age_at_signing" in df.columns:
        age = pd.to_numeric(df["age_at_signing"], errors="coerce")
        df[f"{prefix}_isi_x_age"] = x * age
        df[f"{prefix}_isi_x_age_sq"] = x * (age ** 2)
    
    if "war_pre_mean" in df.columns:
        war = pd.to_numeric(df["war_pre_mean"], errors="coerce")
        df[f"{prefix}_isi_x_war"] = x * war
    
    return df

In [None]:
# Data Preprocessing
def make_preprocessor(
    numeric_features: List[str],
    categorical_features: List[str],
    use_robust_scaler: bool = False,
) -> ColumnTransformer:
    # attempts to use Robust Scaler if available
    # Robust uses median and IQR rather than normalization
    scaler = RobustScaler() if use_robust_scaler else StandardScaler()
    
    num_pipe = Pipeline([
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", scaler),
    ])
    
    cat_pipe = Pipeline([
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("onehot", OneHotEncoder(handle_unknown="ignore", sparse_output=False)),
    ])
    
    transformers = [("num", num_pipe, numeric_features)]
    if categorical_features:
        transformers.append(("cat", cat_pipe, categorical_features))
    
    return ColumnTransformer(transformers=transformers, remainder="drop")


In [None]:
# this function creates the model params for uses the Optuna python package
# Optuna allows for the model to select params rather than explicitly stating such params

def create_model_from_trial(trial: "optuna.Trial", model_type: str, random_state: int = 42) -> Any:
    
    if model_type == "xgboost":
        params = {
            "n_estimators": trial.suggest_int("xgb_n_estimators", 100, 2000),
            "max_depth": trial.suggest_int("xgb_max_depth", 2, 8),
            "learning_rate": trial.suggest_float("xgb_learning_rate", 0.001, 0.3, log=True),
            "subsample": trial.suggest_float("xgb_subsample", 0.5, 1.0),
            "colsample_bytree": trial.suggest_float("xgb_colsample_bytree", 0.5, 1.0),
            "reg_lambda": trial.suggest_float("xgb_reg_lambda", 0.1, 10.0, log=True),
            "reg_alpha": trial.suggest_float("xgb_reg_alpha", 0.001, 10.0, log=True),
            "min_child_weight": trial.suggest_int("xgb_min_child_weight", 1, 20),
            "gamma": trial.suggest_float("xgb_gamma", 0.0, 5.0),
            "scale_pos_weight": trial.suggest_float("xgb_scale_pos_weight", 0.5, 3.0),
            "random_state": random_state,
            "n_jobs": -1,
            "eval_metric": "auc",
            "use_label_encoder": False,
        }
        return XGBClassifier(**params)
    
    elif model_type == "random_forest":
        params = {
            "n_estimators": trial.suggest_int("rf_n_estimators", 100, 1000),
            "max_depth": trial.suggest_int("rf_max_depth", 3, 20),
            "min_samples_split": trial.suggest_int("rf_min_samples_split", 2, 20),
            "min_samples_leaf": trial.suggest_int("rf_min_samples_leaf", 1, 10),
            "max_features": trial.suggest_categorical("rf_max_features", ["sqrt", "log2", None]),
            "class_weight": trial.suggest_categorical("rf_class_weight", ["balanced", "balanced_subsample", None]),
            "random_state": random_state,
            "n_jobs": -1,
        }
        return RandomForestClassifier(**params)
    
    elif model_type == "gradient_boosting":
        params = {
            "n_estimators": trial.suggest_int("gb_n_estimators", 100, 1000),
            "max_depth": trial.suggest_int("gb_max_depth", 2, 8),
            "learning_rate": trial.suggest_float("gb_learning_rate", 0.001, 0.3, log=True),
            "subsample": trial.suggest_float("gb_subsample", 0.5, 1.0),
            "min_samples_split": trial.suggest_int("gb_min_samples_split", 2, 20),
            "min_samples_leaf": trial.suggest_int("gb_min_samples_leaf", 1, 10),
            "random_state": random_state,
        }
        return GradientBoostingClassifier(**params)
    
    elif model_type == "hist_gradient_boosting":
        params = {
            "max_iter": trial.suggest_int("hgb_max_iter", 100, 1000),
            "max_depth": trial.suggest_int("hgb_max_depth", 2, 15),
            "learning_rate": trial.suggest_float("hgb_learning_rate", 0.001, 0.3, log=True),
            "l2_regularization": trial.suggest_float("hgb_l2_reg", 0.0, 10.0),
            "min_samples_leaf": trial.suggest_int("hgb_min_samples_leaf", 5, 50),
            "random_state": random_state,
        }
        return HistGradientBoostingClassifier(**params)
    
    elif model_type == "logistic":
        params = {
            "C": trial.suggest_float("lr_C", 0.001, 100.0, log=True),
            "l1_ratio": trial.suggest_float("lr_l1_ratio", 0.0, 1.0),
            "max_iter": 5000,
            "solver": "saga",
            "penalty": "elasticnet",
            "random_state": random_state,
            "n_jobs": -1,
        }
        return LogisticRegression(**params)
    
    # error is raised if a new model type is added or a typo exists
    else:
        raise ValueError(f"Unknown model type: {model_type}")


In [None]:
# Model run and eval
def compute_metrics(y_true: np.ndarray, y_prob: np.ndarray, threshold: float = 0.5) -> Dict[str, float]:
    """Computes classification metrics"""
    y_true = np.asarray(y_true, dtype=int)
    y_prob = np.asarray(y_prob, dtype=float)
    
    mask = np.isfinite(y_prob)
    y_true = y_true[mask]
    y_prob = y_prob[mask]
    
    # AUC requires a min of two classes
    if len(y_true) == 0 or len(np.unique(y_true)) < 2:
        return {
            "AUC": np.nan, "ACC": np.nan, "F1": np.nan,
            "LOGLOSS": np.nan, "BRIER": np.nan, "AP": np.nan,
            "n_eval": 0, "pos_rate": np.nan,
        }
    
    # grading
    y_hat = (y_prob >= threshold).astype(int)
    
    # scores provided by model results
    return {
        "AUC": float(roc_auc_score(y_true, y_prob)),
        "ACC": float(accuracy_score(y_true, y_hat)),
        "F1": float(f1_score(y_true, y_hat, zero_division=0)),
        "LOGLOSS": float(log_loss(y_true, y_prob, labels=[0, 1])),
        "BRIER": float(brier_score_loss(y_true, y_prob)),
        "AP": float(average_precision_score(y_true, y_prob)),
        "n_eval": int(len(y_true)),
        "pos_rate": float(np.mean(y_true)),
    }

# this is to determine the threshold for grading which maximizes F1 (0-1 scale)
def find_optimal_threshold(y_true: np.ndarray, y_prob: np.ndarray) -> Tuple[float, float]:
    precision, recall, thresholds = precision_recall_curve(y_true, y_prob)
    
    # Compute F1 for each threshold
    f1_scores = 2 * (precision * recall) / (precision + recall + 1e-10)
    
    # argmax returns the best result
    best_idx = np.argmax(f1_scores[:-1])
    best_threshold = thresholds[best_idx]
    best_f1 = f1_scores[best_idx]
    
    return float(best_threshold), float(best_f1)

In [None]:
# Optuna creation, refer to Optuna documentation on developer website

class OptunaObjective:    
    def __init__(
        self,
        X_train: pd.DataFrame,
        y_train: np.ndarray,
        numeric_features: List[str],
        categorical_features: List[str],
        n_cv_folds: int = 5,
        random_state: int = 42,
    ):
        self.X_train = X_train
        self.y_train = y_train
        self.numeric_features = numeric_features
        self.categorical_features = categorical_features
        self.n_cv_folds = n_cv_folds
        self.random_state = random_state
        self.cv = StratifiedKFold(n_splits=n_cv_folds, shuffle=True, random_state=random_state)
    
    def __call__(self, trial: "optuna.Trial") -> float:
        # Select model type
        model_type = trial.suggest_categorical(
            "model_type", 
            ["xgboost", "random_forest", "hist_gradient_boosting", "logistic"]
        )
        
        # Select scaler type
        # uses robust as default
        # switches to Standard Scaler if Robust Scaler fails
        use_robust = trial.suggest_categorical("use_robust_scaler", [True, False])
        
        # calls functions to run model
        model = create_model_from_trial(trial, model_type, self.random_state)
        preprocessor = make_preprocessor(
            self.numeric_features, 
            self.categorical_features,
            use_robust_scaler=use_robust,
        )
        
        pipeline = Pipeline([
            ("preprocessor", preprocessor),
            ("classifier", model),
        ])
        
        # Score CV with AUC
        try:
            scores = cross_val_score(
                pipeline, 
                self.X_train[self.numeric_features + self.categorical_features],
                self.y_train,
                cv=self.cv,
                scoring="roc_auc",
                n_jobs=-1,
            )
            
            # returns the mean AUC
            return float(np.mean(scores))
        
        except Exception as e:
            print(f"[WARN] Trial failed: {e}")
            return 0.0


In [None]:
# Data loader
def load_and_prepare_data(cfg: Config) -> Tuple[pd.DataFrame, pd.DataFrame, List[str], List[str]]:
    
    
    # Loads data
    df = pd.read_csv(cfg.DATA_PATH)
    df = dedupe_columns(df)

    bat_rates = pd.read_csv(cfg.BAT_RATES_PATH)
    pit_rates = pd.read_csv(cfg.PIT_RATES_PATH)
    def_stats = pd.read_csv(cfg.DEF_STATS_PATH)
    sc_pit = pd.read_csv(cfg.STATCAST_PIT_PATH)
    war_panel = load_war_panel(cfg.WAR_DIR)
    
    df["term_start_year"] = pd.to_numeric(df.get("term_start_year"), errors="coerce")
    df = df.dropna(subset=["term_start_year"]).copy()
    df["year"] = df["term_start_year"].astype(int)
    
    # Filter term years 2020-2025
    df = df[(df["year"] >= 2020) & (df["year"] <= 2025)].copy()
    
    # Filter contract length 1-5 years
    df["years_int"] = pd.to_numeric(df.get("years_int"), errors="coerce")
    before = len(df)
    df = df[df["years_int"].notna() & (df["years_int"] <= cfg.MAX_CONTRACT_YEARS)].copy()
    print(f"Filtered contracts > {cfg.MAX_CONTRACT_YEARS} years: {before} -> {len(df)}")
    
    for c in ["key_fangraphs", "key_mlbam"]:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")
    
    # apply pitcher flag
    df["is_pitcher_flag"] = df["position"].apply(is_pitcher)
    
    # Apply hitting features
    bat_rate_cols = [
        "walk_percent", "strikout_percent", "AVG", "OBP", "SLG", "OPS", 
        "ISO", "wOBA", "wRC+", "batting_avg_on_balls_in_play",
        "batted_balls_hard_contact_percent",
    ]
    df = add_pre_rate_features(
        df, bat_rates, rate_cols=safe_cols(bat_rates, bat_rate_cols),
        weight_col="PA" if "PA" in bat_rates.columns else None,
        prefix="bat", pre_years=cfg.PRE_YEARS,
    )
    
    # Apply pitching features
    pit_rate_cols = [
        "strikeout_percent", "walk_percent", "FIP", "xFIP",
        "batting_average_on_balls_in_play", "walks_plus_hits_per_9_innings",
        "home_runs_allowed_per_9_innings",
    ]
    df = add_pre_rate_features(
        df, pit_rates, rate_cols=safe_cols(pit_rates, pit_rate_cols),
        weight_col="TBF" if "TBF" in pit_rates.columns else None,
        prefix="pit", pre_years=cfg.PRE_YEARS,
    )
    
    # Apply defensive features
    def_feature_cols = [
        "fielding_percentage", "defensive_runs_saved", "Errors",
    ]
    df = add_pre_panel_features(
        df, def_stats, contract_key_col="key_mlbam", panel_key_col="MLBAMID",
        contract_year_col="year", panel_year_col="year",
        feature_cols=safe_cols(def_stats, def_feature_cols),
        weight_col="Innings_played" if "Innings_played" in def_stats.columns else None,
        prefix="def", pre_years=cfg.PRE_YEARS,
    )
    
    # Apply statcast pitching features
    sc_feature_cols = [
        "fastball_avg_speed", "whiff_percent", "hard_hit_percent",
        "barrel_batted_rate", "exit_velocity_avg", "swing_percent",
    ]
    df = add_pre_panel_features(
        df, sc_pit, contract_key_col="key_mlbam", panel_key_col="player_id",
        contract_year_col="year", panel_year_col="year",
        feature_cols=safe_cols(sc_pit, sc_feature_cols),
        weight_col="pa" if "pa" in sc_pit.columns else None,
        prefix="scpit", pre_years=cfg.PRE_YEARS,
    )
    
    # Apply WAR stats
    df = add_war_windows(
        df, war_panel, key_col="key_fangraphs", year_col="year",
        pre_years=cfg.PRE_YEARS, post_years=cfg.PRIMARY_POST_YEARS,
    )
    
    # Filters df for rows with WAR data only
    before = len(df)
    df = df[(df["has_war_pre"] > 0) & (df["has_war_post"] > 0)].copy()
    print(f"Filtered missing WAR: {before} -> {len(df)}")
    
    # Finds ISI combos (lambda and lookback periods)
    # apply transformations to each combo
    isi_families = discover_isi_families(df)
    for (lb, lam), cols in isi_families.items():
        isi_main_col = f"ISI_lb{lb}_lamdba_{lam}"
        if isi_main_col in df.columns:
            prefix = f"isi_{lb}_{lam}"
            df = add_isi_transforms(df, isi_main_col, prefix)
    
    df = dedupe_columns(df)
    
    BASE_NUMERIC = [
        "age_at_signing", "years_int", "opt_out_flag", "year", "is_pitcher_flag",
        "war_pre_mean", "war_pre_sum",
    ]
    BASE_CATEGORICAL = ["position"]
    
    # detect newly created features
    PREFIXES = ("bat_pre_", "pit_pre_", "def_pre_", "scpit_pre_", "isi_")
    generated_feats = [c for c in df.columns if c.startswith(PREFIXES)]
    coverage_feats = [c for c in df.columns if c.startswith("has_") or c.endswith("_seasons")]
    
    # includes raw ISI columns
    isi_raw_cols = [c for c in df.columns if c.startswith("ISI_") or "surgery" in c.lower() or "structural" in c.lower()]
    
    numeric_features = unique_list([
        c for c in (BASE_NUMERIC + generated_feats + coverage_feats + isi_raw_cols)
        if c in df.columns
    ])
    categorical_features = unique_list([
        c for c in BASE_CATEGORICAL if c in df.columns
    ])
    
    print(f"Total numeric features: {len(numeric_features)}")
    print(f"Total categorical features: {len(categorical_features)}")
    
    return df, war_panel, numeric_features, categorical_features


In [None]:
# Optimization Loop
def run_optimization(cfg: Config):
    # time limit    
    start_time = time.time()
    max_runtime_seconds = cfg.MAX_RUNTIME_HOURS * 3600
    
    # creates output dir (EDIT IN CONFIG)
    os.makedirs(cfg.OUTPUT_DIR, exist_ok=True)
    
    df, war_panel, numeric_features, categorical_features = load_and_prepare_data(cfg)
    
    # define target
    threshold = cfg.PRIMARY_WAR_THRESHOLD
    df["target"] = (df["war_loss_mean"] >= threshold).astype(int)
    
    print(f"\nTarget threshold: WAR loss >= {threshold}")
    print(f"Positive rate: {df['target'].mean():.3f}")
    
    train_df, test_df = time_split(df, cfg.TRAIN_YEARS, cfg.TEST_YEARS)
    
    # applies percentile cutoff
    aav_col = "guarantee_real_per_year_2025"
    if aav_col in train_df.columns:
        cutoff = train_df[aav_col].quantile(0.95)
        train_df = train_df[train_df[aav_col] <= cutoff].copy()
        test_df = test_df[test_df[aav_col] <= cutoff].copy()
        print(f"Dataset size with Top 5% cutoff applied: Train={len(train_df)} | Test={len(test_df)}")
    
    feature_cols = numeric_features + categorical_features
    X_train = train_df[feature_cols].copy()
    y_train = train_df["target"].values
    X_test = test_df[feature_cols].copy()
    y_test = test_df["target"].values
    
    # saves results
    all_results = []
    best_auc = 0.0
    best_model = None
    best_params = None
    last_improvement_time = time.time()
    
    # runs optuna optimization parameters
    if OPTUNA_AVAILABLE:
        study = optuna.create_study(
            direction="maximize",
            sampler=TPESampler(seed=cfg.RANDOM_STATE),
            pruner=MedianPruner(n_startup_trials=10, n_warmup_steps=5),
        )
        
        # defines goal
        objective = OptunaObjective(
            X_train, y_train, numeric_features, categorical_features,
            n_cv_folds=cfg.N_CV_FOLDS, random_state=cfg.RANDOM_STATE,
        )
        
        # callback for early stopping params and time limit
        def callback(study, trial):
            nonlocal best_auc, last_improvement_time
            
            elapsed = time.time() - start_time
            
            if elapsed > max_runtime_seconds:
                print(f"\n[TIME LIMIT] Reached {cfg.MAX_RUNTIME_HOURS} hours")
                study.stop()
                return
            
            # compares current and best results
            if trial.value and trial.value > best_auc:
                best_auc = trial.value
                last_improvement_time = time.time()
                print(f"[NEW BEST] Trial {trial.number}: AUC={trial.value:.4f}")
            
            # Stops model if no improvements are being made
            no_improve_seconds = time.time() - last_improvement_time
            if no_improve_seconds > cfg.EARLY_STOP_NO_IMPROVE_MINUTES * 60:
                print(f"\n[EARLY STOP] No improvement for {cfg.EARLY_STOP_NO_IMPROVE_MINUTES} minutes")
                study.stop()
                return
            
            # Progress update every 10 trials
            if trial.number % 10 == 0:
                remaining = (max_runtime_seconds - elapsed) / 60
                print(f"[PROGRESS] Trial {trial.number} | Best AUC: {study.best_value:.4f} | Time remaining: {remaining:.1f} min")
        
        # run
        study.optimize(
            objective,
            n_trials=cfg.N_OPTUNA_TRIALS,
            callbacks=[callback],
            show_progress_bar=False,
        )
        
        print(f"\Optimization completed: {len(study.trials)} trials")
        print(f"Best CV AUC: {study.best_value:.4f}")
        print(f"Best params: {study.best_params}")
        
        # Train final model with best params
        print("\n" + "=" * 20)
        print("TRAINING FINAL MODEL WITH BEST PARAMETERS")
        print("=" * 20)
        
        best_params = study.best_params
        model_type = best_params["model_type"]
        
        # recreates the best model
        class FinalTrial:
            def __init__(self, params):
                self.params = params
            def suggest_categorical(self, name, choices):
                return self.params.get(name, choices[0])
            def suggest_int(self, name, low, high):
                return self.params.get(name, (low + high) // 2)
            def suggest_float(self, name, low, high, log=False):
                return self.params.get(name, (low + high) / 2)
        
        final_trial = FinalTrial(best_params)
        best_model = create_model_from_trial(final_trial, model_type, cfg.RANDOM_STATE)
        use_robust = best_params.get("use_robust_scaler", False)
        
        preprocessor = make_preprocessor(numeric_features, categorical_features, use_robust)
        
        final_pipeline = Pipeline([
            ("preprocessor", preprocessor),
            ("classifier", best_model),
        ])
        
        final_pipeline.fit(X_train, y_train)
        y_prob_test = final_pipeline.predict_proba(X_test)[:, 1]
        test_metrics = compute_metrics(y_test, y_prob_test)
        
        # define optimal threshold
        opt_threshold, opt_f1 = find_optimal_threshold(y_test, y_prob_test)
        test_metrics_opt = compute_metrics(y_test, y_prob_test, threshold=opt_threshold)
        
        print(f"\nTest Set Results (threshold=0.5):")
        print(f"  AUC: {test_metrics['AUC']:.4f}")
        print(f"  Accuracy: {test_metrics['ACC']:.4f}")
        print(f"  F1: {test_metrics['F1']:.4f}")
        print(f"  Brier: {test_metrics['BRIER']:.4f}")
        
        print(f"\nTest Set Results (optimal threshold={opt_threshold:.3f}):")
        print(f"  F1: {test_metrics_opt['F1']:.4f}")
        
        # save results
        result_row = {
            "model_type": model_type,
            "cv_auc": study.best_value,
            "test_auc": test_metrics["AUC"],
            "test_acc": test_metrics["ACC"],
            "test_f1_default": test_metrics["F1"],
            "test_f1_optimal": test_metrics_opt["F1"],
            "optimal_threshold": opt_threshold,
            "test_brier": test_metrics["BRIER"],
            "test_logloss": test_metrics["LOGLOSS"],
            "n_train": len(train_df),
            "n_test": len(test_df),
            "pos_rate_train": y_train.mean(),
            "pos_rate_test": y_test.mean(),
            "war_threshold": threshold,
            "n_trials": len(study.trials),
            **{f"param_{k}": v for k, v in best_params.items()},
        }
        all_results.append(result_row)
        
        # records best trials
        top_trials = sorted(study.trials, key=lambda t: t.value if t.value else 0, reverse=True)[:cfg.TOP_N_MODELS]
        
        trial_results = []
        for t in top_trials:
            if t.value:
                trial_results.append({"trial_number": t.number, "cv_auc": t.value, **t.params,})
        
        trials_df = pd.DataFrame(trial_results)
        trials_path = os.path.join(cfg.OUTPUT_DIR, "optuna_top_trials.csv")
        trials_df.to_csv(trials_path, index=False)
        print(f"\nSaved top {len(trial_results)} trials to: {trials_path}")
        
        # saves the trained model
        model_path = os.path.join(cfg.OUTPUT_DIR, "best_model.pkl")
        with open(model_path, "wb") as f:
            pickle.dump(final_pipeline, f)
        print(f"Best model saved to: {model_path}")

    # BACKUP IN CASE OPTUNA FAILS  
    else:        
        models_to_try = [
            ("XGB_default", XGBClassifier(
                n_estimators=1000, max_depth=4, learning_rate=0.05,
                subsample=0.8, colsample_bytree=0.8, reg_lambda=1.5,
                random_state=cfg.RANDOM_STATE, n_jobs=-1,
            )),
            ("RF_default", RandomForestClassifier(
                n_estimators=500, max_depth=10, class_weight="balanced",
                random_state=cfg.RANDOM_STATE, n_jobs=-1,
            )),
            ("HGB_default", HistGradientBoostingClassifier(
                max_iter=500, max_depth=6, learning_rate=0.05,
                random_state=cfg.RANDOM_STATE,
            )),
        ]
        preprocessor = make_preprocessor(numeric_features, categorical_features, False)

        for model_name, model in models_to_try:            
            pipeline = Pipeline([
                ("preprocessor", preprocessor),
                ("classifier", model),
            ])
            
            # CV eval
            cv = StratifiedKFold(n_splits=cfg.N_CV_FOLDS, shuffle=True, random_state=cfg.RANDOM_STATE)
            cv_scores = cross_val_score(pipeline, X_train, y_train, cv=cv, scoring="roc_auc", n_jobs=-1)
            
            pipeline.fit(X_train, y_train)
            y_prob_test = pipeline.predict_proba(X_test)[:, 1]
            test_metrics = compute_metrics(y_test, y_prob_test)
            
            print(f"  CV AUC: {np.mean(cv_scores):.4f} (+/- {np.std(cv_scores):.4f})")
            print(f"  Test AUC: {test_metrics['AUC']:.4f}")
            all_results.append({
                "model_type": model_name,
                "cv_auc": np.mean(cv_scores),
                "cv_auc_std": np.std(cv_scores),
                **test_metrics,
            })
    
    # save final results
    results_df = pd.DataFrame(all_results)
    results_path = os.path.join(cfg.OUTPUT_DIR, "optimization_results.csv")
    results_df.to_csv(results_path, index=False)
    print(f"\nSaved results to: {results_path}")
    
    # print results summary 
    if len(all_results) > 0:
        best = max(all_results, key=lambda x: x.get("test_auc", 0))
        print(f"\nBest Model: {best.get('model_type', 'N/A')}")
        print(f"Test AUC: {best.get('test_auc', 'N/A'):.4f}")
    
    return results_df

In [None]:

# main run

if __name__ == "__main__":

    print(f"Start time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Max runtime: {Config.MAX_RUNTIME_HOURS} hours")
    print("=" * 20)
    
    # init config
    cfg = Config()
    
    # confirm data paths
    missing_paths = []
    for path_name in ["DATA_PATH", "BAT_RATES_PATH", "PIT_RATES_PATH", "DEF_STATS_PATH", "STATCAST_PIT_PATH", "WAR_DIR"]:
        path = getattr(cfg, path_name)
        if not os.path.exists(path):
            missing_paths.append(f"{path_name}: {path}")
    
    if missing_paths:
        print("\n[ERROR] Missing paths:")
        sys.exit(1)
    
    # optim run
    try:
        results = run_optimization(cfg)
                
        print("\n" + "=" * 20)
        print("TRAINING COMPLETE")
        print("=" * 20)
        print(f"Output directory: {cfg.OUTPUT_DIR}")
        print(f"End time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        
    except Exception as e:
        print(f"\n[ERROR] Optimization failed: {e}")
        traceback.print_exc()
        sys.exit(1)
