In [2]:
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm
from datetime import date

from pybaseball import statcast, pitching_stats, batting_stats, cache
cache.enable()          

from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import roc_auc_score, log_loss, brier_score_loss
from sklearn.calibration import CalibratedClassifierCV
from sklearn.model_selection import TimeSeriesSplit

from lightgbm import LGBMClassifier
import joblib, matplotlib.pyplot as plt, seaborn as sns


In [None]:

raw = pd.read_parquet("statcast_2019_2024.parquet")

print(f"Loaded Statcast data: {raw.shape[0]:,} rows, {raw.shape[1]} columns")
raw.head()



In [4]:
# Clean and label data
df = (raw
      .loc[raw.events.notna()]  # only keep pitches with known outcomes
      .assign(hr = lambda x: (x.events == "home_run").astype(int))  # create binary label
      .reset_index(drop=True))

# Filter to rows with valid physical data
df = df.query("release_speed.notna() & plate_x.notna() & plate_z.notna()")


# Optional: check how many rows per season
df['game_date'] = pd.to_datetime(df['game_date'])



In [5]:
# --- location buckets: 5×5 grid ---
# Horizontal strike zone
loc_x_bins = [-2, -1, -0.5, 0, 0.5, 1, 2]

# Vertical strike zone
loc_z_bins = [1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 5.0]

df["loc_x_bin"] = pd.cut(df.plate_x, bins=loc_x_bins, labels=False)
df["loc_z_bin"] = pd.cut(df.plate_z, bins=loc_z_bins, labels=False)

# --- velocity z-score per pitch type & pitcher ---
df["v_rel"] = (df.release_speed - 
               df.groupby(["pitcher","pitch_type"]).release_speed.transform("mean")
              ) / df.groupby(["pitcher","pitch_type"]).release_speed.transform("std")

