# Baseball Analytics Technical Questions

This notebook contains solutions to baseball analytics interview questions covering:
- Q1: Streaming pitch data with rolling features
- Q2: Model serving with batched inference
- Q3: SQL deduplication queries
- Q4: System design for ML pipeline
- Q5: In-game win probability updates

---
## Q1 – Streaming Pitch Data → Rolling Features

**Problem**: Maintain per-pitcher rolling statistics over the last 100 pitches from a stream of pitch events.

**Requirements**:
- Rolling average velocity
- Rolling zone rate (is_in_zone)
- Rolling whiff rate (is_whiff over swings)
- O(1) amortized update time
- Bounded memory
- Handle multiple pitchers interleaved

In [8]:
"""
Q1 - Streaming Pitch Data → Rolling Features

Maintain per-pitcher rolling statistics over the last 100 pitches.
O(1) amortized updates with bounded memory.
"""

from collections import deque, defaultdict
import random


class PitcherRollingStats:
    def __init__(self, window=100):
        """
        Initialize the rolling stats tracker.

        Args:
            window: Number of recent pitches to track (default: 100)
        """
        self.window = window
        self.pitch_data = defaultdict(lambda: deque(maxlen=self.window))

    def update(self, event):
        """
        Ingest one pitch event and return current rolling stats for that pitcher.

        Args:
            event: dict with keys: game_id, pitcher_id, batter_id, inning,
                   pitch_number, pitch_type, velocity, spin_rate, is_in_zone,
                   is_swing, is_ball_in_play, is_strike, is_whiff

        Returns:
            dict: Rolling statistics for the pitcher
        """
        pitcher_id = event['pitcher_id']
        dq = self.pitch_data[pitcher_id]
        dq.append(event)  # auto-pops from left if >100

        # Compute rolling average velocity
        total_vel = sum(p['velocity'] for p in dq)
        mean_vel = total_vel / len(dq)

        # Zone rate: fraction where is_in_zone == 1
        in_zone_count = sum(p['is_in_zone'] for p in dq)
        zone_rate = in_zone_count / len(dq)

        # Whiff rate: whiffs / swings, but only for pitches with is_swing
        swings = sum(p['is_swing'] for p in dq)
        whiffs = sum(p['is_whiff'] for p in dq)
        whiff_rate = (whiffs / swings) if swings > 0 else 0.0

        return {
            'pitcher_id': pitcher_id,
            'mean_velocity': mean_vel,
            'zone_rate': zone_rate,
            'whiff_rate': whiff_rate,
            'pitch_count': len(dq)
        }


def generate_sample_pitch():
    """Generate a single sample pitch event."""
    pitch_types = ['FF', 'SL', 'CH', 'CU', 'SI']
    is_swing = random.random() < 0.45
    is_whiff = is_swing and (random.random() < 0.25)

    return {
        'game_id': f"G{random.randint(1, 5)}",
        'pitcher_id': f"P{random.randint(1, 3)}",
        'batter_id': f"B{random.randint(1, 20)}",
        'inning': random.randint(1, 9),
        'pitch_number': 0,  # Will be set by caller
        'pitch_type': random.choice(pitch_types),
        'velocity': round(random.uniform(85, 98), 1),
        'spin_rate': random.randint(2000, 2800),
        'is_in_zone': random.random() < 0.48,
        'is_swing': is_swing,
        'is_ball_in_play': is_swing and (random.random() < 0.75),
        'is_strike': random.random() < 0.52,
        'is_whiff': is_whiff
    }


def main():
    """Example usage of PitcherRollingStats."""
    print("Baseball Analytics Q1 - Rolling Pitch Statistics")
    print("=" * 60)

    # Initialize tracker
    stats_tracker = PitcherRollingStats(window=100)

    # Simulate pitch stream
    num_pitches = 300
    print(f"\nProcessing {num_pitches} pitches from 3 pitchers...")

    pitcher_stats = {}

    for i in range(num_pitches):
        pitch = generate_sample_pitch()
        pitch['pitch_number'] = i

        stats = stats_tracker.update(pitch)
        pitcher_stats[stats['pitcher_id']] = stats

    # Display final results
    print("\n" + "=" * 60)
    print("Final Rolling Statistics (last 100 pitches per pitcher)")
    print("=" * 60)

    for pitcher_id in sorted(pitcher_stats.keys()):
        stats = pitcher_stats[pitcher_id]
        print(f"\n{pitcher_id}:")
        print(f"  Pitch Count:     {stats['pitch_count']}")
        print(f"  Avg Velocity:    {stats['mean_velocity']:.1f} mph")
        print(f"  Zone Rate:       {stats['zone_rate']:.1%}")
        print(f"  Whiff Rate:      {stats['whiff_rate']:.1%}")

    print("\n" + "=" * 60)
    print("[OK] All pitchers processed with O(1) amortized updates")
    print("[OK] Memory bounded at 100 pitches per pitcher")


