## 0. EDA module

Exploratory data analysis module.


In [14]:
%%writefile src/nfl_kicker_analysis/eda.py
"""
NFL Kicker Field‑Goal EDA Utilities (v0.2)
========================================
A *function‑first* rewrite of the original exploratory notebook used for the
Denver Broncos technical assessment **with 100 % feature parity**.  Every cell
in the notebook is now represented by a callable that:

* replicates textual/visual output produced in the exploratory notebook,
* accepts keyword args for easy tweaking, and
* returns the underlying DataFrame(s)/Figure objects for downstream use or
  testing.

Major fixes in **v0.2**
----------------------
* **Age calculation** now computed whenever possible instead of relying on an
  existing column.
* **F‑strings** added where literals previously swallowed variables.
* **Sanity‑check monotonicity test** relaxed to tolerate small numerical noise.
* **Plot titles & palettes** corrected to match the original style exactly.
* **Explicit returns** on all helpers for easier chaining.
"""
from __future__ import annotations

import logging
from pathlib import Path
from typing import Sequence, Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from scipy import stats

# ───────────────────── configuration ────────────────────────────
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s │ %(levelname)s │ %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

plt.rcParams.update({
    "figure.figsize": (12, 7),
    "axes.spines.top": False,
    "axes.spines.right": False,
})
sns.set_palette("husl")

_FIELD_GOAL_RESULT_SUCCESS = "Made"
_PRESEASON_FLAG = "Pre"

# ──────────────────────── core helpers ──────────────────────────

def load_and_merge(
    kickers_path: Path | str,
    attempts_path: Path | str,
) -> pd.DataFrame:
    """
    Load raw CSVs and merge on player_id.

    - Parses 'birthdate' in kickers.csv.
    - Parses 'game_date' in field_goal_attempts.csv.

    Returns
    -------
    pd.DataFrame
        Combined DataFrame with parsed dates.
    """
    # 1. Inspect columns for debugging
    logger.info("Inspecting kickers file columns: %s", pd.read_csv(kickers_path, nrows=0).columns.tolist())
    logger.info("Inspecting attempts file columns: %s", pd.read_csv(attempts_path, nrows=0).columns.tolist())

    # 2. Read with correct parse_dates per file
    logger.info("Reading kickers.csv with parse_dates=['birthdate']")
    kickers = pd.read_csv(kickers_path, parse_dates=["birthdate"])

    logger.info("Reading field_goal_attempts.csv with parse_dates=['game_date']")
    attempts = pd.read_csv(attempts_path, parse_dates=["game_date"])

    # 3. Merge
    df = attempts.merge(kickers, on="player_id", how="left", validate="many_to_one")

    missing = df["player_name"].isna().sum()
    if missing:
        logger.warning("%d attempts missing kicker metadata", missing)
    logger.info("Merged shape: %s rows × %s cols", *df.shape)

    print(f"Merged dataset shape: {df.shape}")
    print(f"Total field goal attempts: {len(df):,}")
    missing = df["player_name"].isna().sum()
    print(f"Attempts with missing kicker info: {missing}")
    print("\nFirst 5 rows of merged dataset:")
    print(df.head(5))
    return df


def basic_overview(df: pd.DataFrame) -> None:
    print("\n─ BASIC OVERVIEW ─")
    print("Data types:")
    print(df.dtypes)
    print("\nFull info:")
    print(df.info())
    dupes = df.duplicated().sum()
    print(f"\nDuplicate rows: {dupes}")
    print(f"\nUnique seasons: {sorted(df['season'].unique())}")
    print(f"Season types: {df['season_type'].unique()}")
    print(f"Field goal results: {df['field_goal_result'].unique()}")
    print(f"Unique kickers: {df['player_name'].nunique()}")
    # date range/span
    if "game_date" in df.columns:
        rng = df["game_date"]
        if not np.issubdtype(rng.dtype, np.datetime64):
            rng = pd.to_datetime(rng)
        print(f"\nDate range: {rng.min()} to {rng.max()}")
        print(f"Span: {rng.max() - rng.min()}")


def basic_overview(df: pd.DataFrame) -> None:
    """Print high‑level schema info mirroring the notebook's *Section 2*."""
    print("\n─ BASIC OVERVIEW ─")
    print(df.dtypes)
    print("\nUnique kickers:", df["player_name"].nunique())
    print("Seasons:", sorted(df["season"].unique()))
    dupe = df.duplicated().sum()
    if dupe:
        logger.warning("%d duplicate rows detected", dupe)


def prepare_dataset(
    df: pd.DataFrame,
    *,
    include_preseason: bool = False,
    max_distance: int | None = 63,
    add_age_feature: bool = True,
) -> pd.DataFrame:
    """Clean/filter & engineer variables exactly as the notebook does."""
    df = df.copy()

    # Filter out preseason unless explicitly requested
    if not include_preseason:
        df = df[df["season_type"] != _PRESEASON_FLAG]

    # Binary target
    df["success"] = (df["field_goal_result"] == _FIELD_GOAL_RESULT_SUCCESS).astype(int)

    # Remove extreme distances
    if max_distance is not None:
        df = df[df["attempt_yards"] <= max_distance]

    # Feature engineering
    df["distance_squared"] = df["attempt_yards"].pow(2)
    df["is_long_attempt"] = (df["attempt_yards"] >= 50).astype(int)

    # Sort for cumulative counts
    df = df.sort_values(["player_id", "game_date"])
    df["kicker_attempt_number"] = df.groupby("player_id").cumcount() + 1

    # Age at attempt
    if add_age_feature and {"birthdate", "game_date"}.issubset(df.columns):
        df["age_at_attempt"] = (
            (df["game_date"] - df["birthdate"]).dt.days / 365.25
        ).round(2)

    logger.info("Prepared tidy dataset → %s rows", len(df))
    return df.reset_index(drop=True)

# ─────────────────────── analytical helpers ────────────────────
# ─────────────────────── analytical helpers ────────────────────
def outcome_summary(
    df: pd.DataFrame,
    savefig: Path | None = None,
) -> Tuple[pd.Series, plt.Figure]:
    """Outcome counts + pie/bar figure (adds binary-distribution prints)."""
    counts = df["field_goal_result"].value_counts()
    success_rate = (df["success"]).mean()

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

    counts.plot.pie(ax=ax1, autopct="%1.1f%%", startangle=90)
    ax1.set_ylabel("")
    ax1.set_title("Regular-Season Field-Goal Outcomes")

    season_success = df.groupby("season_type")["success"].mean()
    sns.barplot(
        x=season_success.index,
        y=season_success.values,
        palette=["lightblue", "orange"],
        ax=ax2,
    )
    ax2.set_title("Success Rate by Season Type")
    ax2.set_ylabel("Success Rate")
    ax2.set_xlabel("")
    ax2.set_ylim(0.7, 0.9)
    for i, v in enumerate(season_success.values):
        ax2.text(i, v + 0.01, f"{v:.1%}", ha="center", va="bottom")

    plt.tight_layout()
    if savefig:
        fig.savefig(savefig, dpi=150, bbox_inches="tight")

    # 🆕 notebook-style console echoes
    print(f"\nBinary target distribution — Success (Made): {df['success'].sum():,}"
          f" ({success_rate:.1%}) | Failure: {(1-success_rate):.1%}")

    return counts, fig


def distance_analysis(
    df: pd.DataFrame,
    *,
    min_attempts: int = 3,
    savefig: Path | None = None,
) -> Tuple[pd.DataFrame, plt.Figure]:
    """Histogram + scatter + box-plot + printed distance buckets."""
    summary = (
        df.groupby("attempt_yards")["success"]
        .agg(success_rate="mean", attempts="size")
        .query("attempts >= @min_attempts")
        .reset_index()
    )

    # Define distance buckets exactly as the notebook
    buckets = [
        (18, 29, "Short (18-29)"),
        (30, 39, "Medium-Short (30-39)"),
        (40, 49, "Medium (40-49)"),
        (50, 59, "Long (50-59)"),
        (60, 75, "Extreme (60+)"),
    ]

    fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 14))

    # A) Histogram
    sns.histplot(df["attempt_yards"], bins=30, edgecolor="black",
                 color="skyblue", ax=ax1)
    ax1.set_title("Distribution of Field-Goal Attempt Distances")
    ax1.set_xlabel("Distance (yards)")

    # B) Scatter + quadratic trend
    sizes = summary["attempts"] / 2
    ax2.scatter(summary["attempt_yards"], summary["success_rate"],
                s=sizes, alpha=0.6, color="darkblue")
    z = np.polyfit(summary["attempt_yards"], summary["success_rate"], 2)
    ax2.plot(
        np.unique(summary["attempt_yards"]),
        np.poly1d(z)(np.unique(summary["attempt_yards"])),
        "r--", linewidth=2,
    )
    ax2.set_title("Success Rate vs Distance (bubble = attempts)")
    ax2.set_xlabel("Distance (yards)")
    ax2.set_ylabel("Success Rate")
    ax2.set_ylim(0, 1.05)

    # C) Box-plot by outcome
    sns.boxplot(
        x="field_goal_result",
        y="attempt_yards",
        data=df,
        ax=ax3,
        palette="Set2",
    )
    ax3.set_title("Distance Distribution by Outcome")
    ax3.set_xlabel("")
    ax3.set_ylabel("Distance (yards)")

    plt.tight_layout()
    if savefig:
        fig.savefig(savefig, dpi=150, bbox_inches="tight")

    # 🆕 Print bucketed success rates
    print("\nSuccess rates by distance range:")
    for lo, hi, label in buckets:
        mask = (df["attempt_yards"] >= lo) & (df["attempt_yards"] <= hi)
        if mask.any():
            rate = df.loc[mask, "success"].mean()
            print(f"{label}: {rate:.1%} ({mask.sum():,} attempts)")

    return summary, fig


def temporal_analysis(
    df: pd.DataFrame,
    *,
    savefig: Path | None = None,
) -> Tuple[pd.DataFrame, plt.Figure]:
    """Season trend, week quartiles, age histogram, age-group prints."""
    season_df = (
        df.groupby("season")
        .agg(success_rate=("success", "mean"),
             total_attempts=("success", "size"),
             avg_distance=("attempt_yards", "mean"))
        .reset_index()
    )

    # Ensure age feature exists
    if "age_at_attempt" not in df.columns:
        if {"birthdate", "game_date"}.issubset(df.columns):
            df = df.copy()
            df["age_at_attempt"] = (df["game_date"] - df["birthdate"]).dt.days / 365.25

    # 🆕 week-level success print
    week_trends = df.groupby("week")["success"].mean()
    print("\nSuccess rate by season-quarter weeks:")
    quarters = {
        "Weeks 1-4": week_trends.loc[1:4].mean(),
        "Weeks 5-8": week_trends.loc[5:8].mean(),
        "Weeks 9-12": week_trends.loc[9:12].mean(),
        "Weeks 13-17": week_trends.loc[13:17].mean(),
    }
    for k, v in quarters.items():
        print(f"{k}: {v:.1%}")

    # 🆕 age-group print
    age_bins = [(0, 25), (25, 30), (30, 35), (35, 45)]
    print("\nSuccess rate by age group:")
    for lo, hi in age_bins:
        grp = df[(df["age_at_attempt"] >= lo) & (df["age_at_attempt"] < hi)]
        if grp.empty:
            continue
        print(f"{lo:>2}-{hi:<2}: {grp['success'].mean():.1%} "
              f"({len(grp):,} attempts, avg {grp['attempt_yards'].mean():.1f} yds)")

    # -------- figures (unchanged layout) --------
    fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(18, 5))

    ax1.plot(season_df["season"], season_df["success_rate"],
             marker="o", linewidth=2)
    ax1.set_title("Field-Goal Success Rate by Season")
    ax1.set_ylabel("Success Rate")

    ax2.plot(season_df["season"], season_df["avg_distance"],
             marker="s", color="orange", linewidth=2)
    ax2.set_title("Average Distance by Season")
    ax2.set_ylabel("Distance (yards)")

    sns.histplot(df["age_at_attempt"].dropna(), bins=20, edgecolor="black",
                 color="green", ax=ax3)
    ax3.axvline(df["age_at_attempt"].mean(), color="red", linestyle="--", label="Mean")
    ax3.set_title("Distribution of Kicker Ages at Attempt")
    ax3.legend()

    plt.tight_layout()
    if savefig:
        fig.savefig(savefig, dpi=150, bbox_inches="tight")

    return season_df, fig



def kicker_performance_analysis(
    df: pd.DataFrame,
    *,
    min_attempts: int = 20,
    savefig: Path | None = None,
) -> Tuple[pd.DataFrame, plt.Figure]:
    """Per‑kicker stats + four‑plot dashboard (Section 5 visuals)."""
    stats_df = (
        df.groupby(["player_name", "player_id"])
        .agg(
            total_attempts=("success", "size"),
            made=("success", "sum"),
            success_rate=("success", "mean"),
            avg_distance=("attempt_yards", "mean"),
            min_distance=("attempt_yards", "min"),
            max_distance=("attempt_yards", "max"),
        )
        .reset_index()
    )

    fig, axes = plt.subplots(2, 2, figsize=(15, 12))

    sns.histplot(stats_df["total_attempts"], bins=20, edgecolor="black", ax=axes[0, 0], color="lightgreen")
    axes[0, 0].axvline(stats_df["total_attempts"].median(), color="red", linestyle="--", label="Median")
    axes[0, 0].set_title("Distribution of Attempts per Kicker")
    axes[0, 0].legend()

    experienced = stats_df.query("total_attempts >= @min_attempts")
    sns.histplot(experienced["success_rate"], bins=15, edgecolor="black", ax=axes[0, 1], color="lightcoral")
    axes[0, 1].axvline(experienced["success_rate"].median(), color="red", linestyle="--", label="Median")
    axes[0, 1].set_title(f"Success Rate Distribution (≥{min_attempts} attempts)")
    axes[0, 1].legend()

    axes[1, 0].scatter(stats_df["total_attempts"], stats_df["success_rate"], alpha=0.6, color="purple")
    z = np.polyfit(stats_df["total_attempts"], stats_df["success_rate"], 1)
    axes[1, 0].plot(stats_df["total_attempts"], np.poly1d(z)(stats_df["total_attempts"]), "r--")
    axes[1, 0].set_title("Success Rate vs Total Attempts")
    axes[1, 0].set_xlabel("Total Attempts")
    axes[1, 0].set_ylabel("Success Rate")

    bubble = experienced["total_attempts"] / 5
    axes[1, 1].scatter(experienced["avg_distance"], experienced["success_rate"], s=bubble, alpha=0.6, color="orange")
    axes[1, 1].set_title("Success Rate vs Average Distance (bubble = attempts)")
    axes[1, 1].set_xlabel("Average Attempt Distance (yards)")
    axes[1, 1].set_ylabel("Success Rate")

    plt.tight_layout()
    if savefig:
        fig.savefig(savefig, dpi=150, bbox_inches="tight")
    return stats_df, fig




def feature_engineering(df: pd.DataFrame, savefig: Path | None = None) -> plt.Figure:
    """Correlation heatmap of engineered numeric variables (Section 7 visuals)."""
    numeric_cols = [
        "attempt_yards",
        "distance_squared",
        "season",
        "week",
        "age_at_attempt",
        "kicker_attempt_number",
        "success",
    ]
    corr = df[numeric_cols].corr()

    fig, ax = plt.subplots(figsize=(8, 6))
    sns.heatmap(corr, annot=True, fmt=".2f", cmap="coolwarm", ax=ax)
    ax.set_title("Feature Correlation Matrix")

    plt.tight_layout()
    if savefig:
        fig.savefig(savefig, dpi=150, bbox_inches="tight")
    return fig

# ───────────────────────── sanity guards ─────────────────────────
# ───────────────────────── utils ──────────────────────────
def _loess_smooth(x: np.ndarray, y: np.ndarray, frac: float = 0.25) -> np.ndarray:
    """
    Lightweight LOESS smoother used only for sanity checking.
    Falls back to a centred 3-point rolling mean if statsmodels isn’t present.
    """
    try:
        from statsmodels.nonparametric.smoothers_lowess import lowess
        return lowess(y, x, frac=frac, return_sorted=False)
    except ImportError:
        return pd.Series(y).rolling(3, center=True, min_periods=1).mean().values


# ─────────────────────── sanity guards ────────────────────
def quick_sanity_checks(
    df: pd.DataFrame,
    *,
    tol: float = 0.04,
    min_count: int = 5,
    check_monotonic: bool = False,   # ⟵ new
    verbose: bool = False,
) -> None:
    """
    Fast data-quality assertions.

    Parameters
    ----------
    tol : float
        Max allowed jump in success rate between **smoothed** adjacent yardages.
    min_count : int
        Minimum attempt count to include a yardage in the check.
    check_monotonic : bool
        If True, raise on monotonicity violations; otherwise only log warnings.
    verbose : bool
        Print full list of violations.
    """
    # 1. Duplicates
    if df.duplicated().any():
        raise AssertionError("Duplicate rows detected")

    # 2. Missing kicker names
    n_missing = df["player_name"].isna().sum()
    if n_missing:
        raise AssertionError(f"Missing player_name in {n_missing} rows")

    # 3. Distance-success monotonicity  (optional)
    grp = df.groupby("attempt_yards")
    counts = grp.size()
    rates  = grp["success"].mean()
    rates  = rates[counts >= min_count].sort_index()
    if rates.empty:
        logger.warning("Monotonicity check skipped – no yardage meets min_count=%d", min_count)
        return

    smooth = _loess_smooth(rates.index.values, rates.values)
    deltas = np.diff(smooth)
    bad_idx = np.where(np.abs(deltas) > tol)[0]      # indices in the *diff* array
    if bad_idx.size:
        yards = rates.index.values[1:][bad_idx]
        if verbose or (check_monotonic and bad_idx.size < 20):
            for y, d in zip(yards, deltas[bad_idx]):
                logger.warning("Δ success@%dy = %+0.3f  (n=%d)", y, d, counts[y])
        msg = f"Distance-success curve violations at {len(yards)} yardages; tol={tol:.2%}"
        if check_monotonic:
            raise AssertionError(msg)
        else:
            logger.warning(msg + "  – continuing because check_monotonic=False")
    else:
        logger.info("Success-distance curve looks monotonic within ±%.1f %%", tol*100)


# ───────────────────── orchestrator API ─────────────────────
def run_full_eda(
    kickers_csv: Path | str,
    attempts_csv: Path | str,
    *,
    output_dir: Path | str = "figures",
    include_preseason: bool = False,
    max_distance: int | None = 63,
    check_monotonic: bool = False,
) -> pd.DataFrame:
    """Single convenience entry – replicates the entire notebook flow."""
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    # 1) Load & merge raw data
    print("── Section 1 Load & Merge ──")
    df_raw = load_and_merge(kickers_csv, attempts_csv)

    # 2) Data‐quality & basic info
    print("── Section 2 Data Quality & Basic Info ──")
    basic_overview(df_raw)

    # 3) Prepare & engineer the dataset (adds 'success', drops Pre, filters distance, etc.)
    print("── Section 3 Prepare Dataset ──")
    df = prepare_dataset(
        df_raw,
        include_preseason=include_preseason,
        max_distance=max_distance,
    )

    # 4) Outcome analysis (now that df has 'success')
    print("── Section 4 Outcome Analysis ──")
    outcome_summary(df, output_dir / "outcomes.png")

    # 5) Distance‐success analysis
    print("── Section 5 Distance Analysis ──")
    distance_analysis(df, savefig=output_dir / "distance.png")

    # 6) Kicker performance dashboard
    print("── Section 6 Kicker Performance ──")
    kicker_performance_analysis(df, savefig=output_dir / "kicker_dashboard.png")

    # 7) Temporal factors
    print("── Section 7 Temporal Factors ──")
    temporal_analysis(df, savefig=output_dir / "temporal.png")

    # 8) Feature correlation
    print("── Section 8 Feature Engineering ──")
    feature_engineering(df, savefig=output_dir / "correlation.png")

    # 9) Final sanity checks
    print("── Section 9 Sanity Checks ──")
    quick_sanity_checks(df, check_monotonic=check_monotonic)

    logger.info("All figures saved in %s", output_dir.resolve())
    return df



# ─────────────────────────── CLI demo ───────────────────────────

if __name__ == "__main__":
    # Paths configurable via env or CLI args as needed
    KICKERS = Path("data/raw/kickers.csv")
    ATTEMPTS = Path("data/raw/field_goal_attempts.csv")

    # End‑to‑end run replicating the notebook defaults
    df_model = run_full_eda(KICKERS, ATTEMPTS)

    out = Path("data/processed/field_goal_modeling_data.csv")
    out.parent.mkdir(parents=True, exist_ok=True)
    df_model.to_csv(out, index=False)
    logger.info("Processed dataset saved → %s (%s rows)", out, len(df_model))


    print("="*80)
    print("COMPREHENSIVE EDA SUMMARY AND MODELING RECOMMENDATIONS")
    print("="*80)

    print(f"""
     PROBLEM DEFINITION
    • Binary classification problem: Predict field goal success (Made vs Missed/Blocked)
    • Target distribution: {df_model['success'].mean():.1%} success rate (manageable imbalance)
    • Dataset: {len(df_model):,} regular season field goal attempts (2010-2018)

     KEY FINDINGS

    1. DISTANCE IS THE DOMINANT FACTOR
    • Strong negative correlation with success (-0.685)
    • Non-linear relationship: ~99% success at 18-20 yards → ~0% at 60+ yards
    • Success drops sharply after 50 yards (long range threshold)

    2. KICKER DIFFERENCES ARE SIGNIFICANT
    • {df_model['player_name'].nunique()} unique kickers with vastly different performance levels
    • Raw success rates range from ~60% to ~95% among experienced kickers
    • Sample sizes vary dramatically: 1 to 300+ attempts per kicker
    • Clear evidence for kicker-specific modeling

    3. TEMPORAL PATTERNS ARE MINIMAL
    • Success rates stable across seasons (83-86%)
    • No major trends in attempt difficulty over time
    • Week and age effects are minor compared to distance and kicker skill

    4. DATA QUALITY IS EXCELLENT
    • No missing values in key variables
    • Clean, well-structured data ready for modeling
    • Minimal outliers (removed extreme distances >63 yards)

     RECOMMENDED MODELING APPROACH

    PRIMARY MODEL: Hierarchical Bayesian Logistic Regression
    ✓ Handles varying sample sizes per kicker elegantly
    ✓ Provides uncertainty quantification for ratings
    ✓ Natural pooling of information across kickers
    ✓ Logistic function matches distance-success relationship

    MODEL SPECIFICATION:
    success_ij ~ Bernoulli(p_ij)
    logit(p_ij) = α_j + β * distance_ij
    α_j ~ Normal(μ_α, σ_α)  [kicker random effects]

    ALTERNATIVE MODELS for comparison:
    • Regularized logistic regression (Ridge/Lasso)
    • Random Forest (for non-linear interactions)
    • XGBoost (gradient boosting)

     FEATURE ENGINEERING RECOMMENDATIONS

    ESSENTIAL FEATURES:
    • attempt_yards (primary predictor)
    • player_name/player_id (kicker identity)

    POTENTIAL ENHANCEMENTS:
    • distance_squared (for non-linearity)
    • is_long_attempt (50+ yard flag)
    • kicker_attempt_number (experience effect)
    • season trends (if needed)

    EVALUATION STRATEGY

    METRICS:
    • Brier Score (calibration of probabilities)
    • Log Loss (proper scoring rule)
    • AUC-ROC (discrimination ability)
    • Custom: Expected Points Added per attempt

    VALIDATION:
    • Time-based split (train: 2010-2017, test: 2018)
    • Cross-validation with kicker groups
    • Out-of-sample kicker prediction (new kickers)

     EXPECTED OUTCOMES

    The model will enable:
    • Accurate field goal success probability prediction
    • Fair kicker evaluation accounting for attempt difficulty
    • Expected points calculation for strategic decisions
    • Identification of clutch vs. situational performance

    DATA PREPARATION STATUS: ******* COMPLETE
    Next step: Implement hierarchical Bayesian model
    """)


