In [70]:
import pybaseball
from pybaseball import statcast
from pybaseball import  statcast_pitcher



from sklearn.model_selection import train_test_split
from sklearn.cluster import KMeans
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.metrics import silhouette_score, davies_bouldin_score
from sklearn.preprocessing import StandardScaler
from sklearn.utils import resample
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.model_selection import train_test_split, RandomizedSearchCV, GridSearchCV, KFold
from sklearn.tree import plot_tree
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import FunctionTransformer, OneHotEncoder
from sklearn.impute import SimpleImputer

from scipy.stats import beta, norm, binom
import scipy.optimize
from scipy.special import betaln

import matplotlib.pyplot as plt

import seaborn as sns

import pandas as pd

import numpy as np

import itertools

import xgboost as xgb

import joblib

import wandb

In [71]:
wandb.init(
    project='money-ball',
    config={
        'MODEL_TYPE': 'XGBoost',
        'objective':'multi:softprob', 
        'n_estimators':100,           # Set higher if using early stopping
        'max_depth':5, 
        'learning_rate':0.1,           # Lower learning rate often yields better probabilities
        'random_state':42,
        'tree_method':'hist',          # Faster training for large MLB datasets
        'early_stopping_rounds':None     
    }
)

config = wandb.config

In [72]:
pybaseball.cache.enable()
df = statcast(start_dt='2023-06-24', end_dt='2025-06-24')

This is a large query, it may take a moment to complete
Skipping offseason dates
Skipping offseason dates


100%|██████████| 493/493 [03:05<00:00,  2.65it/s]
  final_data = pd.concat(dataframe_list, axis=0).convert_dtypes(convert_string=False)


In [73]:
df_copy = df.copy()

In [74]:
def map_outcome_coarse(row):
    strikes = ['called_strike', 'swinging_strike', 'swinging_strike_blocked', 'foul_tip', 'missed_bunt', 'bunt_foul_tip', 'automatic_strike']
    balls = ['pitchout', 'ball', 'blocked_ball', 'intent_ball', 'automatic_ball']

    if row["description"] in balls:
        return "ball"
    
    if  row["description"] in strikes:
        return "strike"
    
    if row["description"] == "foul" and row['strikes'] == 2:
        return "non-strike foul"

    if row["description"] == "foul_bunt" and row['strikes'] == 2:
        return "non-strike foul"
    
    if row["description"] == "foul_pitchout" and row['strikes'] == 2:
        return "non-strike foul"
    
    if row["description"] == "foul" and row['strikes'] < 2:
        return "strike"
    
    if row["description"] == "foul_pitchout" and row['strikes'] < 2:
        return "strike"
    
    if row["description"] == "foul_bunt" and row['strikes'] < 2:
        return "strike"
    
    hits = ['hit_into_play', 'hit_by_pitch']
    if row['description'] in hits:
        return "hit"

df_copy['outcome_coarse'] = df_copy.apply(map_outcome_coarse, axis=1)

df_copy = df_copy.reset_index(drop=True)

In [75]:
df_copy['is_strike'] = df_copy['outcome_coarse'].isin(['strike'])
df_copy['is_hit'] = df_copy['outcome_coarse'].isin(['hit'])
df_copy['is_ball'] = df_copy['outcome_coarse'].isin(['ball'])
df_copy['is_foul'] = df_copy['outcome_coarse'].isin(['non-strike foul'])

df_copy['is_ff'] = df_copy['pitch_type'].isin(['FF'])
df_copy['is_ch'] = df_copy['pitch_type'].isin(['CH'])
df_copy['is_sl'] = df_copy['pitch_type'].isin(['SL'])
df_copy['is_si'] = df_copy['pitch_type'].isin(['SI'])
df_copy['is_fc'] = df_copy['pitch_type'].isin(['FC'])
df_copy['is_st'] = df_copy['pitch_type'].isin(['ST'])
df_copy['is_fs'] = df_copy['pitch_type'].isin(['FS'])
df_copy['is_cu'] = df_copy['pitch_type'].isin(['CU'])
df_copy['is_kc'] = df_copy['pitch_type'].isin(['KC'])

In [76]:
train_df, test_df= train_test_split(
    df_copy,
    test_size=0.2,
    random_state=42)

train_df, val_df = train_test_split(
    train_df, 
    test_size=0.2,
    random_state=42)

In [77]:
mapping = {'strike': 0, 'ball': 1, 'hit': 2, 'non-strike foul': 3}

y_train = train_df["outcome_coarse"].map(mapping)
y_test = test_df["outcome_coarse"].map(mapping)
y_val = val_df["outcome_coarse"].map(mapping)

X_train = train_df.drop(columns=["outcome_coarse"])
X_test = test_df.drop(columns=["outcome_coarse"])
X_val = val_df.drop(columns=["outcome_coarse"])

