We initialize Python imports and opens a DuckDB connection that every later cell reuses

In [1]:
from pathlib import Path
import duckdb
import numpy as np
import pandas as pd

import statsmodels.api as sm
import statsmodels.formula.api as smf

DB_DIR = Path("../db")
DB_DIR.mkdir(parents=True, exist_ok=True)

con = duckdb.connect(str(DB_DIR / "nflpa.duckdb"))

PANEL_TABLE = "team_week_panel"
MODEL_VIEW = "team_week_panel_nextweek_model"

print("db file", (DB_DIR / "nflpa.duckdb").resolve())

existing_views = set(con.execute("SHOW TABLES").df()["name"].tolist())
if MODEL_VIEW not in existing_views:
    raise RuntimeError("Missing model view team_week_panel_nextweek_model, run notebook 11 before step 13")

cols = con.execute("DESCRIBE team_week_panel").df()["column_name"].tolist()
cols_set = set(cols)

if "team_key" in cols_set:
    TEAM_COL = "team_key"
elif "team" in cols_set:
    TEAM_COL = "team"
else:
    raise RuntimeError("Could not find team column in team_week_panel, expected team_key or team")

con.execute("DROP VIEW IF EXISTS team_week_panel_nextweek_model")
con.execute("DROP TABLE IF EXISTS panel_next_week_flags")

con.execute(f"""
CREATE TABLE panel_next_week_flags AS
WITH base AS (
  SELECT
    season,
    week,
    {TEAM_COL} AS team_key
  FROM team_week_panel
)
SELECT
  season,
  week,
  team_key,
  CASE
    WHEN EXISTS (
      SELECT 1
      FROM base b2
      WHERE b2.season = b1.season
        AND b2.team_key = b1.team_key
        AND b2.week = b1.week + 1
    )
    THEN 1
    ELSE 0
  END AS has_next_week
FROM base b1
""")

con.execute(f"""
CREATE VIEW team_week_panel_nextweek_model AS
SELECT
  p.*,
  f.has_next_week
FROM team_week_panel p
JOIN panel_next_week_flags f
  ON p.season = f.season
 AND p.week = f.week
 AND p.{TEAM_COL} = f.team_key
WHERE f.has_next_week = 1
""")

con.execute("SELECT COUNT(*) AS n FROM team_week_panel_nextweek_model").df()
con.execute(f"SELECT COUNT(*) AS n FROM {MODEL_VIEW}").df()

db file /Users/ramko/Desktop/2025-26-NFLPA-Data-Analytics-Case-Competition/db/nflpa.duckdb


Unnamed: 0,n
0,5950


Quick sanity check to confirm that 'has_next_week' is always 1 in the model view and that the view has unique season week team keys

In [2]:
con.execute(f"""
SELECT
  COUNT(*) AS rows_model,
  SUM(CASE WHEN has_next_week != 1 THEN 1 ELSE 0 END) AS bad_has_next_week
FROM {MODEL_VIEW}
""").df()

con.execute(f"""
SELECT
  COUNT(*) AS dup_rows
FROM (
  SELECT
    season,
    week,
    {TEAM_COL} AS team_key,
    COUNT(*) AS n
  FROM {MODEL_VIEW}
  GROUP BY 1,2,3
  HAVING COUNT(*) > 1
) d
""").df()

Unnamed: 0,dup_rows
0,0


We load the modeling dataset from the model view and checks that required columns exist. We also select the safest 'NonScore' shock column for modeling when a rolling version exists

In [3]:
cols = con.execute(f"DESCRIBE {MODEL_VIEW}").df()["column_name"].tolist()
cols_set = set(cols)

TEAM_COL = "team" if "team" in cols_set else "team_key"
SEASON_COL = "season"
WEEK_COL = "week"

OUTCOME_OFF = "Inj_Off_Next_w"
OUTCOME_DEF = "Inj_Def_Next_w"

required_base = [
    SEASON_COL, WEEK_COL, TEAM_COL,
    OUTCOME_OFF, OUTCOME_DEF,
    "blowout_flag_w",
    "ST_Shock_NonScore_w",
    "ST_Vol_NonScore_w",
    "Cum_Shocks_NonScore_w",
    "ST_Shock_NonScore_w_minus_1",
    "ST_Shock_NonScore_w_minus_2",
    "ST_Shock_NonScore_w_minus_3",
    "offensive_snaps_w",
    "defensive_snaps_w",
    "offensive_no_play_snaps_w",
    "defensive_no_play_snaps_w",
    "points_for",
    "points_against",
    "short_week_flag_w",
    "days_rest_w",
    "bye_last_week_flag_w",
    "home_flag_w",
    "Inj_Off_Last_w",
    "Inj_Def_Last_w",
]

