In [2]:
import sys
import os
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
sys.path.append("..")

from analysis.utils import clean_player_name
from data_sources import PyBaseball, MLBStatsAPI, Salary
from analysis.batter_data_structure import KEEP_RENAME_MAP, ROLLING_COLS

from dotenv import load_dotenv
load_dotenv()

py_baseball = PyBaseball()
mlb_api = MLBStatsAPI()

payroll_source_paths = {
    "historical": os.getenv("MLB_PAYROLLS"),
    "recent": os.getenv("MLB_PAYROLLS_2025")
}

salary_source_paths = {
    "historical": os.getenv("MLB_PLAYER_SALARY_DATA")
}

salary = Salary(payroll_source_paths=payroll_source_paths, salary_source_paths=salary_source_paths)
payrolls = salary.payroll()

# Contains matching keys between data sources
# May be helpful down the road
# chadwick = py_baseball.player_search.chadwick()

import logging
logging.basicConfig(level=logging.WARNING, force=True)  # force=True resets handlers in Jupyter (Py3.8+)

for name in ("urllib3", "urllib3.connectionpool", "requests"):
    logging.getLogger(name).setLevel(logging.ERROR)
    logging.getLogger(name).propagate = False

### Utils and Consts

In [3]:
ROLLING_PERIOD = 3

In [4]:
def _filter_and_rename(df: pd.DataFrame, rename_map: dict):
    df = df[list(rename_map.keys())].rename(columns=rename_map)
    return df

def _reformat_statcast_name(name: str):
    name_split = name.split(", ")
    return f"{name_split[-1]} {name_split[0]}"

def _aggregate_over_season_bref(bref_data: pd.DataFrame) -> pd.DataFrame:
    return (
        bref_data
        .groupby(["season", "mlb_id"])
        .agg({
            "player_name": "first",
            "team": lambda x: "/".join(x.unique()),   # teams played for that season
            "salary": "sum",
            "bWAR": "sum",
        })
        .reset_index()
    )

### Batters

Standard compile standard batter stats from fangraphs

In [5]:
def standard_batter_stats_data_preprocessing(season: int, batter_stats_collection: dict) -> dict:
    batter_stats = py_baseball.batter.stats(start_season=season)
    batter_stats = _filter_and_rename(batter_stats, KEEP_RENAME_MAP["stats"])
    # Used to obtain the player salaries
    batter_bwar = py_baseball.batter.bref_war(season=season)
    batter_bwar = _filter_and_rename(
        batter_bwar, 
        KEEP_RENAME_MAP["bref_war"]
    )
    
    batter_bwar["player_name"] = batter_bwar["player_name"].apply(clean_player_name)
    batter_bwar = _aggregate_over_season_bref(batter_bwar)

    batter_stats = (
        batter_stats
        .merge(
            batter_bwar, 
            how="left", 
            on=["player_name", "team", "season"] # I'm hoping this is enough information to make a good merge
            )
        )
    
    # For now assume that na values are league minimum
    league_minimum = salary.league_minimum_salaries(season)
    batter_stats["salary"] = batter_stats["salary"].fillna(np.float64(league_minimum))
    batter_stats["salary"] = batter_stats["salary"].replace(0.0, np.float64(league_minimum))
    
    return batter_stats_collection | {season: batter_stats}

start_season, end_season = 2009, 2025

batter_stats = {}
for season in range(start_season, end_season + 1):
    batter_stats = standard_batter_stats_data_preprocessing(season, batter_stats)

Compile statcast data.

Data is made up of:
* Statcast expected stats: e.g. expected batting average
* Statcast percentile rankings: e.g. sprint speed percentile

In [6]:
def standard_batter_statcast_preprocessing(season: int, batter_statcast_collection: dict) -> dict:
    batter_statcast_expected = py_baseball.batter.statcast_expected_stats(season)
    batter_statcast_expected = _filter_and_rename(batter_statcast_expected, KEEP_RENAME_MAP["statcast_exp"])

    batter_statcast_percentile = py_baseball.batter.statcast_percentile_ranks(season)
    batter_statcast_percentile = _filter_and_rename(batter_statcast_percentile, KEEP_RENAME_MAP["statcast_pct"])

    statcast = batter_statcast_expected.merge(
        batter_statcast_percentile, 
        how="left", 
        on=["statcast_id"]
    )

    statcast = statcast.dropna(subset="player_name")

    statcast["player_name"] = statcast["player_name"].apply(
        lambda player_name: _reformat_statcast_name(player_name)
    )

    statcast["season"] = season
    
    statcast = statcast.reset_index(drop=True)
    return batter_statcast_collection | {season: statcast}


start_season, end_season = 2015, 2025

batter_statcast = {}
for season in range(start_season, end_season + 1):
    batter_statcast = standard_batter_statcast_preprocessing(season, batter_statcast)

### Concatenate all batter data from fangraphs and statcast, and calculate rolling averages.

In [7]:
# Combine all batter stats
all_batter_stats = pd.concat(
    list(batter_stats.values()),
    ignore_index=True
)


# Combine all Statcast stats
all_statcast_stats = pd.concat(
    list(batter_statcast.values()),
    ignore_index=True
)

# Columns in Statcast but not in FA stats (plus season)
statcast_cols = (
    [col for col in all_statcast_stats.columns
     if col not in all_batter_stats.columns]
    + ["season"]
)

