In [1]:
# 1. Imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import re

# modeling
from xgboost import XGBRegressor
from xgboost import XGBClassifier

from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from scipy.special import logit, expit

# PyTorch for conversion model
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

# nfl pbp loader
import nfl_data_py as nfl

# reproducibility
np.random.seed(42)
torch.manual_seed(42)

<torch._C.Generator at 0x2d754c27cf0>

In [2]:
# 2a. Download PBP (this can take a few minutes)
seasons = [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2024]
print("Loading play-by-play for seasons:", seasons)
raw_pbp = nfl.import_pbp_data(seasons, downcast=False)  # returns a DataFrame (may be large)

print("Rows loaded:", raw_pbp.shape[0])
raw_pbp.head()

Loading play-by-play for seasons: [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2024]
2015 done.
2016 done.
2017 done.
2018 done.
2019 done.
2020 done.
2021 done.
2022 done.
2024 done.
Rows loaded: 433940


Unnamed: 0,play_id,game_id,old_game_id,home_team,away_team,season_type,week,posteam,posteam_type,defteam,...,was_pressure,route,defense_man_zone_type,defense_coverage_type,offense_names,defense_names,offense_positions,defense_positions,offense_numbers,defense_numbers
0,1.0,2015_01_BAL_DEN,2015091309,DEN,BAL,REG,1,,,,...,,,,,,,,,,
1,36.0,2015_01_BAL_DEN,2015091309,DEN,BAL,REG,1,BAL,away,DEN,...,,,,,,,,,,
2,51.0,2015_01_BAL_DEN,2015091309,DEN,BAL,REG,1,BAL,away,DEN,...,,,,,,,,,,
3,75.0,2015_01_BAL_DEN,2015091309,DEN,BAL,REG,1,BAL,away,DEN,...,,,,,,,,,,
4,96.0,2015_01_BAL_DEN,2015091309,DEN,BAL,REG,1,BAL,away,DEN,...,,,,,,,,,,


In [3]:
def parse_weather(weather_str):
    """
    Parses a weather string into structured features:
        - temp_F: float
        - humidity: float (percentage)
        - wind_mph: float
        - wind_dir: str
        - conditions: str (general description, e.g., 'sunny', 'cloudy', etc.)
    """
    result = {
        "temp_F": None,
        "humidity": None,
        "wind_mph": None,
        "wind_dir": None,
        "conditions": None
    }
    
    if not isinstance(weather_str, str):
        return result
    
    lower_str = weather_str.lower()
    
    # Extract temperature
    temp_match = re.search(r'(\d+)\s*°?\s*f', lower_str)
    if temp_match:
        result['temp_F'] = float(temp_match.group(1))
    
    # Extract humidity
    hum_match = re.search(r'humidity[:\s]*(\d+)%', lower_str)
    if hum_match:
        result['humidity'] = float(hum_match.group(1))
    
    # Extract wind speed and direction
    wind_match = re.search(r'wind[:\s]*([nesw]+)\s*(\d+)\s*mph', lower_str)
    if wind_match:
        result['wind_dir'] = wind_match.group(1).upper()
        result['wind_mph'] = float(wind_match.group(2))
    
    # Extract general conditions
    conditions = []
    for cond in ['sunny', 'cloudy', 'clear', 'rain', 'snow', 'fog', 'drizzle', 'storm', 'windy']:
        if cond in lower_str:
            conditions.append(cond)
    if conditions:
        result['conditions'] = ','.join(conditions)
    
    return result


def deconstruct_weather(df, weather_col='weather'):
    """
    Adds structured weather columns to a DataFrame based on a weather string column.
    
    New columns added:
      - temp_F
      - humidity
      - wind_mph
      - wind_dir
      - conditions
    """
    weather_data = df[weather_col].apply(parse_weather)
    weather_df = pd.DataFrame(weather_data.tolist())
    df = pd.concat([df.reset_index(drop=True), weather_df], axis=1)
    
    # Fill missing wind speeds with 0
    df['wind_mph'] = df['wind_mph'].fillna(0)

    # Fill missing temperatures with 60°F
    df['temp_F'] = df['temp_F'].fillna(60)

    return df

In [4]:
pbp = raw_pbp.copy()
pbp = deconstruct_weather(pbp)

In [5]:
pbp = pbp[pbp.penalty == 0]  # remove penalty plays

# --- Drop rows missing home/away WP ---
wp_df = pbp.dropna(subset=["home_wp_post", "away_wp_post"]).copy()