missing = [c for c in required_base if c not in cols_set]
if missing:
    raise RuntimeError("Missing required columns in model view, " + ", ".join(missing))

SHOCK_COL_MAIN = "ST_Shock_NonScore_Roll_w" if "ST_Shock_NonScore_Roll_w" in cols_set else "ST_Shock_NonScore_w"
Z_COL_MAIN = "Z_ST_NonScore_Roll_w" if "Z_ST_NonScore_Roll_w" in cols_set else ("Z_ST_NonScore_w" if "Z_ST_NonScore_w" in cols_set else None)

print("team column", TEAM_COL)
print("shock column main", SHOCK_COL_MAIN)
print("z column main", Z_COL_MAIN)

team column team
shock column main ST_Shock_NonScore_w
z column main Z_ST_NonScore_w


We build the modeling frame, fixed effect keys, and the blowout interaction term by using 'SHOCK_COL_MAIN' when available. We also create an expanding 'NonScore' shock alternative when 'ST_Load_NonScore_w' exists.

In [4]:
select_cols = [
    SEASON_COL, WEEK_COL, TEAM_COL,
    OUTCOME_OFF, OUTCOME_DEF,
    "blowout_flag_w",
    SHOCK_COL_MAIN,
    "ST_Vol_NonScore_w",
    "Cum_Shocks_NonScore_w",
    "ST_Shock_NonScore_w_minus_1",
    "ST_Shock_NonScore_w_minus_2",
    "ST_Shock_NonScore_w_minus_3",
    "offensive_snaps_w",
    "defensive_snaps_w",
    "offensive_no_play_snaps_w",
    "defensive_no_play_snaps_w",
    "points_for",
    "points_against",
    "short_week_flag_w",
    "days_rest_w",
    "bye_last_week_flag_w",
    "home_flag_w",
    "Inj_Off_Last_w",
    "Inj_Def_Last_w",
]

if Z_COL_MAIN is not None:
    select_cols.append(Z_COL_MAIN)

has_st_load_nonscore = "ST_Load_NonScore_w" in cols_set
if has_st_load_nonscore:
    select_cols.append("ST_Load_NonScore_w")

df = con.execute(f"SELECT {', '.join(select_cols)} FROM {MODEL_VIEW}").df()

df[TEAM_COL] = df[TEAM_COL].astype(str)
df[SEASON_COL] = df[SEASON_COL].astype(int)
df[WEEK_COL] = df[WEEK_COL].astype(int)

df["season_week"] = (df[SEASON_COL] * 100 + df[WEEK_COL]).astype(int)
df["team_season"] = (df[TEAM_COL].astype(str) + "_" + df[SEASON_COL].astype(str)).astype(str)

df["shock_nonscore"] = df[SHOCK_COL_MAIN].fillna(0).astype(int)
df["blowout_flag_w"] = df["blowout_flag_w"].fillna(0).astype(int)
df["shock_x_blowout"] = (df["shock_nonscore"] * df["blowout_flag_w"]).astype(int)

lag_cols = [
    "ST_Shock_NonScore_w_minus_1",
    "ST_Shock_NonScore_w_minus_2",
    "ST_Shock_NonScore_w_minus_3",
]
for c in lag_cols:
    df[c] = df[c].fillna(0).astype(int)

df["ST_Vol_NonScore_w"] = df["ST_Vol_NonScore_w"].fillna(0).astype(float)
df["Cum_Shocks_NonScore_w"] = df["Cum_Shocks_NonScore_w"].fillna(0).astype(float)

numeric_fill = {
    "offensive_no_play_snaps_w": 0,
    "defensive_no_play_snaps_w": 0,
    "short_week_flag_w": 0,
    "bye_last_week_flag_w": 0,
    "home_flag_w": 0,
    "Inj_Off_Last_w": 0,
    "Inj_Def_Last_w": 0,
}
for k, v in numeric_fill.items():
    df[k] = df[k].fillna(v)