In [78]:
class BetaBinomialPrior:
    # fit will take in y (success col) and n (total col)
    def fit(self, y, n):
        mu = y.sum()/n.sum()

        # create negative likelihood function
        def nll(kappa):

            alpha = mu * kappa
            beta = (1-mu)*kappa

            # calculate joint beta-binomial marginal likelihood
            loglik = np.sum(betaln(y+alpha,n-y+beta)-betaln(alpha,beta))
            
            return -loglik # we minimize later, make negative to do so

        res = scipy.optimize.minimize(nll, x0 = 10, bounds=[(0.01,None)])
        kappa_hat = res.x[0]
        self.alpha_ = mu * kappa_hat
        self.beta_ = (1-mu)*kappa_hat
        self.mu_ = mu

        return self

    def posterior_mean(self, y, n):
        return (y + self.alpha_) / (n + self.alpha_ + self.beta_)
                    

In [79]:
# create sklearn fit transform pipeline which leverages BetaBinomialPrior class
class PlayerEBRateTransformer(BaseEstimator, TransformerMixin):

    # init vars
    def __init__(self, player_col, success_col, output_col):
        self.player_col = player_col
        self.success_col = success_col
        self.output_col = output_col
    
    # fit prior to data, calculate posteriorsT
    def fit(self, X, y=None):
        # group by player id column, get success and totals for each player
        grouped = (
            X.groupby(self.player_col)
            .agg(
                y = (self.success_col, "sum"),
                n = (self.success_col, "size")
            )
        )
        
        # calculate prior using neg log likelihood method in BetaBinomialPrior().fit()
        # provide successes and failure values for each player to make this estimation
        self.prior_ = BetaBinomialPrior().fit(
            grouped['y'].values,
            grouped['n'].values
        )

        # calculate posterior mean (new metric estimate) for each player in pivot
        grouped[self.output_col] = self.prior_.posterior_mean(
            grouped['y'].values,
            grouped['n'].values
        )

        # create lookup table and default values for pitchers
        self.lookup_ = grouped[self.output_col].to_dict()
        self.default_ = self.prior_.mu_

        return self
    
    # create transform funciton to set column values to lookup values or default if not present
    def transform(self, X):
        X = X.copy()
        X[self.output_col] = (
            X[self.player_col]
            .map(self.lookup_)
            .fillna(self.default_) # might need to fix this step to fill NA with a calculated shrunk rate using alpha and beta?
        )
        return X


In [80]:
from sklearn.pipeline import Pipeline

# create sklearn pipeline to transform the data using empirical bayes shrinkage
eb_pipeline = Pipeline([
    ("pitcher_strike_eb", PlayerEBRateTransformer(
        player_col="pitcher",
        success_col="is_strike",
        # total_col="pitch_id",
        output_col="pitcher_strike_rate_eb"
    )),
    ("pitcher_ball_eb", PlayerEBRateTransformer(
        player_col="pitcher",
        success_col="is_ball",
        # total_col="pitch_id",
        output_col="pitcher_ball_rate_eb"
    )),
    ("pitcher_hit_eb", PlayerEBRateTransformer(
        player_col="pitcher",
        success_col="is_hit",
        # total_col="pitch_id",
        output_col="pitcher_hit_rate_eb"
    )),
    ("batter_hit_eb", PlayerEBRateTransformer(
        player_col="batter",
        success_col="is_hit",
        # total_col="pitch_id",
        output_col="batter_hit_rate_eb"
    )),
    ("batter_strike_eb", PlayerEBRateTransformer(
        player_col="batter",
        success_col="is_strike",
        # total_col="pitch_id",
        output_col="batter_strike_rate_eb"
    )),
    ("batter_ball_eb", PlayerEBRateTransformer(
        player_col="batter",
        success_col="is_ball",
        # total_col="pitch_id",
        output_col="batter_ball_rate_eb"
    )),
    ("pitcher_ff_rate_eb", PlayerEBRateTransformer(
        player_col="pitcher",
        success_col="is_ff",
        # total_col="pitch_id",
        output_col="pitcher_ff_rate_eb"
    )),
    ("pitcher_ch_rate_eb", PlayerEBRateTransformer(
        player_col="pitcher",
        success_col="is_ch",
        # total_col="pitch_id",
        output_col="pitcher_ch_rate_eb"
    )),
    ("pitcher_sl_rate_eb", PlayerEBRateTransformer(
        player_col="pitcher",
        success_col="is_sl",
        # total_col="pitch_id",
        output_col="pitcher_sl_rate_eb"
    )),
    ("pitcher_si_rate_eb", PlayerEBRateTransformer(
        player_col="pitcher",
        success_col="is_si",
        # total_col="pitch_id",
        output_col="pitcher_si_rate_eb"
    )),
    ("pitcher_fc_rate_eb", PlayerEBRateTransformer(
        player_col="pitcher",
        success_col="is_fc",
        # total_col="pitch_id",
        output_col="pitcher_fc_rate_eb"
    )),
    ("pitcher_st_rate_eb", PlayerEBRateTransformer(
        player_col="pitcher",
        success_col="is_st",
        # total_col="pitch_id",
        output_col="pitcher_st_rate_eb"
    )),
    ("pitcher_fs_rate_eb", PlayerEBRateTransformer(
        player_col="pitcher",
        success_col="is_fs",
        # total_col="pitch_id",
        output_col="pitcher_fs_rate_eb"
    )),
    ("pitcher_cu_rate_eb", PlayerEBRateTransformer(
        player_col="pitcher",
        success_col="is_cu",
        # total_col="pitch_id",
        output_col="pitcher_cu_rate_eb"
    )),
    ("pitcher_kc_rate_eb", PlayerEBRateTransformer(
        player_col="pitcher",
        success_col="is_kc",
        # total_col="pitch_id",
        output_col="pitcher_kc_rate_eb"
    )),
])