# --- Define features ---
wp_df["score_time_ratio"] = wp_df["score_differential"].abs() / (wp_df["game_seconds_remaining"] + 1)
wp_features = [
    "yardline_100",
    "down",
    "ydstogo",
    "game_seconds_remaining",
    "half_seconds_remaining",
    "score_differential",
    "posteam_timeouts_remaining",
    "defteam_timeouts_remaining",
    "score_time_ratio",
    "temp_F",
    "wind_mph"
]

# --- Define posteam WP target ---
wp_df["wp_target"] = np.where(
    wp_df["posteam"] == wp_df["home_team"],
    wp_df["home_wp_post"],
    wp_df["away_wp_post"]
)

X_wp = wp_df[wp_features]
y_wp = wp_df["wp_target"]

# --- Clip target to avoid exact 0/1 ---
epsilon = 1e-6
y_wp_clipped = y_wp.clip(epsilon, 1 - epsilon)

# --- Train-test split ---
Xwp_train, Xwp_test, ywp_train, ywp_test = train_test_split(
    X_wp, y_wp_clipped, test_size=0.2, random_state=42
)

# --- Monotone constraints (time variables unconstrained now) ---
monotone_constraints_dict = {
    "yardline_100": -1,              # closer to opponent endzone → WP ↑
    "down": -1,                       # higher down (worse) → WP ↓
    "ydstogo": -1,                    # more yards to go → WP ↓
    "score_differential": 1,          # lead → WP ↑
    "posteam_timeouts_remaining": 1,  # more TOs → WP ↑
    "defteam_timeouts_remaining": -1  # opponent TOs → WP ↓
}

# --- Train XGBoost regressor ---
wp_model = XGBRegressor(
    n_estimators=200,
    max_depth=5,
    learning_rate=0.05,
    subsample=0.8,
    verbosity=0,
    monotone_constraints=monotone_constraints_dict
)

wp_model.fit(Xwp_train, ywp_train, eval_set=[(Xwp_test, ywp_test)], verbose=False)

# --- Evaluate test RMSE ---
preds = wp_model.predict(Xwp_test)
rmse = np.sqrt(np.mean((preds - y_wp.loc[Xwp_test.index].values)**2))
print("WP test RMSE:", rmse)

# --- Helper function ---
def predict_wp(state_df):
    # Compute score_time_ratio if missing
    if "score_time_ratio" not in state_df:
        state_df["score_time_ratio"] = state_df["score_differential"].abs() / (state_df["game_seconds_remaining"] + 1)
    
    preds = wp_model.predict(state_df[wp_features])
    return np.clip(preds, 0.0, 1.0)

WP test RMSE: 0.059866887648509975


In [6]:
def flip_possession(state):
    """
    Flip possession for WP prediction.
    Returns a new state dict from the perspective of the new posteam.
    """
    new_state = state.copy()
    # Negate score differential
    new_state["score_differential"] = -state["score_differential"]
    # Swap timeouts
    new_state["posteam_timeouts_remaining"] = state["defteam_timeouts_remaining"]
    new_state["defteam_timeouts_remaining"] = state["posteam_timeouts_remaining"]
    return new_state

In [7]:
# Filter to punts only
punt_df = pbp[pbp.play_type == "punt"].dropna(subset=["kick_distance", "return_yards"]).copy()

# Compute net punt yardage: kick distance minus return yards, adjust for touchbacks (if available)
# Assuming touchback puts ball at 20-yard line
punt_df["net_punt"] = punt_df["kick_distance"] - punt_df["return_yards"]
punt_df.loc[punt_df["touchback"] == 1, "net_punt"] = punt_df["yardline_100"] - 20

# Features to predict net punt
punt_features = [
    "yardline_100", 
    "game_seconds_remaining", 
    "half_seconds_remaining",
    "score_differential",
    "posteam_timeouts_remaining",
    "defteam_timeouts_remaining",
    "temp_F",
    "wind_mph"
]

X_punt = punt_df[punt_features]
y_punt = punt_df["net_punt"]

# Train-test split
Xp_train, Xp_test, yp_train, yp_test = train_test_split(
    X_punt, y_punt, test_size=0.2, random_state=42
)

# Fit XGBoost regressor
punt_model = XGBRegressor(
    n_estimators=200,
    max_depth=4,
    learning_rate=0.05,
    subsample=0.8,
    verbosity=0
)
punt_model.fit(Xp_train, yp_train, eval_set=[(Xp_test, yp_test)], verbose=False)