must_not_be_null = [
    OUTCOME_OFF, OUTCOME_DEF,
    "offensive_snaps_w", "defensive_snaps_w",
    "points_for", "points_against",
    "days_rest_w",
]
before = len(df)
df = df.dropna(subset=must_not_be_null).reset_index(drop=True)
after = len(df)

df["shock_nonscore_expanding"] = np.nan
df["shock_x_blowout_expanding"] = np.nan
df["z_nonscore_expanding"] = np.nan

if has_st_load_nonscore:
    df = df.sort_values([TEAM_COL, SEASON_COL, WEEK_COL]).reset_index(drop=True)

    def expanding_z(x: pd.Series) -> pd.Series:
        m = x.expanding().mean().shift(1)
        s = x.expanding().std(ddof=1).shift(1)
        return (x - m) / s

    df["z_nonscore_expanding"] = df.groupby([TEAM_COL, SEASON_COL])["ST_Load_NonScore_w"].transform(expanding_z)
    df["shock_nonscore_expanding"] = ((df["z_nonscore_expanding"] >= 1).fillna(False)).astype(int)
    df["shock_x_blowout_expanding"] = (df["shock_nonscore_expanding"] * df["blowout_flag_w"]).astype(int)

print("rows before dropna", before)
print("rows after dropna", after)
print("has ST_Load_NonScore_w", bool(has_st_load_nonscore))
df.head(3)

rows before dropna 5950
rows after dropna 5950
has ST_Load_NonScore_w True


Unnamed: 0,season,week,team,Inj_Off_Next_w,Inj_Def_Next_w,blowout_flag_w,ST_Shock_NonScore_w,ST_Vol_NonScore_w,Cum_Shocks_NonScore_w,ST_Shock_NonScore_w_minus_1,...,Inj_Def_Last_w,Z_ST_NonScore_w,ST_Load_NonScore_w,season_week,team_season,shock_nonscore,shock_x_blowout,shock_nonscore_expanding,shock_x_blowout_expanding,z_nonscore_expanding
0,2012,1,ARI,0.0,0.0,0,0,0.0,0.0,0,...,0.0,-1.732051,14.0,201201,ARI_2012,0,0,0,0,
1,2012,2,ARI,0.0,0.0,0,0,2.828427,0.0,0,...,0.0,0.0,18.0,201202,ARI_2012,0,0,0,0,
2,2012,3,ARI,0.0,0.0,1,0,2.309401,0.0,0,...,0.0,-1.732051,14.0,201203,ARI_2012,0,0,0,0,-0.707107


We produce outcome distribution checks and simple overdispersion indicators for each outcome. We will then use these checks to inform us whether Poisson or Negative Binomial is needed

In [5]:
def outcome_dispersion_stats(y: pd.Series) -> dict:
    y = y.astype(float)
    mean = float(y.mean())
    var = float(y.var(ddof=1))
    share_zero = float((y == 0).mean())
    return {
        "mean": mean,
        "var": var,
        "var_over_mean": (var / mean) if mean > 0 else np.nan,
        "share_zero": share_zero,
        "max": float(y.max()),
    }

stats_off = outcome_dispersion_stats(df[OUTCOME_OFF])
stats_def = outcome_dispersion_stats(df[OUTCOME_DEF])

print("offense outcome", OUTCOME_OFF, stats_off)
print("defense outcome", OUTCOME_DEF, stats_def)

material_overdispersion_off = (not np.isnan(stats_off["var_over_mean"])) and (stats_off["var_over_mean"] >= 1.5)
material_overdispersion_def = (not np.isnan(stats_def["var_over_mean"])) and (stats_def["var_over_mean"] >= 1.5)

print("material overdispersion offense", bool(material_overdispersion_off))
print("material overdispersion defense", bool(material_overdispersion_def))

offense outcome Inj_Off_Next_w {'mean': 1.9201680672268908, 'var': 2.1612169830110557, 'var_over_mean': 1.125535321568121, 'share_zero': 0.17714285714285713, 'max': 9.0}
defense outcome Inj_Def_Next_w {'mean': 2.083865546218487, 'var': 2.380761656150105, 'var_over_mean': 1.1424737361152615, 'share_zero': 0.15529411764705883, 'max': 10.0}
material overdispersion offense False
material overdispersion defense False