if __name__ == "__main__":
    main()


Baseball Analytics Q1 - Rolling Pitch Statistics

Processing 300 pitches from 3 pitchers...

Final Rolling Statistics (last 100 pitches per pitcher)

P1:
  Pitch Count:     100
  Avg Velocity:    91.4 mph
  Zone Rate:       37.0%
  Whiff Rate:      18.4%

P2:
  Pitch Count:     89
  Avg Velocity:    91.7 mph
  Zone Rate:       48.3%
  Whiff Rate:      20.5%

P3:
  Pitch Count:     100
  Avg Velocity:    91.5 mph
  Zone Rate:       52.0%
  Whiff Rate:      22.7%

[OK] All pitchers processed with O(1) amortized updates
[OK] Memory bounded at 100 pitches per pitcher


Q2 – Model serving: batched inference API with timeouts

You’re serving an xG-style model that takes fixed-length feature vectors for shots and returns a probability. You want to batch requests for GPU efficiency, but you also need latency under 100ms per request.

Design a simple Python API layer (no need for full FastAPI boilerplate) that:

Accepts individual prediction requests from multiple threads.

Batches them into tensors of size up to 128.

Ensures no request waits longer than 50ms in the queue before being run.

Show:

How you’d structure the worker thread / queue.

A predict(features) function that a caller would use.

How you’d handle shutdown cleanly.

(This is exactly the kind of “bridge between Baseball Analytics models and Baseball Systems tools” they care about.)

In [10]:
from queue import Queue, Empty
from threading import Thread, Event
import time

class PredictionRequest:
    def __init__(self, features):
        self.features = features
        self.event = Event()
        self.result = None

class BatchedPredictor:
    def __init__(self, model, max_batch_size=128, max_wait_ms=50):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        self.queue = Queue()
        self._stop = Event()
        self.worker = Thread(target=self._batch_worker, daemon=True)
        self.worker.start()

    def predict(self, features):
        req = PredictionRequest(features)
        self.queue.put(req)
        req.event.wait()
        return req.result

    def _batch_worker(self):
        while not self._stop.is_set():
            batch = []
            batch_start = time.time()
            try:
                req = self.queue.get(timeout=0.05)
                batch.append(req)
                while len(batch) < self.max_batch_size:
                    wait_time = self.max_wait_ms / 1000 - (time.time() - batch_start)
                    if wait_time <= 0:
                        break
                    try:
                        req = self.queue.get(timeout=wait_time)
                        batch.append(req)
                    except Empty:
                        break
            except Empty:
                continue

            if batch:
                features_batch = [r.features for r in batch]
                preds = self.model.predict(features_batch)
                for req, pred in zip(batch, preds):
                    req.result = pred
                    req.event.set()

    def shutdown(self):
        self._stop.set()
        self.worker.join()


# Smoke test: simple dummy model
class DummyModel:
    def predict(self, batch):
        return [sum(f) for f in batch]

if __name__ == "__main__":
    import threading

    scorer = BatchedPredictor(DummyModel(), max_batch_size=4, max_wait_ms=20)

    results = []
    def worker(i):
        out = scorer.predict([i, i+1, i+2])
        results.append((i, out))

    # Happy path: less than batch size
    t1 = threading.Thread(target=worker, args=(1,))
    t2 = threading.Thread(target=worker, args=(7,))
    t1.start(); t2.start(); t1.join(); t2.join()
    print("Happy path:", results)

    # Edge: lots of requests, check batching and no more than batch size per batch
    results.clear()
    threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
    for t in threads: t.start()
    for t in threads: t.join()
    print("Stress test:", results)

    scorer.shutdown()


Happy path: [(1, 6), (7, 24)]
Stress test: [(0, 3), (1, 6), (2, 9), (3, 12), (4, 15), (5, 18), (6, 21), (7, 24), (8, 27), (9, 30)]


---
## Q3 – SQL: Deduplicate and Choose Latest Model Version