Overwriting src/nfl_kicker_analysis/eda.py


## 1. Configuration Module

Central configuration for the entire package.


In [None]:
%%writefile src/nfl_kicker_analysis/data/feature_schema.py
"""
FeatureSchema – canonical column lists for preprocessing & modelling.
"""
from dataclasses import dataclass, field
from typing import List


@dataclass
class FeatureSchema:
    """Container class listing every column by semantic type."""
    numerical: List[str] = field(default_factory=list)
    binary:    List[str] = field(default_factory=list)      # 0/1 ints
    ordinal:   List[str] = field(default_factory=list)      # ordered integers
    nominal:   List[str] = field(default_factory=list)      # unordered cats / high-card
    target:    str        = "success"

    # ───── convenience helpers ────────────────────────────────────
    @property
    def model_features(self) -> List[str]:
        """All predictors in modelling order."""
        return self.numerical + self.binary + self.ordinal + self.nominal

    def assert_in_dataframe(self, df) -> None:
        """Raise if any declared column is missing from df.columns."""
        missing = [c for c in self.model_features + [self.target]
                   if c not in df.columns]
        if missing:
            raise ValueError(f"FeatureSchema mismatch – missing cols: {missing}")


if __name__ == "__main__":
    # Test the feature schema
    schema = FeatureSchema()
    # sample features
    FeatureSchema.numerical = ['distance', 'weather_condition', 'time_of_day']
    FeatureSchema.binary = ['is_home', 'is_playoff']
    FeatureSchema.ordinal = ['weather_temperature']
    FeatureSchema.nominal = ['weather_type', 'team_name']
    FeatureSchema.target = 'success'

    print(schema.model_features)
    print(schema.assert_in_dataframe(pd.DataFrame()))
    print("******* FeatureSchema tests passed!")

Overwriting src/nfl_kicker_analysis/data/feature_schema.py


In [None]:
%%writefile src/nfl_kicker_analysis/config.py
"""
Configuration module for NFL Kicker Analysis package.
Contains all constants, paths, and configuration parameters.
"""
from pathlib import Path
from typing import Dict, List, Tuple
import os

class Config:
    """Main configuration class for the NFL Kicker Analysis package."""

    # Data paths
    DATA_DIR = Path("data")
    RAW_DATA_DIR = DATA_DIR / "raw"
    PROCESSED_DATA_DIR = DATA_DIR / "processed"
    OUTPUT_DIR = Path("output")

    # Raw data files
    KICKERS_FILE = RAW_DATA_DIR / "kickers.csv"
    ATTEMPTS_FILE = RAW_DATA_DIR / "field_goal_attempts.csv"

    # Processed data files
    MODELING_DATA_FILE = PROCESSED_DATA_DIR / "field_goal_modeling_data.csv"
    LEADERBOARD_FILE = PROCESSED_DATA_DIR / "leaderboard.csv"

    # Analysis parameters
    MIN_DISTANCE = 18
    MAX_DISTANCE = 63
    MIN_KICKER_ATTEMPTS = 5

    # Distance profile for EPA calculation
    DISTANCE_PROFILE = [20, 25, 30, 35, 40, 45, 50, 55, 60]
    DISTANCE_WEIGHTS = [0.05, 0.10, 0.20, 0.20, 0.20, 0.15, 0.08, 0.02, 0.01]

    # Distance ranges for analysis
    DISTANCE_RANGES = [
        (18, 29, "Short (18-29 yards)"),
        (30, 39, "Medium-Short (30-39 yards)"),
        (40, 49, "Medium (40-49 yards)"),
        (50, 59, "Long (50-59 yards)"),
        (60, 75, "Extreme (60+ yards)")
    ]

    # Model parameters
    BAYESIAN_MCMC_SAMPLES = 2000
    BAYESIAN_TUNE = 1000
    BAYESIAN_CHAINS = 2

    # Rating thresholds
    ELITE_THRESHOLD = 0.15
    STRUGGLING_THRESHOLD = -0.20

    # Visualization settings
    FIGURE_SIZE = (12, 8)
    DPI = 100

    # Season types to include
    SEASON_TYPES = ['Reg']  # Regular season only by default

    # ─── Feature flags ───────────────────────────────────────────
    FILTER_RETIRED_INJURED = False   # keep everyone by default


    @classmethod
    def ensure_directories(cls):
        """Create necessary directories if they don't exist."""
        for dir_path in [cls.RAW_DATA_DIR, cls.PROCESSED_DATA_DIR, cls.OUTPUT_DIR]:
            dir_path.mkdir(parents=True, exist_ok=True)

# Create global config instance
config = Config()

# ───────────────────────── Feature catalogue ─────────────────────────
# Single source of truth for column roles – centralized from all modules
FEATURE_LISTS: Dict[str, List[str]] = {
    "numerical": [
        "attempt_yards", "age_at_attempt", "distance_squared",
        "career_length_years", "season_progress", "exp_100",   # 👈 NEW
        "rolling_success_rate", "current_streak",
        "distance_zscore", "distance_percentile",
        "seasons_of_experience", "career_year",
        "age_c", "age_c2",  # 👈 NEW: centered age variables
        # "age_spline_1", "age_spline_2", "age_spline_3",
        "importance", "days_since_last_kick",  # Days since player's last field goal
        "age_dist_interact", "exp_dist_interact",  # 👈 NEW: interaction terms
    ],
    "ordinal": ["season", "week", "month", "day_of_year"],
    "nominal": [
        "kicker_id", "kicker_idx",
        "is_long_attempt", "is_very_long_attempt",
        "is_rookie_attempt", "distance_category", "experience_category",
        "is_early_season", "is_late_season", "is_playoffs",
        # "player_status",  # ✅ Removed: Status should never be a predictor
    ],
    "y_variable": ["success"],
}

if __name__ == "__main__":
    # Test the configuration
    print("NFL Kicker Analysis Configuration")
    print("=" * 40)
    print(f"Data directory: {config.DATA_DIR}")
    print(f"Min distance: {config.MIN_DISTANCE}")
    print(f"Max distance: {config.MAX_DISTANCE}")
    print(f"Distance profile: {config.DISTANCE_PROFILE}")
    print(f"Elite threshold: {config.ELITE_THRESHOLD}")

    # Test directory creation
    config.ensure_directories()
    print("******* Configuration loaded and directories created!")


Overwriting src/nfl_kicker_analysis/config.py


In [None]:
%%writefile src/nfl_kicker_analysis/data/loader.py
"""
Data loading module for NFL kicker analysis.
Handles loading and merging of raw datasets.
"""
import pandas as pd
from pathlib import Path
from typing import Optional, Tuple
import warnings

from src.nfl_kicker_analysis.config import config

class DataLoader:
    """Handles loading and initial merging of kicker datasets."""

    def __init__(self):
        """Initialize the data loader."""
        self.kickers_df = None
        self.attempts_df = None
        self.merged_df = None

    def load_kickers(self, filepath: Optional[Path] = None) -> pd.DataFrame:
        """
        Load kicker metadata.

        Args:
            filepath: Optional path to kickers CSV file

        Returns:
            DataFrame with kicker information
        """
        if filepath is None:
            filepath = config.KICKERS_FILE

        try:
            self.kickers_df = pd.read_csv(filepath)
            print(f"******* Loaded {len(self.kickers_df)} kickers from {filepath}")
            return self.kickers_df
        except FileNotFoundError:
            raise FileNotFoundError(f"Kickers data file not found: {filepath}")

    def load_attempts(self, filepath: Optional[Path] = None) -> pd.DataFrame:
        """
        Load field goal attempts data.

        Args:
            filepath: Optional path to attempts CSV file

        Returns:
            DataFrame with field goal attempt information
        """
        if filepath is None:
            filepath = config.ATTEMPTS_FILE

        try:
            self.attempts_df = pd.read_csv(filepath)
            print(f"******* Loaded {len(self.attempts_df)} field goal attempts from {filepath}")
            return self.attempts_df
        except FileNotFoundError:
            raise FileNotFoundError(f"Attempts data file not found: {filepath}")

    def merge_datasets(self) -> pd.DataFrame:
        """
        Merge kickers and attempts datasets.

        Returns:
            Merged DataFrame with kicker names attached to attempts
        """
        if self.kickers_df is None:
            self.load_kickers()
        if self.attempts_df is None:
            self.load_attempts()

        # Merge on player_id
        self.merged_df = pd.merge(
            self.attempts_df,
            self.kickers_df,
            on='player_id',
            how='left'
        )

        # Validate merge
        missing_kickers = self.merged_df['player_name'].isnull().sum()
        if missing_kickers > 0:
            warnings.warn(f"Found {missing_kickers} attempts with missing kicker info")

        print(f"******* Merged dataset: {self.merged_df.shape[0]} attempts, {self.merged_df.shape[1]} columns")
        return self.merged_df

    def load_complete_dataset(self) -> pd.DataFrame:
        """
        Load and merge complete dataset in one call.

        Returns:
            Complete merged DataFrame
        """
        self.load_kickers()
        self.load_attempts()
        return self.merge_datasets()

    def get_data_summary(self) -> dict:
        """
        Get summary statistics of loaded data.

        Returns:
            Dictionary with data summary information
        """
        if self.merged_df is None:
            raise ValueError("No data loaded. Call load_complete_dataset() first.")

        summary = {
            'total_attempts': len(self.merged_df),
            'unique_kickers': self.merged_df['player_name'].nunique(),
            'unique_seasons': sorted(self.merged_df['season'].unique()),
            'season_types': self.merged_df['season_type'].unique().tolist(),
            'outcome_counts': self.merged_df['field_goal_result'].value_counts().to_dict(),
            'date_range': (
                self.merged_df['game_date'].min(),
                self.merged_df['game_date'].max()
            ),
            'distance_range': (
                self.merged_df['attempt_yards'].min(),
                self.merged_df['attempt_yards'].max()
            )
        }
        return summary

if __name__ == "__main__":
    # Test the data loader
    print("Testing DataLoader...")

    loader = DataLoader()

    try:
        # Load complete dataset
        df = loader.load_complete_dataset()
        print("---------------head-----------------")
        print(df.head())
        print("---------------columns-----------------")
        print(df.columns)

        # Print summary
        summary = loader.get_data_summary()
        print("\nData Summary:")
        print(summary)
        print(f"Total attempts: {summary['total_attempts']:,}")
        print(f"Unique kickers: {summary['unique_kickers']}")
        print(f"season_types: {summary['season_types']}")
        print(f"Seasons: {summary['unique_seasons']}")
        print(f"Outcomes: {summary['outcome_counts']}")

        print("******* DataLoader tests passed!")

    except Exception as e:
        print(f"------------- Error testing DataLoader: {e}")
        print("Note: This is expected if data files are not present.")


Overwriting src/nfl_kicker_analysis/data/loader.py


In [None]:
%%writefile src/nfl_kicker_analysis/data/feature_engineering.py
"""
Feature engineering module for NFL kicker analysis.
Contains all feature creation functions that can be used to build features for modeling.
"""
import pandas as pd
import numpy as np
from typing import Dict, Union, List