# Quick evaluation
preds = punt_model.predict(Xp_test)
rmse = np.sqrt(np.mean((preds - yp_test.values)**2))
print("Punt model test RMSE:", rmse)

# Utility function
def predict_punt(state_df):
    """state_df: pandas DataFrame with punt_features as columns"""
    raw_pred = punt_model.predict(state_df)
    # Cap net punt at yardline_100
    capped_pred = np.minimum(raw_pred, state_df["yardline_100"].values)
    return capped_pred

Punt model test RMSE: 10.031953656367254


In [8]:
def evaluate_punt(state):
    """
    Evaluate a punt attempt.
    Returns expected WP for the punting team after the punt.
    """
    df = pd.DataFrame([state])
    wp_current = predict_wp(df)[0] 

    # Predict net punt
    net_punt = predict_punt(df[punt_features])[0]

    # Compute receiving team's field position
    if net_punt >= state["yardline_100"]:
        new_yardline_100 = 80  # Touchback → opponent at own 20
    else:
        new_yardline_100 = 100 - (state["yardline_100"] - net_punt)

    # Construct post-punt state for opponent
    punt_result_state = state.copy()
    punt_result_state["yardline_100"] = new_yardline_100
    punt_result_state["down"] = 1
    punt_result_state["ydstogo"] = 10

    # Flip possession
    punt_result_state_flipped = flip_possession(punt_result_state)
    punt_result_state_flipped['half_seconds_remaining'] = max(0, punt_result_state_flipped['half_seconds_remaining'] - 8)
    punt_result_state_flipped['game_seconds_remaining'] = max(0, punt_result_state_flipped['game_seconds_remaining'] - 8)

    # Opponent WP
    wp_opponent = predict_wp(pd.DataFrame([punt_result_state_flipped]))[0]

    # Punting team's expected WP
    ewp_punt = 1 - wp_opponent
    
    # Account for late-game scenario
    if state['score_differential'] <= -1:
        # Assume each timeout saves 42 seconds
        if state['game_seconds_remaining'] <= 42 * (3 - state['posteam_timeouts_remaining']):
                ewp_punt = 0

    return {
        "wp_current": round(wp_current, 4),
        "net_punt": round(net_punt, 4),
        "ewp_punt": round(ewp_punt, 4)
    }

In [9]:
# --- Filter to field goal attempts only ---
fg_df = pbp[pbp.play_type == "field_goal"].dropna(subset=["field_goal_result"]).copy()
fg_df = fg_df[fg_df.field_goal_result.isin(['made', 'missed', 'blocked'])]
fg_df = fg_df[fg_df.kick_distance >= 17]

# --- Map result to 1/0 ---
fg_df["fg_made"] = (fg_df["field_goal_result"] == "made").astype(int)

# Field goal
fg_df["score_time_ratio"] = fg_df["score_differential"].abs() / (fg_df["game_seconds_remaining"] + 1)
fg_features = [
    "kick_distance",
    "game_seconds_remaining",
    "half_seconds_remaining",
    "score_differential",
    "score_time_ratio",
    "temp_F",
    "wind_mph"
]

X_fg = fg_df[fg_features]
y_fg = fg_df["fg_made"]

# --- Train-test split ---
Xfg_train, Xfg_test, yfg_train, yfg_test = train_test_split(
    X_fg, y_fg, test_size=0.2, random_state=42
)

# --- Logistic Regression model ---
fg_model_lr = LogisticRegression(
    solver='lbfgs',
    max_iter=1000
)

fg_model_lr.fit(Xfg_train, yfg_train)

# --- Quick evaluation ---
preds = fg_model_lr.predict_proba(Xfg_test)[:, 1]
rmse = np.sqrt(np.mean((preds - yfg_test.values)**2))
print("FG (LogReg) test RMSE:", rmse)

# --- Prediction function ---
def predict_fg(state_dict):
    # Convert dict to dataframe
    df = pd.DataFrame([state_dict])
    
    if "score_time_ratio" not in df:
        df["score_time_ratio"] = df["score_differential"].abs() / (df["game_seconds_remaining"] + 1)
    
    p_make = fg_model_lr.predict_proba(df[fg_features])[:, 1][0]
    
    # Smoothly decay for very long kicks
    decay_threshold = 55
    kick_distance = state_dict["kick_distance"]
    if kick_distance >= decay_threshold:
        p_make *= max(0, (70 - kick_distance) / (70 - decay_threshold))
    
    return np.clip(p_make, 0.0, 1.0)