**Problem**: Query a model runs table to find the latest run per model version, then find the best version per family.

**Schema**:
```sql
model_runs(
  model_family   TEXT,
  model_version  TEXT,
  run_ts         TIMESTAMP,
  elpd           DOUBLE PRECISION
);
```

In [None]:
# TODO: Write SQL queries
# Query 1: Latest run per (family, version)
# Query 2: Best version by ELPD for each model_family
import duckdb




-- Latest run per (model_family, model_version)
SELECT
  model_family,
  model_version,
  run_ts,
  elpd
FROM (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY model_family, model_version
      ORDER BY run_ts DESC
    ) AS rn
  FROM model_runs
) t
WHERE rn = 1;

-- Best model_version per model_family by ELPD
WITH latest_per_version AS (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY model_family, model_version
      ORDER BY run_ts DESC
    ) AS rn
  FROM model_runs
)
SELECT
  model_family,
  model_version,
  run_ts,
  elpd
FROM (
  SELECT *,
    ROW_NUMBER() OVER (
      PARTITION BY model_family
      ORDER BY elpd DESC
    ) AS r_best
  FROM latest_per_version
  WHERE rn = 1
) x
WHERE r_best = 1;



---
## Q4 – System Design: End-to-End ML Pipeline

**Problem**: Design a production ML pipeline for player projection models.

**Requirements**:
- Nightly data pulls from Snowflake
- Model training/refresh based on drift/schedule
- Evaluation vs current production model
- Automated promotion and API exposure
- Versioning, rollback, and monitoring

In [None]:
# TODO: Design document or architecture diagram
# Include: tools, versioning strategy, rollback plan, monitoring approach

---
## Q5 – Baseball-Flavored Coding: In-Game Win Probability Update

**Problem**: Update win probability during a game based on plate appearance outcomes.

**Inputs**:
- Run expectancy table: (outs, base_state) → expected_runs_remaining
- Before/after game states from a plate appearance

In [13]:
import csv
import math

def load_run_expectancy(path):
    re_dict = {}
    with open(path, newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            key = (int(row["outs"]), row["base_state"])
            re_dict[key] = float(row["expected_runs"])
    return re_dict

def run_value(re, before, after, runs_scored):
    key_before = (before["outs"], before["base_state"])
    key_after = (after["outs"], after["base_state"])
    re_before = re.get(key_before, 0.0)
    re_after = re.get(key_after, 0.0)
    return (re_after + runs_scored) - re_before

def features_for_wp(state, cum_run_value):
    return [
        1.0,
        state["inning"],
        int(state["top"]),
        state["score_diff"],
        cum_run_value,
    ]

def sigmoid(x):
    return 1 / (1 + math.exp(-x))

def win_prob(beta, state, cum_run_value):
    x = features_for_wp(state, cum_run_value)
    z = sum(b * xi for b, xi in zip(beta, x))
    return sigmoid(z)

# SMOKE TEST
RE_EXAMPLE = {
    (1, "100"): 0.88,
    (1, "000"): 0.28,
}

before = {"inning": 7, "top": True, "score_diff": -1, "outs": 1, "base_state": "100"}
after  = {"inning": 7, "top": True, "score_diff": 0,  "outs": 1, "base_state": "000"}
runs_scored = 1

rv = run_value(RE_EXAMPLE, before, after, runs_scored)
print(f"Run Value: {rv:.2f}")  # Should be (0.28 + 1) - 0.88 = 0.40

beta = [0.2, 0.05, -0.05, 0.3, 0.1]  # Fake model weights
wp = win_prob(beta, after, cum_run_value=rv)
print(f"Win Probability: {wp:.2%}")

# Edge case: missing base_state/out combo
after_bad = after.copy()
after_bad["base_state"] = "999"  # Not in RE table
rv_bad = run_value(RE_EXAMPLE, before, after_bad, runs_scored)
print("Run Value (bad base_state):", rv_bad)

# Stress-ish: multiple inning WP updates
cum_rv = 0.0
for inning in range(7, 10):
    state = {"inning": inning, "top": False, "score_diff": inning - 7, "outs": 1, "base_state": "000"}
    cum_rv += 0.15
    print(f"Inning {inning}: WP = {win_prob(beta, state, cum_run_value=cum_rv):.2%}")


Run Value: 0.40
Win Probability: 63.18%
Run Value (bad base_state): 0.12
Inning 7: WP = 63.76%
Inning 8: WP = 71.71%
Inning 9: WP = 78.50%