In [81]:
class PlayerClusterDistanceTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, player_col, cluster_cols, max_k=10, prefix="cluster"):
        self.player_col = player_col
        self.cluster_cols = cluster_cols
        self.max_k = max_k
        self.prefix = prefix

        self.scaler_ = StandardScaler()
        self.kmeans_ = None
        self.best_k_ = None
        self.distance_map_ = None
        self.dist_cols_ = None

    def fit(self, X, y=None):
        # One row per player
        X_unique = (
            X.groupby(self.player_col)[self.cluster_cols]
            .first()
            .dropna()
        )

        X_scaled = self.scaler_.fit_transform(X_unique)

        # Select best K via Davies-Bouldin
        ks = np.arange(2, self.max_k + 1)
        db_scores = []

        for k in ks:
            km = KMeans(n_clusters=k, n_init=10, random_state=42)
            labels = km.fit_predict(X_scaled)
            db_scores.append(davies_bouldin_score(X_scaled, labels))

        self.best_k_ = ks[np.argmin(db_scores)]

        self.kmeans_ = KMeans(
            n_clusters=self.best_k_,
            n_init=10,
            random_state=42
        ).fit(X_scaled)

        # Compute distances
        distances = self.kmeans_.transform(X_scaled)

        self.dist_cols_ = [
            f"{self.prefix}_center_{i}"
            for i in range(self.best_k_)
        ]

        # Store as mapping instead of DataFrame
        self.distance_map_ = dict(
            zip(X_unique.index, distances)
        )

        return self

    def transform(self, X):
        X_out = X.copy()

        # Map existing players
        def get_dist(player):
            return self.distance_map_.get(player)

        dist_array = X_out[self.player_col].map(get_dist)

        # Handle unseen players
        unseen_mask = dist_array.isna()

        if unseen_mask.any():
            unseen_players = (
                X_out.loc[unseen_mask, [self.player_col] + self.cluster_cols]
                .drop_duplicates()
            )

            scaled = self.scaler_.transform(unseen_players[self.cluster_cols])
            dists = self.kmeans_.transform(scaled)

            for player, dist in zip(unseen_players[self.player_col], dists):
                self.distance_map_[player] = dist

            dist_array = X_out[self.player_col].map(get_dist)

        # Expand into columns
        dist_matrix = np.vstack(dist_array.values)
        dist_df = pd.DataFrame(
            dist_matrix,
            columns=self.dist_cols_,
            index=X_out.index
        )

        return pd.concat([X_out, dist_df], axis=1)
    

    def get_feature_names_out(self, input_features=None):
        return self.dist_cols_



In [82]:
kmeans_pipeline = Pipeline([
    ('pitcher_archetype', PlayerClusterDistanceTransformer(
        player_col='pitcher',
        cluster_cols=['pitcher_strike_rate_eb', 'pitcher_ball_rate_eb', 'pitcher_hit_rate_eb'],
        prefix='pitcher_cluster'
    )),
    ('batter_archetype', PlayerClusterDistanceTransformer(
        player_col='batter',
        cluster_cols=['batter_strike_rate_eb', 'batter_ball_rate_eb', 'batter_hit_rate_eb'],
        prefix='batter_cluster'
    )),
    # ('pitcher_pitch_type_archetype', PlayerKmeansTransformer(
    #     player_col='pitcher',
    #     cluster_cols=["pitcher_sl_rate_eb", "pitcher_si_rate_eb", "pitcher_ch_rate_eb", "pitcher_cu_rate_eb"],
    #     prefix='pitcher_div'
    # ))
])

