Data Collection:

-Using PyBaseball package to collect pitch data from 2022

In [4]:
import sys
print(sys.executable)
from pybaseball import statcast



c:\Users\cpim1\AppData\Local\Programs\Python\Python313\python.exe


In [5]:
from datetime import date, timedelta
import os
import time
from typing import List, Tuple
import io
import sys
import contextlib
import warnings

import numpy as np
import pandas as pd
from pybaseball import statcast


In [7]:
# -------------------------------
# Quiet mode
# -------------------------------
# Disable tqdm progress bars used internally
os.environ["TQDM_DISABLE"] = "1"
# Silence pybaseball/pandas FutureWarnings etc.
warnings.filterwarnings("ignore", module="pybaseball")
warnings.filterwarnings("ignore", category=FutureWarning)

@contextlib.contextmanager
def _suppress_stdout_stderr():
    """Temporarily redirect stdout/stderr to devnull (tqdm writes to stderr)."""
    with open(os.devnull, 'w') as devnull:
        old_out, old_err = sys.stdout, sys.stderr
        try:
            sys.stdout = devnull
            sys.stderr = devnull
            yield
        finally:
            sys.stdout = old_out
            sys.stderr = old_err


# -------------------------------
# Config
# -------------------------------
TRAIN_RANGES: List[Tuple[date, date]] = [
    (date(2021, 4, 1),  date(2021, 10, 3)),
    (date(2022, 4, 7),  date(2022, 10, 5)),
    (date(2023, 3, 30), date(2023, 10, 1)),
    (date(2024, 3, 28), date(2024, 9, 29)),
]
TEST_RANGES: List[Tuple[date, date]] = [
    (date(2025, 3, 27), date(2025, 8, 31)),
]

CHUNK_DAYS = 5
REQUEST_SLEEP_SEC = 1.0
MAX_RETRIES = 3
RETRY_SLEEP_SEC = 3.0

OUTPUT_DIR = "output"
TRAIN_OUT = os.path.join(OUTPUT_DIR, "train.csv")
TEST_OUT  = os.path.join(OUTPUT_DIR, "test.csv")

In [8]:
# -------------------------------
# Utilities
# -------------------------------
def generate_chunks(start_date: date, end_date: date, span_days: int = CHUNK_DAYS) -> List[Tuple[date, date]]:
    chunks = []
    cur = start_date
    delta = timedelta(days=span_days - 1)
    while cur <= end_date:
        nxt = min(cur + delta, end_date)
        chunks.append((cur, nxt))
        cur = nxt + timedelta(days=1)
    return chunks


def fetch_statcast_window(start_dt: str, end_dt: str) -> pd.DataFrame:
    last_err = None
    for _ in range(MAX_RETRIES):
        try:
            with _suppress_stdout_stderr():
                return statcast(start_dt=start_dt, end_dt=end_dt)
        except Exception as e:
            last_err = e
            time.sleep(RETRY_SLEEP_SEC)
    # On permanent failure return empty frame silently
    return pd.DataFrame()


def collect_ranges(ranges: List[Tuple[date, date]]) -> pd.DataFrame:
    frames = []
    for s, e in ranges:
        for a, b in generate_chunks(s, e, CHUNK_DAYS):
            df = fetch_statcast_window(a.strftime("%Y-%m-%d"), b.strftime("%Y-%m-%d"))
            if df is not None and not df.empty:
                frames.append(df)
            time.sleep(REQUEST_SLEEP_SEC)
    if not frames:
        return pd.DataFrame()
    out = pd.concat(frames, ignore_index=True)
    if "game_date" in out.columns:
        out["game_date"] = pd.to_datetime(out["game_date"], errors="coerce")
    return out