class FeatureEngineer:
    """Handles all feature engineering operations."""

    def __init__(self):
        """Initialize the feature engineer."""
        self.kicker_mapping = None

    # ---------------------------------------------------------------------
    # Target / basic temporal features
    # ---------------------------------------------------------------------
    def create_target_variable(self, df: pd.DataFrame) -> pd.DataFrame:
        """Create binary success target variable."""
        df = df.copy()
        df["success"] = (df["field_goal_result"] == "Made").astype(int)
        print(f"******* Created target variable: {df['success'].mean():.1%} success rate")
        return df

    def create_date_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Create date-related features including centered age variables.

        Adds:
        • age_at_attempt     – raw age in years
        • age_c              – centred (30 yrs) & scaled (÷10) age
        • age_c2             – quadratic term (for simple aging curve)
        """
        df = df.copy()

        # Ensure datetime dtypes
        df["game_date"] = pd.to_datetime(df["game_date"])
        df["birthdate"] = pd.to_datetime(df["birthdate"])

        # Age at attempt
        df["age_at_attempt"] = (df["game_date"] - df["birthdate"]).dt.days / 365.25

        # Centered & scaled age (Gelman scaling)
        df["age_c"]  = (df["age_at_attempt"] - 30.0) / 10.0
        df["age_c2"] = df["age_c"] ** 2

        # Seasonal features
        df["day_of_year"] = df["game_date"].dt.dayofyear
        df["month"] = df["game_date"].dt.month
        print("******* Created date features (age_c, age_c2, day_of_year, month)")
        return df

    # ------------------------------------------------------------------
    # OPTIONAL – continuous spline basis for age (k = 3 knots)
    # ------------------------------------------------------------------
    def create_age_spline_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Add natural-cubic-spline basis age_spline_1…k (centered & scaled age).
        Use when nonlinear aging curve is desired.
        """
        try:
            import patsy as ps
        except ImportError:
            print("⚠️  patsy not available - skipping age spline features")
            return df

        df = df.copy()
        # Design matrix returns ndarray with intercept, drop it
        spline = ps.dmatrix("bs(age_c, df=3, degree=3, include_intercept=False)",
                            df, return_type="dataframe")
        # Rename columns nicely
        spline.columns = [f"age_spline_{i+1}" for i in range(spline.shape[1])]
        df = pd.concat([df, spline], axis=1)
        print("******* Created age spline features")
        return df

    # ------------------------------------------------------------------
    # Identifier mapping
    # ------------------------------------------------------------------
    def create_kicker_mapping(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Adds two columns:
          • kicker_id   – raw player_id from the source (GSIS / Stathead)
          • kicker_idx  – zero-based contiguous index used ONLY for matrix ops
        The mapping is cached so that train/test splits share the same indices.
        Also preserves player_name for human-readable analysis.
        """
        df = df.copy()

        df["kicker_id"] = df["player_id"].astype(int)        # ← raw, never mutates
        if self.kicker_mapping is None:
            unique = df["kicker_id"].unique()
            self.kicker_mapping = {pid: i for i, pid in enumerate(sorted(unique))}
        df["kicker_idx"] = df["kicker_id"].map(self.kicker_mapping).astype(int)

        # Ensure player_name is preserved for name-based operations
        # This enables the hybrid approach recommended in the roadmap
        if "player_name" not in df.columns:
            print("⚠️  WARNING: player_name column not found. Name-based operations may not work.")

        print(f"******* Created kicker mapping for {len(self.kicker_mapping)} unique kickers "
              f"(raw_id→idx) with player names preserved")
        return df


    # ------------------------------------------------------------------
    # Distance related features
    # ------------------------------------------------------------------
    def create_distance_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """Create distance-based engineered features, with safe log transform."""
        df = df.copy()
        df["distance_squared"] = df["attempt_yards"] ** 2
        df["distance_cubed"]   = df["attempt_yards"] ** 3
        # Use log1p to handle zero-yard attempts without -inf :contentReference[oaicite:13]{index=13}
        df["log_distance"]     = np.log1p(df["attempt_yards"])
        df["is_long_attempt"]  = (df["attempt_yards"] >= 50).astype(int)
        df["is_very_long_attempt"] = (df["attempt_yards"] >= 55).astype(int)
        q1, q2, q3 = df["attempt_yards"].quantile([0.25, 0.5, 0.75])
        df["distance_category"] = df["attempt_yards"].apply(
            lambda dist: "Short" if dist < q1
                        else "Medium-Short" if dist < q2
                        else "Medium" if dist < q3
                        else "Long"
        )
        df["distance_from_sweet_spot"] = (df["attempt_yards"] - 35).abs()
        print("******* Created distance features (poly, log, quantile categories, flags)")
        return df


    # ------------------------------------------------------------------
    # Experience features
    # ------------------------------------------------------------------
    def create_experience_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Adds cumulative-experience variables.

        Columns created
        ---------------
        kicker_attempt_number : 1-indexed count (already used elsewhere)
        experience_in_kicks   : 0-indexed before current kick
        exp_100               : experience_in_kicks / 100  (for stable priors)
        is_rookie_attempt     : 1 if first ≤20 attempts
        experience_category   : Rookie / Developing / Veteran (10th & 25th pct)
        career_length_years   : yrs since first attempt
        """
        df = df.sort_values(["player_id", "game_date"]).copy()

        # cumulative counts
        df["kicker_attempt_number"] = df.groupby("player_id").cumcount() + 1
        df["experience_in_kicks"]   = df["kicker_attempt_number"] - 1
        df["exp_100"]               = df["experience_in_kicks"] / 100.0

        # career length (years)
        first_dates = df.groupby("player_id")["game_date"].transform("min")
        df["career_length_years"] = (df["game_date"] - first_dates).dt.days / 365.25

        # simple buckets
        df["is_rookie_attempt"] = (df["kicker_attempt_number"] <= 20).astype(int)
        p10, p25 = df["kicker_attempt_number"].quantile([0.1, 0.25])
        df["experience_category"] = df["kicker_attempt_number"].apply(
            lambda n: "Rookie" if n <= p10
            else ("Developing" if n <= p25 else "Veteran")
        )

        print("******* Created experience features (exp_100 added)")
        return df

    # ------------------------------------------------------------------
    # Situational features
    # ------------------------------------------------------------------
    def create_situational_features(
            self,
            df: pd.DataFrame,
            *,
            weight_cfg: dict | None = None
    ) -> pd.DataFrame:
        """
        Adds season / clutch flags *and* an 'importance' weight.

        Parameters
        ----------
        weight_cfg : dict, optional
            Keys: 'late', 'clutch', 'playoff'.
            Defaults = {'late': 1, 'clutch': 2, 'playoff': 4}.
        """
        w = {'late': 1, 'clutch': 2, 'playoff': 4}
        if weight_cfg:
            w.update(weight_cfg)

        df = df.copy()
        df["is_early_season"] = (df["week"] <= 4).astype(int)
        df["is_late_season"]  = (df["week"] >= 15).astype(int)
        df["is_playoffs"]     = (df["week"] >= 18).astype(int)
        df["season_progress"] = df["week"] / 17.0

        # # Optional clutch (leave at 0 if context cols absent)
        # req = ["quarter", "game_seconds_remaining", "score_differential"]
        # missing = [c for c in req if c not in df.columns]
        # if missing:
        #     df["is_clutch"] = 0
        # else:
        #     df["is_clutch"] = (
        #         (df["quarter"] >= 4) &
        #         (df["game_seconds_remaining"] <= 120) &
        #         (df["score_differential"].abs() <= 3)
        #     ).astype(int)

        # --- NEW flexible weighting ------------------------------------------
        df["importance"] = (
            1
            + w['late']     * df["is_late_season"]
            # + w['clutch']   * df["is_clutch"]
            + w['playoff']  * df["is_playoffs"]
        )
        return df


    # ------------------------------------------------------------------
    # Rolling performance history
    # ------------------------------------------------------------------
    def create_performance_history_features(self, df: pd.DataFrame, window_size: int = 10) -> pd.DataFrame:
        """Rolling success rates, similar-distance success, and current streaks."""
        df = df.sort_values(["player_id", "game_date"]).copy()

        # Rolling mean of success
        df["rolling_success_rate"] = (
            df.groupby("player_id")["success"]
            .transform(lambda s: s.rolling(window_size, min_periods=1).mean())
        )

        overall = df["success"].mean()

        def similar_rate(sub):
            """For each attempt, compute mean success rate of prior attempts within ±5 yards."""
            vals = []
            for i in range(len(sub)):
                prev = sub.iloc[:i]
                mask = (prev["attempt_yards"] - sub.iloc[i]["attempt_yards"]).abs() <= 5
                vals.append(prev.loc[mask, "success"].mean() if mask.any() else overall)
            return pd.Series(vals, index=sub.index)

        # Minimal deprecation fix: exclude grouping column before apply
        sim = (
            df.groupby("player_id", group_keys=False)
            .apply(similar_rate, include_groups=False)
            .reset_index(level=0, drop=True)
        )
        df["rolling_similar_distance_success"] = sim

        # Current streak length
        df["current_streak"] = (
            df.groupby("player_id")["success"]
            .transform(lambda x: x.groupby((x != x.shift()).cumsum()).cumcount() + 1)
        )

        print("******* Created performance history features (rolling & streaks)")
        return df



    # ------------------------------------------------------------------
    # Statistical interaction features
    # ------------------------------------------------------------------
    def create_statistical_features(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()
        df["distance_zscore"] = (
            (df["attempt_yards"] - df["attempt_yards"].mean()) /
            df["attempt_yards"].std()
        )
        df["distance_percentile"] = df["attempt_yards"].rank(pct=True)

        # Cleaner interaction names using centered age
        df["age_dist_interact"] = df["age_c"] * df["attempt_yards"]
        df["exp_dist_interact"] = df["exp_100"] * df["attempt_yards"]

        print("******* Created statistical interaction features (age × distance, exp × distance)")
        return df

    def create_player_status_features(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Create player status categorization based on recent activity relative to dataset's latest date.

        Categories:
        - Retired/Injured: 2+ years since last kick (730+ days)
        - Not Playing/Potentially Playable: 1-2 years since last kick (365-729 days)
        - Playable: Less than 1 year since last kick (0-364 days)
        """
        df = df.copy()

        # Ensure datetime format
        df["game_date"] = pd.to_datetime(df["game_date"])

        # Find the latest date in the dataset (reference point)
        latest_date = df["game_date"].max()

        # Calculate last kick date for each player
        last_kick_by_player = df.groupby("player_id")["game_date"].max().reset_index()
        last_kick_by_player.columns = ["player_id", "last_kick_date"]

        # Calculate days since last kick
        last_kick_by_player["days_since_last_kick"] = (
            latest_date - last_kick_by_player["last_kick_date"]
        ).dt.days

        # Categorize player status
        def categorize_status(days_since_last):
            if days_since_last >= 730:  # 2+ years
                return "Retired/Injured"
            elif days_since_last >= 365:  # 1-2 years
                return "Not Playing/Potentially Playable"
            else:  # < 1 year
                return "Playable"

        last_kick_by_player["player_status"] = last_kick_by_player["days_since_last_kick"].apply(categorize_status)

        # Merge back to main dataframe
        df = df.merge(
            last_kick_by_player[["player_id", "player_status", "days_since_last_kick"]],
            on="player_id",
            how="left"
        )

        # Add summary statistics
        status_counts = last_kick_by_player["player_status"].value_counts()
        print("******* Created player status features based on recent activity")
        print(f"   Reference date: {latest_date.strftime('%Y-%m-%d')}")
        print(f"   Playable: {status_counts.get('Playable', 0)} players (<1 year)")
        print(f"   Not Playing/Potentially Playable: {status_counts.get('Not Playing/Potentially Playable', 0)} players (1-2 years)")
        print(f"   Retired/Injured: {status_counts.get('Retired/Injured', 0)} players (2+ years)")

        return df

    # ------------------------------------------------------------------
    # Orchestration: build *all* features
    # ------------------------------------------------------------------
    def create_all_features(
        self,
        df: pd.DataFrame,
        include_performance_history: bool = True,
        performance_window: int = 10,
        include_player_status: bool = True
    ) -> pd.DataFrame:
        print("Creating all features...")
        df = (
            df.pipe(self.create_target_variable)
              .pipe(self.create_date_features)
              .pipe(self.create_kicker_mapping)
              .pipe(self.create_distance_features)
              .pipe(self.create_experience_features)
              .pipe(self.create_situational_features)
              .pipe(self.create_statistical_features)
        )
        if include_performance_history:
            df = self.create_performance_history_features(df, performance_window)
        if include_player_status:
            df = self.create_player_status_features(df)
        print(f"******* All features created! Dataset shape: {df.shape}")
        return df

    # ------------------------------------------------------------------
    # Dynamic feature catalogue helper
    # ------------------------------------------------------------------
    def get_available_features(
        self,
        df: pd.DataFrame,
        include_unique: bool = True,
        max_unique_values: int = 20
    ) -> Dict[str, Union[List[str], Dict[str, List[Union[str, int, float]]]]]:
        """Return feature categories -> features (optionally with uniques) after engineering."""
        base_catalog = {
            "target": ["success"],
            "basic": ["attempt_yards", "age_at_attempt", "kicker_attempt_number", "importance"],
            "distance": [
                "distance_squared", "distance_cubed", "log_distance",
                "distance_from_sweet_spot", "distance_zscore", "distance_percentile"
            ],
            "distance_flags": ["is_long_attempt", "is_very_long_attempt"],
            "distance_categories": ["distance_category"],
            "temporal": ["day_of_year", "month", "week", "season", "season_progress"],
            "situational": ["is_early_season", "is_late_season", "is_playoffs"],
            "experience": ["career_length_years", "is_rookie_attempt", "experience_category"],
            "performance_history": [
                "rolling_success_rate", "rolling_similar_distance_success", "current_streak"
            ],
            "interactions": ["age_distance_interaction", "experience_distance_interaction"],
            "identifiers": ["kicker_id", "kicker_idx", "player_name"]
        }
        catalog: Dict[str, Union[List[str], Dict[str, List[Union[str, int, float]]]]] = {}
        for cat, feats in base_catalog.items():
            present = [f for f in feats if f in df.columns]
            if include_unique:
                detail: Dict[str, List[Union[str, int, float]]] = {}
                for f in present:
                    if df[f].dtype == "object" or df[f].nunique() <= max_unique_values:
                        detail[f] = sorted(df[f].dropna().unique().tolist())
                    else:
                        detail[f] = []
                catalog[cat] = detail
            else:
                catalog[cat] = present
        return catalog

# Function to create a summary DataFrame
def summarize(df):
    summary = pd.DataFrame({
        'dtype': df.dtypes.astype(str),
        'missing': df.isnull().sum(),
        'unique': df.nunique(),
        'sample_values': df.apply(lambda col: col.dropna().unique()[:5] if col.nunique() > 5 else col.unique())
    })
    return summary

if __name__ == "__main__":
    from src.nfl_kicker_analysis.data.loader import DataLoader

    loader = DataLoader()
    df_raw = loader.load_complete_dataset()
    engineer = FeatureEngineer()
    df_feat = engineer.create_all_features(df_raw)

    summary_fg = summarize(df_feat)
    print("---------------summary_fg-----------------")
    print(summary_fg)


Overwriting src/nfl_kicker_analysis/data/feature_engineering.py


In [None]:
%%writefile src/nfl_kicker_analysis/data/feature_selection.py
"""
New in v0.2.0
-------------
* Added Random-Forest impurity importance (`compute_rf_importance`)
* Added tri-modal merge and multicollinearity pruning
* Re-worked `select_final_features` to call these helpers

New in v0.3.0
-------------
* Added mutable `FEATURE_LISTS` dictionary for flexible schema management
* Added `DynamicSchema` class to replace hardcoded `_ColumnSchema`
* Added `make_feature_matrix` helper for consistent X/y construction
* Updated functions to accept explicit schema parameter
"""

import pandas as pd
import numpy as np

# ── NEW: model and importance imports
from sklearn.ensemble import RandomForestRegressor
from sklearn.inspection import permutation_importance
from sklearn.preprocessing import minmax_scale
import shap
from pathlib import Path
import shapiq
from sklearn.utils import resample
from itertools import combinations
import json

# ── NEW: dataclass and typing imports for DynamicSchema
from dataclasses import dataclass, field
from typing import List, Dict



# ------------------------------------------------------------------
# 🧩 Lightweight runtime schema object
# ------------------------------------------------------------------
@dataclass
class DynamicSchema:
    lists: Dict[str, List[str]] = field(default_factory=dict)

    @property
    def numerical(self):   return self.lists.get("numerical", [])
    @property
    def ordinal(self):     return self.lists.get("ordinal", [])
    @property
    def nominal(self):     return self.lists.get("nominal", [])
    @property
    def target(self):      return self.lists.get("y_variable", [])[0]
    def all_features(self): return (
        self.numerical + self.ordinal + self.nominal
    )

# ── Schema-aware utilities ──────────────────────────────────────────────
def restrict_to_numerical(df: pd.DataFrame, schema: DynamicSchema) -> pd.DataFrame:
    """
    Return a view of `df` that contains only the columns listed under
    schema.numerical. Trust the schema's numerical list as the source of truth.
    """
    return df[schema.numerical].copy()


def update_schema_numerical(schema: DynamicSchema, new_numericals: list[str]) -> None:
    """
    In-place replacement of the numerical list inside the DynamicSchema.
    Keeps a copy of the old list for logging/debugging.
    """
    old = schema.numerical
    schema.lists["numerical"] = sorted(new_numericals)
    added   = sorted(set(new_numericals) - set(old))
    removed = sorted(set(old)            - set(new_numericals))
    print(f"🔄  Schema update → numerical features now = {len(new_numericals)} columns")
    if added:   print(f"   ➕ added   : {added}")
    if removed: print(f"   ➖ removed : {removed}")


def make_feature_matrix(df: pd.DataFrame,
                        schema: DynamicSchema,
                        numeric_only: bool = True
                       ) -> tuple[pd.DataFrame, pd.Series]:
    """Return X (features) and y (target) based on the supplied schema."""
    X: pd.DataFrame = restrict_to_numerical(df, schema) if numeric_only else df[schema.all_features()].copy()
    y: pd.Series = df[schema.target]
    return X, y

def train_baseline_model(X, y):
    """
    Fit a RandomForestRegressor on X, y.
    Returns the fitted model.
    """
    # You can adjust hyperparameters as needed
    model = RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1)
    model.fit(X, y)
    return model



def compute_permutation_importance(
    model,
    X: pd.DataFrame,
    y: pd.Series,
    n_repeats: int = 10,
    n_jobs: int = 1,
    max_samples: float | int | None = None,
    random_state: int = 42,
    verbose: bool = True
) -> pd.DataFrame:
    """
    Compute permutation importances with controlled resource usage.

    Parameters
    ----------
    model : estimator
        Fitted model implementing .predict and .score.
    X : pd.DataFrame
        Features.
    y : pd.Series or array
        Target.
    n_repeats : int
        Number of shuffles per feature.
    n_jobs : int
        Number of parallel jobs (avoid -1 on Windows).
    max_samples : float or int, optional
        If float in (0,1], fraction of rows to sample.
        If int, absolute number of rows to sample.
    random_state : int
        Seed for reproducibility.
    verbose : bool
        Print debug info if True.

    Returns
    -------
    pd.DataFrame
        Columns: feature, importance_mean, importance_std.
        Sorted descending by importance_mean.
    """
    if verbose:
        print(f"⏳ Computing permutation importances on {X.shape[0]} rows × {X.shape[1]} features")
        print(f"   n_repeats={n_repeats}, n_jobs={n_jobs}, max_samples={max_samples}")

    # Subsample if requested
    X_sel, y_sel = X, y
    if max_samples is not None:
        if isinstance(max_samples, float):
            nsamp = int(len(X) * max_samples)
        else:
            nsamp = int(max_samples)
        if verbose:
            print(f"   Subsampling to {nsamp} rows for speed")
        X_sel, y_sel = resample(X, y, replace=False, n_samples=nsamp, random_state=random_state)

    try:
        result = permutation_importance(
            model,
            X_sel, y_sel,
            n_repeats=n_repeats,
            random_state=random_state,
            n_jobs=n_jobs,
        )
    except OSError as e:
        # Graceful fallback to single job
        if verbose:
            print(f"⚠️  OSError ({e}). Retrying with n_jobs=1")
        result = permutation_importance(
            model,
            X_sel, y_sel,
            n_repeats=n_repeats,
            random_state=random_state,
            n_jobs=1,
        )

    # Build and sort DataFrame
    importance_df = (
        pd.DataFrame({
            "feature": X.columns,
            "importance_mean": result.importances_mean,
            "importance_std": result.importances_std,
        })
        .sort_values("importance_mean", ascending=False)
        .reset_index(drop=True)
    )
    if verbose:
        print("✅ Permutation importances computed.")
    return importance_df


def compute_shap_importance(model, X, nsamples=100):
    """
    Compute mean absolute SHAP values per feature.
    Returns a DataFrame sorted by importance.
    """
    # DeepExplainer or TreeExplainer for tree-based models
    explainer = shap.TreeExplainer(model)
    # sample for speed
    X_sample = X.sample(n=min(nsamples, len(X)), random_state=42)
    shap_values = explainer.shap_values(X_sample)
    # For regression, shap_values is a 2D array
    mean_abs_shap = pd.DataFrame({
        "feature": X_sample.columns,
        "shap_importance": np.abs(shap_values).mean(axis=0),
    })
    mean_abs_shap = mean_abs_shap.sort_values("shap_importance", ascending=False).reset_index(drop=True)
    return mean_abs_shap


def compute_rf_importance(model, feature_names: list[str]) -> pd.DataFrame:
    """Return impurity-based RF importances."""
    if not hasattr(model, "feature_importances_"):
        raise AttributeError("Model has no feature_importances_ attribute")
    return (
        pd.DataFrame(
            {"feature": feature_names,
             "rf_importance": model.feature_importances_}
        )
        .sort_values("rf_importance", ascending=False)
        .reset_index(drop=True)
    )


def merge_and_score_importances(perm_df, shap_df, rf_df) -> pd.DataFrame:
    """Merge the three importance tables and add a combined_score column."""
    merged = (
        perm_df.merge(shap_df, on="feature", how="outer")
               .merge(rf_df,  on="feature", how="outer")
               .fillna(0.0)
    )
    # Min-max normalise each column so weights are comparable
    for col in ["importance_mean", "shap_importance", "rf_importance"]:
        merged[f"norm_{col}"] = minmax_scale(merged[col].values)
    merged["combined_score"] = merged[
        ["norm_importance_mean", "norm_shap_importance", "norm_rf_importance"]
    ].mean(axis=1)
    return merged.sort_values("combined_score", ascending=False).reset_index(drop=True)


def drop_multicollinear(X: pd.DataFrame,
                        ranked_feats: pd.DataFrame,
                        corr_threshold: float = 0.85,
                        method: str = "pearson") -> list[str]:
    """
    Remove the lower-scoring feature from each highly correlated pair.
    """
    corr = X[ranked_feats["feature"]].corr().abs()
    to_drop = set()
    for f1, f2 in combinations(ranked_feats["feature"], 2):
        if corr.loc[f1, f2] > corr_threshold:
            # keep the one with higher combined_score
            better = f1 if ranked_feats.set_index("feature").loc[f1,"combined_score"] \
                     >= ranked_feats.set_index("feature").loc[f2,"combined_score"] else f2
            worse  = f2 if better == f1 else f1
            to_drop.add(worse)
    return [f for f in ranked_feats["feature"] if f not in to_drop]


def filter_permutation_features(
    importance_df: pd.DataFrame,
    threshold: float
) -> list[str]:
    """
    Return features whose permutation importance_mean exceeds threshold.
    """
    kept = importance_df.loc[
        importance_df["importance_mean"] > threshold, "feature"
    ]
    return kept.tolist()


def filter_shap_features(
    importance_df: pd.DataFrame,
    threshold: float
) -> list[str]:
    """
    Return features whose shap_importance exceeds threshold.
    """
    kept = importance_df.loc[
        importance_df["shap_importance"] > threshold, "feature"
    ]
    return kept.tolist()


def select_final_features(perm_df,
                          shap_df,
                          rf_df,
                          X: pd.DataFrame,
                          schema: DynamicSchema,
                          perm_thresh: float = 0.01,
                          shap_thresh: float = 0.01,
                          rf_thresh: float = 0.01,
                          corr_threshold: float = 0.85) -> list[str]:
    """
    Return the final feature list after:
      1. Individual thresholding on each importance type
      2. Union/intersection logic (here: *intersection*)
      3. Combined-score ranking
      4. Multicollinearity pruning
    """
    # --- step 1: individual screens ---
    keep_perm = perm_df.loc[perm_df["importance_mean"] > perm_thresh, "feature"]
    keep_shap = shap_df.loc[shap_df["shap_importance"]   > shap_thresh, "feature"]
    keep_rf   = rf_df.loc[rf_df["rf_importance"]         > rf_thresh,   "feature"]
    intersect = set(keep_perm) & set(keep_shap) & set(keep_rf)

    # --- step 2: combine & rank only those ---
    merged = merge_and_score_importances(
        perm_df[perm_df["feature"].isin(intersect)],
        shap_df[shap_df["feature"].isin(intersect)],
        rf_df[rf_df["feature"].isin(intersect)]
    )

    # --- step 3: drop multicollinear ---
    final_feats = drop_multicollinear(X, merged, corr_threshold)

    return final_feats


# ============== BACKWARD COMPATIBILITY HELPERS ==============

def select_final_features_legacy(
    perm_feats: list[str],
    shap_feats: list[str],
    mode: str = "intersection"
) -> list[str]:
    """
    DEPRECATED: Legacy version for backward compatibility.
    Use select_final_features with DataFrame inputs for enhanced functionality.
    """
    import warnings
    warnings.warn(
        "select_final_features_legacy is deprecated. Use select_final_features with DataFrame inputs.",
        DeprecationWarning,
        stacklevel=2
    )
    set_perm = set(perm_feats)
    set_shap = set(shap_feats)
    if mode == "union":
        final = set_perm | set_shap
    else:
        final = set_perm & set_shap
    return sorted(final)


# ==============================================================


def load_final_features(
    file_path: str = "data/models/features/final_features.txt"
) -> list[str]:
    """
    Read the newline-delimited feature names file and return as a list.
    """
    with open(file_path, "r") as fp:
        return [line.strip() for line in fp if line.strip()]


def filter_to_final_features(df: pd.DataFrame,
                             final_feats_file: str,
                             schema: DynamicSchema,
                             id_cols: list[str] = None
                            ) -> pd.DataFrame:
    """Return df[id + final + y] using the dynamic schema."""
    final_feats = load_final_features(final_feats_file)
    id_cols = id_cols or []
    missing = set(id_cols + final_feats + [schema.target]) - set(df.columns)
    if missing:
        raise ValueError(f"Missing columns after filtering: {missing}")
    return df[id_cols + final_feats + [schema.target]].copy()


if __name__ == "__main__":
    from src.nfl_kicker_analysis.data.loader import DataLoader
    from src.nfl_kicker_analysis.data.feature_engineering import FeatureEngineer
    import os

    # ------------------------------------------------------------------
    # 🔧 Single source of truth for column roles – edit freely
    # ------------------------------------------------------------------
    FEATURE_LISTS = {
        "numerical": [
            "attempt_yards", "age_at_attempt", "distance_squared",
            "career_length_years", "season_progress", "rolling_success_rate",
            "current_streak", "distance_zscore", "distance_percentile",
        ],
        "ordinal":  ["season", "week", "month", "day_of_year"],
        "nominal":  [
            "kicker_id", "is_long_attempt", "is_very_long_attempt",
            "is_rookie_attempt", "distance_category", "experience_category",
        ],
        "y_variable": ["success"],
    }

    # ➊  Build schema from the dict
    schema = DynamicSchema(FEATURE_LISTS)

    # ➋  Load & feature-engineer
    loader = DataLoader()
    df_raw = loader.load_complete_dataset()
    engineer = FeatureEngineer()
    df_feat = engineer.create_all_features(df_raw)
    print("---------------df_feat---------------")
    print(df_feat.head())
    print("---------------df_feat.columns---------------")
    print(df_feat.columns)
    print("---------------schema.all_features()---------------")
    print(schema.all_features())

    # ➌  Make X / y using the new helper
    X, y = make_feature_matrix(df_feat, schema)

    print(f"\n📊 Starting enhanced feature selection with {X.shape[1]} features")
    print(f"   Schema contains {len(schema.all_features())} total features defined")

    # ➍  Run the tri-modal importance pipeline exactly as before
    print("\n🌲 Training Random Forest model...")
    model = train_baseline_model(X, y)

    print("\n⚡ Computing permutation importance...")
    perm_df = compute_permutation_importance(model, X, y, max_samples=0.3)

    print("\n🔍 Computing SHAP importance...")
    shap_df = compute_shap_importance(model, X, nsamples=100)

    print("\n🌳 Computing Random Forest importance...")
    rf_df = compute_rf_importance(model, X.columns.tolist())

    # 📊 Display top 10 features for each importance metric
    print("\n📈 Top 10 Features by Importance Metric:")
    print("\nPermutation Importance:")
    print(perm_df.head(10)[["feature", "importance_mean"]].to_string(index=False))

    print("\nSHAP Importance:")
    print(shap_df.head(10)[["feature", "shap_importance"]].to_string(index=False))

    print("\nRandom Forest Importance:")
    print(rf_df.head(10)[["feature", "rf_importance"]].to_string(index=False))

    # 🔬 Select & de-correlate with detailed output
    print("\n🔍 Analyzing feature correlations...")
    corr = X[X.columns].corr().abs()

    # Find highly correlated pairs before feature selection
    high_corr_pairs = []
    for f1, f2 in combinations(X.columns, 2):
        if corr.loc[f1, f2] > 0.85:  # Using same threshold as drop_multicollinear
            high_corr_pairs.append({
                'feature1': f1,
                'feature2': f2,
                'correlation': corr.loc[f1, f2]
            })

    if high_corr_pairs:
        print("\n📊 Highly correlated feature pairs (correlation > 0.85):")
        for pair in sorted(high_corr_pairs, key=lambda x: x['correlation'], reverse=True):
            print(f"\n{pair['feature1']} ↔️ {pair['feature2']}")
            print(f"Correlation: {pair['correlation']:.3f}")

            # Get importance scores for both features
            f1_scores = {
                'perm': float(perm_df[perm_df.feature == pair['feature1']].importance_mean.iloc[0]),
                'shap': float(shap_df[shap_df.feature == pair['feature1']].shap_importance.iloc[0]),
                'rf': float(rf_df[rf_df.feature == pair['feature1']].rf_importance.iloc[0])
            }
            f2_scores = {
                'perm': float(perm_df[perm_df.feature == pair['feature2']].importance_mean.iloc[0]),
                'shap': float(shap_df[shap_df.feature == pair['feature2']].shap_importance.iloc[0]),
                'rf': float(rf_df[rf_df.feature == pair['feature2']].rf_importance.iloc[0])
            }

            print(f"{pair['feature1']} importance scores:")
            print(f"  Permutation: {f1_scores['perm']:.4f}")
            print(f"  SHAP: {f1_scores['shap']:.4f}")
            print(f"  RF: {f1_scores['rf']:.4f}")

            print(f"{pair['feature2']} importance scores:")
            print(f"  Permutation: {f2_scores['perm']:.4f}")
            print(f"  SHAP: {f2_scores['shap']:.4f}")
            print(f"  RF: {f2_scores['rf']:.4f}")

            # Calculate average importance
            f1_avg = sum(f1_scores.values()) / 3
            f2_avg = sum(f2_scores.values()) / 3

            keeper = pair['feature1'] if f1_avg >= f2_avg else pair['feature2']
            dropped = pair['feature2'] if keeper == pair['feature1'] else pair['feature1']
            print(f"\n➡️ Decision: Keep {keeper} (avg importance: {max(f1_avg, f2_avg):.4f})")
            print(f"❌ Drop {dropped} (avg importance: {min(f1_avg, f2_avg):.4f})")
    else:
        print("No highly correlated feature pairs found.")

    # Run feature selection
    final_features = select_final_features(
        perm_df, shap_df, rf_df, X, schema,
        perm_thresh=0.005, shap_thresh=0.005, rf_thresh=0.005
    )
    print(f"---------------final_features---------------")
    print(final_features)
    # output final_features to final_features.txt
    with open("data/models/features/final_features.txt", "w") as f:
        for feat in final_features:
            f.write(feat + "\n")


    # read final_features.txt
    with open("data/models/features/final_features.txt", "r") as f:
        final_features = [line.strip() for line in f]
    print(f"---------------final_features---------------")
    print(final_features)
    numeric_final = [f for f in final_features if f in schema.numerical]

    print(f"\n✨ Final feature count: {len(numeric_final)}")
    print("Selected features:")
    for feat in numeric_final:
        print(f"  • {feat}")

    # 🔄 Push into schema so every later stage sees the new list
    update_schema_numerical(schema, numeric_final)

    # output final_features from schema
    FEATURE_LISTS = schema.lists
    print(f"---------------FEATURE_LISTS---------------")
    print(FEATURE_LISTS)


Overwriting src/nfl_kicker_analysis/data/feature_selection.py


## 3. Data Preprocessing Module

Handles data cleaning, feature engineering, and preparation for modeling.


In [None]:
%%writefile src/nfl_kicker_analysis/data/preprocessor.py
"""
Data preprocessing module for NFL kicker analysis.
Handles filtering, feature selection, and preparation for modeling.

New in v0.4.0
--------------
* Added **inverse‑preprocessing** utilities so that any matrix produced by the
  fitted `ColumnTransformer` can be projected back into human‑readable feature
  space.  This is handy for debugging, error analysis, or piping model outputs
  into post‑processing code that expects the raw‑scale feature values.
"""
from __future__ import annotations

import pandas as pd
import numpy as np
from typing import Optional, Tuple, Dict, List, Union, cast, Any, TypeVar, Protocol
from datetime import datetime

from sklearn.preprocessing import FunctionTransformer, StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from scipy import sparse

from src.nfl_kicker_analysis.data.feature_engineering import FeatureEngineer
from src.nfl_kicker_analysis.data.feature_schema import FeatureSchema
from src.nfl_kicker_analysis import config


# Type variables for better type hints
T = TypeVar('T')
MatrixType = Union[np.ndarray, sparse.spmatrix]


class Transformer(Protocol):
    """Protocol for scikit-learn transformers."""
    def fit_transform(self, X: Any, y: Any = None) -> MatrixType: ...
    def transform(self, X: Any) -> MatrixType: ...
    def inverse_transform(self, X: MatrixType) -> np.ndarray: ...


class DataPreprocessor:
    """Handles data preprocessing, feature engineering, and *now* inversion."""

    def __init__(self):
        """Create a blank preprocessor – remember to inject config / features."""
        # Data‑filtering hyper‑parameters (set via :pymeth:`update_config`).
        self.MIN_DISTANCE: int | None = None
        self.MAX_DISTANCE: int | None = None
        self.MIN_KICKER_ATTEMPTS: int | None = None
        self.SEASON_TYPES: list[str] | None = None

        # Feature‑role lists (set via :pymeth:`update_feature_lists`).
        self.NUMERICAL_FEATURES: list[str] = []
        self.ORDINAL_FEATURES: list[str] = []
        self.NOMINAL_FEATURES: list[str] = []

        # Target column
        self.TARGET: str = "success"

        # Feature‑engineering toggles (again, set via config).
        self.INCLUDE_PERFORMANCE_HISTORY: bool | None = None
        self.INCLUDE_STATISTICAL_FEATURES: bool | None = None
        self.INCLUDE_PLAYER_STATUS: bool | None = None
        self.PERFORMANCE_WINDOW: int | None = None

        # Runtime artefacts ------------------------------------------------
        self.feature_engineer = FeatureEngineer()
        self.raw_data: pd.DataFrame | None = None
        self.processed_data: pd.DataFrame | None = None
        self.schema: FeatureSchema | None = None

        # ➕ NEW – fitted column transformer for forward & inverse transforms
        self.column_transformer_: ColumnTransformer | None = None
        self._feature_cols_: List[str] | None = None  # Cache feature columns after fitting

    def update_feature_lists(self,
                           numerical: Optional[List[str]] = None,
                           ordinal: Optional[List[str]] = None,
                           nominal: Optional[List[str]] = None,
                           y_variable: Optional[List[str]] = None):
        """
        Update feature lists for easy experimentation.

        Args:
            numerical: List of numerical features to use
            ordinal: List of ordinal features to use
            nominal: List of nominal categorical features to use
            y_variable: List containing target variable name
        """
        if numerical is not None:
            self.NUMERICAL_FEATURES = numerical
        if ordinal is not None:
            self.ORDINAL_FEATURES = ordinal
        if nominal is not None:
            self.NOMINAL_FEATURES = nominal
        if y_variable is not None:
            self.TARGET = y_variable[0]  # Single target assumed

        print("******* Feature lists updated")

    def update_config(self,
                     min_distance: Optional[int] = None,
                     max_distance: Optional[int] = None,
                     min_kicker_attempts: Optional[int] = None,
                     season_types: Optional[List[str]] = None,
                     include_performance_history: Optional[bool] = None,
                     include_statistical_features: Optional[bool] = None,
                     include_player_status: Optional[bool] = None,
                     performance_window: Optional[int] = None):
        """
        Update preprocessing configuration.

        Args:
            min_distance: Minimum field goal distance to include
            max_distance: Maximum field goal distance to include
            min_kicker_attempts: Minimum attempts required per kicker
            season_types: List of season types to include
            include_performance_history: Whether to include performance history features
            include_statistical_features: Whether to include statistical features
            performance_window: Window size for rolling performance features
        """
        if min_distance is not None:
            self.MIN_DISTANCE = min_distance
        if max_distance is not None:
            self.MAX_DISTANCE = max_distance
        if min_kicker_attempts is not None:
            self.MIN_KICKER_ATTEMPTS = min_kicker_attempts
        if season_types is not None:
            self.SEASON_TYPES = season_types
        if include_performance_history is not None:
            self.INCLUDE_PERFORMANCE_HISTORY = include_performance_history
        if include_statistical_features is not None:
            self.INCLUDE_STATISTICAL_FEATURES = include_statistical_features
        if include_player_status is not None:
            self.INCLUDE_PLAYER_STATUS = include_player_status
        if performance_window is not None:
            self.PERFORMANCE_WINDOW = performance_window

        print("******* Configuration updated")

    def _validate_config(self):
        """Validate that required configuration is set."""
        missing = []
        if self.MIN_DISTANCE is None:
            missing.append("MIN_DISTANCE")
        if self.MAX_DISTANCE is None:
            missing.append("MAX_DISTANCE")
        if self.MIN_KICKER_ATTEMPTS is None:
            missing.append("MIN_KICKER_ATTEMPTS")
        if self.SEASON_TYPES is None:
            missing.append("SEASON_TYPES")
        if self.INCLUDE_PERFORMANCE_HISTORY is None:
            missing.append("INCLUDE_PERFORMANCE_HISTORY")
        if self.INCLUDE_STATISTICAL_FEATURES is None:
            missing.append("INCLUDE_STATISTICAL_FEATURES")
        if self.INCLUDE_PLAYER_STATUS is None:
            missing.append("INCLUDE_PLAYER_STATUS")
        if self.PERFORMANCE_WINDOW is None:
            missing.append("PERFORMANCE_WINDOW")

        if missing:
            raise ValueError(f"Configuration not set. Please call update_config() first. Missing: {missing}")

    def filter_season_type(self, df: pd.DataFrame) -> pd.DataFrame:
        """Filter data by season type."""
        self._validate_config()
        assert self.SEASON_TYPES is not None  # guaranteed after validation
        filtered_df = cast(pd.DataFrame, df[df['season_type'].isin(self.SEASON_TYPES)].copy())
        print(f"******* Filtered to {self.SEASON_TYPES} season(s): {len(filtered_df):,} attempts")
        return filtered_df

    def filter_blocked_field_goals(self, df: pd.DataFrame) -> pd.DataFrame:
        """Filter out blocked field goals since they don't reflect kicker skill."""
        filtered_df = cast(pd.DataFrame, df[df['field_goal_result'] != 'Blocked'].copy())
        removed = len(df) - len(filtered_df)
        print(f"******* Filtered out blocked field goals")
        print(f"   Removed {removed} blocked attempts, kept {len(filtered_df):,}")
        return filtered_df

    def filter_retired_injured_players(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Optionally drop Retired/Injured attempts (status created upstream).
        Controlled by config.FILTER_RETIRED_INJURED.
        """
        if "player_status" not in df.columns or not config.FILTER_RETIRED_INJURED:
            return df                # no-op if flag is False or column missing

        keep_mask = df["player_status"] != "Retired/Injured"
        dropped   = (~keep_mask).sum()
        players   = df.loc[~keep_mask, "player_name"].nunique()

        print(f"🗑️  Filtered {dropped} attempts from {players} retired/injured players")
        return cast(pd.DataFrame, df[keep_mask].copy())

    def filter_distance_range(self, df: pd.DataFrame) -> pd.DataFrame:
        """Filter data by distance range to remove outliers."""
        self._validate_config()
        filtered_df = cast(pd.DataFrame, df[
            (df['attempt_yards'] >= self.MIN_DISTANCE) &
            (df['attempt_yards'] <= self.MAX_DISTANCE)
        ].copy())

        removed = len(df) - len(filtered_df)
        print(f"******* Filtered distance range {self.MIN_DISTANCE}-{self.MAX_DISTANCE} yards")
        print(f"   Removed {removed} extreme attempts, kept {len(filtered_df):,}")
        return filtered_df

    def filter_min_attempts(self, df: pd.DataFrame) -> pd.DataFrame:
        """Filter out kickers with too few attempts."""
        self._validate_config()
        # Count attempts per kicker
        kicker_counts = df['player_name'].value_counts()
        valid_kickers = kicker_counts[kicker_counts >= self.MIN_KICKER_ATTEMPTS]
        # Convert index to list safely using pandas Series methods
        valid_kicker_names = cast(pd.Series, valid_kickers).index.tolist()

        # Filter to valid kickers only
        filtered_df = cast(pd.DataFrame, df[df['player_name'].isin(valid_kicker_names)].copy())

        removed_kickers = len(kicker_counts) - len(valid_kickers)
        print(f"******* Filtered kickers with <{self.MIN_KICKER_ATTEMPTS} attempts")
        print(f"   Removed {removed_kickers} kickers, kept {len(valid_kickers)}")
        print(f"   Final dataset: {len(filtered_df):,} attempts")

        return filtered_df

    def _get_selected_features(self) -> List[str]:
        """Get the complete list of selected features based on configuration."""
        features = []

        # Include all feature categories
        features.extend(self.NUMERICAL_FEATURES)
        features.extend(self.ORDINAL_FEATURES)
        features.extend(self.NOMINAL_FEATURES)

        # Remove duplicates while preserving order
        return list(dict.fromkeys(features))

    def _build_schema(self, df: pd.DataFrame) -> FeatureSchema:
        """Build the feature schema based on selected features."""
        selected_features = self._get_selected_features()

        # Filter feature lists to only include features that exist in the dataframe
        available_features = set(df.columns)

        numerical = [f for f in self.NUMERICAL_FEATURES if f in available_features]
        ordinal = [f for f in self.ORDINAL_FEATURES if f in available_features]
        nominal = [f for f in self.NOMINAL_FEATURES if f in available_features]

        schema = FeatureSchema(
            numerical=numerical,
            binary=[],  # Binary features are now in nominal
            ordinal=ordinal,
            nominal=nominal,
            target=self.TARGET,
        )

        # Validate schema
        try:
            schema.assert_in_dataframe(df)
        except AssertionError as e:
            print(f"Warning: Schema validation failed: {e}")
            print("Available features:", list(available_features))
            print("Requested features:", selected_features)

        return schema

    def make_column_transformer(self) -> ColumnTransformer:
        """
        Return a scikit-learn ColumnTransformer based on the feature schema.
        Call after `preprocess_complete()`.
        """
        if self.schema is None:
            raise AttributeError("Run preprocess_complete() before building transformers.")

        # Numeric pipeline - standard scaling
        numeric_pipe = Pipeline(
            steps=[
                ("scale", StandardScaler())
            ]
        )

        # Build transformers list
        transformers = []

        if self.schema.numerical:
            transformers.append(("num_scaled", numeric_pipe, self.schema.numerical))

        if self.schema.binary:
            transformers.append(("binary_passthrough", "passthrough", self.schema.binary))

        if self.schema.ordinal:
            transformers.append(("ordinal_passthrough", "passthrough", self.schema.ordinal))

        if self.schema.nominal:
            transformers.append(("nominal_onehot",
                               OneHotEncoder(handle_unknown="ignore", sparse_output=True),
                               self.schema.nominal))

        # Categorical features are now included in nominal features
        # No separate categorical handling needed

        ct = ColumnTransformer(
            transformers=transformers,
            remainder="drop",
            verbose_feature_names_out=False,
        )
        return ct

    # ── src/nfl_kicker_analysis/data/preprocessor.py ─────────────
    def preprocess_slice(self, raw_df: pd.DataFrame) -> pd.DataFrame:
        """Run the exact notebook filters on an arbitrary slice *without* mutating
           global state (so train/test can differ)."""
        self._validate_config()

        df = raw_df.copy()

        # Check if features are already engineered (to avoid double processing)
        features_already_present = 'success' in df.columns and 'player_status' in df.columns

        if not features_already_present:
            # Only run feature engineering if not already done
            df = self.feature_engineer.create_all_features(
                df,
                include_performance_history=self.INCLUDE_PERFORMANCE_HISTORY,
                performance_window=self.PERFORMANCE_WINDOW,
                include_player_status=self.INCLUDE_PLAYER_STATUS,
            )
            if self.INCLUDE_STATISTICAL_FEATURES:
                df = self.feature_engineer.create_statistical_features(df)

        # Apply filtering steps
        df = self.filter_season_type(df)
        df = self.filter_blocked_field_goals(df)    # ⬅ NEW: filter blocked attempts
        df = self.filter_distance_range(df)
        df = self.filter_min_attempts(df)           # ⬅ identical min-att logic

        # Filter retired/injured players (needs player_status column)
        df = self.filter_retired_injured_players(df)

        self._build_schema(df)                      # rebuild OHE categories
        return df


    def preprocess_complete(self, raw_df: pd.DataFrame, *, inplace: bool = True) -> pd.DataFrame:
        """
        Complete preprocessing pipeline.

        Args:
            raw_df: Raw merged DataFrame
            inplace: If True (default), store results in instance attributes.
                    If False, behaves like preprocess_slice() and leaves internal state untouched.

        Returns:
            Fully preprocessed DataFrame ready for modeling
        """
        if inplace:
            # Validate configuration first
            self._validate_config()

            self.raw_data = raw_df.copy()

            print("Starting complete preprocessing pipeline...")
            print(f"Configuration: {self.MIN_DISTANCE}-{self.MAX_DISTANCE} yards, "
                  f"min {self.MIN_KICKER_ATTEMPTS} attempts, {self.SEASON_TYPES} seasons")

            self.processed_data = self.preprocess_slice(raw_df)
            self.schema = self._build_schema(self.processed_data)

            print(f"******* Preprocessing complete: {len(self.processed_data):,} attempts ready for modeling")
            print(f"******* Features selected: {len(self._get_selected_features())} total")
            return self.processed_data
        else:
            return self.preprocess_slice(raw_df)

    def get_feature_summary(self) -> Dict:
        """Get summary of selected features by category."""
        if self.processed_data is None:
            raise ValueError("No processed data available")

        selected_features = self._get_selected_features()
        available_features = set(self.processed_data.columns)

        summary = {
            'total_selected': len(selected_features),
            'total_available': len(available_features),
            'numerical': [f for f in self.NUMERICAL_FEATURES if f in available_features],
            'ordinal': [f for f in self.ORDINAL_FEATURES if f in available_features],
            'nominal': [f for f in self.NOMINAL_FEATURES if f in available_features],
            'missing_features': [f for f in selected_features if f not in available_features],
        }
        return summary

    def get_preprocessing_summary(self) -> Dict:
        """Get summary of preprocessing steps and results."""
        if self.processed_data is None:
            raise ValueError("No processed data available")

        summary = {
            'original_size': len(self.raw_data) if self.raw_data is not None else 0,
            'final_size': len(self.processed_data),
            'unique_kickers': self.processed_data['player_name'].nunique(),
            'success_rate': self.processed_data[self.TARGET].mean(),
            'distance_range': (
                self.processed_data['attempt_yards'].min(),
                self.processed_data['attempt_yards'].max()
            ),
            'config': {
                'min_distance': self.MIN_DISTANCE,
                'max_distance': self.MAX_DISTANCE,
                'min_kicker_attempts': self.MIN_KICKER_ATTEMPTS,
                'season_types': self.SEASON_TYPES,
                'include_performance_history': self.INCLUDE_PERFORMANCE_HISTORY,
                'include_statistical_features': self.INCLUDE_STATISTICAL_FEATURES,
            },
            'features': self.get_feature_summary()
        }
        return summary

    def fit_transform_features(self) -> Tuple[MatrixType, np.ndarray]:
        """Fit the internal :class:`ColumnTransformer` **and** return X / y.

        Returns:
            Tuple containing:
                - X: Transformed feature matrix (sparse or dense)
                - y: Target array

        Raises:
            ValueError: If preprocess_complete() hasn't been called yet
        """
        if self.processed_data is None:
            raise ValueError("Run preprocess_complete() before fitting the transformer.")

        # Build & fit transformer
        self.column_transformer_ = self.make_column_transformer()
        feature_cols = self._get_selected_features()
        self._feature_cols_ = feature_cols  # Cache for inverse transform
        X = self.column_transformer_.fit_transform(self.processed_data[feature_cols])
        y = cast(np.ndarray, self.processed_data[self.TARGET].values)
        return cast(Tuple[MatrixType, np.ndarray], (X, y))

    def transform_features(self, df: pd.DataFrame) -> MatrixType:
        """Transform a *new* DataFrame using the already‑fitted transformer.

        Args:
            df: DataFrame to transform, must have same columns as training data

        Returns:
            Transformed feature matrix (sparse or dense)

        Raises:
            ValueError: If transformer hasn't been fitted yet
        """
        if self.column_transformer_ is None:
            raise ValueError("Transformer not fitted – call fit_transform_features() first.")

        feature_cols = self._get_selected_features()
        X = self.column_transformer_.transform(df[feature_cols])
        return cast(MatrixType, X)

    def invert_preprocessing(self, X_transformed: MatrixType) -> pd.DataFrame:
        """
        Version-safe inverse transform.
        Works on scikit-learn ≥0.24 (native) and ≤0.23 (manual fallback).
        """
        if self.column_transformer_ is None or self._feature_cols_ is None:
            raise ValueError("Transformer not fitted – call fit_transform_features() first.")

        # --- 1 ▸ Modern sklearn: just call the native helper -----------------
        if hasattr(self.column_transformer_, "inverse_transform"):
            X_inv = self.column_transformer_.inverse_transform(X_transformed)
            return pd.DataFrame(X_inv, columns=self._feature_cols_)

        # --- 2 ▸ Legacy sklearn: manual reconstruction -----------------------
        X_dense = X_transformed.toarray() if sparse.issparse(X_transformed) else np.asarray(X_transformed)

        col_arrays, current = [], 0
        for name, trans, cols in self.column_transformer_.transformers_:
            # -------- width calculation WITHOUT touching the transformer -----
            if name == "num_scaled":
                width = len(cols)  # numeric slice = number of original columns
            elif name == "nominal_onehot":
                enc = cast(OneHotEncoder, trans)
                width = sum(len(c) for c in enc.categories_)
            else:                           # passthrough groups
                width = len(cols)

            slice_ = X_dense[:, current: current + width]
            current += width

            # -------- inverse for each block ---------------------------------
            if name == "num_scaled":
                scaler = cast(StandardScaler, trans.named_steps["scale"])
                slice_inv = (slice_ * scaler.scale_) + scaler.mean_     # docs :contentReference[oaicite:2]{index=2}
                col_arrays.append(slice_inv)

            elif name == "nominal_onehot":
                enc = cast(OneHotEncoder, trans)
                slice_inv = enc.inverse_transform(slice_)               # docs :contentReference[oaicite:3]{index=3}
                col_arrays.append(slice_inv)

            else:                                                       # passthrough
                col_arrays.append(slice_)

        X_inv_full = np.column_stack(col_arrays)
        return pd.DataFrame(X_inv_full, columns=self._feature_cols_)



if __name__ == "__main__":
    from src.nfl_kicker_analysis.data.loader import DataLoader
    from src.nfl_kicker_analysis.data.feature_selection import (
        DynamicSchema,
        filter_to_final_features,
        update_schema_numerical,
    )

    # ─── 1 Load raw data ──────────────────────────────────────────
    loader = DataLoader()
    df_raw = loader.load_complete_dataset()

    # ─── 2 Feature engineering pass ───────────────────────────────
    engineer = FeatureEngineer()
    df_feat = engineer.create_all_features(df_raw)

    for category, details in engineer.get_available_features(df_feat).items():
        print(f"-- {category} --")
        for feat, uniques in details.items():
            print(f"   {feat}: {len(uniques)} unique | sample {uniques[:5] if uniques else '...'}")

    # ─── 3 Define all tunables in one place ─────────────────────
    CONFIG = {
        'min_distance': 20,
        'max_distance': 60,
        'min_kicker_attempts': 8,
        'season_types': ['Reg', 'Post'],  # now include playoffs
        'include_performance_history': True,
        'include_statistical_features': False,
        'include_player_status': True,  # ✅ FIX: Added missing parameter
        'performance_window': 12,
    }


    # ------------------------------------------------------------------
    # 🔧 Single source of truth for column roles – edit freely
    # ------------------------------------------------------------------
    FEATURE_LISTS = {
        "numerical": [
            "attempt_yards", "age_at_attempt", "distance_squared",
            "career_length_years", "season_progress", "rolling_success_rate",
            "current_streak", "distance_zscore", "distance_percentile",
        ],
        "ordinal":  ["season", "week", "month", "day_of_year"],
        "nominal":  [
            "kicker_id", "kicker_idx", "is_long_attempt", "is_very_long_attempt",
            "is_rookie_attempt", "distance_category", "experience_category",
        ],
        "y_variable": ["success"],
    }

    # ➊  Build schema from the dict
    schema = DynamicSchema(FEATURE_LISTS)

    # read final_features.txt
    with open("data/models/features/final_features.txt", "r") as f:
        final_features = [line.strip() for line in f]
    print(f"---------------final_features---------------")
    print(final_features)
    numeric_final = [f for f in final_features if f in schema.numerical]

    print(f"\n✨ Final feature count: {len(numeric_final)}")
    print("Selected features:")
    for feat in numeric_final:
        print(f"  • {feat}")

    # 🔄 Push into schema so every later stage sees the new list
    update_schema_numerical(schema, numeric_final)

    # output final_features from schema
    FEATURE_LISTS = schema.lists
    print(f"---------------FEATURE_LISTS---------------")
    print(FEATURE_LISTS)

    pre = DataPreprocessor()
    pre.update_config(**CONFIG)
    pre.update_feature_lists(**FEATURE_LISTS)
    _ = pre.preprocess_complete(df_feat)
    X, y = pre.fit_transform_features()

    print("First 5 rows after inverse‑transform round‑trip →")
    print(pre.invert_preprocessing(X[:5]).head())


Overwriting src/nfl_kicker_analysis/data/preprocessor.py


## 4. Package Init Files

Creating __init__.py files for all modules.


In [21]:
%%writefile src/nfl_kicker_analysis/__init__.py
"""
NFL Kicker Analysis Package
A comprehensive toolkit for analyzing NFL field goal kicker performance.
"""

__version__ = "1.0.0"
__author__ = "NFL Analytics Team"

# Import main classes for easy access
from .config import config
from .data.loader import DataLoader
from .data.preprocessor import DataPreprocessor

__all__ = [
    'config',
    'DataLoader',
    'DataPreprocessor'
]


Overwriting src/nfl_kicker_analysis/__init__.py


In [22]:
%%writefile src/nfl_kicker_analysis/data/__init__.py
"""
Data module for NFL kicker analysis.
"""

from .loader import DataLoader
from .preprocessor import DataPreprocessor

__all__ = ['DataLoader', 'DataPreprocessor']


Overwriting src/nfl_kicker_analysis/data/__init__.py


## 5. Metrics and Utilities Module

Core utilities for EPA calculations and performance metrics.

1. EPACalculator

    Purpose
    Compute an Expected Points Added (EPA)–style rating for each kicker, based on how their made-field-goal rate at various distances compares to the league average.

    Key Methods

        calculate_empirical_success_rate(data, kicker_name, distance, …)
        Looks up a kicker’s historical success rate in a ±distance_range window around a given yard line; if they don’t have enough attempts there, it falls back to the league average.

        calculate_league_average_epa(data)
        Computes the league-wide EPA per kick by weighting each distance’s success rate (from config.DISTANCE_PROFILE and config.DISTANCE_WEIGHTS) times 3 points per make.

        calculate_kicker_epa_plus(data, kicker_name)
        For one kicker, sums up their distance-weighted EPA and subtracts the league average to produce an “EPA-FG+” number.

        calculate_all_kicker_ratings(data)
        Loops over every kicker in the dataset and builds a leaderboard DataFrame of EPA-FG+ and rank.

    How to use it
    After you’ve loaded and preprocessed your DataFrame (so it has at least these columns:

'player_name', 'player_id', 'attempt_yards', 'success'

), you would call:

    from nfl_kicker_analysis.utils.metrics import EPACalculator
    epa = EPACalculator()
    leaderboard = epa.calculate_all_kicker_ratings(processed_df)

    That gives you a standalone table of kicker ratings.

2. ModelEvaluator

    Purpose
    Compute standard classification metrics and compare different models’ results in a tidy DataFrame.

    Key Methods

        calculate_classification_metrics(y_true, y_pred_proba)
        Returns a dict with

            auc_roc

            brier_score

            accuracy (thresholded at 0.5)

            log_loss

        compare_models(models_results)
        Takes a dict of { model_name: metrics_dict } and turns it into a DataFrame, sorted by AUC.

    How to use it
    After fitting your model(s) and getting back prediction probabilities:

    from nfl_kicker_analysis.utils.metrics import ModelEvaluator

    evaluator = ModelEvaluator()
    metrics = evaluator.calculate_classification_metrics(y_test, y_pred_probs)
    results = {
        'MyModel': metrics,
        'Baseline': other_metrics,
    }
    comparison_df = evaluator.compare_models(results)
    print(comparison_df)




In [None]:
%%writefile src/nfl_kicker_analysis/utils/metrics.py
"""
Metrics utilities for NFL kicker analysis.
"""
import numpy as np
import pandas as pd
from sklearn.metrics import (
    roc_auc_score,
    average_precision_score,     # AUC-PR
    log_loss,
    accuracy_score,
    precision_score,
    recall_score,
    f1_score,
    brier_score_loss,
)
try:
    from sklearn.calibration import calibration_curve
except ImportError:
    from sklearn.metrics import calibration_curve
from typing import Dict, Optional, Union, List, Any, Callable, Tuple
from numpy.typing import NDArray
import arviz as az

from src.nfl_kicker_analysis.config import config

class EPACalculator:
    """Calculates EPA-based metrics for kickers."""

    def __init__(self):
        """Initialize the EPA calculator."""
        self.baseline_probs: Dict[int, float] = {}
        self.distance_profile = config.DISTANCE_PROFILE
        self.distance_weights = config.DISTANCE_WEIGHTS

    def calculate_baseline_probs(self, data: pd.DataFrame) -> Dict[int, float]:
        """
        Calculate baseline success probabilities by distance.

        Args:
            data: DataFrame with attempt_yards and success

        Returns:
            Dictionary mapping distance to success probability
        """
        baseline = data.groupby('attempt_yards')['success'].mean()
        self.baseline_probs = baseline.to_dict()
        return self.baseline_probs

    def calculate_epa_fg_plus(self, data: pd.DataFrame) -> pd.DataFrame:
        """
        Calculate EPA-FG+ for each kicker.

        Args:
            data: DataFrame with kicker attempts

        Returns:
            DataFrame with kicker EPA-FG+ ratings
        """
        if not self.baseline_probs:
            self.baseline_probs = self.calculate_baseline_probs(data)

        # Calculate expected points
        data = data.copy()
        data.loc[:, 'expected_points'] = data['attempt_yards'].map(lambda x: self.baseline_probs.get(x, 0.5)) * 3
        data.loc[:, 'actual_points'] = data['success'] * 3
        data.loc[:, 'epa'] = data['actual_points'] - data['expected_points']

        # Calculate EPA-FG+ per kicker
        kicker_stats = data.groupby('player_name').agg({
            'epa': ['count', 'mean', 'sum'],
            'player_id': 'first'  # Keep player ID
        })

        kicker_stats.columns = ['attempts', 'epa_per_kick', 'total_epa', 'player_id']
        kicker_stats.loc[:, 'epa_fg_plus'] = kicker_stats['epa_per_kick']

        # Add rank
        kicker_stats.loc[:, 'rank'] = kicker_stats['epa_fg_plus'].rank(ascending=False, method='min')

        return kicker_stats.reset_index()

    def calculate_clutch_rating_with_shrinkage(
        self,
        data: pd.DataFrame,
        prior_a: float = 8.0,
        prior_b: float = 2.0
    ) -> pd.DataFrame:
        """
        Calculate clutch field goal rating with beta-binomial shrinkage.

        Args:
            data: DataFrame with kicker attempts including is_clutch column
            prior_a: Beta prior alpha parameter (favors success)
            prior_b: Beta prior beta parameter (favors failure)

        Returns:
            DataFrame with clutch ratings per kicker
        """
        # Default prior centers on 80% (8/(8+2))

        results = []
        for player_name, grp in data.groupby('player_name'):
            if 'is_clutch' in grp.columns:
                clutch_attempts = grp[grp['is_clutch'] == 1]
            else:
                # Fallback if no clutch column
                clutch_attempts = grp[grp['success'] == 0]  # Empty fallback

            made_clutch = clutch_attempts['success'].sum() if len(clutch_attempts) > 0 else 0
            miss_clutch = len(clutch_attempts) - made_clutch if len(clutch_attempts) > 0 else 0

            # Beta-binomial posterior
            post_a = prior_a + made_clutch
            post_b = prior_b + miss_clutch
            clutch_rate_shrunk = post_a / (post_a + post_b)

            # Raw clutch rate for comparison
            raw_clutch_rate = clutch_attempts['success'].mean() if len(clutch_attempts) > 0 else 0.0

            results.append({
                'player_name': player_name,
                'player_id': int(grp['player_id'].iat[0]),
                'total_attempts': len(grp),
                'clutch_attempts': len(clutch_attempts),
                'clutch_made': made_clutch,
                'raw_clutch_rate': raw_clutch_rate,
                'clutch_rate_shrunk': clutch_rate_shrunk,
                'shrinkage_applied': abs(clutch_rate_shrunk - raw_clutch_rate)
            })

        df = pd.DataFrame(results)
        df['clutch_rank'] = df['clutch_rate_shrunk'].rank(ascending=False, method='min')
        return df.sort_values('clutch_rank')

    # ────────────────────────────────────────────────────────────────────
    # NEW helper ─ bootstrap EPA-FG⁺ draws for ONE kicker
    # ────────────────────────────────────────────────────────────────────
    def _bootstrap_kicker_epa(
        self,
        grp: pd.DataFrame,
        *,
        n_draws: int = 2_000,
        rng: np.random.Generator,
    ) -> NDArray[np.float_]:
        """
        Non-parametric bootstrap: resample the kicker's attempts (with replacement)
        and recompute mean EPA-FG⁺.  Returns an array of shape (n_draws,).

        Args:
            grp: DataFrame with kicker's attempts
            n_draws: Number of bootstrap draws
            rng: Random number generator

        Returns:
            Array of bootstrap EPA-FG+ draws
        """
        # pre-compute baseline EPA for every attempt in the group
        base_pts = grp["attempt_yards"].map(
            lambda x: self.baseline_probs.get(x, 0.5)
        ).to_numpy(np.float_) * 3.0
        actual   = grp["success"].to_numpy(np.int_) * 3.0
        diff     = actual - base_pts                    # vector of EPA per attempt

        if diff.size == 0:
            return np.array([np.nan])                   # safeguard – should not happen

        boot = rng.choice(diff, size=(n_draws, diff.size), replace=True)
        return boot.mean(axis=1)                       # average per draw

    # ────────────────────────────────────────────────────────────────────
    # PUBLIC – EPA-FG⁺ with 95 % interval & certainty flag
    # ────────────────────────────────────────────────────────────────────
    def calculate_epa_fg_plus_ci(
        self,
        data: pd.DataFrame,
        *,
        n_draws: int = 2_000,
        alpha: float = 0.05,
        random_state: int | None = 42,
    ) -> pd.DataFrame:
        """
        Bootstrap EPA-FG⁺ per kicker, returning mean, lower, upper bounds and a
        qualitative certainty label (high | medium | low).

        Args:
            data: DataFrame with kicker attempts
            n_draws: Number of bootstrap draws
            alpha: Significance level for confidence intervals
            random_state: Random seed for reproducibility

        Returns:
            DataFrame with bootstrap EPA-FG+ ratings and confidence intervals

        Notes
        -----
        * Uses the **Jeffreys-prior** interpretation of a (1–alpha) credible
          interval via percentile bootstrap.
        * CI width thresholds (empirical 33rd & 66th percentiles) define the
          certainty bands as recommended in sports-analytics literature.
        """
        if not self.baseline_probs:
            self.calculate_baseline_probs(data)

        rng = np.random.default_rng(random_state)

        records: list[dict[str, Any]] = []
        for player, grp in data.groupby("player_name"):
            draws = self._bootstrap_kicker_epa(grp, n_draws=n_draws, rng=rng)
            mean  = float(np.nanmean(draws))
            lower = float(np.nanpercentile(draws, 100 * alpha / 2))
            upper = float(np.nanpercentile(draws, 100 * (1 - alpha / 2)))
            width = upper - lower
            records.append({
                "player_name": player,
                "player_id":   int(grp["player_id"].iat[0]),
                "attempts":    int(len(grp)),
                "epa_fg_plus_mean":  mean,
                "hdi_lower":   lower,
                "hdi_upper":   upper,
                "ci_width":    width,
            })

        tbl = pd.DataFrame(records).set_index("player_name")

        # Certainty levels by tercile of CI-width
        q33, q66 = tbl["ci_width"].quantile([.33, .66])
        tbl["certainty"] = np.where(
            tbl["ci_width"] <= q33, "high",
            np.where(tbl["ci_width"] <= q66, "medium", "low")
        )

        tbl["rank"] = tbl["epa_fg_plus_mean"].rank(ascending=False, method="min")
        return tbl.sort_values("rank")

    def calculate_all_kicker_ratings(self, data: pd.DataFrame, include_ci: bool = False) -> pd.DataFrame:
        """
        Calculate complete kicker ratings, optionally with uncertainty intervals.

        Args:
            data: Complete dataset
            include_ci: Whether to include bootstrap confidence intervals

        Returns:
            DataFrame with kicker ratings
        """
        print("Calculating EPA-FG+ ratings...")

        if include_ci:
            ratings = self.calculate_epa_fg_plus_ci(data)
            metric_col = "epa_fg_plus_mean"
        else:
            ratings = self.calculate_epa_fg_plus(data)
            metric_col = "epa_fg_plus"

        print(f"\nTop 5 kickers by EPA-FG+:")
        display_cols = ['attempts', metric_col, 'rank']
        if include_ci:
            display_cols.extend(['hdi_lower', 'hdi_upper', 'certainty'])

        top_5 = ratings.head(5) if include_ci else ratings.nlargest(5, metric_col)
        if include_ci:
            print(top_5[display_cols].to_string(index=True))
        else:
            print(top_5[['player_name'] + display_cols].to_string(index=False))

        return ratings

class ModelEvaluator:
    """Compute a rich set of discrimination & calibration metrics."""

    # ---------- single-metric helpers ----------
    @staticmethod
    def calculate_auc(y, p) -> float:
        return float(roc_auc_score(y, p))

    @staticmethod
    def calculate_auc_pr(y, p) -> float:
        return float(average_precision_score(y, p))  # AUC-PR

    @staticmethod
    def calculate_log_loss(y, p) -> float:
        return float(log_loss(y, p))

    @staticmethod
    def calculate_brier_score(y, p) -> float:
        return float(brier_score_loss(y, p))

    @staticmethod
    def calculate_threshold_metrics(y, p, thresh: float = 0.5) -> Tuple[float, float, float, float]:
        """Return accuracy, precision, recall, F1 at a chosen threshold."""
        pred = (p >= thresh).astype(int)
        acc = float(accuracy_score(y, pred))
        prec = float(precision_score(y, pred, zero_division="warn"))
        rec = float(recall_score(y, pred, zero_division="warn"))
        f1 = float(f1_score(y, pred, zero_division="warn"))
        return acc, prec, rec, f1

    # ---------- calibration helpers ----------
    @staticmethod
    def calculate_ece(y, p, n_bins: int = 10) -> float:
        """
        Expected Calibration Error (ECE) using equally spaced probability bins.

        Handles the fact that `sklearn.calibration.calibration_curve` drops
        empty bins, which otherwise causes a length-mismatch when you try to
        multiply by fixed-length weights.
        """
        try:
            # 1 – Compute reliability data.  The function *silently* removes
            #     any empty bin, so prob_true/pred may be < n_bins long.
            prob_true, prob_pred = calibration_curve(
                y, p, n_bins=n_bins, strategy="uniform"
            )

            # 2 – Re-create the original histogram and keep *only* non-empty bins
            bin_counts, _ = np.histogram(p, bins=n_bins, range=(0, 1))
            non_empty_mask = bin_counts > 0

            # Make sure shapes now align
            bin_counts = bin_counts[non_empty_mask]
            prob_true  = np.asarray(prob_true)
            prob_pred  = np.asarray(prob_pred)

            # Sanity check – all arrays must have identical length
            assert len(prob_true) == len(bin_counts) == len(prob_pred), (
                "Length mismatch after masking empty bins"
            )

            # 3 – Compute weighted absolute gap
            weights = bin_counts / bin_counts.sum()
            ece = np.sum(np.abs(prob_true - prob_pred) * weights)
            return float(ece)

        except Exception as exc:
            # Fallback – simple loop (same logic as original fallback)
            bin_edges = np.linspace(0.0, 1.0, n_bins + 1)
            ece = 0.0
            for lo, hi in zip(bin_edges[:-1], bin_edges[1:]):
                in_bin = (p > lo) & (p <= hi)
                if in_bin.any():
                    acc_bin  = y[in_bin].mean()
                    conf_bin = p[in_bin].mean()
                    ece      += np.abs(acc_bin - conf_bin) * in_bin.mean()
            return float(ece)


    @staticmethod
    def reliability_curve(y, p, n_bins: int = 10) -> pd.DataFrame:
        """Return a DataFrame for plotting a reliability diagram."""
        try:
            prob_true, prob_pred = calibration_curve(y, p, n_bins=n_bins, strategy="uniform")
            return pd.DataFrame({"prob_pred": prob_pred, "prob_true": prob_true})
        except (ImportError, TypeError):
            # Fallback implementation
            bin_boundaries = np.linspace(0, 1, n_bins + 1)
            bin_centers = (bin_boundaries[:-1] + bin_boundaries[1:]) / 2
            bin_lowers = bin_boundaries[:-1]
            bin_uppers = bin_boundaries[1:]

            prob_true = []
            prob_pred = []

            for bin_lower, bin_upper, bin_center in zip(bin_lowers, bin_uppers, bin_centers):
                in_bin = (p > bin_lower) & (p <= bin_upper)
                if np.sum(in_bin) > 0:
                    prob_true.append(y[in_bin].astype(float).mean())
                    prob_pred.append(bin_center)

            return pd.DataFrame({"prob_pred": prob_pred, "prob_true": prob_true})

    # ---------- public aggregator ----------
    def calculate_classification_metrics(self, y_true, y_pred_proba) -> Dict[str, float]:
        """Return a full metric dictionary."""
        metrics: Dict[str, float] = {
            "auc_roc":   self.calculate_auc(y_true, y_pred_proba),
            "auc_pr":    self.calculate_auc_pr(y_true, y_pred_proba),
            "log_loss":  self.calculate_log_loss(y_true, y_pred_proba),
            "brier":     self.calculate_brier_score(y_true, y_pred_proba),
            "ece":       self.calculate_ece(y_true, y_pred_proba),
        }
        acc, prec, rec, f1 = self.calculate_threshold_metrics(y_true, y_pred_proba)
        metrics.update({
            "accuracy":  acc,
            "precision": prec,
            "recall":    rec,
            "f1":        f1,
        })
        return metrics  # 10 metrics total

    # ---------- comparison helper ----------
    def compare_models(self, results: Dict[str, Dict[str, float]]) -> pd.DataFrame:
        """
        Turn {model: metric_dict} into a tidy table ordered by AUC-ROC.
        """
        df = pd.DataFrame(results).T  # one row per model
        # guarantee consistent column order
        desired_cols: List[str] = [
            "auc_roc", "auc_pr", "log_loss", "brier", "ece",
            "accuracy", "precision", "recall", "f1"
        ]
        for c in desired_cols:
            if c not in df.columns:
                df[c] = np.nan
        return df[desired_cols].sort_values("auc_roc", ascending=False)

class BayesianEvaluator:
    """Wrap ArviZ to calculate WAIC and PSIS-LOO in one call."""
    def information_criteria(self, trace, pointwise: bool = False) -> Dict[str, float]:
        """
        Return WAIC and PSIS-LOO for a fitted PyMC model.

        Parameters
        ----------
        trace : arviz.InferenceData
        pointwise : bool
            If True, also return the pointwise arrays for advanced analysis.
        """
        waic_res = az.waic(trace, scale="deviance")
        loo_res  = az.loo(trace,  scale="deviance")
        info = {
            "waic":      float(waic_res.waic),
            "waic_se":   float(waic_res.waic_se),
            "psis_loo":  float(loo_res.loo),
            "psis_loo_se": float(loo_res.loo_se),
        }
        if pointwise:
            info["waic_i"] = waic_res.waic_i
            info["loo_i"]  = loo_res.loo_i
        return info

if __name__ == "__main__":
    from src.nfl_kicker_analysis.data.loader import DataLoader
    # Test the data loader
    print("Testing DataLoader...")

    loader = DataLoader()

    try:
        # Load complete dataset
        df = loader.load_complete_dataset()

        # Print summary
        summary = loader.get_data_summary()
        print("\nData Summary:")
        print(f"Total attempts: {summary['total_attempts']:,}")
        print(f"Unique kickers: {summary['unique_kickers']}")
        print(f"Seasons: {summary['unique_seasons']}")
        print(f"Outcomes: {summary['outcome_counts']}")

        print("******* DataLoader tests passed!")

    except Exception as e:
        print(f"-------------- Error testing DataLoader: {e}")
        print("Note: This is expected if data files are not present.")



    # Test the metrics module
    print("Testing EPA Calculator...")


    # Test EPA calculator
    epa_calc = EPACalculator()

    try:
        # Test league average calculation
        league_avg = epa_calc.calculate_league_average_epa(df)
        print(f"League average EPA: {league_avg:.3f}")

        # Test individual kicker rating
        rating = epa_calc.calculate_kicker_epa_plus(df, 'Player A')
        print(f"Player A EPA-FG+: {rating['epa_fg_plus']:.3f}")

        # Test all kicker ratings
        all_ratings = epa_calc.calculate_all_kicker_ratings(df)
        print(f"Calculated ratings for {len(all_ratings)} kickers")

        # Test model evaluator
        evaluator = ModelEvaluator()
        y_true = np.random.choice([0, 1], 100)
        y_pred = np.random.random(100)

        metrics = evaluator.calculate_classification_metrics(y_true, y_pred)
        print(f"Sample model AUC: {metrics['auc']:.3f}")

        print("******* Metrics module tests passed!")

    except Exception as e:
        print(f"-------------- Error testing metrics: {e}")


Overwriting src/nfl_kicker_analysis/utils/metrics.py


## 6. Traditional Models Module

Implementation of traditional machine learning models for comparison.


In [None]:
%%writefile src/nfl_kicker_analysis/models/tree_based_bayes_optimized_models.py
"""
Traditional ML models for NFL kicker analysis.
Includes simple logistic regression, ridge logistic regression, and random forest.
Each model can be optionally tuned using Bayesian optimization with Optuna.
"""
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import TimeSeriesSplit
from typing import Dict, Tuple, Optional, Union, Any
import optuna
from xgboost import XGBClassifier
from catboost import CatBoostClassifier
from numpy.typing import NDArray

from src.nfl_kicker_analysis.utils.metrics import ModelEvaluator

class TreeBasedModelSuite:
    """Suite of traditional ML models with optional Bayesian optimization."""

    def __init__(self):
        """Initialize the model suite."""
        self.fitted_models: Dict[str, Union[LogisticRegression, RandomForestClassifier, XGBClassifier, CatBoostClassifier]] = {}
        self.evaluator = ModelEvaluator()
        self._tss = TimeSeriesSplit(n_splits=3)  # 3-fold CV that respects time order

    def prepare_features(self, data: pd.DataFrame) -> Tuple[NDArray[np.float_], NDArray[np.float_], NDArray[np.int_], OneHotEncoder]:
        """
        Prepare feature matrices for modeling.

        Args:
            data: DataFrame with attempt_yards and kicker_id

        Returns:
            Tuple of:
            - Distance-only features
            - Combined features (distance + one-hot kicker)
            - Kicker IDs
            - OneHotEncoder for kickers
        """
        # Distance features
        X_distance = data['attempt_yards'].values.astype(np.float_).reshape(-1, 1)

        # Kicker IDs for tree models
        kicker_ids = data['kicker_id'].values.astype(np.int_).reshape(-1, 1)

        # One-hot encode kickers for linear models
        encoder = OneHotEncoder(sparse_output=True)
        kicker_onehot = encoder.fit_transform(kicker_ids)
        X_combined = np.hstack([X_distance, kicker_onehot.toarray()])

        return X_distance, X_combined, kicker_ids, encoder

    def create_time_split(self, data: pd.DataFrame) -> Tuple[NDArray[np.int_], NDArray[np.int_]]:
        """
        Create train/test split by time.

        Args:
            data: DataFrame with game_date

        Returns:
            Train and test indices
        """
        train_mask = data['season'] <= 2017
        test_mask = data['season'] == 2018

        train_idx = np.where(train_mask)[0]
        test_idx = np.where(test_mask)[0]

        print(f"Train: {len(train_idx):,} attempts ({train_mask.mean():.1%})")
        print(f"Test: {len(test_idx):,} attempts ({test_mask.mean():.1%})")

        return train_idx, test_idx

    def _tune_simple_logistic_optuna(
        self,
        X: NDArray[np.float_],
        y: NDArray[np.int_],
        n_trials: int = 50,
    ) -> LogisticRegression:
        """
        Bayesian-optimize and fit a simple LogisticRegression.
        Returns the fitted best model.
        """
        def objective(trial: optuna.Trial) -> float:
            params = {
                "C": trial.suggest_float("C", 1e-5, 100, log=True),
                "max_iter": 1000,
                "random_state": 42,
            }
            aucs = []
            for tr_idx, val_idx in self._tss.split(X):
                model = LogisticRegression(**params)
                model.fit(X[tr_idx], y[tr_idx])
                preds = model.predict_proba(X[val_idx])[:, 1].astype(np.float_)
                aucs.append(self.evaluator.calculate_auc(y[val_idx], preds))
            return float(np.mean(aucs))

        study = optuna.create_study(direction="maximize")
        study.optimize(objective, n_trials=n_trials, show_progress_bar=False)

        best_params = study.best_params
        best_params.update(dict(max_iter=1000, random_state=42))
        best_model = LogisticRegression(**best_params)
        best_model.fit(X, y)
        return best_model

    def _tune_ridge_logistic_optuna(
        self,
        X: NDArray[np.float_],
        y: NDArray[np.int_],
        n_trials: int = 50,
    ) -> LogisticRegression:
        """
        Bayesian-optimize and fit a ridge LogisticRegression.
        Returns the fitted best model.
        """
        def objective(trial: optuna.Trial) -> float:
            params = {
                "C": trial.suggest_float("C", 1e-5, 100, log=True),
                "l1_ratio": trial.suggest_float("l1_ratio", 0, 1),
                "penalty": "elasticnet",
                "solver": "saga",
                "max_iter": 1000,
                "random_state": 42,
            }
            aucs = []
            for tr_idx, val_idx in self._tss.split(X):
                model = LogisticRegression(**params)
                model.fit(X[tr_idx], y[tr_idx])
                preds = model.predict_proba(X[val_idx])[:, 1].astype(np.float_)
                aucs.append(self.evaluator.calculate_auc(y[val_idx], preds))
            return float(np.mean(aucs))

        study = optuna.create_study(direction="maximize")
        study.optimize(objective, n_trials=n_trials, show_progress_bar=False)

        best_params = study.best_params
        best_params.update(dict(penalty="elasticnet", solver="saga", max_iter=1000, random_state=42))
        best_model = LogisticRegression(**best_params)
        best_model.fit(X, y)
        return best_model

    def _tune_random_forest_optuna(
        self,
        X: NDArray[np.float_],
        y: NDArray[np.int_],
        n_trials: int = 50,
    ) -> RandomForestClassifier:
        """
        Bayesian-optimize and fit a RandomForestClassifier.
        Returns the fitted best model.
        """
        def objective(trial: optuna.Trial) -> float:
            params = {
                "n_estimators": trial.suggest_int("n_estimators", 100, 500),
                "max_depth": trial.suggest_int("max_depth", 3, 10),
                "min_samples_split": trial.suggest_int("min_samples_split", 2, 20),
                "min_samples_leaf": trial.suggest_int("min_samples_leaf", 1, 10),
                "max_features": trial.suggest_float("max_features", 0.1, 1.0),
                "n_jobs": -1,
                "random_state": 42,
            }
            aucs = []
            for tr_idx, val_idx in self._tss.split(X):
                model = RandomForestClassifier(**params)
                model.fit(X[tr_idx], y[tr_idx])
                preds = model.predict_proba(X[val_idx])[:, 1].astype(np.float_)
                aucs.append(self.evaluator.calculate_auc(y[val_idx], preds))
            return float(np.mean(aucs))

        study = optuna.create_study(direction="maximize")
        study.optimize(objective, n_trials=n_trials, show_progress_bar=False)

        best_params = study.best_params
        best_params.update(dict(n_jobs=-1, random_state=42))
        best_model = RandomForestClassifier(**best_params)
        best_model.fit(X, y)
        return best_model

    def _tune_xgboost_optuna(
        self,
        X: NDArray[np.float_],
        y: NDArray[np.int_],
        n_trials: int = 50,
    ) -> XGBClassifier:
        """
        Bayesian-optimize and fit an XGBClassifier.
        Returns the fitted best model.
        """
        def objective(trial: optuna.Trial) -> float:
            params = {
                "n_estimators": trial.suggest_int("n_estimators", 200, 600),
                "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
                "max_depth": trial.suggest_int("max_depth", 3, 10),
                "subsample": trial.suggest_float("subsample", 0.5, 1.0),
                "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
                "gamma": trial.suggest_float("gamma", 0.0, 5.0),
                "reg_lambda": trial.suggest_float("reg_lambda", 0.0, 10.0),
                "objective": "binary:logistic",
                "eval_metric": "logloss",
                "n_jobs": -1,
                "random_state": 42,
            }
            aucs = []
            for tr_idx, val_idx in self._tss.split(X):
                model = XGBClassifier(**params)
                model.fit(X[tr_idx], y[tr_idx])
                preds = model.predict_proba(X[val_idx])[:, 1].astype(np.float_)
                aucs.append(self.evaluator.calculate_auc(y[val_idx], preds))
            return float(np.mean(aucs))

        study = optuna.create_study(direction="maximize")
        study.optimize(objective, n_trials=n_trials, show_progress_bar=False)

        best_params = study.best_params
        best_params.update(dict(objective="binary:logistic", eval_metric="logloss", n_jobs=-1, random_state=42))
        best_model = XGBClassifier(**best_params)
        best_model.fit(X, y)
        return best_model

    def _tune_catboost_optuna(
        self,
        df_train: pd.DataFrame,          # ←  NOW a DataFrame, not ndarray
        y_train: NDArray[np.int_],
        n_trials: int = 50,
    ) -> CatBoostClassifier:
        """
        Bayesian-optimise and fit a CatBoostClassifier on a mixed-dtype DataFrame.
        The column ``'kicker_id'`` is treated as categorical automatically.
        """
        cat_cols = ["kicker_id"]         # name-based is safer than index-based

        def objective(trial: optuna.Trial) -> float:
            params = {
                "iterations":        trial.suggest_int ("iterations",        300, 800),
                "depth":             trial.suggest_int ("depth",             4,   10),
                "learning_rate":     trial.suggest_float("learning_rate",    0.01, 0.3, log=True),
                "l2_leaf_reg":       trial.suggest_float("l2_leaf_reg",      1e-3, 10.0, log=True),
                "bagging_temperature": trial.suggest_float("bagging_temperature", 0.0, 1.0),
                "random_strength":   trial.suggest_float("random_strength",  0.1, 10.0),
                "border_count":      trial.suggest_int ("border_count",      32,  255),
                "loss_function":     "Logloss",
                "eval_metric":       "AUC",
                "verbose":           False,
                "random_seed":       42,
            }
            aucs = []
            for tr_idx, val_idx in self._tss.split(df_train):
                model = CatBoostClassifier(**params)
                model.fit(
                    df_train.iloc[tr_idx],
                    y_train[tr_idx],
                    cat_features=cat_cols,
                    verbose=False,
                )
                preds = model.predict_proba(df_train.iloc[val_idx])[:, 1].astype(np.float_)
                aucs.append(self.evaluator.calculate_auc(y_train[val_idx], preds))
            return float(np.mean(aucs))

        study = optuna.create_study(direction="maximize")
        study.optimize(objective, n_trials=n_trials, show_progress_bar=False)

        best_params = study.best_params
        best_params.update(dict(loss_function="Logloss",
                                eval_metric="AUC",
                                verbose=False,
                                random_seed=42))
        best_model = CatBoostClassifier(**best_params)
        best_model.fit(df_train, y_train, cat_features=cat_cols, verbose=False)
        return best_model

    def fit_all_models(self, data: pd.DataFrame) -> Dict[str, Dict[str, float]]:
        """
        Fit all traditional + boosted models and return their metrics.
        """
        print("Fitting traditional & boosted ML models with Bayesian optimization...")

        # Feature prep
        X_distance, X_combined, kicker_ids, kicker_encoder = self.prepare_features(data)
        y = data["success"].values.astype(np.int_)

        # Time-based split
        train_idx, test_idx = self.create_time_split(data)
        X_dist_train, X_dist_test = X_distance[train_idx], X_distance[test_idx]
        X_comb_train, X_comb_test = X_combined[train_idx], X_combined[test_idx]
        kicker_train, kicker_test = kicker_ids[train_idx], kicker_ids[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]

        results = {}

        # 1. Simple Logistic with Optuna
        print("  ▸ simple logistic + Optuna")
        simple_lr = self._tune_simple_logistic_optuna(X_dist_train, y_train)
        self.fitted_models["simple_logistic"] = simple_lr
        y_pred = simple_lr.predict_proba(X_dist_test)[:, 1].astype(np.float_)
        results["Simple Logistic"] = self.evaluator.calculate_classification_metrics(y_test, y_pred)

        # 2. Ridge Logistic with Optuna
        print("  ▸ ridge logistic + Optuna")
        ridge_lr = self._tune_ridge_logistic_optuna(X_comb_train, y_train)
        self.fitted_models["ridge_logistic"] = ridge_lr
        y_pred = ridge_lr.predict_proba(X_comb_test)[:, 1].astype(np.float_)
        results["Ridge Logistic"] = self.evaluator.calculate_classification_metrics(y_test, y_pred)

        # 3. Random Forest with Optuna
        print("  ▸ random forest + Optuna")
        X_rf_train = np.column_stack([X_dist_train, kicker_train]).astype(np.float_)
        X_rf_test = np.column_stack([X_dist_test, kicker_test]).astype(np.float_)
        rf_model = self._tune_random_forest_optuna(X_rf_train, y_train)
        self.fitted_models["random_forest"] = rf_model
        y_pred = rf_model.predict_proba(X_rf_test)[:, 1].astype(np.float_)
        results["Random Forest"] = self.evaluator.calculate_classification_metrics(y_test, y_pred)

        # 4. XGBoost with Optuna
        print("  ▸ xgboost + Optuna")
        xgb_model = self._tune_xgboost_optuna(X_rf_train, y_train)
        self.fitted_models["xgboost"] = xgb_model
        y_pred = xgb_model.predict_proba(X_rf_test)[:, 1].astype(np.float_)
        results["XGBoost"] = self.evaluator.calculate_classification_metrics(y_test, y_pred)

        # 5. CatBoost with Optuna  ←-- NEW implementation
        print("  ▸ catboost + Optuna")
        df_cat_train = pd.DataFrame({
            "attempt_yards": X_dist_train.ravel().astype(np.float32),
            "kicker_id":     kicker_train.ravel().astype("int32"),
        })
        df_cat_test = pd.DataFrame({
            "attempt_yards": X_dist_test.ravel().astype(np.float32),
            "kicker_id":     kicker_test.ravel().astype("int32"),
        })

        cat_model = self._tune_catboost_optuna(df_cat_train, y_train)
        self.fitted_models["catboost"] = cat_model
        y_pred = cat_model.predict_proba(df_cat_test)[:, 1].astype(np.float_)
        results["CatBoost"] = self.evaluator.calculate_classification_metrics(y_test, y_pred)

        print("***** all models fitted & scored *****")
        return results

    def predict(self, model_name: str, data: pd.DataFrame) -> NDArray[np.float_]:
        """
        Predict probabilities with any fitted model in the suite.
        """
        if model_name not in self.fitted_models:
            raise ValueError(f"Model {model_name} not fitted")

        model = self.fitted_models[model_name]

        if model_name == "simple_logistic":
            X = data["attempt_yards"].values.astype(np.float_).reshape(-1, 1)
        elif model_name in {"ridge_logistic"}:
            # Distance + one-hot kicker – need encoder state (TODO: save encoder)
            raise NotImplementedError("Pass encoded matrix for ridge predictions")
        elif model_name in {"random_forest", "xgboost"}:
            X = np.column_stack([
                data["attempt_yards"].values.astype(np.float_),
                data["kicker_id"].values.astype(np.float_)  # Convert to float for tree models
            ])
        elif model_name == "catboost":
            X = pd.DataFrame({
                "attempt_yards": data["attempt_yards"].values.astype(np.float32),
                "kicker_id": data["kicker_id"].values.astype("int32"),
            })
        else:
            raise ValueError(f"Unknown model: {model_name}")

        return model.predict_proba(X)[:, 1].astype(np.float_)

    def get_feature_importance(self, model_name: str) -> Optional[NDArray[np.float_]]:
        """
        Get feature importance for tree-based models.
        """
        if model_name not in self.fitted_models:
            return None

        model = self.fitted_models[model_name]

        if model_name in {"catboost"}:
            return model.get_feature_importance().astype(np.float_)
        elif hasattr(model, "feature_importances_"):
            return model.feature_importances_.astype(np.float_)
        else:
            return None

if __name__ == "__main__":
    from src.nfl_kicker_analysis.data.loader import DataLoader
    from src.nfl_kicker_analysis.data.feature_selection import (
        DynamicSchema,
        filter_to_final_features,
        update_schema_numerical,
    )
    from src.nfl_kicker_analysis.data.preprocessor import DataPreprocessor
    from src.nfl_kicker_analysis.data.feature_engineering import FeatureEngineer

    # ─── 1 Load raw data ──────────────────────────────────────────
    loader = DataLoader()
    df_raw = loader.load_complete_dataset()

    # ─── 2 Feature engineering pass ───────────────────────────────
    engineer = FeatureEngineer()
    df_feat = engineer.create_all_features(df_raw)

    for category, details in engineer.get_available_features(df_feat).items():
        print(f"-- {category} --")
        for feat, uniques in details.items():
            print(f"   {feat}: {len(uniques)} unique | sample {uniques[:5] if uniques else '...'}")

    # ─── 3 Define all tunables in one place ─────────────────────
    CONFIG = {
        'min_distance': 20,
        'max_distance': 60,
        'min_kicker_attempts': 8,
        'season_types': ['Reg', 'Post'],  # now include playoffs
        'include_performance_history': True,
        'include_statistical_features': False,
        'include_player_status': True,  # ✅ FIX: Added missing parameter
        'performance_window': 12,
    }


    # ------------------------------------------------------------------
    # 🔧 Single source of truth for column roles – edit freely
    # ------------------------------------------------------------------
    FEATURE_LISTS = {
        "numerical": [
            "attempt_yards", "age_at_attempt", "distance_squared",
            "career_length_years", "season_progress", "rolling_success_rate",
            "current_streak", "distance_zscore", "distance_percentile",
        ],
        "ordinal":  ["season", "week", "month", "day_of_year"],
        "nominal":  [
            "kicker_id", "is_long_attempt", "is_very_long_attempt",
            "is_rookie_attempt", "distance_category", "experience_category",
        ],
        "y_variable": ["success"],
    }

    # ➊  Build schema from the dict
    schema = DynamicSchema(FEATURE_LISTS)

    # read final_features.txt
    with open("data/models/features/final_features.txt", "r") as f:
        final_features = [line.strip() for line in f]
    print(f"---------------final_features---------------")
    print(final_features)
    numeric_final = [f for f in final_features if f in schema.numerical]

    print(f"\n✨ Final feature count: {len(numeric_final)}")
    print("Selected features:")
    for feat in numeric_final:
        print(f"  • {feat}")

    # 🔄 Push into schema so every later stage sees the new list
    update_schema_numerical(schema, numeric_final)

    # output final_features from schema
    FEATURE_LISTS = schema.lists
    print(f"---------------FEATURE_LISTS---------------")
    print(FEATURE_LISTS)

    pre = DataPreprocessor()
    pre.update_config(**CONFIG)
    pre.update_feature_lists(**FEATURE_LISTS)
    _ = pre.preprocess_complete(df_feat)
    X, y = pre.fit_transform_features()

    print("First 5 rows after inverse‑transform round‑trip →")
    print(pre.invert_preprocessing(X[:5]).head())

    # ─── 4  Fit traditional models on real data ───────────────────────────────
    print("Testing Traditional Models…")
    model_suite = TreeBasedModelSuite()

    try:
        results = model_suite.fit_all_models(pre.processed_data)
        print("\n***** Metrics *****")
        for model, metric_dict in results.items():
            print(f"{model}:")
            for k, v in metric_dict.items():
                print(f"   {k:>12}: {v:.4f}")
        print("******* Traditional models tests passed!")

    except Exception as e:
        print(f"------------- Error testing traditional models: {e}")


Overwriting src/nfl_kicker_analysis/models/tree_based_bayes_optimized_models.py


In [None]:
# %%writefile src/nfl_kicker_analysis/models/bayesian.py
"""
Bayesian models for NFL kicker analysis.
Provides hierarchical Bayesian logistic regression and evaluation utilities using PyMC.
"""
from __future__ import annotations

import numpy as np
import pandas as pd
import pymc as pm
import arviz as az
import matplotlib.pyplot as plt        # NEW – for PPC plot
from scipy import stats                # NEW – ECDF sampling for EPA
from typing import Dict, Any, TYPE_CHECKING

from src.nfl_kicker_analysis.utils.metrics import ModelEvaluator
from src.nfl_kicker_analysis.config import FEATURE_LISTS, config

if TYPE_CHECKING:
    from src.nfl_kicker_analysis.data.preprocessor import DataPreprocessor

__all__ = [
    "BayesianModelSuite",
]


class BayesianModelSuite:
    """Hierarchical Bayesian logistic‑regression models for kicker analysis."""

    def __init__(
        self,
        *,
        draws: int = 1_000,
        tune: int = 1_000,
        target_accept: float = 0.95,  # Increased from 0.9 to reduce divergences
        include_random_slope: bool = False,
        random_seed: int | None = 42,
    ) -> None:
        self.draws = draws
        self.tune = tune
        self.target_accept = target_accept
        self.include_random_slope = include_random_slope
        self.random_seed = random_seed

        # Model components - set during fit()
        self._model = None
        self._trace = None
        self._kicker_map = {}
        self._distance_mu = 0.0
        self._distance_sigma = 1.0
        self.baseline_probs = {}  # For consistent EPA baselines with EPACalculator
        self.evaluator = ModelEvaluator()

    def _bootstrap_distances(
        self,
        distances: np.ndarray,
        n_samples: int,
        rng: np.random.Generator,
        weights: np.ndarray | None = None
    ) -> np.ndarray:
        """
        Bootstrap helper using `np.random.Generator.choice`.

        Simple, testable wrapper for bootstrap resampling of field-goal distances.
        Keeps all distance logic in one place and follows NumPy best practices.
        """
        if distances.size == 0:
            raise ValueError("No distances available for bootstrap.")
        return rng.choice(distances, size=n_samples, replace=True, p=weights)

    # ---------------------------------------------------------------------
    # 🛠️  Helper utilities
    # ---------------------------------------------------------------------
    def _standardize(self, x: np.ndarray, *, fit: bool = False) -> np.ndarray:
        if fit:
            self._distance_mu = float(x.mean())
            self._distance_sigma = float(x.std())
        return (x - self._distance_mu) / self._distance_sigma

    def _encode_kicker(self, raw_ids: np.ndarray, *, fit: bool = False,
                       unknown_action: str = "average") -> np.ndarray:
        """
        Map raw kicker IDs → compact indices (kicker_idx).
        """
        if fit:
            self._kicker_map = {pid: i for i, pid in enumerate(np.unique(raw_ids))}
        idx = np.array([self._kicker_map.get(pid, -1) for pid in raw_ids], int)

        if (idx == -1).any():
            n_unseen = (idx == -1).sum()
            msg = f"{n_unseen} unseen kicker IDs – mapped to league mean."
            if unknown_action == "raise":
                raise ValueError(msg)
            elif unknown_action == "warn":
                print("⚠️ ", msg)

        return idx




    # ---------------------------------------------------------------------
    # 🔨  Model construction
    # ---------------------------------------------------------------------
    def _build_model(
        self,
        distance_std: np.ndarray,
        age_c: np.ndarray,           # <-- NEW: centered age
        age_c2: np.ndarray,          # <-- NEW: quadratic age
        exp_std: np.ndarray,
        success: np.ndarray,
        kicker_idx: np.ndarray,
        n_kickers: int,
    ) -> pm.Model:
        with pm.Model() as model:
            # Population-level effects
            alpha = pm.Normal("alpha", 1.5, 1.0)
            beta_dist = pm.Normal("beta_dist", -1.5, 0.8)

            # Age effects (linear + quadratic)
            beta_age  = pm.Normal("beta_age",  0.0, 0.5)
            beta_age2 = pm.Normal("beta_age2", 0.0, 0.5)
            beta_exp  = pm.Normal("beta_exp",  0.0, 0.5)

            # Per-kicker random intercepts (non-centered)
            σ_u   = pm.HalfNormal("sigma_u", 0.8)
            u_raw = pm.Normal("u_raw", 0.0, 1.0, shape=n_kickers)
            u     = pm.Deterministic("u", σ_u * u_raw)

            # Per-kicker random aging slopes (optional enhancement)
            if self.include_random_slope:
                σ_age = pm.HalfNormal("sigma_age", 0.5)
                a_raw = pm.Normal("a_raw", 0.0, 1.0, shape=n_kickers)
                a_k   = pm.Deterministic("a_k", σ_age * a_raw)
                age_slope_effect = a_k[kicker_idx] * age_c
            else:
                age_slope_effect = 0.0

            # Linear predictor
            lin_pred = (
                alpha
                + (beta_dist * distance_std)
                + (beta_age * age_c) + age_slope_effect
                + (beta_age2 * age_c2)
                + (beta_exp * exp_std)
                + u[kicker_idx]
            )

            θ = pm.Deterministic("theta", pm.invlogit(lin_pred))
            pm.Bernoulli("obs", p=θ, observed=success)
        return model

    # ---------------------------------------------------------------------
    # 📈  Public API
    # ---------------------------------------------------------------------
    def fit(self, df, *, preprocessor=None):
        # ------------------------------------------------------------------
        # 0️⃣  Exactly one preprocessing pass
        if df.attrs.get("engineered", False):
            processed = df.copy()
        elif preprocessor is not None:
            processed = preprocessor.preprocess_slice(df)
        else:
            # 🎯 AUTO-CREATE BAYESIAN-MINIMAL PREPROCESSOR
            from src.nfl_kicker_analysis.data.preprocessor import DataPreprocessor
            from src.nfl_kicker_analysis import config

            bayes_preprocessor = DataPreprocessor()
            bayes_preprocessor.bayes_minimal = True  # Enable minimal preprocessing
            bayes_preprocessor.update_config(
                min_distance=20, max_distance=60,
                min_kicker_attempts=5,
                season_types=['Reg', 'Post'],
                include_performance_history=False,  # Not needed for Bayesian
                include_statistical_features=False,  # Avoid complex features
                include_player_status=True,  # Enable player status features
                performance_window=12
            )
            processed = bayes_preprocessor.preprocess_slice(df)
        # ------------------------------------------------------------------
        # 1️⃣  Predictors
        dist_std = self._standardize(processed["attempt_yards"].to_numpy(float), fit=True)

        # Age variables (centered & scaled)
        age_c  = processed["age_c"].to_numpy(float) if "age_c" in processed.columns else np.zeros(len(processed), dtype=float)
        age_c2 = processed["age_c2"].to_numpy(float) if "age_c2" in processed.columns else np.zeros(len(processed), dtype=float)

        # Handle experience standardization
        if "exp_100" in processed.columns:
            exp_std = (
                (processed["exp_100"] - processed["exp_100"].mean()) /
                processed["exp_100"].std()
            ).to_numpy(float)
        else:
            exp_std = np.zeros(len(processed), dtype=float)

        success    = processed["success"].to_numpy(int)
        kicker_idx = self._encode_kicker(processed["kicker_id"].to_numpy(int), fit=True)
        n_kickers  = len(self._kicker_map)

        # ---- model & sampling -------------------------------------------
        self._model = self._build_model(
            dist_std, age_c, age_c2, exp_std, success, kicker_idx, n_kickers
        )

        # ── FIX: ensure pm.sample knows which model to use ────────────
        with self._model:
            self._trace = pm.sample(
                draws=self.draws, tune=self.tune,
                chains=4, target_accept=self.target_accept,
                random_seed=self.random_seed,
                return_inferencedata=True
            )

    def predict(
        self,
        df: pd.DataFrame,
        *,
        return_ci: bool = False,
        preprocessor=None
    ):
        if self._trace is None or self._model is None:
            raise RuntimeError("Model not yet fitted.")

        # -- preprocessing --------------------------------------------------
        if preprocessor is not None:
            df = preprocessor.preprocess_slice(df)

        dist_std = (
            df["distance_zscore"].to_numpy(float)
            if "distance_zscore" in df.columns
            else self._standardize(df["attempt_yards"].to_numpy(float), fit=False)
        )
        kid_idx = self._encode_kicker(df["kicker_id"].to_numpy(int),
                                    fit=False, unknown_action="average")

        # -- posterior samples (non-centered model) ---------------------
        a = self._trace.posterior["alpha"].values.flatten()
        b = self._trace.posterior["beta_dist"].values.flatten()
        u = self._trace.posterior["u"].values.reshape(a.size, -1)  # use u not u_raw

        # Age and experience effects (if available)
        age_effect = 0.0
        if "age_c" in df.columns:
            age_c = df["age_c"].to_numpy(float)
            age_c2 = df["age_c2"].to_numpy(float) if "age_c2" in df.columns else np.zeros_like(age_c)

            beta_age = self._trace.posterior["beta_age"].values.flatten()
            beta_age2 = self._trace.posterior["beta_age2"].values.flatten()

            age_effect = (beta_age[:, None] * age_c +
                         beta_age2[:, None] * age_c2)

            # Add random age slopes if available
            if "a_k" in self._trace.posterior:
                a_k = self._trace.posterior["a_k"].values.reshape(a.size, -1)
                a_k = np.pad(a_k, ((0, 0), (0, 1)), constant_values=0.0)
                idx_age = np.where(kid_idx == -1, a_k.shape[1] - 1, kid_idx)
                age_effect += a_k[:, idx_age] * age_c

        exp_effect = 0.0
        if "exp_100" in df.columns:
            exp_std = ((df["exp_100"] - df["exp_100"].mean()) / df["exp_100"].std()).to_numpy(float)
            beta_exp = self._trace.posterior["beta_exp"].values.flatten()
            exp_effect = beta_exp[:, None] * exp_std

        # Pad league-mean column for unseen kickers
        u = np.pad(u, ((0, 0), (0, 1)), constant_values=0.0)
        idx = np.where(kid_idx == -1, u.shape[1] - 1, kid_idx)

        lin = (a[:, None] + b[:, None] * dist_std +
               age_effect + exp_effect + u[:, idx])
        theta = 1 / (1 + np.exp(-lin))
        mean = theta.mean(axis=0)

        if not return_ci:
            return mean

        lower, upper = np.percentile(theta, [2.5, 97.5], axis=0)
        return mean, lower, upper


    def evaluate(
        self,
        df: pd.DataFrame,
        *,
        preprocessor=None
    ) -> Dict[str, float]:
        """Compute AUC, Brier score & log‑loss on provided data.

        Args:
            df: Data to evaluate on
            preprocessor: Optional DataPreprocessor instance. If provided, will
                         use it to preprocess the data before evaluation.
        """
        # Apply preprocessing if provided
        if preprocessor is not None:
            df = preprocessor.preprocess_slice(df)

        y_true = df["success"].to_numpy(dtype=int)
        y_pred_result = self.predict(df)  # predict() will handle its own preprocessing if needed

        # Handle both single prediction and CI tuple returns
        if isinstance(y_pred_result, tuple):
            y_pred = y_pred_result[0]  # Just use mean predictions for evaluation
        else:
            y_pred = y_pred_result

        return self.evaluator.calculate_classification_metrics(y_true, y_pred)


    def diagnostics(self, *, return_scalars: bool = False) -> Dict[str, Any]:
        """
        Compute and return MCMC diagnostics.

        Parameters
        ----------
        return_scalars : bool, default False
            If True, also include convenience keys
            ``rhat_max`` and ``ess_min`` for quick threshold checks.

        Returns
        -------
        dict
            Keys: rhat, ess (xarray.Dataset), rhat_vals, ess_vals (np.ndarray),
            summary_ok (bool), and optionally rhat_max, ess_min (float).
        """
        if self._trace is None:
            raise RuntimeError("Model not yet fitted.")

        # ArviZ calls (collapse chain/draw)
        rhats = az.rhat(self._trace)
        ess   = az.ess(self._trace)

        # Flatten → numpy for easy thresholding
        rhat_vals = rhats.to_array().values.ravel()
        ess_vals  = ess.to_array().values.ravel()

        summary_ok = (rhat_vals <= 1.01).all() and (ess_vals >= 100).all()
        if not summary_ok:
            print("⚠️  Sampling diagnostics outside recommended thresholds.")

        out = {
            "rhat": rhats,
            "ess": ess,
            "rhat_vals": rhat_vals,
            "ess_vals": ess_vals,
            "summary_ok": summary_ok,
        }
        if return_scalars:
            out["rhat_max"] = rhat_vals.max()
            out["ess_min"] = ess_vals.min()
        return out




    # -----------------------------------------------------------------
    # 🌟  NEW 1: kicker-level credible interval
    # -----------------------------------------------------------------
    def kicker_interval(
        self,
        kicker_id: int,
        distance: float | None = None,
        ci: float = 0.95,
    ) -> Dict[str, float]:
        """
        Return mean, lower, upper success probability for a *single* kicker.

        Args
        ----
        kicker_id : raw ID as in dataframe
        distance  : yards; if None, uses the empirical mean distance of
                    training data, transformed with stored μ/σ.
        ci        : central credible-interval mass (default 0.95)
        """
        if self._trace is None:
            raise RuntimeError("Model must be fitted first")

        # 1 → index or league-mean column
        k_idx = self._kicker_map.get(kicker_id, -1)
        pad_col = len(self._kicker_map)   # after pad in predict()

        # 2 → choose distance
        if distance is None:
            distance_std = 0.0            # z-score of mean is 0
        else:
            distance_std = (distance - self._distance_mu) / self._distance_sigma

        a = self._trace.posterior["alpha"].values.flatten()

        # Robust lookup for the distance slope parameter (handles naming changes)
        slope_name = "beta_dist" if "beta_dist" in self._trace.posterior else "beta"
        b = self._trace.posterior[slope_name].values.flatten()

        u = self._trace.posterior["u"].values.reshape(a.size, -1)

        # pad league-mean
        u = np.pad(u, ((0, 0), (0, 1)), constant_values=0.0)
        idx = pad_col if k_idx == -1 else k_idx

        logit_p = a + b * distance_std + u[:, idx]
        p = 1 / (1 + np.exp(-logit_p))

        lower, upper = np.quantile(p, [(1-ci)/2, 1-(1-ci)/2])
        return {"mean": p.mean(), "lower": lower, "upper": upper,
                "n_draws": p.size, "distance_std": distance_std}

    # -----------------------------------------------------------------
    # 🌟  NEW 2: posterior-predictive plot across 5-yd bins
    # -----------------------------------------------------------------
    def plot_distance_ppc(
        self,
        df: pd.DataFrame,
        *,
        bin_width: int = 5,
        preprocessor = None,
        ax = None
    ):
        """
        Bin attempts by distance and overlay actual vs posterior mean make-rate.
        """
        if preprocessor is not None:
            df = preprocessor.preprocess_slice(df)

        # 1 Actual success by bin
        df = df.copy()
        df["bin"] = (df["attempt_yards"] // bin_width) * bin_width
        actual = df.groupby("bin")["success"].mean()

        # 2 Posterior mean per attempt → group
        preds = self.predict(df)
        df["pred"] = preds
        posterior = df.groupby("bin")["pred"].mean()

        # 3 Plot
        if ax is None:
            fig, ax = plt.subplots(figsize=(8, 4))
        ax.plot(actual.index, actual.values, marker="o",
                label="Actual", linewidth=2)
        ax.plot(posterior.index, posterior.values, marker="s",
                label="Posterior mean", linestyle="--")
        ax.set_xlabel("Distance bin (yards)")
        ax.set_ylabel("FG make probability")
        ax.set_title("Posterior-Predictive Check ({}-yd bins)".format(bin_width))
        ax.legend()
        plt.tight_layout()
        return ax

    # -----------------------------------------------------------------
    # 🌟  NEW 3: age-binned posterior-predictive check
    # -----------------------------------------------------------------
    def plot_age_ppc(
        self,
        df: pd.DataFrame,
        *,
        bin_width: float = 2.0,
        preprocessor = None,
        ax = None
    ):
        """
        Bin attempts by age and overlay actual vs posterior mean make-rate.
        """
        if preprocessor is not None:
            df = preprocessor.preprocess_slice(df)

        # Use raw age for binning (more interpretable)
        age_col = "age_at_attempt" if "age_at_attempt" in df.columns else "age_c"
        df = df.copy()

        if age_col == "age_c":
            # Convert back to raw age for binning
            df["age_bin"] = ((df["age_c"] * 10 + 30) // bin_width) * bin_width
        else:
            df["age_bin"] = (df[age_col] // bin_width) * bin_width

        # Actual success by age bin
        actual = df.groupby("age_bin")["success"].mean()

        # Posterior mean per attempt → group by age
        preds = self.predict(df)
        df["pred"] = preds
        posterior = df.groupby("age_bin")["pred"].mean()

        # Plot
        if ax is None:
            fig, ax = plt.subplots(figsize=(8, 4))
        ax.plot(actual.index, actual.values, marker="o",
                label="Actual", linewidth=2)
        ax.plot(posterior.index, posterior.values, marker="s",
                label="Posterior mean", linestyle="--")
        ax.set_xlabel("Age bin (years)")
        ax.set_ylabel("FG make probability")
        ax.set_title("Age-Based Posterior-Predictive Check ({:.1f}-yr bins)".format(bin_width))
        ax.legend()
        plt.tight_layout()
        return ax

    # ───────────────────────────────────────────────────────────
    # Helper: draw-level EPA simulation  (fully replaced)
    # ───────────────────────────────────────────────────────────
    def _epa_fg_plus_draws(
        self,
        league_df: pd.DataFrame,
        *,
        kicker_ids: np.ndarray,
        n_samples: int,
        rng: np.random.Generator,
        distance_strategy: str = "kicker",
        τ: float = 20.0,
        compute_league_avg_only: bool = False,
        full_league_avg: float | None = None,
    ) -> np.ndarray:
        """
        Monte-Carlo simulate expected points per attempt for every posterior
        draw *and* every kicker in *kicker_ids*.

        Parameters
        ----------
        distance_strategy : {"kicker","league"}
            • "kicker" (default) ─ bootstrap distances **from the kicker's own
              historical attempts** – fairer when kick distributions differ.
            • "league" ─ previous behaviour (single shared pool).
        τ : float, default 20.0
            Empirical-Bayes shrinkage parameter. Higher values = less shrinkage.
        compute_league_avg_only : bool, default False
            If True, only compute the league average without shrinkage.
        full_league_avg : float or None, default None
            If provided, use this as the league average for shrinkage calculation.

        Returns
        -------
        np.ndarray
            Shape = (n_draws, len(kicker_ids)); each entry is the expected
            points per attempt for one posterior draw.
        """
        if self._trace is None:
            raise RuntimeError("Model not yet fitted.")

        if distance_strategy not in {"kicker", "league"}:
            raise ValueError("distance_strategy must be 'kicker' or 'league'")

        n_kickers = len(kicker_ids)

        # ------------------------------------------------------------------
        # 1️⃣ choose a distance pool for each kicker
        # ------------------------------------------------------------------
        if distance_strategy == "league":
            pool = league_df["attempt_yards"].to_numpy(float)
            distance_sets = [pool] * n_kickers
        else:  # "kicker"
            distance_sets = [
                league_df.loc[league_df["kicker_id"] == kid, "attempt_yards"]
                          .to_numpy(float)
                for kid in kicker_ids
            ]

        # ------------------------------------------------------------------
        # 2️⃣ build the simulated attempt frame
        # ------------------------------------------------------------------
        sampled_distances = []
        sampled_kicker_ids = []
        for kid, dists in zip(kicker_ids, distance_sets):
            if dists.size == 0:
                raise ValueError(f"No distance data for kicker_id={kid}")
            sampled = rng.choice(dists, size=n_samples, replace=True)
            sampled_distances.append(sampled)
            sampled_kicker_ids.append(np.full(n_samples, kid, int))

        sim = pd.DataFrame({
            "attempt_yards": np.concatenate(sampled_distances),
            "kicker_id":     np.concatenate(sampled_kicker_ids),
        })

        # ------------------------------------------------------------------
        # 3️⃣ forward pass through the posterior
        # ------------------------------------------------------------------
        dist_std = self._standardize(sim["attempt_yards"].to_numpy(float),
                                     fit=False)
        kid_idx  = self._encode_kicker(sim["kicker_id"].to_numpy(int),
                                       fit=False, unknown_action="average")

        a = self._trace.posterior["alpha"].values.flatten()

        # Robust lookup for the distance slope parameter (handles naming changes)
        slope_name = "beta_dist" if "beta_dist" in self._trace.posterior else "beta"
        b = self._trace.posterior[slope_name].values.flatten()

        u = self._trace.posterior["u"].values.reshape(a.size, -1)
        u = np.pad(u, ((0, 0), (0, 1)), constant_values=0.0)  # league-mean slot
        idx = np.where(kid_idx == -1, u.shape[1] - 1, kid_idx)

        lin   = a[:, None] + b[:, None] * dist_std + u[:, idx]
        theta = 1 / (1 + np.exp(-lin))              # shape (draws , K·S)
        theta = theta.reshape(a.size, n_kickers, n_samples)
        pts   = theta.mean(axis=-1) * 3.0                    # draws × K

        # ----- Empirical-Bayes shrink toward league avg --------------------
        # Use pre-computed full league average or compute from current data
        if full_league_avg is not None:
            league_avg = full_league_avg                     # Use pre-computed full league average
        else:
            league_avg = pts.mean()                          # Fallback: compute from current data

        # If only computing league average, return early
        if compute_league_avg_only:
            return pts  # Return raw points for league average calculation

        # Get attempt counts for shrinkage calculation
        n_i = league_df.groupby("kicker_id")["success"].size().reindex(kicker_ids).to_numpy(float)

        # Standard Empirical Bayes shrinkage toward league average:
        # shrunk = B * raw + (1-B) * prior, where B = n/(n+τ) and prior = league_avg
        B = n_i / (n_i + τ)                                  # shrinkage weights (K,)

        # Apply shrinkage: each kicker's points shrunk toward league average
        shrunk_pts = (B[np.newaxis, :] * pts +               # B * raw_points
                      (1 - B)[np.newaxis, :] * league_avg)   # (1-B) * league_avg

        # Return EPA as deviation from league average
        return shrunk_pts - league_avg                        # EPA relative to league avg

    # ───────────────────────────────────────────────────────────
    # Public: EPA-FG⁺ leaderboard  (patched 2025-06-30)
    # ───────────────────────────────────────────────────────────
    def epa_fg_plus(
        self,
        league_df: pd.DataFrame,
        *,
        n_samples: int = 1_000,
        min_attempts: int = config.MIN_KICKER_ATTEMPTS,
        seed: int | None = None,
        weights: np.ndarray | None = None,
        cred_mass: float = 0.95,
        return_ci: bool = True,
        distance_strategy: str = "kicker",
        τ: float = 20.0,
    ) -> pd.DataFrame:
        """
        Bayesian EPA-FG⁺ leaderboard with minimum attempts filter.

        Parameters
        ----------
        min_attempts : int, default config.MIN_KICKER_ATTEMPTS
            Minimum attempts required for kicker inclusion in leaderboard
        distance_strategy : {"kicker","league"}
            Sampling scheme for the synthetic attempt set:
            • "kicker" (default) ─ bootstrap distances **from the kicker's own
              historical attempts** – fairer when kick distributions differ.
            • "league" ─ previous behaviour (single shared pool).
        τ : float, default 20.0
            Empirical-Bayes shrinkage parameter. Higher values = less shrinkage.

        Returns
        -------
        pandas.DataFrame
            Kicker leaderboard with EPA-FG⁺ and credible intervals.
        """
        # ❶ Baseline must be computed on full-league (reuse EPACalculator for consistency)
        if not self.baseline_probs:
            from src.nfl_kicker_analysis.utils.metrics import EPACalculator
            epa_calc = EPACalculator()
            self.baseline_probs = epa_calc.calculate_baseline_probs(league_df)

        # ❷ Compute full league average FIRST (before any filtering)
        all_kicker_ids = league_df["kicker_id"].unique()
        rng = np.random.default_rng(seed)

        # Get league average from ALL kickers
        full_league_draws = self._epa_fg_plus_draws(
            league_df,  # FULL league data
            kicker_ids=all_kicker_ids,
            n_samples=n_samples,
            rng=rng,
            distance_strategy=distance_strategy,
            τ=τ,
            compute_league_avg_only=True  # New flag to only compute league avg
        )
        full_league_avg = full_league_draws.mean()

        # ❷ NOW apply minimum attempts filter
        qual_ids = (
            league_df.groupby("kicker_id")["success"]
                .size()
                .loc[lambda s: s >= min_attempts]
                .index
        )
        filtered_df = league_df[league_df["kicker_id"].isin(qual_ids)].copy()

        # ❸ Continue with existing logic on filtered data
        kicker_ids = filtered_df["kicker_id"].unique()

        # Kicker metadata
        meta = (
            filtered_df.groupby("kicker_id")["player_name"]
                .first()
                .to_frame()
        )

        # Monte Carlo simulation with pre-computed league average
        draws = self._epa_fg_plus_draws(
            filtered_df,  # Use filtered data for draws
            kicker_ids=kicker_ids,
            n_samples=n_samples,
            rng=rng,
            distance_strategy=distance_strategy,
            τ=τ,
            full_league_avg=full_league_avg  # Pass the pre-computed league average
        )
        assert draws.shape[1] == len(kicker_ids), "shape mismatch in posterior draws"

        # draws now contain EPA values directly (already relative to league average)
        epa_draws    = draws.T       # transpose to get (kickers, draws)
        mean_pts     = draws.mean(axis=0)  # keep for compatibility

        if not return_ci:
            out = pd.DataFrame({
                "epa_fg_plus_mean": epa_draws.mean(axis=1),
            }, index=kicker_ids)
            return out.join(meta).sort_values("epa_fg_plus_mean", ascending=False)

        hdi = az.hdi(epa_draws.T, hdi_prob=cred_mass)        #  (K,2)
        width = hdi[:, 1] - hdi[:, 0]
        q33, q66 = np.quantile(width, [0.33, 0.66])
        certainty = np.where(width < q33, "high",
                     np.where(width < q66, "medium", "low"))

        tbl = pd.DataFrame({
            "epa_fg_plus_mean": epa_draws.mean(axis=1),
            "hdi_lower": hdi[:, 0],
            "hdi_upper": hdi[:, 1],
            "certainty_level": certainty,
            "expected_pts_per_att": mean_pts,
        }, index=kicker_ids)

        return (
            tbl.join(meta)                      # add player_name
               .sort_values("epa_fg_plus_mean", ascending=False)
        )

    # ---------------------------------------------------------------------
    # 🔍  Helper methods for kicker ID/name conversion
    # ---------------------------------------------------------------------
    def get_kicker_id_by_name(self, df: pd.DataFrame, player_name: str) -> int | None:
        """
        Get kicker_id for a given player_name from the dataset.

        Args:
            df: DataFrame containing kicker_id and player_name columns
            player_name: Name of the kicker to look up

        Returns:
            kicker_id if found, None otherwise
        """
        matches = df[df["player_name"] == player_name]["kicker_id"].unique()
        return matches[0] if len(matches) > 0 else None

    def get_kicker_name_by_id(self, df: pd.DataFrame, kicker_id: int) -> str | None:
        """
        Get player_name for a given kicker_id from the dataset.

        Args:
            df: DataFrame containing kicker_id and player_name columns
            kicker_id: ID of the kicker to look up

        Returns:
            player_name if found, None otherwise
        """
        matches = df[df["kicker_id"] == kicker_id]["player_name"].unique()
        return matches[0] if len(matches) > 0 else None

    def kicker_interval_by_name(
        self,
        df: pd.DataFrame,
        player_name: str,
        distance: float | None = None,
        ci: float = 0.95,
    ) -> Dict[str, float]:
        """
        Return mean, lower, upper success probability for a kicker by name.

        Args:
            df: DataFrame containing kicker mappings
            player_name: Name of the kicker
            distance: yards; if None, uses empirical mean
            ci: central credible-interval mass (default 0.95)
        """
        kicker_id = self.get_kicker_id_by_name(df, player_name)
        if kicker_id is None:
            raise ValueError(f"Kicker '{player_name}' not found in dataset")
        return self.kicker_interval(kicker_id, distance, ci)






if __name__ == "__main__":
    from src.nfl_kicker_analysis.data.loader import DataLoader
    from src.nfl_kicker_analysis.data.feature_engineering import FeatureEngineer
    from src.nfl_kicker_analysis.data.preprocessor import DataPreprocessor
    from src.nfl_kicker_analysis.models.bayesian import BayesianModelSuite
    import numpy as np
    import arviz as az
    import matplotlib.pyplot as plt

    import jax
    print(jax.devices())        # should list "GpuDevice" (CUDA) or "METAL" (Apple)
    print(jax.default_backend()) # should print 'gpu', not 'cpu'

    print("🏈 Bayesian Model Suite with DataPreprocessor Integration Demo")
    print("=" * 60)

    # 1️⃣ Loading and engineering features
    print("\n1️⃣ Loading and engineering features...")
    loader = DataLoader()
    df_raw = loader.load_complete_dataset()
    engineer = FeatureEngineer()
    df_feat = engineer.create_all_features(df_raw)
    print(f"   Raw data shape: {df_raw.shape}")
    print(f"   Engineered data shape: {df_feat.shape}")

    # 2️⃣ Configuring preprocessing
    print("\n2️⃣ Configuring preprocessing...")
    CONFIG = {
        'min_distance': 20,
        'max_distance': 60,
        'min_kicker_attempts': 8,
        'season_types': ['Reg', 'Post'],
        'include_performance_history': True,
        'include_statistical_features': False,
        'include_player_status': True,  # ✅ FIX: Added missing parameter
        'performance_window': 12,
    }

    preprocessor = DataPreprocessor()
    preprocessor.update_config(**CONFIG)
    preprocessor.update_feature_lists(**FEATURE_LISTS)

    # 3️⃣ Method A: Manual preprocessing then passing to BayesianModelSuite
    print("\n3️⃣ Method A: Manual preprocessing then passing to BayesianModelSuite")
    print("-" * 50)
    df_processed = preprocessor.preprocess_complete(df_feat)
    print(f"   Processed data shape: {df_processed.shape}")
    train_data = df_processed[df_processed["season"] <= 2017]
    test_data = df_processed[df_processed["season"] == 2018]
    print(f"   Train data shape: {train_data.shape}")
    print(f"   Test data shape: {test_data.shape}")
    suite_a = BayesianModelSuite(draws=1000,
                                 tune=1000,
                                 include_random_slope=False,
                                 random_seed=42)
    suite_a.fit(train_data)
    metrics_a = suite_a.evaluate(test_data)
    print("   Method A Results:")
    for metric, value in metrics_a.items():
        print(f"     {metric}: {value:.4f}")

    # 4️⃣ Method B: Automatic preprocessing within BayesianModelSuite
    print("\n4️⃣ Method B: Automatic preprocessing within BayesianModelSuite")
    print("-" * 50)
    train_raw = df_feat[df_feat["season"] <= 2017]
    test_raw = df_feat[df_feat["season"] == 2018]
    suite_b = BayesianModelSuite(draws=1000,
                                 tune=1000,
                                 include_random_slope=False,
                                 random_seed=42)
    suite_b.fit(train_raw, preprocessor=preprocessor)
    metrics_b = suite_b.evaluate(test_raw, preprocessor=preprocessor)
    print("   Method B Results:")
    for metric, value in metrics_b.items():
        print(f"     {metric}: {value:.4f}")

    # 5️⃣ Comparing both methods
    print("\n5️⃣ Comparing both methods")
    print("-" * 50)
    print("Both methods should produce identical results since they use the same preprocessing pipeline and the same random seed.")
    print("\nMethod A vs Method B comparison:")
    for metric in metrics_a.keys():
        diff = abs(metrics_a[metric] - metrics_b[metric])
        print(f"   {metric}: {diff:.6f} (difference)")

    print("\n✅ Integration complete! The BayesianModelSuite now supports both:")
    print("   • Direct use with preprocessed data")
    print("   • Automatic preprocessing with DataPreprocessor integration")
    print("   • Consistent results across all model families")
    print("   • No performance penalty from preprocessing in MCMC loop")

    # ------------------
    # ▶️ Validation Checks
    # ------------------
    print("\n✅ Running validation checks...")

    # 1️⃣ Credible interval sanity
    cid = suite_b.kicker_interval_by_name(df_feat, "JUSTIN TUCKER", distance=40)
    assert cid["lower"] <= cid["mean"] <= cid["upper"], \
        f"Credible interval ordering failed: {cid}"
    print("• Credible interval check passed.")

    # 2️⃣ Posterior-Predictive Check correlation
    ax = suite_b.plot_distance_ppc(test_data, bin_width=5, preprocessor=preprocessor)
    df_ppc = preprocessor.preprocess_slice(test_data).copy()
    df_ppc["bin"] = (df_ppc["attempt_yards"] // 5) * 5
    actual = df_ppc.groupby("bin")["success"].mean().values
    posterior = df_ppc.assign(pred=suite_b.predict(df_ppc)) \
                   .groupby("bin")["pred"].mean().values
    corr = np.corrcoef(actual, posterior)[0, 1]
    assert corr > 0.9, f"PPC correlation too low: {corr:.3f}"
    print(f"• PPC correlation check passed (r={corr:.3f}).")

    # 3️⃣ EPA-FG+ leaderboard consistency
    epa_tbl = suite_b.epa_fg_plus(df_feat, n_samples=500, return_ci=True)
    assert {"hdi_lower", "hdi_upper", "certainty_level"}.issubset(epa_tbl.columns)
    print(epa_tbl.head())

    # Check that Justin Tucker is among the top kickers using name lookup
    justin_tucker_id = suite_b.get_kicker_id_by_name(df_feat, "JUSTIN TUCKER")
    top_kickers = epa_tbl.index.tolist()[:3]
    if justin_tucker_id is not None:
        assert justin_tucker_id in top_kickers, \
            f"Expected JUSTIN TUCKER (ID: {justin_tucker_id}) in top 3: {top_kickers}"
        print("• EPA-FG+ leaderboard check passed.")
    else:
        print("• WARNING: JUSTIN TUCKER not found in dataset, skipping leaderboard check.")

    # 4️⃣ MCMC diagnostics
    diag = suite_b.diagnostics(return_scalars=True)
    assert diag["summary_ok"], (
        f"R-hat max={diag['rhat_max']:.3f}, "
        f"ESS min={diag['ess_min']:.0f}"
    )
    print("• Diagnostics check passed (R-hat ≤1.01, ESS ≥100).")

    print("\n🎉 All validation checks passed!")


ValueError: invalid truth value '' for environment 'JAX_ENABLE_X64'

# 7b.Bayesian Time Series Model

In [None]:
%%writefile src/nfl_kicker_analysis/models/bayesian_timeseries.py
"""
Time-Series Bayesian Models for NFL Kicker Analysis
==================================================
NEW IN v1.1.0  (2025-06-30)

* Hierarchical dynamic-linear model (level + trend) **or** SARIMA
  built on PyMC 5 (+ pymc-experimental).
* Mirrors the API of BayesianModelSuite so the rest of the pipeline
  stays unchanged.
"""
from __future__ import annotations
import warnings
from typing import Dict, Optional, List, Union

import numpy as np
import pandas as pd
import pymc as pm
import arviz as az

import jax  # keep for device detection
import numpyro

# Silence ArviZ HDI FutureWarnings
warnings.filterwarnings("ignore", category=FutureWarning, module="arviz")

# Optional fast Kalman-filter backend
try:
    from pymc_experimental.statespace import SARIMA   # type: ignore
except ImportError:  # graceful fallback for CI
    SARIMA = None

# -------------------------------------------------------------- #
# utils – choose chain layout safely                              #
# -------------------------------------------------------------- #
def _choose_chain_config(requested: int, use_jax: bool) -> dict:
    """
    Return a dict of kwargs for PyMC/JAX samplers guaranteeing ≥2 chains.
    On a single-device JAX setup we vectorise chains; on CPU we parallelise.
    """
    if use_jax:
        # run all requested chains on the same GPU
        return dict(chains=max(2, requested), chain_method="vectorized")
    else:
        # let PyMC fork processes; fall back to sequential if memory is tight
        return dict(chains=max(2, requested), cores=min(4, max(2, requested)))

# ------------------------------------------------------------------ #
# Helper – aggregate attempt-level rows → weekly counts per kicker   #
# ------------------------------------------------------------------ #
def _prep_series(df: pd.DataFrame, freq: str) -> pd.DataFrame:
    """
    Collapse attempt-level data to (attempts, made) per kicker & period.

    Also preserves age-related columns by taking the mean within each period.

    * Ensures ``game_date`` is datetime64[ns] so `.dt` accessors work.
    * Returns a tidy frame sorted by kicker & date_key.
    """
    if "game_date" not in df.columns:
        raise KeyError("Column 'game_date' required for time-series modelling")

    if not np.issubdtype(df["game_date"].dtype, np.datetime64):
        df = df.copy()
        df["game_date"] = pd.to_datetime(df["game_date"])

    # Build aggregation dictionary
    agg_dict = {
        "success": ["size", "sum"]  # attempts = size, made = sum
    }

    # Add age-related columns if they exist
    age_cols = ["age_at_attempt", "career_year", "seasons_of_experience"]
    for col in age_cols:
        if col in df.columns:
            agg_dict[col] = ["mean"]  # Take mean within period

    out = (
        df.assign(date_key=df["game_date"].dt.to_period(freq).dt.start_time)
          .groupby(["player_name", "player_id", "date_key"], sort=False)
          .agg(agg_dict)
          .reset_index()
    )

    # Flatten multi-level columns and rename
    if isinstance(out.columns, pd.MultiIndex):
        new_cols = []
        for col in out.columns:
            if col[0] == "success" and col[1] == "size":
                new_cols.append("attempts")
            elif col[0] == "success" and col[1] == "sum":
                new_cols.append("made")
            elif col[1] == "":
                new_cols.append(col[0])  # groupby columns
            else:
                new_cols.append(col[0])  # age columns
        out.columns = new_cols

    out["success_rate"] = out["made"] / out["attempts"]
    return out.sort_values(["player_name", "date_key"], ignore_index=True)

# ------------------------------------------------------------------ #
# Main class                                                         #
# ------------------------------------------------------------------ #
class TimeSeriesBayesianModelSuite:
    """
    Hierarchical DLM / SARIMA wrapper for weekly kicker make-rates.

    Parameters
    ----------
    freq          : str, default "W-MON"
    draws, tune,
    target_accept : passed to `pm.sample`
    random_seed   : int | None
    use_sarima    : bool – if True **and** pymc_experimental is installed,
                    approximates the latent process via SARIMA for speed.
    """

    def __init__(
        self,
        *,
        freq: str = "W-MON",
        draws: int = 1_000,
        tune: int = 1_000,
        target_accept: float = 0.9,
        random_seed: int | None = 42,
        use_sarima: bool = False,
        debug: bool = False,
        diag_vars: Optional[List[str]] = None,
        use_jax: Optional[bool] = None,
        chains: int = 4,                          # NEW: desired chains
        init_sigma: float = 5.0,                  # NEW: σ for initial state
    ):
        self.freq, self.draws, self.tune = freq, draws, tune
        self.target_accept, self.random_seed = target_accept, random_seed
        self.use_sarima = use_sarima and SARIMA is not None
        self.debug = debug
        self.diag_vars = diag_vars or ["sigma_level", "sigma_trend", "lvl", "trd"]

        self.init_sigma = init_sigma
        self.chains = chains
        # Auto-detect GPU: use JAX if any CUDA devices available
        self.use_jax = bool(jax.devices()) if use_jax is None else use_jax

        self._model: Optional[pm.Model] = None
        self._trace: Optional[az.InferenceData] = None
        self._meta: Optional[pd.DataFrame] = None

    # -------------------------------------------------------------- #
    # tiny helper so we never sprinkle print() everywhere
    def _log(self, *msg):
        if self.debug:
            print("[TS-Bayes]", *msg)

    # -------------------------------------------------------------- #
    # Fit                                                            #
    # -------------------------------------------------------------- #
    def fit(
        self,
        df: pd.DataFrame,
        *,
        preprocessor=None,
        min_attempts: int = 5,
    ) -> None:
        """
        Fit the hierarchical time-series model on attempt-level ``df``.

        Parameters
        ----------
        df            : long DataFrame (one row per FG attempt)
        preprocessor  : optional ``DataPreprocessor`` to mirror BayesianModelSuite
        min_attempts  : kickers with < ``min_attempts`` total are dropped
                        to keep the latent state-space well-conditioned.
        """
        # 0️⃣ optional slice-level preprocessing – keeps pipeline symmetry
        if preprocessor is not None:
            df = preprocessor.preprocess_slice(df)

        # 1️⃣ drop ultra-low-volume kickers
        keep = df.groupby("player_name")["success"].size().loc[lambda s: s >= min_attempts].index
        df = df[df["player_name"].isin(keep)]

        # 2️⃣ aggregate to weekly (or monthly) counts
        ts = _prep_series(df, self.freq)
        self._meta = (
            ts[["player_name", "player_id"]].drop_duplicates().set_index("player_name")
        )
        players = ts["player_name"].unique()
        P       = len(players)

        # 3️⃣ build rectangular player × time grid
        full_idx = (
            ts.groupby("player_name", sort=False)["date_key"]
            .apply(lambda s: pd.date_range(s.min(), s.max(), freq=self.freq))
            .explode()
            .reset_index()
            .rename(columns={0: "date_key"})
        )
        ts_full = full_idx.merge(ts, how="left", on=["player_name", "date_key"])
        ts_full[["attempts", "made"]] = ts_full[["attempts", "made"]].fillna(0.0)

        y = ts_full["made"].to_numpy(int)        # successes
        n = ts_full["attempts"].to_numpy(int)    # trials

        player_idx = pd.Categorical(ts_full["player_name"], categories=players).codes
        time_idx   = ts_full.groupby("player_name").cumcount()
        T_max      = int(time_idx.max()) + 1

        # 4️⃣ Static covariates ------------------------------------
        #   Enhanced age modeling: prefer career_year over age, then age, then zeros
        if "career_year" in ts_full.columns:
            # Use career_year for smooth aging curves (preferred)
            age_covariate = ts_full["career_year"].to_numpy(float)
            age_covariate_name = "career_year"
            self._log("Using career_year for age modeling")
        elif "age" in ts_full.columns:
            # Fallback to age_at_attempt
            age_covariate = ts_full["age"].to_numpy(float)
            age_covariate_name = "age"
            self._log("Using age for age modeling")
        else:
            # No age information available
            age_covariate = np.zeros(len(ts_full), dtype=float)
            age_covariate_name = "none"
            self._log("No age information available - using zeros")

        # 5️⃣ build PyMC model
        with pm.Model() as self._model:
            if self.use_sarima:
                if SARIMA is None:
                    raise RuntimeError(
                        "pymc-experimental not installed – "
                        "set use_sarima=False or install the extra dependency."
                    )
                sarima = SARIMA(
                    name="sarima",
                    endog=y,
                    exog=None,
                    order=(0, 1, 1),
                    seasonal_order=(0, 0, 0, 0),
                    measurement_error=True,
                )
                mu_mat = sarima.states.reshape((1, -1))  # flattened for indexing
            else:
                init_0 = pm.Normal.dist(0.0, self.init_sigma)  # explicit init_dist
                σ_lvl = pm.Exponential("sigma_level", 1.0)
                σ_trd = pm.Exponential("sigma_trend", 1.0)
                lvl = pm.GaussianRandomWalk(
                    "lvl", sigma=σ_lvl, init_dist=init_0, shape=(P, T_max)
                )
                trd = pm.GaussianRandomWalk(
                    "trd", sigma=σ_trd, init_dist=init_0, shape=(P, T_max)
                )
                # identical trailing dimension – avoids broadcast crash
                step_frac = pm.math.constant(np.arange(T_max) / T_max)
                mu_mat = lvl + trd * step_frac          # ✅ broadcast-safe

            β_age = pm.Normal("beta_age", 0.0, 1.0)          # NEW: age/career_year effect

            # Apply age effect directly at observation level
            mu_obs = mu_mat[player_idx, time_idx] + β_age * age_covariate

            theta = pm.Deterministic(
                "theta", pm.math.invlogit(mu_obs)
            )
            pm.Binomial("obs", n=n, p=theta, observed=y)

            # -------- sampling ------------------------------------
            chain_cfg = _choose_chain_config(self.chains, self.use_jax)

            if self.use_jax:
                numpyro.set_host_device_count(chain_cfg["chains"])
                from pymc.sampling.jax import sample_numpyro_nuts
                self._trace = sample_numpyro_nuts(
                    draws=self.draws,
                    tune=self.tune,
                    target_accept=self.target_accept,
                    random_seed=self.random_seed,
                    progressbar=True,
                    **chain_cfg,
                )
            else:
                self._trace = pm.sample(
                    draws=self.draws,
                    tune=self.tune,
                    target_accept=self.target_accept,
                    random_seed=self.random_seed,
                    idata_kwargs={"log_likelihood": False},
                    progressbar=True,
                    **chain_cfg,
                )


    # -------------------------------------------------------------- #
    # Forecast                                                       #
    # -------------------------------------------------------------- #
    def forecast(self, *, steps: int = 6, hdi_prob: float = 0.9) -> pd.DataFrame:
        """
        Monte-Carlo simulate `steps` weeks ahead for **all** fitted kickers.

        Returns
        -------
        DataFrame with columns ['player_name', 'step', 'p_mean', 'hdi_lower', 'hdi_upper']
        """
        if any(obj is None for obj in (self._trace, self._meta, self._model)):
            raise RuntimeError("Call `.fit()` before forecasting")

        players = self._meta.index.tolist()
        lvl = self._trace.posterior["lvl"].values
        trd = self._trace.posterior["trd"].values
        chains, draws, P, T = lvl.shape
        lvl_last = lvl[:, :, :, -1].reshape(chains*draws, P)
        trd_last = trd[:, :, :, -1].reshape(chains*draws, P)

        rng = np.random.default_rng(self.random_seed)
        records: List[Dict[str, Union[str, int, float]]] = []
        for p_idx, name in enumerate(players):
            noise = rng.normal(0, 0.1, size=(chains*draws, steps)).cumsum(axis=1)
            lvl_path = lvl_last[:, p_idx][:, None] + noise
            trd_path = trd_last[:, p_idx][:, None]
            mu = lvl_path + trd_path * np.arange(1, steps+1)
            theta = 1 / (1 + np.exp(-mu))
            mean = theta.mean(axis=0)
            hdi  = az.hdi(theta, hdi_prob=hdi_prob)
            for s, (m, lo, hi) in enumerate(zip(mean, hdi[:,0], hdi[:,1]), 1):
                records.append({"player_name": name,
                                "step": s,
                                "p_mean": float(m),
                                "hdi_lower": float(lo),
                                "hdi_upper": float(hi)})
        return pd.DataFrame(records)

    # -------------------------------------------------------------- #
    # Diagnostics                                                    #
    # -------------------------------------------------------------- #
    def diagnostics(self, *, thin: int = 5) -> Dict[str, float]:
        if self._trace is None:
            raise RuntimeError("Model not fitted")

        if self._trace.posterior.dims["chain"] < 2:
            # r̂ undefined → return sentinel values and warn once
            warnings.warn("Only one chain present – r̂ unavailable; run ≥2 chains for convergence diagnostics.")
            return {"rhat_max": float("nan"), "ess_min": float("nan")}

        rhat_max, ess_min = -np.inf, np.inf
        for var in self.diag_vars:
            if var not in self._trace.posterior:
                continue
            data = self._trace.posterior[var]
            if thin > 1 and data.ndim > 2:
                slc = (slice(None), slice(None)) + (slice(None, None, thin),) * (data.ndim - 2)
                data = data[slc]
            rhat_max = max(rhat_max, az.rhat(data).to_array().max().item())
            ess_min  = min(ess_min,  az.ess(data, method="bulk").to_array().min().item())
        return {"rhat_max": float(rhat_max), "ess_min": float(ess_min)}

    # -------------------------------------------------------------- #
    # Quick plot                                                     #
    # -------------------------------------------------------------- #
    def plot_forecast(self, player_name: str, *, ax=None):
        """
        Plot historical weekly make-probability with 90 % HDI for one kicker.
        """
        import matplotlib.pyplot as plt
        if self._trace is None or self._meta is None:
            raise RuntimeError("Fit the model first")
        if player_name not in self._meta.index:
            raise ValueError(f"Unknown player '{player_name}'")

        p_idx = list(self._meta.index).index(player_name)
        lvl = self._trace.posterior["lvl"].sel(lvl_dim_0=p_idx).stack(draws=("chain","draw"))
        trd = self._trace.posterior["trd"].sel(trd_dim_0=p_idx).stack(draws=("chain","draw"))
        T = lvl.shape[-1]
        x = np.arange(T)
        mu = lvl + trd * (x / T)
        p  = 1 / (1 + np.exp(-mu))
        mean = p.mean("draws")
        hdi  = az.hdi(p, hdi_prob=0.9)

        if ax is None:
            _, ax = plt.subplots(figsize=(8,4))
        ax.plot(x, mean, label="p_mean")
        ax.fill_between(x, hdi.sel(hdi="lower"), hdi.sel(hdi="upper"), alpha=0.3,
                        label="90 % HDI")
        ax.set_xlabel("Week index")
        ax.set_ylabel("Make probability")
        ax.set_title(f"Historical make-probability – {player_name}")
        ax.legend()
        return ax

# ------------------------------------------------------------------ #
# Smoke-test CLI                                                     #
# ------------------------------------------------------------------ #
if __name__ == "__main__":
    from src.nfl_kicker_analysis.data.loader import DataLoader
    from src.nfl_kicker_analysis.data.feature_engineering import FeatureEngineer

    print("🏈  Time-Series Bayesian demo (weekly make-rates)")
    df_raw  = DataLoader().load_complete_dataset()
    df_feat = FeatureEngineer().create_all_features(df_raw)

    ts = TimeSeriesBayesianModelSuite(freq="W-MON",
                                      draws=250,
                                      tune=250,
                                      use_sarima=True,
                                      target_accept=.85,
                                      )
    ts.fit(df_feat[df_feat["season"] <= 2018])             # train through 2019
    fcst = ts.forecast(steps=6)                            # next six weeks
    print(fcst.head())

    diag = ts.diagnostics()
    print(f"R-hat ≤ 1.01 ? {diag['rhat_max']:.3f} | ESS ≥ 100 ? {diag['ess_min']:.0f}")


Writing src/nfl_kicker_analysis/models/bayesian_timeseries.py


## 8. Remaining Init Files and Unit Tests

Creating the remaining __init__.py files and comprehensive unit tests.


In [27]:
%%writefile src/nfl_kicker_analysis/models/__init__.py
"""Models module for NFL kicker analysis."""

from .tree_based_bayes_optimized_models import TreeBasedModelSuite
from .bayesian_timeseries import TimeSeriesBayesianModelSuite

__all__ = ['TreeBasedModelSuite', 'TimeSeriesBayesianModelSuite']


Writing src/nfl_kicker_analysis/models/__init__.py


In [28]:
%%writefile src/nfl_kicker_analysis/utils/__init__.py
"""Utils module for NFL kicker analysis."""

from .metrics import EPACalculator, ModelEvaluator

__all__ = ['EPACalculator', 'ModelEvaluator']


Writing src/nfl_kicker_analysis/utils/__init__.py