In [83]:
X_train_eb = eb_pipeline.fit_transform(X_train)
X_train_kmeans = kmeans_pipeline.fit_transform(X_train_eb)
X_train_kmeans.info()

<class 'pandas.core.frame.DataFrame'>
Index: 990656 entries, 278316 to 1070020
Columns: 156 entries, pitch_type to batter_cluster_center_4
dtypes: Float64(42), Int64(59), bool(13), datetime64[ns](1), float64(25), object(16)
memory usage: 1.2+ GB


In [84]:
# 1. Feature Creator (The "Count" String)
def make_count_feature(X):
    X = X.copy()
    X['count'] = X['balls'].astype(str) + "-" + X['strikes'].astype(str)
    return X

count_gen = FunctionTransformer(make_count_feature)



In [85]:
# 2. Categorical Mini-Pipeline
categorical_features=[
    'game_type',
    'stand',
    'p_throws',
    'if_fielding_alignment',
    'of_fielding_alignment'
]

cat_pipeline = Pipeline([
    ("imputer", SimpleImputer(strategy="constant", fill_value="unknown")),
    ("encoder", OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])

In [None]:
rate_features = [
    'pitcher_strike_rate_eb', 
    'pitcher_ball_rate_eb', 
    'pitcher_hit_rate_eb',
    'batter_strike_rate_eb', 
    'batter_ball_rate_eb', 
    'batter_hit_rate_eb'
]

base_numeric = [
    'game_year',
    'outs_when_up',
    'inning',
    'at_bat_number',
    'bat_score',
    'fld_score'
]

In [87]:
preprocessor_pipeline = Pipeline([
    ("eb_logic", eb_pipeline),        # EB rate features
    ("count_step", count_gen),        # Adds 'count' string
    ("kmeans_logic", kmeans_pipeline) # Adds cluster distance columns
])

preprocessor_pipeline.fit(X_train)

In [88]:
kmeans_step = preprocessor_pipeline.named_steps["kmeans_logic"]

pitcher_cols = kmeans_step.named_steps["pitcher_archetype"].get_feature_names_out()
batter_cols = kmeans_step.named_steps["batter_archetype"].get_feature_names_out()

kmeans_cols = pitcher_cols + batter_cols

In [89]:
# 3. The Final Processor
# We wait until the VERY end to OHE everything. This keeps the intermediate
# dataframes clean and readable.
processor = ColumnTransformer([

    ("base", "passthrough", base_numeric),

    ("rates", "passthrough", rate_features),

    ("kmeans", "passthrough", kmeans_cols),

    # Encode the standard categories
    ("cat_encoding", cat_pipeline, categorical_features),
    
    # Encode the count string we created in step 1
    ("count_ohe", OneHotEncoder(handle_unknown='ignore', sparse_output=False), ['count']),
    



], remainder="drop")

processor.set_output(transform="pandas")

In [90]:
xgb_clf_best = xgb.XGBClassifier(
    objective=config.objective, 
    n_estimators=config.n_estimators,           # Set higher if using early stopping
    max_depth=config.max_depth, 
    learning_rate=config.learning_rate,           # Lower learning rate often yields better probabilities
    random_state=config.random_state,
    tree_method=config.tree_method,          # Faster training for large MLB datasets
    early_stopping_rounds=config.early_stopping_rounds
)

In [91]:
# 4. The Master Assembly
xgb_model_pipeline = Pipeline([
    ("preprocessing", preprocessor_pipeline),
    ("final_prep", processor),
    ("xgb_model", xgb_clf_best)
])

In [92]:
xgb_model_pipeline.fit(X_train, y_train)

# X_train_transformed.info()

In [93]:
probs = xgb_model_pipeline.predict_proba(X_test)

In [94]:
from sklearn.metrics import log_loss
loss = log_loss(y_test, probs)

print(f"Validation Log Loss: {loss:.4f}")
print(probs*10)

loss_dict = {
    'log loss':loss
}

Validation Log Loss: 1.0973
[[5.0705166  3.8596053  0.9775927  0.09228536]
 [1.6801988  3.093423   2.6241202  2.6022587 ]
 [1.5962558  3.9699984  2.312961   2.1207848 ]
 ...
 [4.855255   3.5850787  1.5356705  0.023996  ]
 [4.8036757  2.7981732  2.3669674  0.03118365]
 [4.125033   2.845312   3.0204012  0.0092537 ]]


In [95]:
joblib.dump(xgb_model_pipeline, 'model.pkl')
model_artifact = wandb.Artifact(name="pitch-odds-model", type="model", metadata=loss_dict)
model_artifact.add_file('model.pkl')
wandb.log_artifact(model_artifact)
wandb.finish()