# --- break deltas relative to league at that velo band ---
velo_band = (df.release_speed // 5 * 5)     # 80-84, 85-89, ...
for axis, col in zip(["x","z"], ["pfx_x","pfx_z"]):
    mu = df.groupby(["pitch_type", velo_band])[col].transform("mean")
    df[f"{col}_rel"] = df[col] - mu

# Keep only feature columns we plan to model
feat_cols = ["loc_x_bin","loc_z_bin","v_rel","pfx_x_rel","pfx_z_rel",
             "release_extension","balls","strikes","pitch_type",
             "stand","p_throws"]          # batter & pitcher handedness
df = df[feat_cols + ["hr","game_date","pitcher","batter"]]



In [6]:
from pybaseball import pitching_stats, batting_stats, playerid_reverse_lookup
import pandas as pd

# 1. Pull FanGraphs stats for each year
seasons = range(2019, 2025)
pitch_stats = []
bat_stats = []

for yr in seasons:
    print(f"Pulling stats for {yr}...")
    p_stats = pitching_stats(yr, qual=0)[["IDfg", "ERA", "WHIP"]].copy()
    b_stats = batting_stats(yr, qual=0)[["IDfg", "ISO", "BB%", "K%"]].copy()
    
    p_stats = p_stats.rename(columns={"IDfg": "pitcher_fg"}).assign(season=yr)
    b_stats = b_stats.rename(columns={"IDfg": "batter_fg"}).assign(season=yr)
    
    pitch_stats.append(p_stats)
    bat_stats.append(b_stats)

pitch_stats = pd.concat(pitch_stats, ignore_index=True)
bat_stats   = pd.concat(bat_stats,   ignore_index=True)

# 2. Map MLBAM (Statcast) IDs → FanGraphs IDs
unique_ids = pd.unique(df[["pitcher", "batter"]].values.ravel())
id_map = playerid_reverse_lookup(unique_ids)
id_map = id_map[["key_mlbam", "key_fangraphs"]].dropna().astype(int)
id_map = id_map.rename(columns={"key_mlbam": "mlbam_id", "key_fangraphs": "fg_id"})

# 3. Add FanGraphs IDs to df
df = df.rename(columns={"pitcher": "mlbam_id"})
df = df.merge(id_map.rename(columns={"fg_id": "pitcher_fg"}), on="mlbam_id", how="left")
df = df.rename(columns={"mlbam_id": "pitcher"})

df = df.rename(columns={"batter": "mlbam_id"})
df = df.merge(id_map.rename(columns={"fg_id": "batter_fg"}), on="mlbam_id", how="left")
df = df.rename(columns={"mlbam_id": "batter"})

# 4. Add season column (if not already there)
df["season"] = pd.to_datetime(df["game_date"]).dt.year

# 5. Merge season stats using FanGraphs IDs
df = df.merge(pitch_stats, on=["pitcher_fg", "season"], how="left")
df = df.merge(bat_stats,   on=["batter_fg", "season"],  how="left")

# 6. Final rename — use actual column names if they exist
# We'll auto-detect from the merge result
rename_map = {}
for col in df.columns:
    if col == "ERA": rename_map["ERA"] = "ERA_pitcher"
    if col == "WHIP": rename_map["WHIP"] = "WHIP_pitcher"
    if col == "ISO": rename_map["ISO"] = "ISO_batter"
    if col == "BB%": rename_map["BB%"] = "BB%_batter"
    if col == "K%": rename_map["K%"] = "K%_batter"
df = df.rename(columns=rename_map)

# 7. Display final check
print("Merged shape:", df.shape)
print("Available stat columns:", [col for col in df.columns if "ERA" in col or "WHIP" in col or "ISO" in col or "BB%" in col or "K%" in col])
df[["ERA_pitcher", "WHIP_pitcher", "ISO_batter", "BB%_batter", "K%_batter"]].describe()


Pulling stats for 2019...
Pulling stats for 2020...
Pulling stats for 2021...
Pulling stats for 2022...
Pulling stats for 2023...
Pulling stats for 2024...
Merged shape: (993929, 23)
Available stat columns: ['ERA_pitcher', 'WHIP_pitcher', 'ISO_batter', 'BB%_batter', 'K%_batter']


Unnamed: 0,ERA_pitcher,WHIP_pitcher,ISO_batter,BB%_batter,K%_batter
count,975852.0,975852.0,982061.0,982061.0,982061.0
mean,4.351166,1.317542,0.165757,0.084799,0.227955
std,2.132945,0.328138,0.06352,0.032445,0.070676
min,0.0,0.0,0.0,0.0,0.0
25%,3.3,1.14,0.124,0.063,0.181
50%,4.07,1.28,0.164,0.083,0.223
75%,4.99,1.44,0.207,0.104,0.268
max,162.0,21.0,1.5,1.0,1.0


In [None]:
# ╔════════════════════════════════════════════════════════════════════╗
# ║  LIGHTGBM TRAINING: Fewer False Positives for HR Classification   ║
# ╚════════════════════════════════════════════════════════════════════╝
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import roc_auc_score, log_loss
from sklearn.calibration import CalibratedClassifierCV
from lightgbm import LGBMClassifier
import numpy as np, joblib

# ─────────────────────────────────────────────────────────────────────
# 1. FILTER unreliable rows
# ─────────────────────────────────────────────────────────────────────
df = (
    df[df["WHIP_pitcher"].notna() & (df["WHIP_pitcher"] > 0)]
      .loc[df["ERA_pitcher"].notna()]
      .loc[df["ISO_batter"].notna()]
      .loc[df["BB%_batter"].notna()]
      .loc[df["K%_batter"].notna()]
)

# ─────────────────────────────────────────────────────────────────────
# 2. CLIP / FILL extreme numeric values
# ─────────────────────────────────────────────────────────────────────
df["ERA_pitcher"]  = df["ERA_pitcher"].clip(1, 10).fillna(5)
df["WHIP_pitcher"] = df["WHIP_pitcher"].clip(0.8, 2.0).fillna(1.3)
df["ISO_batter"]   = df["ISO_batter"].clip(0, 1.0).fillna(0.15)
df["BB%_batter"]   = df["BB%_batter"].clip(0, 0.30).fillna(0.08)
df["K%_batter"]    = df["K%_batter"].clip(0, 0.50).fillna(0.20)

# ─────────────────────────────────────────────────────────────────────
# 3. FEATURE MATRIX
# ─────────────────────────────────────────────────────────────────────
feature_cols = [
    "loc_x_bin","loc_z_bin","v_rel","pfx_x_rel","pfx_z_rel","release_extension",
    "balls","strikes","pitch_type","stand","p_throws",
    "ERA_pitcher","WHIP_pitcher","ISO_batter","BB%_batter","K%_batter"
]
X = df[feature_cols]
y = df["hr"]

cat_cols = ["loc_x_bin","loc_z_bin","pitch_type","stand","p_throws"]
num_cols = [c for c in feature_cols if c not in cat_cols]

preprocessor = ColumnTransformer([
    ("num", StandardScaler(),               num_cols),
    ("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols)
])

# Conservative LightGBM: penalize false positives more
model = LGBMClassifier(
    objective="binary",
    n_estimators=800,
    learning_rate=0.03,          # weight inverse of class freq
    class_weight={0: 2.0, 1: 1.0},          # ↑ penalize false positives more
    num_leaves=48,
    min_data_in_leaf=300,
    reg_lambda=3.0,
    random_state=42
)

pipe = Pipeline([
    ("prep", preprocessor),
    ("clf",  model)
])

# ─────────────────────────────────────────────────────────────────────
# 4. TIME-SERIES CV + ISOTONIC CALIBRATION
# ─────────────────────────────────────────────────────────────────────
tscv = TimeSeriesSplit(n_splits=5)
aucs, losses = [], []

for fold, (tr, val) in enumerate(tscv.split(X)):
    pipe.fit(X.iloc[tr], y.iloc[tr])

    calib = CalibratedClassifierCV(pipe, method="isotonic", cv="prefit")
    calib.fit(X.iloc[val], y.iloc[val])

    p = calib.predict_proba(X.iloc[val])[:,1]
    aucs.append(roc_auc_score(y.iloc[val], p))
    losses.append(log_loss(y.iloc[val], p))
    print(f"Fold {fold+1}: AUC={aucs[-1]:.4f}  LogLoss={losses[-1]:.4f}")

print(f"\n✅ Avg AUC  = {np.mean(aucs):.4f}")
print(f"✅ Avg LL   = {np.mean(losses):.4f}")

# ─────────────────────────────────────────────────────────────────────
# 5. FINAL TRAINING with 10% HOLD-OUT FOR CALIBRATION
# ─────────────────────────────────────────────────────────────────────
cut = df["game_date"].quantile(0.90)
mask_cal = df["game_date"] >= cut

pipe.fit(X[~mask_cal], y[~mask_cal])   # final fit
final_model = CalibratedClassifierCV(
    estimator=pipe,
    method="isotonic",
    cv="prefit"
).fit(X[mask_cal], y[mask_cal])

joblib.dump(final_model, "hr_pitch_model_calibrated.joblib")
print("✅  Model saved → hr_pitch_model_calibrated.joblib")


In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.utils import compute_sample_weight
import joblib

# Hold out 2% most recent pitches
hold_mask = df["game_date"] >= df["game_date"].quantile(0.98)
X_hold, y_hold = X[hold_mask], y[hold_mask]

# Raw probabilities from calibrated model
p_raw = final_model.predict_proba(X_hold)[:, 1].reshape(-1, 1)

# 💡 Assign higher weight to negative class (non-HR)
sample_weights = compute_sample_weight(class_weight={0: 2.0, 1: 1.0}, y=y_hold)


# Fit temperature model with weight bias against false positives
temp_model = LogisticRegression(
    penalty=None,
    solver="lbfgs",
    max_iter=1000
).fit(p_raw, y_hold, sample_weight=sample_weights)

# Save it
joblib.dump(temp_model, "temp_scaler.joblib")
print("✅ Temperature scaler saved with weight against false positives")


In [23]:
# ╔═════════════════════════════════════════════════════════════════════╗
# ║  FEATURE-IMPORTANCE (global)  – sums OHE columns back to originals ║
# ╚═════════════════════════════════════════════════════════════════════╝

import pandas as pd
import re

# 1)  Grab the fitted pieces
pipe      = final_model.estimator                 # the original Pipeline
prep      = pipe.named_steps["prep"]              # ColumnTransformer
lgbm      = pipe.named_steps["clf"]               # LightGBMClassifier

# 2)  Get full transformed column list
full_names = prep.get_feature_names_out()         # e.g. 'num__ERA_pitcher' or 'cat__pitch_type_FF'

# 3)  Strip the 'num__' / 'cat__' prefixes for cleaner labels
clean = [re.sub(r"^(num|cat)__", "", n) for n in full_names]

# 4)  Pair with LightGBM's raw importances
raw_imp = pd.Series(lgbm.feature_importances_, index=clean)

# 5)  Collapse one-hot columns: sum per original feature (text before first '_'
#     for pitch_type_FF → pitch_type, stand_R → stand, etc.)
def root(col):
    if col.startswith("loc_x") or col.startswith("loc_z"):
        return "Pitch location (grid)"
    if col.startswith("pfx_x") or col.startswith("pfx_z"):
        return "Movement delta (pfx)"
    if col.startswith("pitch_type"):
        return "Pitch type"
    if col.startswith("stand"):
        return "Batter stand (L/R)"
    if col.startswith("p_throws"):
        return "Pitcher throws (L/R)"
    if col == "v_rel":
        return "Velocity z-score"
    if col == "release_extension":
        return "Release extension"
    if col in ["ERA_pitcher","WHIP_pitcher"]:
        return col  # keep as is
    if col in ["ISO_batter","BB%_batter","K%_batter"]:
        return col  # keep as is
    return col          # fallback

agg_imp = raw_imp.groupby(raw_imp.index.map(root)).sum()

# 6)  Rank and display
ranked = agg_imp.sort_values(ascending=False).to_frame("importance")
display(ranked.head(20))        # top 20, change or drop .head() to see all


Unnamed: 0,importance
Movement delta (pfx),7214
Pitch location (grid),4649
Velocity z-score,3797
ISO_batter,3723
K%_batter,3424
ERA_pitcher,3087
BB%_batter,2951
WHIP_pitcher,2288
Release extension,1969
Pitch type,1604


In [None]:
# ╔══════════════════════════════════════════════════════════════════════════╗
# ║  GAME-LEVEL HR-PROB PREDICTOR  – swing-only risk, pitch-type breakdown  ║
# ╚══════════════════════════════════════════════════════════════════════════╝
import pandas as pd, numpy as np, datetime as dt, joblib, warnings, sys
from collections import defaultdict
from pathlib import Path
import warnings
from pybaseball import (
    statcast_pitcher, statcast_batter, playerid_reverse_lookup,
    batting_stats, pitching_stats
)

warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)

pd.set_option('future.no_silent_downcasting', True)
# ───────────────────────── CONFIG ──────────────────────────
BATTER_ID   = 666969          # Marcell Ozuna
PITCHER_ID  = 669372          # Matthew Boyd
TARGET_DATE = dt.date.today()

LOOKBACK_DAYS = 30
N_PA          = 4             # projected PA for this batter
MC            = 1000          # Monte-Carlo runs
MODEL_PATH    = "hr_pitch_model_calibrated.joblib"
TEMP_PATH     = "temp_scaler.joblib"
STATS_PATH    = "statcast_2019_2024_with_stats.parquet"
MANUAL_BAT    = "R"           # override if lookup fails
MANUAL_THROW  = "R"
# ───────────────────────────────────────────────────────────

# 0 ░ LOAD MODELS & LOOKUP TABLE ░
model = joblib.load(MODEL_PATH)
if Path(TEMP_PATH).exists():
    temp_model  = joblib.load(TEMP_PATH)
    temp_scaled = lambda p: temp_model.predict_proba(p.reshape(-1,1))[:,1]
else:
    warnings.warn("No temp_scaler.joblib – probabilities un-shrunk")
    temp_scaled = lambda p: p

stats_df = pd.read_parquet(STATS_PATH)

SEASON = TARGET_DATE.year
if SEASON not in stats_df["season"].unique():
    SEASON = int(stats_df["season"].max())
    warnings.warn(f"No rows for season {TARGET_DATE.year}; fallback to {SEASON}")

# 1 ░ PLAYER NAMES & SIDES ░
def name_side(pid):
    row = playerid_reverse_lookup([pid]).iloc[0]
    return f"{row['name_first']} {row['name_last']}", row.get("bats"), row.get("throws")

batter_name, bat_side, _ = name_side(BATTER_ID)
pitcher_name, _, p_throw = name_side(PITCHER_ID)

def fix_side(orig, manual, role, pid, col):
    if manual: return manual
    if pd.notna(orig) and orig != "": return orig
    try:
        samp = statcast_batter(f"{SEASON-1}-06-01", f"{SEASON-1}-06-15",
                               pid, verbose=False).head(1)
        if not samp.empty:
            return samp[col].iloc[0]
    except Exception:
        pass
    warnings.warn(f"No {role} side for {pid}; defaulting to 'R'")
    return "R"

bat_side = fix_side(bat_side, MANUAL_BAT,   "bat",   BATTER_ID, "stand")
p_throw  = fix_side(p_throw,  MANUAL_THROW, "throw", PITCHER_ID, "p_throws")

# 2 ░ FETCH RECENT PITCHES ░
def pitcher_pitches():
    end, start = TARGET_DATE, TARGET_DATE - dt.timedelta(days=LOOKBACK_DAYS)
    df = statcast_pitcher(start.isoformat(), end.isoformat(), PITCHER_ID)
    df = df[df["stand"] == bat_side]
    if len(df) < 200:
        s1, s2 = f"{SEASON}-03-01", f"{SEASON}-10-31"
        df = statcast_pitcher(s1, s2, PITCHER_ID)
        df = df[df["stand"] == bat_side]
    df = df[df["plate_x"].notna() & df["plate_z"].notna()]
    df["pa_id"] = df["game_pk"].astype(str) + "_" + df["at_bat_number"].astype(str)
    return df

pitch_df = pitcher_pitches()

# count-state HR weights
cnt_hr = (pitch_df.groupby(["balls","strikes"])["events"]
                     .apply(lambda s: (s=="home_run").mean())
                     .clip(lower=0.001))
pitch_df["w_cnt"] = pitch_df.set_index(["balls","strikes"]).index.map(cnt_hr)

# 3 ░ FEATURE ENGINEERING HELPER ░
x_bins = [-2,-1,-0.5,0,0.5,1,2]
z_bins = [1,1.5,2,2.5,3,3.5,5]
def engineer(df):
    df = df.copy()
    df["loc_x_bin"] = pd.cut(df["plate_x"], x_bins, labels=False)
    df["loc_z_bin"] = pd.cut(df["plate_z"], z_bins, labels=False)
    grp = df.groupby("pitch_type")
    df["v_rel"] = (df["release_speed"] - grp["release_speed"].transform("mean"))/grp["release_speed"].transform("std")
    velo_band = (df["release_speed"] // 5 * 5)
    for raw, rel in [("pfx_x","pfx_x_rel"),("pfx_z","pfx_z_rel")]:
        df[rel] = df[raw] - df.groupby(["pitch_type", velo_band])[raw].transform("mean")
    df["stand"]    = bat_side
    df["p_throws"] = p_throw
    return df

# 4 ░ SEASON-STAT LOOKUP ░
def pull(col, id_col, pid, default):
    row = stats_df.loc[(stats_df[id_col]==pid)&(stats_df["season"]==SEASON)]
    if not row.empty and pd.notna(row.iloc[0][col]): return row.iloc[0][col]
    row = stats_df.loc[stats_df[id_col]==pid].sort_values("season", ascending=False).head(1)
    if not row.empty and pd.notna(row.iloc[0][col]): return row.iloc[0][col]
    try:
        fg_tbl = (batting_stats if id_col=="batter" else pitching_stats)(SEASON,qual=0).set_index("IDfg")
        raw = col.split("_")[0].upper()
        if pid in fg_tbl.index and pd.notna(fg_tbl.loc[pid, raw]):
            return fg_tbl.loc[pid, raw]
    except Exception:
        pass
    return default

ERA_val  = pull("ERA_pitcher","pitcher",PITCHER_ID,4.30)
WHIP_val = pull("WHIP_pitcher","pitcher",PITCHER_ID,1.30)
ISO_val  = pull("ISO_batter","batter",BATTER_ID,0.150)
BB_val   = pull("BB%_batter","batter",BATTER_ID,0.08)
K_val    = pull("K%_batter","batter",BATTER_ID,0.22)

# 5 ░ MONTE-CARLO  (swing-only risk + pitch-type tallies) ░
pitch_type_totals = defaultdict(float)
game_probs        = []
swing_regex       = r"swing|foul|hit"

for _ in range(MC):
    pa_ids = np.random.choice(pitch_df["pa_id"].unique(), N_PA, replace=True)
    pa_df  = pitch_df[pitch_df["pa_id"].isin(pa_ids)]
    pa_df  = pa_df.sample(frac=1, weights="w_cnt", replace=True)

    feats = engineer(pa_df).reset_index(drop=True)
    feats[["ERA_pitcher","WHIP_pitcher"]] = ERA_val, WHIP_val
    feats[["ISO_batter","BB%_batter","K%_batter"]] = ISO_val, BB_val, K_val
    feats.replace([np.inf,-np.inf],0,inplace=True); feats.fillna(0,inplace=True)
    if "batter"  in model.feature_names_in_: feats["batter"]  = BATTER_ID
    if "pitcher" in model.feature_names_in_: feats["pitcher"] = PITCHER_ID

    pp_raw  = model.predict_proba(feats[model.feature_names_in_])[:,1]
    swing_m = feats["description"].str.contains(swing_regex, case=False, na=False)
    feats["prob"] = temp_scaled(pp_raw) * swing_m.values.astype(float)

    # aggregate by PA
    pa_probs = feats.groupby("pa_id")["prob"].apply(lambda x: 1 - np.prod(1 - x))
    game_probs.append(1 - np.prod(1 - pa_probs.values))

    # accumulate pitch-type contribution
    for pt, grp in feats.groupby("pitch_type"):
        pt_prob = 1 - np.prod(1 - grp["prob"].values)
        pitch_type_totals[pt] += pt_prob

# 6 ░ REPORT ░
p_mean = np.mean(game_probs)
p_lo, p_hi = np.percentile(game_probs, [10, 90])

if pitch_type_totals:
    total = sum(pitch_type_totals.values())
    pitch_rank = sorted([(pt, v/total) for pt,v in pitch_type_totals.items()],
                        key=lambda x: x[1], reverse=True)
    top_pitch, top_share = pitch_rank[0]
    pitch_msg = f"Most likely pitch type for the HR: {top_pitch}  ({top_share*100:.1f}% share)"
else:
    pitch_msg = "Pitch-type breakdown unavailable (no HR risk accumulated)"

print(f"\n📊  {batter_name.title()} ({bat_side}) vs {pitcher_name.title()} ({p_throw}) — {TARGET_DATE}")
print(f"    HR probability: {p_mean*100:.1f}%  (80% CI {p_lo*100:.1f}–{p_hi*100:.1f})")
print(f"    {pitch_msg}")
print(f"    Based on {MC:,} Monte-Carlo runs, {N_PA} PA each.")