def compute_vaa_row(row: pd.Series) -> float:
    for k in ("vy0", "ay", "vz0", "az"):
        if pd.isna(row.get(k)):
            return np.nan
    vy0, ay, vz0, az = row["vy0"], row["ay"], row["vz0"], row["az"]
    if pd.isna(vy0) or pd.isna(ay) or pd.isna(vz0) or pd.isna(az) or ay == 0:
        return np.nan
    y0, yf = 50.0, 17.0 / 12.0
    vy_f_sq = vy0**2 - 2.0 * ay * (y0 - yf)
    if vy_f_sq < 0:
        return np.nan
    vy_f = -np.sqrt(vy_f_sq)
    t = (vy_f - vy0) / ay
    if not np.isfinite(t) or t <= 0:
        return np.nan
    vz_f = vz0 + az * t
    if vy_f == 0:
        return np.nan
    return float(-np.degrees(np.arctan(vz_f / vy_f)))


def add_vaa(df: pd.DataFrame) -> pd.DataFrame:
    if df.empty:
        return df
    df["vaa"] = df.apply(compute_vaa_row, axis=1)
    return df


def sort_and_dedup(df: pd.DataFrame) -> pd.DataFrame:
    if df.empty:
        return df
    sort_cols = [c for c in ["game_date", "game_pk", "at_bat_number", "pitch_number"] if c in df.columns]
    if sort_cols:
        df = df.sort_values(sort_cols, kind="mergesort")
    dedup_keys = [c for c in ["game_pk", "at_bat_number", "pitch_number"] if c in df.columns]
    if dedup_keys:
        df = df.drop_duplicates(subset=dedup_keys, keep="first")
    else:
        df = df.drop_duplicates(keep="first")
    return df.reset_index(drop=True)


def ensure_dir(path: str):
    os.makedirs(os.path.dirname(path), exist_ok=True)

In [9]:
# -------------------------------
# Main
# -------------------------------
def main():
    ensure_dir(TRAIN_OUT)
    ensure_dir(TEST_OUT)

    # TRAIN
    train_df = collect_ranges(TRAIN_RANGES)
    train_df = add_vaa(train_df)
    train_df = sort_and_dedup(train_df)
    train_df.to_csv(TRAIN_OUT, index=False)
    print(f"✅ Wrote {len(train_df):,} rows to {TRAIN_OUT}")

    # TEST
    test_df = collect_ranges(TEST_RANGES)
    test_df = add_vaa(test_df)
    test_df = sort_and_dedup(test_df)
    test_df.to_csv(TEST_OUT, index=False)
    print(f"✅ Wrote {len(test_df):,} rows to {TEST_OUT}")


if __name__ == "__main__":
    main()

✅ Wrote 2,844,586 rows to output\train.csv
✅ Wrote 599,491 rows to output\test.csv


In [10]:
from pathlib import Path
print("CWD:", Path.cwd())
p = Path("output/train.csv").resolve()
print("File:", p)
print("Exists?", p.exists())

CWD: c:\Users\cpim1\Downloads
File: C:\Users\cpim1\Downloads\output\train.csv
Exists? True


In [60]:
import pandas as pd

df = pd.read_csv("output/train.csv")
print(df.columns.to_list())