# Merge free agent batter stats with statcast stats
all_batter_stats = all_batter_stats.merge(
    all_statcast_stats[statcast_cols],
    how="left",
    left_on=["mlb_id", "season"],
    right_on=["statcast_id", "season"]
)

Compute rolling statistics for numerical data.

In [8]:
def calculate_rolling_statistics(df, groupby_col="fg_id", rolling_period=ROLLING_PERIOD, rolling_cols=ROLLING_COLS):
    """Compute rolling statistics"""
    for col in rolling_cols:
        df[f"{col}_{rolling_period}yr_rolling"] = (
            df
            .groupby(groupby_col)[col]
            .rolling(window=rolling_period, min_periods=None)
            .mean()
            .reset_index(level=0, drop=True)
        )
    return df

def shift_targets_up(df, targets, groupby_col="fg_id"):
    """Shift the next years war to the previous year"""
    for col in targets:
        df[f"target_{col}"] = (
            df
            .groupby(groupby_col)
            [col].shift(-1)
        )
    return df


Separate pre-statcast data and post-statcast free agent statistics.

In [9]:
all_batter_stats = calculate_rolling_statistics(all_batter_stats)

all_batter_stats = shift_targets_up(all_batter_stats, ["fWAR", "bWAR"])

pre_statcast_drop_cols = [
    col for col in all_batter_stats.columns 
    if col.replace(f"_{ROLLING_PERIOD}yr_rolling", "") in statcast_cols
]

pre_statcast = all_batter_stats[all_batter_stats["season"] < 2015].drop(pre_statcast_drop_cols, axis=1)
post_statcast = all_batter_stats[all_batter_stats["season"] >= 2015]

Build and apply the XGBRegressor Model

In [10]:
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from xgboost import XGBRegressor

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import numpy as np

from collections import defaultdict

exclude_cols = ["player_name", "fg_id", "target_fWAR", "target_bWAR"]
categorical_cols = ["team"] 
numeric_cols = [
    col for col in post_statcast.columns 
    if col not in exclude_cols 
    and col not in categorical_cols
]

def build_pre_transformer(numeric_cols, categorical_cols):
    return ColumnTransformer([
        # 1. Base numeric: impute + scale
        (
            "numeric",
            Pipeline([
                ("imputer", SimpleImputer(strategy="median")),
                ("scaler", StandardScaler()),
            ]),
            numeric_cols,
        ),

        # 2. Base categorical: one-hot
        # (
        #     "base_cat",
        #     OneHotEncoder(handle_unknown="ignore"),
        #     categorical_cols,
        # )
    ])


results = defaultdict(dict)
models = defaultdict(dict)

targets = ["target_fWAR", "target_bWAR"]

target_pool = post_statcast.dropna(subset=["target_bWAR", "target_fWAR"]).reset_index(drop=True)

X = target_pool.drop(exclude_cols, axis=1)

for target in targets:
    pre = build_pre_transformer(numeric_cols, categorical_cols)
    
    model = Pipeline([
        ("pre", pre),
        ("est", XGBRegressor(n_estimators=800, max_depth=6))
    ])
    y = target_pool[target]

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)

    mae = mean_absolute_error(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    r2 = r2_score(y_test, y_pred)

    # Save model
    models[target] = model

    # Save results
    results[target]["mae"] = mae
    results[target]["rmse"] = rmse
    results[target]["r2"] = r2

    print(f"{target}:")
    print(f"MAE:  {mae:,.3f}")
    print(f"RMSE: {rmse:,.3f}")
    print(f"R²:   {r2:.3f}")
    print("-" * 20)

 'swing_length_3yr_rolling']. At least one non-missing value is needed for imputation with strategy='median'.
 'swing_length_3yr_rolling']. At least one non-missing value is needed for imputation with strategy='median'.
 'swing_length_3yr_rolling']. At least one non-missing value is needed for imputation with strategy='median'.


target_fWAR:
MAE:  0.880
RMSE: 1.268
R²:   0.346
--------------------
target_bWAR:
MAE:  0.927
RMSE: 1.363
R²:   0.327
--------------------


 'swing_length_3yr_rolling']. At least one non-missing value is needed for imputation with strategy='median'.


In [11]:
model = models["target_fWAR"]

est = model.named_steps["est"]
importances = est.feature_importances_
feature_names = model.named_steps["pre"].get_feature_names_out()

import pandas as pd
importance_df = pd.DataFrame({
    "feature": feature_names,
    "importance": est.feature_importances_
}).sort_values("importance", ascending=False).reset_index(drop=True)

In [12]:
batter_predictions_2026 = post_statcast[post_statcast["season"] == 2025]

X_test = batter_predictions_2026.drop(exclude_cols, axis=1).reset_index(drop=True)

for target in targets:
    model = models[target]
    y_pred = model.predict(X_test)

    batter_predictions_2026[f"predicted_{target}"] = y_pred

 'swing_length_3yr_rolling']. At least one non-missing value is needed for imputation with strategy='median'.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  batter_predictions_2026[f"predicted_{target}"] = y_pred
 'swing_length_3yr_rolling']. At least one non-missing value is needed for imputation with strategy='median'.
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  batter_predictions_2026[f"predicted_{target}"] = y_pred


In [13]:
simplified = batter_predictions_2026[["fg_id", "player_name", "predicted_target_fWAR", "predicted_target_bWAR"]]
simplified.to_csv("2026 Batter WAR Projections.csv", index=False)