FG (LogReg) test RMSE: 0.3398156053648387


In [10]:
def evaluate_field_goal(state):
    """Evaluates a field goal attempt."""
    df = pd.DataFrame([state])
    wp_current = predict_wp(df)[0]
    
    fg_state = state.copy()
    fg_state['kick_distance'] = fg_state['yardline_100'] + 17

    # Predict FG make probability
    p_make = predict_fg(fg_state)

    # State after successful FG
    fg_success_state = state.copy()
    fg_success_state["down"] = 1
    fg_success_state["ydstogo"] = 10
    fg_success_state['yardline_100'] = 75
    fg_success_state["score_differential"] += 3
    fg_success_state = flip_possession(fg_success_state)
    fg_success_state['half_seconds_remaining'] = max(0, fg_success_state['half_seconds_remaining'] - 5)
    fg_success_state['game_seconds_remaining'] = max(0, fg_success_state['game_seconds_remaining'] - 5)
    wp_success = 1 - predict_wp(pd.DataFrame([fg_success_state]))[0]

    # State after missed FG (opponent gets ball)
    fg_fail_state = state.copy()
    fg_fail_state["down"] = 1
    fg_fail_state["ydstogo"] = 10
    fg_fail_state["yardline_100"] = 100 - (state["yardline_100"] + 7)
    fg_fail_state = flip_possession(fg_fail_state)
    fg_fail_state['half_seconds_remaining'] = max(0, fg_fail_state['half_seconds_remaining'] - 5)
    fg_fail_state['game_seconds_remaining'] = max(0, fg_fail_state['game_seconds_remaining'] - 5)
    wp_fail_for_us = 1 - predict_wp(pd.DataFrame([fg_fail_state]))[0]
    
    # Account for late-game scenario
    if state['score_differential'] <= -4:
        # Assume each timeout saves 42 seconds
        if state['game_seconds_remaining'] <= 42 * (3 - state['posteam_timeouts_remaining']):
                wp_success = 0
                wp_fail_for_us = 0
    
    # Expected WP
    ewp_field_goal = p_make * wp_success + (1 - p_make) * wp_fail_for_us

    return {
        "wp_current": round(wp_current,4),
        "p_make": round(p_make,4),
        "wp_success": round(wp_success,4),
        "wp_fail": round(wp_fail_for_us,4),
        "ewp_field_goal": round(ewp_field_goal,4)
    }

In [11]:
# Filter to 4th-down go-for-it plays
go_df = pbp[
    (pbp['down'] == 4) &
    (pbp['play_type'].isin(['run', 'pass']))  # filters out punts/FGs
].copy()

# Target: did the team convert?
# EPA models use "first_down" but check yours
go_df = go_df.dropna(subset=['first_down'])

# Go-for-it conversion
go_df["score_time_ratio"] = go_df["score_differential"].abs() / (go_df["game_seconds_remaining"] + 1)

go_df["success"] = (
    (go_df["first_down"] == 1) |
    (go_df["touchdown"] == 1)
).astype(int)

X_conv = go_df[wp_features]
y_conv = go_df["success"]

monotone_constraints = {
    "ydstogo": -1,          # longer → worse
    "yardline_100": -1,     # farther from EZ → worse
    "down": 0,
    "score_differential": 0,
    "posteam_timeouts_remaining": 0,
    "defteam_timeouts_remaining": 0
}

conv_model = XGBClassifier(
    n_estimators=300,
    max_depth=4,
    learning_rate=0.05,
    subsample=0.8,
    eval_metric="logloss",
    monotone_constraints=monotone_constraints,
    verbosity=0
)

Xc_train, Xc_test, yc_train, yc_test = train_test_split(
    X_conv, y_conv, test_size=0.2, random_state=42
)

conv_model.fit(Xc_train, yc_train)

def predict_conv(state_df):
    if "score_time_ratio" not in state_df:
        state_df["score_time_ratio"] = state_df["score_differential"].abs() / (state_df["game_seconds_remaining"] + 1)
    
    preds = conv_model.predict_proba(state_df[conv_features])[:, 1]
    return np.clip(preds, 0.0, 1.0)