['pitch_type', 'game_date', 'release_speed', 'release_pos_x', 'release_pos_z', 'player_name', 'batter', 'pitcher', 'events', 'description', 'spin_dir', 'spin_rate_deprecated', 'break_angle_deprecated', 'break_length_deprecated', 'zone', 'des', 'game_type', 'stand', 'p_throws', 'home_team', 'away_team', 'type', 'hit_location', 'bb_type', 'balls', 'strikes', 'game_year', 'pfx_x', 'pfx_z', 'plate_x', 'plate_z', 'on_3b', 'on_2b', 'on_1b', 'outs_when_up', 'inning', 'inning_topbot', 'hc_x', 'hc_y', 'tfs_deprecated', 'tfs_zulu_deprecated', 'umpire', 'sv_id', 'vx0', 'vy0', 'vz0', 'ax', 'ay', 'az', 'sz_top', 'sz_bot', 'hit_distance_sc', 'launch_speed', 'launch_angle', 'effective_speed', 'release_spin_rate', 'release_extension', 'game_pk', 'fielder_2', 'fielder_3', 'fielder_4', 'fielder_5', 'fielder_6', 'fielder_7', 'fielder_8', 'fielder_9', 'release_pos_y', 'estimated_ba_using_speedangle', 'estimated_woba_using_speedangle', 'woba_value', 'woba_denom', 'babip_value', 'iso_value', 'launch_speed_a

In [61]:

unique_pitches = sorted(df["pitch_type"].dropna().unique().tolist())
print(unique_pitches)


['CH', 'CS', 'CU', 'EP', 'FA', 'FC', 'FF', 'FO', 'FS', 'KC', 'KN', 'PO', 'SC', 'SI', 'SL', 'ST', 'SV']


In [62]:
# Make "is_[pitchtype]" columns


PITCH_TYPE_FLAGS = [
    "is_CH","is_CS","is_CU","is_EP","is_FA","is_FC","is_FF","is_FO","is_FS",
    "is_KC","is_KN","is_PO","is_SC","is_SI","is_SL","is_ST","is_SV"
]

# optional: map ambiguous labels -> your canonical set
_PITCH_MAP = {
    "FA": "FF",   # four-seam
    "ST": "SV",   # sweeper
}

def add_pitch_type_flags(df: pd.DataFrame, col: str = "pitch_type") -> pd.DataFrame:
    # clean source column
    pt = (df[col].astype("string").str.strip().str.upper())
    pt = pt.replace(_PITCH_MAP)  # normalize aliases

    # one-hot to is_* names
    dummies = pd.get_dummies(pt, prefix="is", dtype="uint8")

    # ensure every desired flag column exists
    for flag in PITCH_TYPE_FLAGS:
        if flag not in dummies.columns:
            dummies[flag] = 0

    # keep only desired order
    dummies = dummies[PITCH_TYPE_FLAGS]

    # drop any existing is_* to avoid duplicates, then join fresh
    df = df.drop(columns=[c for c in df.columns if c.startswith("is_")], errors="ignore").join(dummies)

    return df

# ----- usage -----
df = pd.read_csv("output/train.csv")
df = add_pitch_type_flags(df)

# quick check
print(PITCH_TYPE_FLAGS)  # your exact list/order
print(df[PITCH_TYPE_FLAGS].sum().sort_values(ascending=False).head(10))
# e.g. count of changeups:
print("CH count:", int(df["is_CH"].sum()))


['is_CH', 'is_CS', 'is_CU', 'is_EP', 'is_FA', 'is_FC', 'is_FF', 'is_FO', 'is_FS', 'is_KC', 'is_KN', 'is_PO', 'is_SC', 'is_SI', 'is_SL', 'is_ST', 'is_SV']
is_FF    947500.0
is_SL    463669.0
is_SI    439449.0
is_CH    309410.0
is_FC    216938.0
is_CU    196785.0
is_SV    143853.0
is_FS     60817.0
is_KC     59991.0
is_EP      1769.0
dtype: float64
CH count: 309410


In [63]:
df["is_CH"].sum()  # changeups

np.uint64(309410)

In [64]:
# Make columns for pitcher throwing hand

# normalize text first
pt = (df["p_throws"]
        .astype("string")
        .str.strip()
        .str.upper())

# binary flags (uint8 saves memory)
df["is_R"] = (pt == "R").astype("uint8")
df["is_L"] = (pt == "L").astype("uint8")

# quick sanity checks
print(df[["p_throws","is_R","is_L"]].head(10))
print("R count:", int(df["is_R"].sum()), " | L count:", int(df["is_L"].sum()))


  p_throws  is_R  is_L
0        R     1     0
1        R     1     0
2        R     1     0
3        R     1     0
4        R     1     0
5        R     1     0
6        R     1     0
7        R     1     0
8        R     1     0
9        R     1     0
R count: 2060796  | L count: 783790


In [65]:
# Make column for batter handedness
st = (df["stand"]
        .astype("string")
        .str.strip()
        .str.upper())

df["bat_is_R"] = (st == "R").astype("uint8")
df["bat_is_L"] = (st == "L").astype("uint8")
print(df[["bat_is_R","is_R","is_L"]].head(10))


   bat_is_R  is_R  is_L
0         1     1     0
1         1     1     0
2         1     1     0
3         1     1     0
4         1     1     0
5         1     1     0
6         0     1     0
7         1     1     0
8         1     1     0
9         1     1     0