In [12]:
def evaluate_go_for_it(state):
    """Evaluates a 4th down go-for-it attempt."""
    df = pd.DataFrame([state])
    wp_current = predict_wp(df)[0]

    p_conv = conv_model.predict_proba(df[wp_features])[:, 1][0]

    # WP if conversion succeeds
    go_success_state = state.copy()
    go_success_state["down"] = 1
    go_success_state["yardline_100"] = state["yardline_100"] - state['ydstogo']
    go_success_state['ydstogo'] = min(10, go_success_state['yardline_100']) # Account for 1st & goal
    go_success_state['half_seconds_remaining'] = max(0, go_success_state['half_seconds_remaining'] - 5) # Assume play lenght is 5 seconds
    go_success_state['game_seconds_remaining'] = max(0, go_success_state['game_seconds_remaining'] - 5)
    wp_success = predict_wp(pd.DataFrame([go_success_state]))[0]

    # WP if conversion fails
    go_fail_state = state.copy()
    go_fail_state["down"] = 1
    go_fail_state["ydstogo"] = 10
    go_fail_state["yardline_100"] = 100 - state["yardline_100"]
    go_fail_state = flip_possession(go_fail_state)
    go_fail_state['half_seconds_remaining'] = max(0, go_fail_state['half_seconds_remaining'] - 5)
    go_fail_state['game_seconds_remaining'] = max(0, go_fail_state['game_seconds_remaining'] - 5)
    wp_fail_for_us = 1 - predict_wp(pd.DataFrame([go_fail_state]))[0]

    # Expected WP
    ewp_go_for_it = p_conv * wp_success + (1 - p_conv) * wp_fail_for_us

    return {
        "wp_current": round(wp_current,4),
        "p_conv": round(p_conv,4),
        "wp_success": round(wp_success,4),
        "wp_fail": round(wp_fail_for_us,4),
        "ewp_go_for_it": round(ewp_go_for_it,4)
    }


In [13]:
def evaluate_play_options(state):
    """Evaluates all 3 options on a 4th down."""
    # WP from current state
    wp_current = predict_wp(pd.DataFrame([state]))[0]

    # Evaluate each option
    punt_result = evaluate_punt(state)
    fg_result = evaluate_field_goal(state)
    go_result = evaluate_go_for_it(state)
    
    # Compute WPAs
    wpa_punt = punt_result["ewp_punt"] - wp_current
    wpa_field_goal = fg_result["ewp_field_goal"] - wp_current
    wpa_go_for_it = go_result["ewp_go_for_it"] - wp_current
    
    # Choose best play
    wpa_dict = {
        "punt": wpa_punt,
        "field_goal": wpa_field_goal,
        "go_for_it": wpa_go_for_it
    }
    recommended_play = max(wpa_dict, key=wpa_dict.get)

    # Consolidate results
    results = {
        "wp_current": round(wp_current, 4),
        "ewp_punt": round(punt_result["ewp_punt"], 4),
        "ewp_field_goal": round(fg_result["ewp_field_goal"], 4),
        "ewp_go_for_it": round(go_result["ewp_go_for_it"], 4),
        "wpa_punt": round(wpa_punt, 4),
        "wpa_field_goal": round(wpa_field_goal, 4),
        "wpa_go_for_it": round(wpa_go_for_it, 4),
        "recommended_play": recommended_play
    }

    return results

In [22]:
state = {
    "yardline_100": 25,             
    "down": 4,
    "ydstogo": 5,                   
    "game_seconds_remaining": 900,   
    "half_seconds_remaining": 900,
    "score_differential": 0,       
    "posteam_timeouts_remaining": 3,
    "defteam_timeouts_remaining": 3,
    "temp_F" : 60,
    "wind_mph" : 0
}

evaluate_play_options(state)

{'wp_current': 0.5593,
 'ewp_punt': 0.5159,
 'ewp_field_goal': 0.5902,
 'ewp_go_for_it': 0.5596,
 'wpa_punt': -0.0434,
 'wpa_field_goal': 0.0309,
 'wpa_go_for_it': 0.0003,
 'recommended_play': 'field_goal'}

In [23]:
evaluate_punt(state)

{'wp_current': 0.5593, 'net_punt': 24.4149, 'ewp_punt': 0.5159}

In [24]:
evaluate_field_goal(state)

{'wp_current': 0.5593,
 'p_make': 0.8617,
 'wp_success': 0.6103,
 'wp_fail': 0.465,
 'ewp_field_goal': 0.5902}

In [25]:
evaluate_go_for_it(state)

{'wp_current': 0.5593,
 'p_conv': 0.4196,
 'wp_success': 0.6608,
 'wp_fail': 0.4865,
 'ewp_go_for_it': 0.5596}