In [None]:
%%writefile src/data/load_data.py
import pandas as pd


def load_raw(path='data/Research Data Project/Research Data Project/exit_velo_project_data.csv'):
    """
    Load raw exit velocity data from the data directory.
    
    Returns:
        pd.DataFrame: Raw exit velocity data
    """
    # This is a placeholder - in a real implementation, you would load actual data
    # For example: df = pd.read_csv('data/raw/exit_velo.csv')

    df = pd.read_csv(path)
    return df 


if __name__ == "__main__":
    path = 'data/Research Data Project/Research Data Project/exit_velo_project_data.csv'
    df = load_raw(path)
    print(df.head())
    print(df.columns)

    # --- inspect nulls in the raw data ---
    null_counts = df.isnull().sum()
    null_counts = null_counts[null_counts > 0]
    if null_counts.empty:
        print("✅  No missing values in raw data.")
    else:
        print("=== Raw data null counts ===")
        for col, cnt in null_counts.items():
            print(f" • {col!r}: {cnt} missing")
    

# Feature Engineering

In [None]:
%%writefile src/features/feature_engineering.py
"""Feature engineering utilities for the exit‑velo project.

All helpers are pure functions that take a pandas DataFrame and return a *copy*
with additional engineered columns so we avoid side effects.

Example
-------
>>> from src.data.load_data import load_raw
>>> from src.features.feature_engineering import feature_engineer
>>> df = load_raw()
>>> df_fe = feature_engineer(df)
"""
from __future__ import annotations

import pandas as pd
import numpy as np

###############################################################################
# Helper functions
###############################################################################

def _rolling_stat(
    df: pd.DataFrame,
    group_cols: list[str],
    target: str,
    stat: str = "mean",
    window: int = 50,
) -> pd.Series:
    """Group‑wise rolling statistic.  Sorted by season order of appearance.

    The function first sorts by the index order (assumed chronological inside each
    group) then applies a rolling window with *min_periods=10* so early samples
    are not overly noisy.
    """
    return (
        df.sort_values(group_cols)
        .groupby(group_cols)[target]
        .rolling(window, min_periods=10)
        .agg(stat)
        .reset_index(level=group_cols, drop=True)
    )

###############################################################################
# Public API
###############################################################################

def feature_engineer(df: pd.DataFrame, copy: bool = True) -> pd.DataFrame:
    """Return a DataFrame enriched with engineered features.

    Parameters
    ----------
    df   : raw data as returned by ``load_raw``
    copy : if *True* (default) operate on a copy so the original is untouched.
    """

    if copy:
        df = df.copy()

    # ────────────────────────────────────────────────────────────────────────
    # 1. Basic type harmonisation & canonical casing
    # ────────────────────────────────────────────────────────────────────────
    str_cols = df.select_dtypes(include=['object', 'string']).columns
    df[str_cols] = df[str_cols].apply(lambda col: col.str.upper())

    # ────────────────────────────────────────────────────────────────────────
    # 2. Age engineering (dynamic quantile bins)
    # ────────────────────────────────────────────────────────────────────────
    df["age_sq"] = df["age"] ** 2  # capture non‑linear aging curve
    n_age_bins = 4
    df["age_bin"] = pd.qcut(df["age"], q=n_age_bins, duplicates='drop')
    
    # ────────────────────────────────────────────────────────────────────────
    # 3. Height normalisation relative to MLB average (~74 inches)
    # ────────────────────────────────────────────────────────────────────────
    avg_batter_height = df["batter_height"].mean()
    
    # Use the average batter height to calculate height_diff
    df["height_diff"] = df["batter_height"] - avg_batter_height

    # ────────────────────────────────────────────────────────────────────────
    # 4. Launch / spray angle buckets (dynamic quantile bins) & barrel indicator
    # ────────────────────────────────────────────────────────────────────────
    # Dynamic launch-angle bins
    n_la_bins = 5
    df["la_bin"] = pd.qcut(df["launch_angle"], q=n_la_bins, duplicates='drop')

    # Dynamic spray-angle bins
    n_spray_bins = 3
    df["spray_bin"] = pd.qcut(df["spray_angle"], q=n_spray_bins, duplicates='drop')

    # Statcast barrel proxy: EV >= 98 & 26° > LA >= 8°
    df["is_barrel"] = (
        (df["exit_velo"] >= 98) & (df["launch_angle"] >= 8) & (df["launch_angle"] < 26)
    ).astype("category")


    # ────────────────────────────────────────────────────────────────────────
    # 6. Handedness & matchup indicators
    # ────────────────────────────────────────────────────────────────────────
    df["same_hand"] = (df["batter_hand"] == df["pitcher_hand"])
    df["hand_match"] = df["batter_hand"] + "_VS_" + df["pitcher_hand"]

    # ────────────────────────────────────────────────────────────────────────
    # 7. Pitch‑type interactions
    # ────────────────────────────────────────────────────────────────────────
    df["pitch_hand_match"] = df["pitch_group"] + "_" + df["hand_match"]

    # ────────────────────────────────────────────────────────────────────────
    # 8. Player‑level historical stats (rolling EV mean & SD)
    #    These capture each hitter’s *latent* ability and shrink early samples
    #    via wider rolling windows.
    # ────────────────────────────────────────────────────────────────────────
    df["player_ev_mean50"] = _rolling_stat(df, ["batter_id"], "exit_velo", "mean", 50)
    df["player_ev_std50"] = _rolling_stat(df, ["batter_id"], "exit_velo", "std", 50)
    # ✱ NEW: safe assignment, no inplace chain ✱
    ev_mean_global = df["exit_velo"].mean()
    ev_std_global  = df["exit_velo"].std()
    df["player_ev_mean50"] = df["player_ev_mean50"].fillna(ev_mean_global)
    df["player_ev_std50"]  = df["player_ev_std50"].fillna(ev_std_global)

    # 9a. Hard‑hit & barrel‑adjacent flags
    df["hard_hit"] = (df["exit_velo"] >= 95).astype("category")
    df["near_barrel"] = (
        (df["exit_velo"].between(95, 98)) &
        (df["launch_angle"].between(5, 30))
    ).astype("category")

    # 9b. EV × LA and distance proxy
    df["ev_la_product"] = df["exit_velo"] * (df["launch_angle"] + 90)
    df["est_distance"] = df["exit_velo"] * df["hangtime"]
    #  Variance‑stabilised EV×LA
    df["ev_la_sqrt"] = np.sqrt(df["ev_la_product"].clip(lower=0))

    # 10. Pitcher rolling stats
    df["pitcher_ev_mean50"] = _rolling_stat(df, ["pitcher_id"], "exit_velo", "mean", 50)
    df["pitcher_ev_mean50"].fillna(df["exit_velo"].mean(), inplace=True)

    # 11. Outcome encoding – simple value mapping for power/speed signal.
    _OUTCOME_W = {
        "out": 0,
        "single": 1,
        "double": 2,
        "triple": 3,
        "home run": 4,
    }
    df["outcome_val"] = df["outcome"].str.lower().map(_OUTCOME_W)

    # centred covariates
    df["age_centered"]    = df["age"] - df["age"].median()
    df["season_centered"] = df["season"] - df["season"].median()   # ⬅ NEW
    df["level_idx"]       = df["level_abbr"].map({"AA": 0, "AAA": 1, "MLB": 2})
    
    return df

###############################################################################
# CLI entry‑point (quick smoke test)
###############################################################################

if __name__ == "__main__":
    from pathlib import Path
    from src.data.load_data import load_raw

    raw_path = "data/Research Data Project/Research Data Project/exit_velo_project_data.csv"
    df = load_raw(raw_path)
    print(df.head())
    print(df.columns)

    # --- inspect nulls in the raw data ---
    null_counts = df.isnull().sum()
    null_counts = null_counts[null_counts > 0]
    if null_counts.empty:
        print("✅  No missing values in raw data.")
    else:
        print("=== Raw data null counts ===")
        for col, cnt in null_counts.items():
            print(f" • {col!r}: {cnt} missing")
    df_fe = feature_engineer(df)

    print("Raw →", df.shape, "//  Feature‑engineered →", df_fe.shape)
    print(df_fe.head())
    print(df_fe.columns)


# ColumnSchema: separates raw and engineered columns

In [None]:
%%writefile src/data/ColumnSchema.py
""" 
Column schema helper for exit‑velo project.

Centralises every raw and engineered column name in one place and exposes
 type‑safe accessors so downstream code never hard‑codes strings.

Usage
-----
>>> from src.features.columns import cols
>>> num_cols = cols.numerical()
>>> ord_cols = cols.ordinal()
>>> all_for_model = cols.model_features()
"""

from functools import lru_cache
from typing import List, Dict
import json

class _ColumnSchema:
    """Container for canonical column lists.

    Keeping everything behind methods avoids accidental mutation and lets
    IDEs offer autocompletion (because the return type is always `List[str]`).
    """

    _ID_COLS: List[str] = [
        "season", "batter_id", "pitcher_id",
    ]

    _ORDINAL_CAT_COLS: List[str] = [
        "level_abbr",   # AA < AAA < MLB
        "age_bin",      # 4 quantile bins of age
        "la_bin",       # 5 quantile bins of launch angle
        "spray_bin",    # 3 quantile bins of spray angle
        "outcome_val",  # 0–4 mapping of out→HR
    ]

    _NOMINAL_CAT_COLS: List[str] = [
        "hit_type",
        "outcome",
        "pitch_group",
        "batter_hand",
        "pitcher_hand",
        "hand_match",
        "pitch_hand_match",
        "same_hand",
        "is_barrel",
        "hard_hit",
        "near_barrel",
    ]

    _NUMERICAL_COLS: List[str] = [
        # raw inputs
        "exit_velo",
        "launch_angle",
        "spray_angle",
        "hangtime",
        # engineered continuous
        "ev_la_product",
        "ev_la_sqrt",
        "est_distance",
        "player_ev_mean50",
        "player_ev_std50",
        "pitcher_ev_mean50",
        "level_idx",
        "season_centered",
        "age_centered"
    ]

    _TARGET_COL: str = "exit_velo"

    # ────────────────────────────────────────────────────────────────────
    # Public helpers
    # ────────────────────────────────────────────────────────────────────
    def id(self) -> List[str]:
        return self._ID_COLS.copy()

    def ordinal(self) -> List[str]:
        return self._ORDINAL_CAT_COLS.copy()

    def nominal(self) -> List[str]:
        return self._NOMINAL_CAT_COLS.copy()

    def categorical(self) -> List[str]:
        """All cat cols (ordinal + nominal)."""
        return self._ORDINAL_CAT_COLS + self._NOMINAL_CAT_COLS

    def numerical(self) -> List[str]:
        return self._NUMERICAL_COLS.copy()


    def target(self) -> str:
        return self._TARGET_COL

    # ------------------------------------------------------------------
    @lru_cache(maxsize=1)
    def model_features(self) -> List[str]:
        """Columns fed into the ML pipeline *after* preprocess.

        Excludes the target but includes derived cols.
        """
        phys_minus_target = [
            c for c in self._NUMERICAL_COLS if c != self._TARGET_COL
        ]

        return (
            phys_minus_target
            + self._NOMINAL_CAT_COLS
            + self._ORDINAL_CAT_COLS  # some algos want raw string order
        )

    def all_raw(self) -> List[str]:
        """Returns every column expected in raw input CSV (incl. engineered)."""
        return (
            self._ID_COLS
            + self._ORDINAL_CAT_COLS
            + self._NOMINAL_CAT_COLS
            + self._NUMERICAL_COLS
        )

    def as_dict(self) -> Dict[str, List[str]]:
        """Dictionary form – handy for YAML/JSON dumps."""
        return {
            "id": self.id(),
            "ordinal": self.ordinal(),
            "nominal": self.nominal(),
            "numerical": self.numerical(),
            "target": [self.target()],
        }


if __name__ == "__main__":
    # singleton instance people can import as `cols`
    cols = _ColumnSchema()

    __all__ = ["cols"]
    print("ID columns:         ", cols.id())
    print("Ordinal columns:    ", cols.ordinal())
    print("Nominal columns:    ", cols.nominal())
    print("All categorical:    ", cols.categorical())
    print("Numerical columns:  ", cols.numerical())
    print("Target column:      ", cols.target())
    print("Model features:     ", cols.model_features())
    print("All raw columns:    ", cols.all_raw())
    print("\nAs dict (JSON):")
    print(json.dumps(cols.as_dict(), indent=2))



In [None]:
%%writefile src/features/eda.py

import pandas as pd  # type: ignore
import matplotlib.pyplot as plt  # type: ignore
import numpy as np  # type: ignore
from src.data.ColumnSchema import _ColumnSchema

# Optional imports with fallbacks for advanced statistics
try:
    import scipy.stats as stats  # type: ignore
    from statsmodels.nonparametric.smoothers_lowess import (
        lowess  # type: ignore
    )
    from statsmodels.stats.stattools import (
        durbin_watson  # type: ignore
    )
    _HAS_STATS_LIBS = True
except ImportError:
    _HAS_STATS_LIBS = False
    print("Warning: scipy or statsmodels not available. "
          "Some diagnostics will be limited.")

from scipy.stats import f_oneway, ttest_ind



def get_column_groups() -> dict:
    """
    Return a mapping of column-type → list of columns,
    based on the canonical schema in src.features.feature_selection.cols.
    """
    return cols.as_dict()

def check_nulls(df: pd.DataFrame):
    # Identify columns with null values
    null_columns = df.columns[df.isnull().any()].tolist()
    
    # Output the columns with null values
    if null_columns:
        print("Columns with null values:", null_columns)
    else:
        print("No columns with null values.")


def quick_pulse_check(
    df: pd.DataFrame,
    velo_col: str = "exit_velo",
    group_col: str = "batter_id",
    level_col: str = "level_abbr"
) -> pd.DataFrame:
    """
    Print a quick summary table:
      - total rows
      - unique batters
      - overall median exit_velo
      - median exit_velo by level
      - distribution of events per batter (median, 25th pct)
      - distribution of seasons per batter
      - pearson correlations of velo with launch_angle & hangtime
    Returns a pd.DataFrame with those metrics.
    """
    df = df.copy()
    total_rows = len(df)
    n_batters = df[group_col].nunique()
    overall_med = df[velo_col].median()

    # median by level
    med_by_level = df.groupby(level_col)[velo_col].median()

    # events per batter
    ev_per = df[group_col].value_counts()
    ev_stats = ev_per.quantile([0.25, 0.5]).to_dict()

    # seasons per batter
    seasons_per = df.groupby(group_col)["season"].nunique()
    seasons_stats = seasons_per.value_counts().sort_index().to_dict()

    # basic correlations
    corr = df[[velo_col, "launch_angle", "hangtime"]].corr()[velo_col].drop(velo_col)

    # Build a summary table
    metrics = [
        "Total rows",
        "Unique batters",
        "Overall median EV",
    ]
    values = [
        total_rows,
        n_batters,
        overall_med,
    ]
    
    # Add level-specific metrics
    for lvl in med_by_level.index:
        metrics.append(f"Median EV @ {lvl}")
        values.append(med_by_level[lvl])
    
    # Add batter event metrics
    metrics.extend([
        "Events per batter (25th pct)",
        "Events per batter (median)",
    ])
    values.extend([
        ev_stats.get(0.25, "N/A"),
        ev_stats.get(0.5, "N/A"),
    ])
    
    # Add season distribution
    for season_count, count in seasons_stats.items():
        metrics.append(f"Batters with {season_count} season(s)")
        values.append(count)
    
    # Add correlations
    metrics.extend([
        "ρ(exit_velo, launch_angle)",
        "ρ(exit_velo, hangtime)",
    ])
    values.extend([
        corr.get("launch_angle", "N/A"),
        corr.get("hangtime", "N/A"),
    ])

    table = pd.DataFrame({
        "Metric": metrics,
        "Value": values
    })
    
    print(table.to_string(index=False))
    return table


def red_flag_small_samples(df: pd.DataFrame,
                           group_col: str = "batter_id",
                           threshold: int = 15) -> pd.Series:
    """
    Identify batters with fewer than `threshold` events.
    Returns a Series of counts indexed by batter_id.
    """
    counts = df[group_col].value_counts()
    small = counts[counts < threshold]
    print(f"> Batters with fewer than {threshold} events: {len(small)}")
    if len(small) > 0:
        print(f"  First few: {', '.join(map(str, small.index[:5]))}")
    return small


def red_flag_level_effect(df: pd.DataFrame,
                          level_col: str = "level_abbr",
                          velo_col: str = "exit_velo") -> tuple:
    """
    One-way ANOVA of exit_velo across levels.
    Returns (F-statistic, p-value) or (None, None) if scipy is not available.
    """
    if not _HAS_STATS_LIBS:
        print("> ANOVA on exit_velo by level: scipy not available")
        print("> Basic level summary instead:")
        summary = df.groupby(level_col)[velo_col].agg(['mean', 'std', 'count'])
        print(summary)
        return None, None
    
    groups = [
        df[df[level_col] == lvl][velo_col].dropna()
        for lvl in df[level_col].unique()
    ]
    F, p = stats.f_oneway(*groups)
    print(f"> ANOVA on {velo_col} by {level_col}: F={F:.3f}, p={p:.3e}")
    return F, p


# ------------------------------------------------------------------
#  REPLACE old red_flag_level_effect  → clearer name & doc
# ------------------------------------------------------------------
def league_level_effect(
    df: pd.DataFrame,
    level_col: str = "level_abbr",
    velo_col: str = "exit_velo",
) -> tuple[float | None, float | None]:
    """
    🔹 Why it matters – confirms MLB vs Triple‑A (etc.) differences to
      justify hierarchical level effects in the model.

    One‑way ANOVA of `exit_velo` across `level_col`.
    Returns (F, p) or (None, None) if SciPy unavailable.
    """
    if not _HAS_STATS_LIBS:
        print("> SciPy unavailable – falling back to group summary")
        print(df.groupby(level_col)[velo_col].describe())
        return None, None

    groups = [df[df[level_col] == lv][velo_col].dropna()
              for lv in df[level_col].unique()]
    f_val, p_val = stats.f_oneway(*groups)
    print(f"> Level effect ANOVA: F={f_val:.3f}, p={p_val:.3e}")
    return f_val, p_val



def diag_age_effect(df: pd.DataFrame,
                    age_col: str = "age_centered",
                    velo_col: str = "exit_velo") -> np.ndarray | None:
    """
    LOWESS smoothing of exit_velo vs. age_centered.
    Returns the smoothed array or None if statsmodels is not available.
    """
    if not _HAS_STATS_LIBS:
        print("> Age effect analysis: statsmodels not available")
        print("> Basic correlation instead:")
        corr = df[[age_col, velo_col]].corr().iloc[0, 1]
        print(f"Correlation between {age_col} and {velo_col}: {corr:.3f}")
        return None
    
    # Run LOWESS smoothing
    smooth_result = lowess(df[velo_col], df[age_col])
    
    # Plot the result
    plt.figure(figsize=(6, 3))
    plt.scatter(df[age_col], df[velo_col], alpha=0.1, s=1, color='gray')
    plt.plot(
        smooth_result[:, 0], 
        smooth_result[:, 1], 
        'r-', 
        linewidth=2, 
        label="LOWESS fit"
    )
    plt.xlabel(age_col)
    plt.ylabel(velo_col)
    plt.title("Age effect (LOWESS)")
    plt.legend()
    plt.tight_layout()
    
    return smooth_result


def diag_time_series_dw(
    df: pd.DataFrame,
    time_col: str = "season",
    group_col: str = "batter_id",
    velo_col: str = "exit_velo"
) -> pd.Series | None:
    """
    Compute Durbin–Watson on each batter's time series of mean exit_velo.
    Returns a Series of DW statistics or None if statsmodels is not available.
    """
    if not _HAS_STATS_LIBS:
        print("> Time series analysis: statsmodels not available")
        return None
    
    # Create pivot table of seasons (columns) by batters (rows)
    pivot = (
        df
        .groupby([group_col, time_col])[velo_col]
        .mean()
        .unstack(fill_value=np.nan)
    )
    
    # Only process batters with at least 3 seasons
    valid_batters = pivot.dropna(thresh=3).index
    if len(valid_batters) == 0:
        print("> No batters with sufficient seasons for Durbin-Watson test")
        return None
    
    # Calculate DW statistic for each valid batter
    dw_stats = {}
    for batter in valid_batters:
        series = pivot.loc[batter].dropna()
        if len(series) >= 3:  # Recheck after dropna
            dw = durbin_watson(series)
            dw_stats[batter] = dw
    
    dw_series = pd.Series(dw_stats)
    print(
        f"> Mean Durbin–Watson across {len(dw_series)} batters: "
        f"{dw_series.mean():.3f}"
    )
    print("> DW < 1.5 suggests positive autocorrelation")
    print("> DW > 2.5 suggests negative autocorrelation")
    print("> DW ≈ 2.0 suggests no autocorrelation")
    
    return dw_series


# ------------------------------------------------------------------
#  REPLACE old diag_time_series_dw WITH optional helper
# ------------------------------------------------------------------
def _optional_dw_check(
    df: pd.DataFrame,
    time_col: str = "season",
    group_col: str = "batter_id",
    velo_col: str = "exit_velo",
) -> pd.Series | None:
    """
    (OPTIONAL) Durbin–Watson residual autocorrelation **per batter**.
    Mostly irrelevant for cross‑sectional EV analysis but retained
    behind a private name for power users.
    """
    if not _HAS_STATS_LIBS:
        return None
    pivot = (
        df.groupby([group_col, time_col])[velo_col]
          .mean().unstack()
    )
    stats_out = {}
    for idx, row in pivot.dropna(thresh=3).iterrows():
        if row.count() >= 3:
            stats_out[idx] = durbin_watson(row.dropna())
    if not stats_out:
        print("> DW check: no eligible batters")
        return None
    s = pd.Series(stats_out)
    print(f"DW mean={s.mean():.2f} (1.5<→pos autocorr, >2.5→neg)")
    return s




def check_red_flags(df: pd.DataFrame, 
                    sample_threshold: int = 15) -> dict:
    """
    Run all red flag checks and return the results in a dictionary.
    """
    results = {}
    
    # Check for small sample sizes
    small_samples = red_flag_small_samples(df, threshold=sample_threshold)
    results['small_samples'] = small_samples
    
    # Check for level effects
    f_stat, p_val = red_flag_level_effect(df)
    results['level_effect'] = {
        'f_statistic': f_stat,
        'p_value': p_val
    }
    
    return results


def plot_distributions(df: pd.DataFrame,
                       velo_col: str = "exit_velo",
                       by: str = "level_abbr"):
    """
    Histogram of `velo_col` faceted by `by`.
    Returns the Matplotlib figure so callers can save or show it.
    """
    groups = df[by].unique()
    fig, axes = plt.subplots(len(groups), 1,
                             figsize=(6, 2.8 * len(groups)),
                             sharex=True)
    for ax, grp in zip(axes, groups):
        ax.hist(df[df[by] == grp][velo_col], bins=30, alpha=0.75)
        ax.set_title(f"{by} = {grp} (n={len(df[df[by] == grp])})")
        ax.set_xlabel(velo_col)
    fig.tight_layout()
    return fig


def plot_correlations(df: pd.DataFrame, cols: list[str]):
    """
    Heat-map of Pearson correlations for `cols`.
    """
    corr = df[cols].corr()
    fig, ax = plt.subplots(figsize=(0.6 * len(cols) + 2,
                                    0.6 * len(cols) + 2))
    im = ax.imshow(corr, vmin=-1, vmax=1, cmap="coolwarm")
    fig.colorbar(im, ax=ax, shrink=0.8)
    ax.set_xticks(range(len(cols)), cols, rotation=90)
    ax.set_yticks(range(len(cols)), cols)
    fig.tight_layout()
    return fig


def plot_time_trends(df: pd.DataFrame,
                     time_col: str = "season",
                     group_col: str = "batter_id",
                     velo_col: str = "exit_velo",
                     sample: int = 50):
    """
    Plot mean exit-velo over time for a random sample of batters.
    """
    batters = df[group_col].unique()
    chosen = np.random.choice(batters,
                              min(sample, len(batters)),
                              replace=False)
    fig, ax = plt.subplots(figsize=(8, 4))
    for b in chosen:
        series = (
            df[df[group_col] == b]
            .groupby(time_col)[velo_col]
            .mean()
        )
        ax.plot(series.index, series.values, alpha=0.3)
    ax.set_xlabel(time_col)
    ax.set_ylabel(velo_col)
    ax.set_title("Sample batter exit-velo over time")
    fig.tight_layout()
    return fig


def summarize_numeric_vs_target(
    df: pd.DataFrame,
    numeric_cols: list[str] | None = None,
    target_col: str = "exit_velo",
) -> pd.DataFrame:
    """
    Summarise each numeric predictor against the target.

    Returns a DataFrame indexed by feature with:
      n          – number of non‑null pairs
      pearson_r  – Pearson correlation coefficient
    """
    # --- Pull fresh lists from the schema every time -----------------
    groups = cols.as_dict()

    if numeric_cols is None:
        numeric_cols = groups.get("numerical", [])

    # --- Clean the list ---------------------------------------------
    numeric_cols = [
        c for c in numeric_cols
        if c != target_col and c in df.columns      # ❶ exclude target, ❷ guard
    ]

    records = []
    for col in numeric_cols:
        sub = df[[col, target_col]].dropna()
        if sub.empty:               # skip columns that are all‑NA
            continue
        r = sub[col].corr(sub[target_col])
        records.append({"feature": col, "n": len(sub), "pearson_r": r})

    result = (
        pd.DataFrame.from_records(records)
        .set_index("feature")
        .sort_values("pearson_r", ascending=False)
    )

    print("\n=== Numeric vs target correlations ===")
    print(result)

    return result


def plot_numeric_vs_target(
    df: pd.DataFrame,
    numeric_cols: list[str] | None = None,
    target_col: str = "exit_velo",
):
    """
    Scatter plots of each numeric predictor vs the target with r‑value in title.
    """
    summary = summarize_numeric_vs_target(df, numeric_cols, target_col)
    for feature, row in summary.iterrows():
        plt.figure(figsize=(6, 4))
        plt.scatter(
            df[feature], df[target_col],
            alpha=0.3, s=5, edgecolors="none"
        )
        plt.title(f"{feature} vs {target_col}  (r = {row['pearson_r']:.2f})")
        plt.xlabel(feature)
        plt.ylabel(target_col)
        plt.tight_layout()
        plt.show()



def summarize_categorical_vs_target(
    df: pd.DataFrame,
    cat_cols: list[str] | None = None,
    target_col: str = "exit_velo"
) -> dict[str, pd.DataFrame]:
    """
    For each categorical feature, returns a DataFrame of:
      count, mean, median, std of the target by category.
    """
    groups = get_column_groups()
    if cat_cols is None:
        cat_cols = groups.get("categorical", [])

    summaries: dict[str, pd.DataFrame] = {}
    for col in cat_cols:
        stats = (
            df
            .groupby(col)[target_col]
            .agg(count="count", mean="mean", median="median", std="std")
            .sort_values("count", ascending=False)
        )
        print(f"\n=== {col} vs {target_col} summary ===")
        print(stats)
        summaries[col] = stats
    return summaries


def plot_categorical_vs_target(
    df: pd.DataFrame,
    cat_cols: list[str] | None = None,
    target_col: str = "exit_velo"
):
    """
    For each categorical feature, draw a box‑plot of the target by category.
    """
    groups = get_column_groups()
    if cat_cols is None:
        cat_cols = groups.get("categorical", [])

    for col in cat_cols:
        plt.figure(figsize=(6, 4))
        df.boxplot(column=target_col, by=col, vert=False,
                   grid=False, patch_artist=True)
        plt.title(f"{target_col} by {col}")
        plt.suptitle("")           # remove pandas' automatic suptitle
        plt.xlabel(target_col)
        plt.tight_layout()
        plt.show()



def examine_and_filter_by_sample_size(
    df: pd.DataFrame,
    count_col: str = "exit_velo",
    group_col: str = "batter_id",
    season_col: str = "season",
    percentile: float = 0.05,
    min_count: int | None = None,
    filter_df: bool = False,
) -> tuple[dict[int, pd.DataFrame], pd.DataFrame | None]:
    """
    For each season:
      - compute per-batter count, mean, std of `count_col`
      - pick cutoff: min_count if provided, else the `percentile` quantile
      - print diagnostics
      - plot histograms *safely* (drops NaNs first)
    Returns:
      - summaries: dict season → per-batter summary DataFrame
      - filtered_df: if filter_df, the original df filtered to batters ≥ cutoff
    """
    summaries: dict[int, pd.DataFrame] = {}
    mask_keep: list[pd.Series] = []

    for season, sub in df.groupby(season_col):
        # 1) per-batter summary (count *non-NA* exit_velo)
        summary = (
            sub
            .groupby(group_col)[count_col]
            .agg(count="count", mean="mean", std="std")
            .sort_values("count")
        )
        summaries[season] = summary

        # 2) determine cutoff
        cutoff = min_count if min_count is not None else int(summary["count"].quantile(percentile))
        small = summary[summary["count"] < cutoff]
        large = summary[summary["count"] >= cutoff]

        # 3) diagnostics
        print(f"\n=== Season {season} (cutoff = {cutoff}) ===")
        print(f"  small (<{cutoff} events): {len(small)} batters")
        print(small[["count","mean","std"]].describe(), "\n")
        print(f"  large (≥{cutoff} events): {len(large)} batters")
        print(large[["count","mean","std"]].describe())

        # 4) **safe plotting**: drop NaNs, skip if nothing to plot
        small_means = small["mean"].dropna()
        large_means = large["mean"].dropna()

        if small_means.empty and large_means.empty:
            print(f"  ⚠️  Season {season}: no valid per-batter means to plot")
        else:
            plt.figure(figsize=(8, 3))
            if not small_means.empty:
                plt.hist(small_means, bins=30, alpha=0.6, label=f"n<{cutoff}")
            if not large_means.empty:
                plt.hist(large_means, bins=30, alpha=0.6, label=f"n≥{cutoff}")
            plt.title(f"Season {season}: per-batter EV means")
            plt.xlabel("Mean exit_velo")
            plt.legend()
            plt.tight_layout()
            plt.show()

        # 5) build mask to keep only large-sample batters
        if filter_df:
            keep_ids = large.index
            mask_keep.append(
                (df[season_col] == season) &
                (df[group_col].isin(keep_ids))
            )

    # 6) combine masks and filter
    filtered_df = None
    if filter_df and mask_keep:
        combined = pd.concat(mask_keep, axis=1).any(axis=1)
        filtered_df = df[combined].copy()

    return summaries, filtered_df



def hypothesis_test(df, feature, target="exit_velo", test_type="anova"):
    """
    Perform hypothesis tests for feature significance.
    """
    if test_type == "anova":
        groups = [df[df[feature] == cat][target] for cat in df[feature].unique()]
        F, p = f_oneway(*groups)
        print(f"ANOVA: F={F:.3f}, p={p:.3e}")
        return F, p
    elif test_type == "ttest":
        group1 = df[df[feature] == 0][target]
        group2 = df[df[feature] == 1][target]
        t, p = ttest_ind(group1, group2)
        print(f"T-test: t={t:.3f}, p={p:.3e}")
        return t, p


# ------------------------------------------------------------------
#  NEW: robust outlier flagging
# ------------------------------------------------------------------
def flag_outliers_iqr(
    df: pd.DataFrame,
    velo_col: str = "exit_velo",
    iqr_mult: float = 1.5,
) -> pd.Series:
    """
    🔹 Why it matters – extreme EVs (>120 mph or <40 mph) can distort
      skew / variance estimates used in hierarchical priors.

    Returns a boolean Series (True = *suspect* outlier) using the
    classic IQR rule: value < Q1 − k·IQR  or  > Q3 + k·IQR.
    """
    q1, q3 = df[velo_col].quantile([0.25, 0.75])
    iqr = q3 - q1
    lower, upper = q1 - iqr_mult * iqr, q3 + iqr_mult * iqr
    mask = (df[velo_col] < lower) | (df[velo_col] > upper)
    n = int(mask.sum())
    print(f"> Outlier flag ({velo_col}): {n} rows outside [{lower:.1f}, {upper:.1f}]")
    return mask



# ------------------------------------------------------------------
#  NEW: EV distribution summary + QQ plot
# ------------------------------------------------------------------
def ev_distribution_summary(
    df: pd.DataFrame,
    velo_col: str = "exit_velo",
    bins: int = 40,
):
    """
    🔹 Why it matters – confirms right‑skew & heavy‑tail nature of EV
      so you can choose a skew‑normal or Student‑t likelihood.

    Prints skew/kurtosis, shows histogram, KDE, CDF & QQ (if scipy).
    """
    data = df[velo_col].dropna()
    print(
        f"Skewness = {stats.skew(data):.2f},  "
        f"Kurtosis = {stats.kurtosis(data, fisher=False):.2f}"
    )
    fig, ax = plt.subplots(1, 3, figsize=(12, 3))
    ax[0].hist(data, bins=bins, density=True, alpha=0.7)
    data.plot(kind="kde", ax=ax[0], linewidth=2)
    ax[0].set_title("Histogram & KDE")

    # empirical CDF
    ecdf_x = np.sort(data)
    ecdf_y = np.arange(1, len(ecdf_x) + 1) / len(ecdf_x)
    ax[1].plot(ecdf_x, ecdf_y)
    ax[1].set_title("Empirical CDF")

    # QQ vs normal
    from scipy import stats as _st
    _st.probplot(data, dist="norm", plot=ax[2])
    ax[2].set_title("QQ‑plot vs Normal")
    plt.tight_layout()
    return fig


# ------------------------------------------------------------------
#  NEW: Year/era trend diagnostic
# ------------------------------------------------------------------
def year_trend_ev(
    df: pd.DataFrame,
    season_col: str = "season",
    velo_col: str = "exit_velo",
    ci: bool = True,
):
    """
    🔹 Why it matters – detects ball‑era shifts (e.g. 2019 “juiced”,
      2021 “deadened”) so forecasts for 2024 use correct baseline.

    Produces a table & line plot of mean/median EV per season.
    """
    g = df.groupby(season_col)[velo_col]
    stats_df = g.agg(mean="mean", median="median", n="count")
    print("\n=== Exit‑velo by season ===")
    print(stats_df)

    fig, ax = plt.subplots(figsize=(7, 3))
    stats_df["mean"].plot(ax=ax, marker="o", label="Mean EV")
    stats_df["median"].plot(ax=ax, marker="s", label="Median EV")
    if ci:
        sem = g.sem()
        ax.fill_between(
            stats_df.index,
            stats_df["mean"] - 1.96 * sem,
            stats_df["mean"] + 1.96 * sem,
            alpha=0.2,
            label="95% CI (mean)"
        )
    ax.set_ylabel(velo_col)
    ax.set_title("Seasonal trend in exit velocity")
    ax.legend()
    plt.tight_layout()
    return stats_df, fig



# ------------------------------------------------------------------
#  REPLACE old red_flag_level_effect  → clearer name & doc
# ------------------------------------------------------------------
def league_level_effect(
    df: pd.DataFrame,
    level_col: str = "level_abbr",
    velo_col: str = "exit_velo",
) -> tuple[float | None, float | None]:
    """
    🔹 Why it matters – confirms MLB vs Triple‑A (etc.) differences to
      justify hierarchical level effects in the model.

    One‑way ANOVA of `exit_velo` across `level_col`.
    Returns (F, p) or (None, None) if SciPy unavailable.
    """
    if not _HAS_STATS_LIBS:
        print("> SciPy unavailable – falling back to group summary")
        print(df.groupby(level_col)[velo_col].describe())
        return None, None

    groups = [df[df[level_col] == lv][velo_col].dropna()
              for lv in df[level_col].unique()]
    f_val, p_val = stats.f_oneway(*groups)
    print(f"> Level effect ANOVA: F={f_val:.3f}, p={p_val:.3e}")
    return f_val, p_val











if __name__ == "__main__":
    from pathlib import Path
    from src.data.load_data import load_raw
    from src.features.feature_engineering import feature_engineer

    raw_path = "data/Research Data Project/Research Data Project/exit_velo_project_data.csv"
    df = load_raw(raw_path)
    print(df.head())
    print(df.columns)

    # --- inspect nulls in the raw data ---
    null_counts = df.isnull().sum()
    null_counts = null_counts[null_counts > 0]
    if null_counts.empty:
        print("✅  No missing values in raw data.")
    else:
        print("=== Raw data null counts ===")
        for col, cnt in null_counts.items():
            print(f" • {col!r}: {cnt} missing")
    df_fe = feature_engineer(df)

    print("Raw →", df.shape, "//  Feature‑engineered →", df_fe.shape)
    print(df_fe.head())

    # singleton instance people can import as `cols`
    cols = _ColumnSchema()

    __all__ = ["cols"]
    print("ID columns:         ", cols.id())
    print("Ordinal columns:    ", cols.ordinal())
    print("Nominal columns:    ", cols.nominal())
    print("All categorical:    ", cols.categorical())
    print("Numerical columns:  ", cols.numerical())
    print("Model features:     ", cols.model_features())
    print("Target columns:  ", cols.target())
    print("All raw columns:    ", cols.all_raw())
    numericals = cols.numerical()
    # use list‐comprehension to drop target(s) from numerical features
    numericals_without_y = [c for c in numericals if c not in cols.target()]


    print("\n===== check on small samples =====")
    summaries, _ = examine_and_filter_by_sample_size(df, percentile=0.05)
    summaries, df_filtered = examine_and_filter_by_sample_size(
        df, percentile=0.05, min_count=15, filter_df=False
    )

    
    # Example usage
    print("\n===== NULLS CHECK =====")
    check_nulls(df_fe)
    
    print("\n===== QUICK PULSE CHECK =====")
    quick_pulse_check(df_fe)
    
    print("\n===== RED FLAGS CHECK =====")
    check_red_flags(df_fe)
    
    print("\n===== AGE EFFECT ANALYSIS =====")
    diag_age_effect(df_fe, age_col="age")
    
    print("\n===== TIME SERIES ANALYSIS =====")
    diag_time_series_dw(df_fe)
    
    print("\n===== PLOTTING =====")
    fig1 = plot_distributions(df_fe, by="hit_type")
    fig2 = plot_correlations(df_fe, numericals)  # Using cols schema
    fig3 = plot_time_trends(df_fe, sample=20)


    # — Numeric features —
    num_summary = summarize_numeric_vs_target(df_fe)
    plot_numeric_vs_target(df_fe)

    # — Categorical features —
    cat_summary = summarize_categorical_vs_target(df_fe)
    plot_categorical_vs_target(df_fe)

    # Example: Test if age has significant effect
    hypothesis_test(df_fe, feature="age_bin", test_type="anova")
    
    
    league_level_effect(df_fe)
    year_trend_ev(df_fe)
    flag_outliers_iqr(df_fe)
    ev_distribution_summary(df_fe)
# _optional_dw_check(df_fe)   # only if you still care


In [None]:
%%writefile src/features/data_prep.py
import pandas as pd
import numpy as np


# ─── 2. optional clipping utility (call ONLY in model‑fit script) ──
def clip_extreme_ev(df, velo_col="exit_velo", lower=51.2, upper=127.4):
    df = df.copy()
    df[velo_col] = df[velo_col].clip(lower, upper)
    return df




if __name__ == "__main__":
    from pathlib import Path
    from src.data.load_data import load_raw
    from src.features.feature_engineering import feature_engineer
    # from src.features.data_prep import clip_extreme_ev
    raw_path = "data/Research Data Project/Research Data Project/exit_velo_project_data.csv"
    df = load_raw(raw_path)
    print(df.head())
    print(df.columns)

    # --- inspect nulls in the raw data ---
    null_counts = df.isnull().sum()
    null_counts = null_counts[null_counts > 0]
    if null_counts.empty:
        print("✅  No missing values in raw data.")
    else:
        print("=== Raw data null counts ===")
        for col, cnt in null_counts.items():
            print(f" • {col!r}: {cnt} missing")
    df_fe = feature_engineer(df)

    print("Raw →", df.shape, "//  Feature‑engineered →", df_fe.shape)
    print(df_fe.head())

    # singleton instance people can import as `cols`
    cols = _ColumnSchema()

    __all__ = ["cols"]
    print("ID columns:         ", cols.id())
    print("Ordinal columns:    ", cols.ordinal())
    print("Nominal columns:    ", cols.nominal())
    print("All categorical:    ", cols.categorical())
    print("Numerical columns:  ", cols.numerical())
    print("Model features:     ", cols.model_features())
    print("Target columns:  ", cols.target())
    print("All raw columns:    ", cols.all_raw())
    numericals = cols.numerical()
    # use list‐comprehension to drop target(s) from numerical features
    numericals_without_y = [c for c in numericals if c not in cols.target()]
    





# Preprocessing

In [None]:
%%writefile src/features/preprocess.py
"""
Preprocessing module for exit velocity pipeline.
Supports multiple model types (linear, XGBoost, PyMC, etc.) with
automatic ordinal-category detection from the data.
"""
import pandas as pd
import numpy as np
from sklearn.experimental import enable_iterative_imputer  # noqa: F401
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer, IterativeImputer
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder, StandardScaler

from src.data.ColumnSchema import _ColumnSchema
from sklearn.model_selection import train_test_split

from pandas.api.types import is_categorical_dtype
# ───────────────────────────────────────────────────────────────────────
# Numeric & nominal pipelines (unchanged)
# ───────────────────────────────────────────────────────────────────────
numeric_linear = Pipeline([
    ('impute', SimpleImputer(strategy='median', add_indicator=True)),
    ('scale', StandardScaler()),
])
numeric_iterative = Pipeline([
    ('impute', IterativeImputer(random_state=0, add_indicator=True)),
    ('scale', StandardScaler()),
])
nominal_pipe = Pipeline([
    ('impute', SimpleImputer(strategy='constant', fill_value='MISSING')),
    ('encode', OneHotEncoder(drop='first', handle_unknown='ignore')),
])

# ───────────────────────────────────────────────────────────────────────
# Dynamic preprocess functions
# ───────────────────────────────────────────────────────────────────────
# src/features/preprocess.py

def fit_preprocessor(
    df: pd.DataFrame,
    model_type: str = "linear",
    debug: bool = False,
) -> tuple[np.ndarray, pd.Series, ColumnTransformer]:
    """
    Build & fit the preprocessing ColumnTransformer on the *full* training data.
    Returns (X_matrix, y, fitted_transformer).
    """
    cols = _ColumnSchema()
    TARGET = cols.target()

    # ------------------------------------------------------------
    # 1. filter rows & coerce numerics
    # ------------------------------------------------------------
    df = df[df["hit_type"].str.upper() != "BUNT"].copy()
    df = df.dropna(subset=[TARGET])
    num_feats = [c for c in cols.numerical() if c != TARGET]
    df[num_feats] = df[num_feats].apply(pd.to_numeric, errors="coerce")

    # ------------------------------------------------------------
    # 2. Prepare X, y as DATAFRAMES (keeps column names)
    # ------------------------------------------------------------
    ord_feats = cols.ordinal()
    nom_feats = cols.nominal()
    X = df[num_feats + ord_feats + nom_feats]
    y = df[TARGET]

    # force all ordinal columns to string so categories are comparable
    X[ord_feats] = (
        X[ord_feats]
        .astype(str)
        .where(X[ord_feats].notna(), other=np.nan)  # keep NaNs
    )

    # ------------------------------------------------------------
    # 3. Compute *global* ordinal category lists
    # ------------------------------------------------------------
    ordinal_categories = []
    for c in ord_feats:
        cats = (
            X[c].dropna().unique().tolist()
        )
        if "MISSING" not in cats:
            cats.append("MISSING")
        ordinal_categories.append(cats)

    if debug:
        print("Detected ordinal categories:", list(zip(ord_feats, ordinal_categories)))

    # ------------------------------------------------------------
    # 4. Build pipelines
    # ------------------------------------------------------------
    ordinal_pipe = Pipeline(
        [
            ("impute", SimpleImputer(strategy="constant", fill_value="MISSING")),
            (
                "encode",
                OrdinalEncoder(
                    categories=ordinal_categories,
                    handle_unknown="use_encoded_value",
                    unknown_value=-1,
                    dtype="int32",
                ),
            ),
        ]
    )

    numeric_pipe = (
        numeric_linear if model_type == "linear" else numeric_iterative
    )

    ct = ColumnTransformer(
        [
            ("num", numeric_pipe, num_feats),
            ("ord", ordinal_pipe, ord_feats),
            ("nom", nominal_pipe, nom_feats),
        ],
        remainder="drop",
        verbose_feature_names_out=False,
    )

    X_mat = ct.fit_transform(X, y)  # still returns a NumPy array
    return X_mat, y, ct




def transform_preprocessor(
    df: pd.DataFrame,
    transformer: ColumnTransformer,
) -> tuple[np.ndarray, pd.Series]:
    """
    Apply an already‑fitted transformer to *any* new DataFrame.
    Unseen ordinal categories are coerced to 'MISSING' first.
    """
    cols = _ColumnSchema()
    TARGET = cols.target()
    num_feats = [c for c in cols.numerical() if c != TARGET]
    ord_feats = cols.ordinal()
    nom_feats = cols.nominal()

    df = df.dropna(subset=[TARGET]).copy()
    df[num_feats] = df[num_feats].apply(pd.to_numeric, errors="coerce")

    X = df[num_feats + ord_feats + nom_feats]
    y = df[TARGET]

    # unseen ordinals → 'MISSING'
    X[ord_feats] = (
        X[ord_feats]
        .astype(str)
        .where(X[ord_feats].notna(), other="MISSING")
    )

    X_mat = transformer.transform(X)  # no warnings now
    return X_mat, y



def inverse_transform_preprocessor(
    X_trans: np.ndarray,
    transformer: ColumnTransformer
) -> pd.DataFrame:
    """
    Invert each block of a ColumnTransformer back to its original features,
    based on the exact column lists we passed in.
    """
    import numpy as np, pandas as pd

    # 1) Flatten the lists we gave each transformer to recover original feature order
    orig_features: list[str] = []
    for name, _, cols in transformer.transformers_:
        if cols == 'drop':
            continue
        orig_features.extend(cols)

    parts = []
    start = 0
    n_rows = X_trans.shape[0]

    # 2) For each transformer, slice & inverse-transform
    for name, trans, cols in transformer.transformers_:
        if cols == 'drop':
            continue

        fitted = transformer.named_transformers_[name]

        # how many columns did it produce?
        dummy = np.zeros((1, len(cols)))
        try:
            out = fitted.transform(dummy)
        except Exception:
            out = dummy
        n_out = out.shape[1]

        block = X_trans[:, start:start + n_out]
        start += n_out

        # apply inverse_transform
        if trans == 'passthrough':
            inv = block
        elif name == 'num':
            scaler = fitted.named_steps['scale']
            inv_full = scaler.inverse_transform(block)
            inv = inv_full[:, :len(cols)]
        else:
            if isinstance(fitted, Pipeline):
                last = list(fitted.named_steps.values())[-1]
                inv = last.inverse_transform(block)
            else:
                inv = fitted.inverse_transform(block)

        parts.append(pd.DataFrame(inv, columns=cols, index=range(n_rows)))

    # 3) Concatenate & reorder
    df_orig = pd.concat(parts, axis=1)
    return df_orig[orig_features]


def prepare_for_mixed_and_hierarchical(df: pd.DataFrame) -> pd.DataFrame:
    """
    Cleans the rows *and* adds convenience covariates expected by the
    hierarchical and mixed-effects models.
    """
    cols = _ColumnSchema()
    TARGET = cols.target()

    df = df.copy()

    # drop bunts & missing target
    df = df[df["hit_type"].str.upper() != "BUNT"]
    df = df.dropna(subset=[TARGET])

    # category coding for later random-effects
    df["batter_id"] = df["batter_id"].astype("category")
    return df





# debugs:
def summarize_categorical_missingness(df: pd.DataFrame) -> pd.DataFrame:
    """
    For each categorical column (ordinal + nominal), compute:
      - original_null_count / pct
      - imputed_missing_count / pct
    Safely handles pandas.Categorical by first adding 'MISSING' to its categories.
    """
    cols    = _ColumnSchema()
    cat_cols = cols.ordinal() + cols.nominal()
    summary = []
    n = len(df)

    for col in cat_cols:
        ser = df[col]
        orig_null = ser.isna().sum()

        # If it's a Categorical, add 'MISSING' as a valid category
        if is_categorical_dtype(ser):
            ser = ser.cat.add_categories(['MISSING'])

        # Count rows that would become 'MISSING'
        imputed_missing = ser.fillna('MISSING').eq('MISSING').sum()

        summary.append({
            'column': col,
            'original_null_count':   orig_null,
            'original_null_pct':     orig_null / n,
            'imputed_missing_count': imputed_missing,
            'imputed_missing_pct':   imputed_missing / n,
        })

    return pd.DataFrame(summary)





# ───────────────────────────────────────────────────────────────────────
# 6. Smoke test (only run when module executed directly)
# ───────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    from pathlib import Path
    from src.data.load_data import load_raw
    from src.features.feature_engineering import feature_engineer
    from src.features.data_prep import clip_extreme_ev
    raw_path = "data/Research Data Project/Research Data Project/exit_velo_project_data.csv"
    df = load_raw(raw_path)
    print(df.head())
    print(df.columns)

    # --- inspect nulls in the raw data ---
    null_counts = df.isnull().sum()
    null_counts = null_counts[null_counts > 0]
    if null_counts.empty:
        print("✅  No missing values in raw data.")
    else:
        print("=== Raw data null counts ===")
        for col, cnt in null_counts.items():
            print(f" • {col!r}: {cnt} missing")
    df_fe = feature_engineer(df)

    print("Raw →", df.shape, "//  Feature‑engineered →", df_fe.shape)
    print(df_fe.head())

    # singleton instance people can import as `cols`
    cols = _ColumnSchema()

    __all__ = ["cols"]
    print("ID columns:         ", cols.id())
    print("Ordinal columns:    ", cols.ordinal())
    print("Nominal columns:    ", cols.nominal())
    print("All categorical:    ", cols.categorical())
    print("Numerical columns:  ", cols.numerical())
    print("Model features:     ", cols.model_features())
    print("Target columns:  ", cols.target())
    print("All raw columns:    ", cols.all_raw())
    numericals = cols.numerical()
    # use list‐comprehension to drop target(s) from numerical features
    numericals_without_y = [c for c in numericals if c not in cols.target()]

    summary_df = summarize_categorical_missingness(df_fe)
    print(summary_df.to_markdown(index=False))


    # check nulls
    print("🛠️  Nulls in X before fit_transform:")
    null_counts = df_fe.isnull().sum()
    null_counts = null_counts[null_counts > 0]
    if null_counts.empty:
        print("✅  No missing values after feature engineering.")
    else:
        print("=== Null counts post-engineering ===")
        print(null_counts)

    train_df, test_df = train_test_split(df_fe, test_size=0.2, random_state=42)

    # only on training data for linear/XGB
    train_df = clip_extreme_ev(train_df)
    #valid_df = clip_extreme_ev(valid_df)
    
    # run with debug prints
    X_train, y_train, tf = fit_preprocessor(train_df, model_type='linear', debug=True)
    X_test,  y_test      = transform_preprocessor(test_df, tf)

        
    print("Processed shapes:", X_train.shape, X_test.shape)

    # Example of inverse transform: 
    print("==========Example of inverse transform:==========")
    df_back = inverse_transform_preprocessor(X_train, tf)
    print("\n✅ Inverse‐transformed head (should mirror your original X_train):")
    print(df_back.head())
    print("Shape:", df_back.shape, "→ original X_train shape before transform:", X_train.shape)
    


# feature selection

In [None]:
%%writefile src/features/feature_selection.py
import pandas as pd

# ── NEW: model and importance imports
from sklearn.ensemble import RandomForestRegressor
from sklearn.inspection import permutation_importance
import shap

from pathlib import Path
from src.data.load_data import load_raw
from src.features.feature_engineering import feature_engineer
from src.data.ColumnSchema import _ColumnSchema
# ── NEW: shapash and shapiq imports
from shapash import SmartExplainer
import shapiq
from sklearn.utils import resample

def train_baseline_model(X, y):
    """
    Fit a RandomForestRegressor on X, y.
    Returns the fitted model.
    """
    # You can adjust hyperparameters as needed
    model = RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1)
    model.fit(X, y)
    return model



def compute_permutation_importance(
    model,
    X: pd.DataFrame,
    y: pd.Series,
    n_repeats: int = 10,
    n_jobs: int = 1,
    max_samples: float | int = None,
    random_state: int = 42,
    verbose: bool = True
) -> pd.DataFrame:
    """
    Compute permutation importances with controlled resource usage.
    
    Parameters
    ----------
    model : estimator
        Fitted model implementing .predict and .score.
    X : pd.DataFrame
        Features.
    y : pd.Series or array
        Target.
    n_repeats : int
        Number of shuffles per feature.
    n_jobs : int
        Number of parallel jobs (avoid -1 on Windows).
    max_samples : float or int, optional
        If float in (0,1], fraction of rows to sample.
        If int, absolute number of rows to sample.
    random_state : int
        Seed for reproducibility.
    verbose : bool
        Print debug info if True.
        
    Returns
    -------
    pd.DataFrame
        Columns: feature, importance_mean, importance_std.
        Sorted descending by importance_mean.
    """
    # Debug info
    if verbose:
        print(f"⏳ Computing permutation importances on {X.shape[0]} rows × {X.shape[1]} features")
        print(f"   n_repeats={n_repeats}, n_jobs={n_jobs}, max_samples={max_samples}")

    # Subsample if requested
    X_sel, y_sel = X, y
    if max_samples is not None:
        if isinstance(max_samples, float):
            nsamp = int(len(X) * max_samples)
        else:
            nsamp = int(max_samples)
        if verbose:
            print(f"   Subsampling to {nsamp} rows for speed")
        X_sel, y_sel = resample(X, y, replace=False, n_samples=nsamp, random_state=random_state)

    try:
        result = permutation_importance(
            model,
            X_sel, y_sel,
            n_repeats=n_repeats,
            random_state=random_state,
            n_jobs=n_jobs,
        )
    except OSError as e:
        # Graceful fallback to single job
        if verbose:
            print(f"⚠️  OSError ({e}). Retrying with n_jobs=1")
        result = permutation_importance(
            model,
            X_sel, y_sel,
            n_repeats=n_repeats,
            random_state=random_state,
            n_jobs=1,
        )

    # Build and sort DataFrame
    importance_df = (
        pd.DataFrame({
            "feature": X.columns,
            "importance_mean": result.importances_mean,
            "importance_std": result.importances_std,
        })
        .sort_values("importance_mean", ascending=False)
        .reset_index(drop=True)
    )
    if verbose:
        print("✅ Permutation importances computed.")
    return importance_df


def compute_shap_importance(model, X, nsamples=100):
    """
    Compute mean absolute SHAP values per feature.
    Returns a DataFrame sorted by importance.
    """
    # DeepExplainer or TreeExplainer for tree-based models
    explainer = shap.TreeExplainer(model)
    # sample for speed
    X_sample = X.sample(n=min(nsamples, len(X)), random_state=42)
    shap_values = explainer.shap_values(X_sample)
    # For regression, shap_values is a 2D array
    mean_abs_shap = pd.DataFrame({
        "feature": X_sample.columns,
        "shap_importance": np.abs(shap_values).mean(axis=0),
    })
    mean_abs_shap = mean_abs_shap.sort_values("shap_importance", ascending=False).reset_index(drop=True)
    return mean_abs_shap



def filter_permutation_features(
    importance_df: pd.DataFrame,
    threshold: float
) -> list[str]:
    """
    Return features whose permutation importance_mean exceeds threshold.
    """
    kept = importance_df.loc[
        importance_df["importance_mean"] > threshold, "feature"
    ]
    return kept.tolist()


def filter_shap_features(
    importance_df: pd.DataFrame,
    threshold: float
) -> list[str]:
    """
    Return features whose shap_importance exceeds threshold.
    """
    kept = importance_df.loc[
        importance_df["shap_importance"] > threshold, "feature"
    ]
    return kept.tolist()


def select_final_features(
    perm_feats: list[str],
    shap_feats: list[str],
    mode: str = "intersection"
) -> list[str]:
    """
    Combine permutation and SHAP feature lists.
    mode="intersection" for features in both lists,
    mode="union" for features in either list.
    """
    set_perm = set(perm_feats)
    set_shap = set(shap_feats)
    if mode == "union":
        final = set_perm | set_shap
    else:
        final = set_perm & set_shap
    # return sorted for reproducibility
    return sorted(final)



def load_final_features(
    file_path: str = "data/models/features/final_features.txt"
) -> list[str]:
    """
    Read the newline-delimited feature names file and return as a list.
    """
    with open(file_path, "r") as fp:
        return [line.strip() for line in fp if line.strip()]


def filter_to_final_features(
    df: pd.DataFrame,
    file_path: str = "data/models/features/final_features.txt"
) -> pd.DataFrame:
    """
    Given a feature-engineered DataFrame, load the final feature list,
    then return df[ ID_cols + final_features + [target] ].
    """
    # load the feature names
    final_feats = load_final_features(file_path)
    cols = _ColumnSchema()

    keep = cols.id() + final_feats + [cols.target()]
    missing = set(keep) - set(df.columns)
    if missing:
        raise ValueError(f"Cannot filter: missing columns {missing}")
    return df[keep].copy()





if __name__ == "__main__":
    # --- existing loading & schema logic ---
    raw_path = Path("data/Research Data Project/Research Data Project/exit_velo_project_data.csv")
    df = load_raw(raw_path)
    print(df.head())
    print(df.columns)
    null_counts = df.isnull().sum()
    null_counts = null_counts[null_counts > 0]
    if null_counts.empty:
        print("✅  No missing values in raw data.")
    else:
        print("=== Raw data null counts ===")
        for col, cnt in null_counts.items():
            print(f" • {col!r}: {cnt} missing")
    df_fe = feature_engineer(df)
    print("Raw →", df.shape, "//  Feature-engineered →", df_fe.shape)
    print(df_fe.head())

    cols = _ColumnSchema()
    print("ID columns:         ", cols.id())
    print("Ordinal columns:    ", cols.ordinal())
    print("Nominal columns:    ", cols.nominal())
    print("All categorical:    ", cols.categorical())
    print("Numerical columns:  ", cols.numerical())
    print("Model features:     ", cols.model_features())
    print("Target columns:     ", cols.target())
    print("All raw columns:    ", cols.all_raw())
    numericals = cols.numerical()
    numericals_without_y = [c for c in numericals if c not in cols.target()]

    # ── STEP 1: fully preprocess the engineered DataFrame ──
    from src.features.preprocess import fit_preprocessor, inverse_transform_preprocessor

    # fit_preprocessor returns (X_matrix, y, fitted_transformer)
    X_np, y, preproc = fit_preprocessor(df_fe, model_type='linear', debug=False)

    # Use the same index that y carries (only non-bunt, non-NA rows)
    idx = y.index
    
    # turn that into a DataFrame with the same column names:
    feat_names = preproc.get_feature_names_out()
    X = pd.DataFrame(X_np, columns=feat_names, index=idx)
    print(f"✅ Preprocessed feature matrix: {X.shape[0]} rows × {X.shape[1]} cols")

    # (optional) confirm inverse transform lines up:
    df_back = inverse_transform_preprocessor(X_np, preproc)
    df_back.index = idx
    print("✅ inverse_transform round-trip (head):")
    print(df_back.head())

    # ── STEP 2: train & compute importances on *that* X ──
    print("\nTraining baseline model…")
    model = train_baseline_model(X, y)

    print("\n🔍 Permutation Importances:")
    perm_imp = compute_permutation_importance(
        model, X, y,
        n_repeats=10,
        n_jobs=2,            # test small parallelism
        max_samples=0.5,     # test subsampling
        verbose=True
    )
    print(perm_imp)


    print("\n🔍 SHAP Importances:")
    shap_imp = compute_shap_importance(model, X)
    print(shap_imp)

    # ── STEP 3: threshold & select your final features ──
    perm_thresh = 0.01
    shap_thresh = 0.01
    perm_feats = filter_permutation_features(perm_imp, perm_thresh)
    shap_feats = filter_shap_features(shap_imp, shap_thresh)
    final_feats = select_final_features(perm_feats, shap_feats, mode="intersection")
    print(f"\nFinal preprocessed feature list ({len(final_feats)}):")
    print(final_feats)

    # ── STEP 4: build & save a dataset with just those features + target + IDs ──
    df_final = pd.concat([
        df_fe[cols.id()],
        df_fe[[cols.target()]],
        X[final_feats]
    ], axis=1)
    print("Final dataset shape:", df_final.shape)

    Path("data/models/features/final_features.txt").write_text("\n".join(final_feats))
    print("✔️ Saved feature list to final_features.txt")


    # Demo: filter the full df_fe back to just those features
    df_filtered = filter_to_final_features(df_fe)
    print("Filtered to final features shape:", df_filtered.shape)


# model choices

see modelling_choices.txt

In [None]:
%%writefile src/models/linear.py

"""
Fast linear baselines (OLS and Ridge).

Usage
-----
>>> from src.models.linear import fit_ridge
>>> fitted, rmse = fit_ridge(train_df, val_df)
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from sklearn.linear_model import Ridge
from sklearn.pipeline import Pipeline


def _split_xy(df: pd.DataFrame):
    X = df.drop(columns=["exit_velo"])
    y = df["exit_velo"]
    return X, y


def fit_ridge(X_tr: pd.DataFrame,
              y_tr: pd.DataFrame,
              X_te: pd.DataFrame,
              y_te: pd.DataFrame,
              alpha: float = 1.0):
    """
    Returns (sklearn Pipeline, RMSE on test set).
    """

    model = Pipeline(
        [("reg" , Ridge(alpha=alpha, random_state=0))]
    ).fit(X_tr, y_tr)

    pred = model.predict(X_te)
    rmse = np.sqrt(np.mean((pred - y_te) ** 2))
    return model, rmse



if __name__ == "__main__":
    from pathlib import Path
    from src.data.load_data import load_raw
    from src.features.feature_engineering import feature_engineer
    from src.data.ColumnSchema import _ColumnSchema
    from sklearn.model_selection import train_test_split
    from src.features.preprocess import summarize_categorical_missingness
    from src.features.preprocess import fit_preprocessor, transform_preprocessor, inverse_transform_preprocessor
    raw_path = "data/Research Data Project/Research Data Project/exit_velo_project_data.csv"
    df = load_raw(raw_path)
    print(df.head())
    print(df.columns)

    # --- inspect nulls in the raw data ---
    null_counts = df.isnull().sum()
    null_counts = null_counts[null_counts > 0]
    if null_counts.empty:
        print("✅  No missing values in raw data.")
    else:
        print("=== Raw data null counts ===")
        for col, cnt in null_counts.items():
            print(f" • {col!r}: {cnt} missing")
    df_fe = feature_engineer(df)

    print("Raw →", df.shape, "//  Feature‑engineered →", df_fe.shape)
    print(df_fe.head())

    # singleton instance people can import as `cols`
    cols = _ColumnSchema()

    __all__ = ["cols"]
    print("ID columns:         ", cols.id())
    print("Ordinal columns:    ", cols.ordinal())
    print("Nominal columns:    ", cols.nominal())
    print("All categorical:    ", cols.categorical())
    print("Numerical columns:  ", cols.numerical())
    print("Model features:     ", cols.model_features())
    print("Target columns:  ", cols.target())
    print("All raw columns:    ", cols.all_raw())
    numericals = cols.numerical()
    # use list‐comprehension to drop target(s) from numerical features
    numericals_without_y = [c for c in numericals if c not in cols.target()]

    summary_df = summarize_categorical_missingness(df_fe)
    print(summary_df.to_markdown(index=False))


    # check nulls
    print("🛠️  Nulls in X before fit_transform:")
    null_counts = df_fe.isnull().sum()
    null_counts = null_counts[null_counts > 0]
    if null_counts.empty:
        print("✅  No missing values after feature engineering.")
    else:
        print("=== Null counts post-engineering ===")
        print(null_counts)

    train_df, test_df = train_test_split(df_fe, test_size=0.2, random_state=42)
    
    # only on training data for linear/XGB
    train_df = clip_extreme_ev(train_df)
    #valid_df = clip_extreme_ev(valid_df)
    
    # run with debug prints
    X_train, y_train, tf = fit_preprocessor(train_df, model_type='linear', debug=True)
    X_test,  y_test      = transform_preprocessor(test_df, tf)

        
    print("Processed shapes:", X_train.shape, X_test.shape)

    # Example of inverse transform: 
    print("==========Example of inverse transform:==========")
    df_back = inverse_transform_preprocessor(X_train, tf)
    print("\n✅ Inverse‐transformed head (should mirror your original X_train):")
    print(df_back.head())
    print("Shape:", df_back.shape, "→ original X_train shape before transform:", X_train.shape)
    

    # === NEW: Train & evaluate Ridge regression ===
    model_ridge, rmse_ridge = fit_ridge(X_train, y_train, X_test,  y_test)
    print(f"Ridge regression RMSE: {rmse_ridge:.4f}")


In [None]:
%%writefile src/models/gbm.py

"""
Gradient‑boosting baseline (XGBoost regressor).
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from xgboost import XGBRegressor
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
import optuna
from sklearn.model_selection import cross_val_score

def _split_xy(df: pd.DataFrame):
    X = df.drop(columns=["exit_velo"])
    y = df["exit_velo"]
    return X, y


# At the top of src/models/gbm.py, after your imports:

from xgboost.core import XGBoostError

# ————— Detect GPU support —————
try:
    # Try a no-op instantiation to see if GPU build is present
    XGBRegressor(tree_method="gpu_hist", predictor="gpu_predictor")
    GPU_SUPPORT = True
    # Optional: print("✅  XGBoost GPU support detected")
except XGBoostError:
    GPU_SUPPORT = False
    # Optional: print("⚠️  XGBoost GPU support NOT available, falling back to CPU")

# ————— Updated tune_gbm —————
def tune_gbm(X, y, n_trials: int = 50):
    """
    Run an Optuna study to minimize CV RMSE of an XGBRegressor.
    Falls back to CPU if GPU is unavailable.
    """
    def objective(trial):
        # Base hyperparameters
        params = {
            "n_estimators": trial.suggest_int("n_estimators", 100, 1000),
            "learning_rate": trial.suggest_float("learning_rate", 1e-3, 0.3, log=True),
            "max_depth": trial.suggest_int("max_depth", 3, 10),
            "subsample": trial.suggest_float("subsample", 0.5, 1.0),
            "colsample_bytree": trial.suggest_float("colsample_bytree", 0.5, 1.0),
            "random_state": 0,
            "n_jobs": -1,
        }

        if GPU_SUPPORT:
            params.update({
                "tree_method": "gpu_hist",
                "predictor": "gpu_predictor",
            })
        else:
            params.update({
                "tree_method": "hist",
                "predictor": "cpu_predictor",
            })

        model = XGBRegressor(**params)
        # 3-fold CV, negative RMSE
        scores = cross_val_score(
            model, X, y,
            scoring="neg_root_mean_squared_error",
            cv=3, n_jobs=-1
        )
        return -scores.mean()

    study = optuna.create_study(direction="minimize")
    study.optimize(objective, n_trials=n_trials)
    return study.best_trial.params

# ————— Updated fit_gbm —————
def fit_gbm(X_tr, y_tr, X_te, y_te, **gbm_kw):
    """
    Train XGBRegressor with optional hyperparams, early stopping,
    and automatic GPU/CPU selection.
    Returns (model, RMSE).
    """
    # Default settings
    gbm_default = dict(
        n_estimators=600,
        learning_rate=0.05,
        max_depth=6,
        subsample=0.7,
        colsample_bytree=0.6,
        random_state=0,
        n_jobs=-1,
        early_stopping_rounds=50,
    )

    # GPU vs CPU: only add the correct keys
    if GPU_SUPPORT:
        gbm_default.update({
            "tree_method": "gpu_hist",
            "predictor": "gpu_predictor",
        })
    else:
        gbm_default.update({
            "tree_method": "hist",
            "predictor": "cpu_predictor",
        })

    # Override with any user-passed kw
    gbm_default.update(gbm_kw)

    model = XGBRegressor(**gbm_default)
    model.fit(
        X_tr, y_tr,
        eval_set=[(X_te, y_te)],
        early_stopping_rounds=gbm_default["early_stopping_rounds"],
        verbose=False
    )
    preds = model.predict(X_te)
    rmse = mean_squared_error(y_te, preds, squared=False)
    return model, rmse




if __name__ == "__main__":
    from pathlib import Path
    from src.data.load_data import load_raw
    from src.features.feature_engineering import feature_engineer
    from src.data.ColumnSchema import _ColumnSchema
    from src.features.data_prep import clip_extreme_ev

    from sklearn.model_selection import train_test_split
    from src.features.preprocess import summarize_categorical_missingness
    from src.features.preprocess import fit_preprocessor, transform_preprocessor, inverse_transform_preprocessor
    raw_path = "data/Research Data Project/Research Data Project/exit_velo_project_data.csv"
    df = load_raw(raw_path)
    print(df.head())
    print(df.columns)

    # --- inspect nulls in the raw data ---
    null_counts = df.isnull().sum()
    null_counts = null_counts[null_counts > 0]
    if null_counts.empty:
        print("✅  No missing values in raw data.")
    else:
        print("=== Raw data null counts ===")
        for col, cnt in null_counts.items():
            print(f" • {col!r}: {cnt} missing")
    df_fe = feature_engineer(df)

    print("Raw →", df.shape, "//  Feature‑engineered →", df_fe.shape)
    print(df_fe.head())

    # singleton instance people can import as `cols`
    cols = _ColumnSchema()

    __all__ = ["cols"]
    print("ID columns:         ", cols.id())
    print("Ordinal columns:    ", cols.ordinal())
    print("Nominal columns:    ", cols.nominal())
    print("All categorical:    ", cols.categorical())
    print("Numerical columns:  ", cols.numerical())
    print("Model features:     ", cols.model_features())
    print("Target columns:  ", cols.target())
    print("All raw columns:    ", cols.all_raw())
    numericals = cols.numerical()
    # use list‐comprehension to drop target(s) from numerical features
    numericals_without_y = [c for c in numericals if c not in cols.target()]

    summary_df = summarize_categorical_missingness(df_fe)
    print(summary_df.to_markdown(index=False))


    # check nulls
    print("🛠️  Nulls in X before fit_transform:")
    null_counts = df_fe.isnull().sum()
    null_counts = null_counts[null_counts > 0]
    if null_counts.empty:
        print("✅  No missing values after feature engineering.")
    else:
        print("=== Null counts post-engineering ===")
        print(null_counts)

    train_df, test_df = train_test_split(df_fe, test_size=0.2, random_state=42)

    # only on training data for linear/XGB
    train_df = clip_extreme_ev(train_df)
    #valid_df = clip_extreme_ev(valid_df)
    
    # run with debug prints
    X_train, y_train, tf = fit_preprocessor(train_df, model_type='linear', debug=True)
    X_test,  y_test      = transform_preprocessor(test_df, tf)

        
    print("Processed shapes:", X_train.shape, X_test.shape)

    # Example of inverse transform: 
    print("==========Example of inverse transform:==========")
    df_back = inverse_transform_preprocessor(X_train, tf)
    print("\n✅ Inverse‐transformed head (should mirror your original X_train):")
    print(df_back.head())
    print("Shape:", df_back.shape, "→ original X_train shape before transform:", X_train.shape)
    


    # === Hyperparameter tuning ===
    best_params = tune_gbm(X_train, y_train, n_trials=50)
    print("Tuned params:", best_params)

    # === Train & evaluate ===
    gbm_model, rmse = fit_gbm(
        X_train, y_train, X_test, y_test, **best_params
    )
    print(f"Tuned XGBoost RMSE: {rmse:.4f}")



In [None]:
%%writefile src/models/mixed.py

"""
Frequentist mixed‑effects model using statsmodels MixedLM.

Formula implemented:
    exit_velo ~ 1 + level_ord + age_centered
              + (1 | batter_id)

We rely on columns already produced by preprocess():
    • level_idx  (0,1,2)   – ordinal
    • age_centered
"""
from __future__ import annotations
import numpy as np
import pandas as pd
import statsmodels.formula.api as smf


def fit_mixed(train: pd.DataFrame,
              test: pd.DataFrame):
    """Return (fitted model, RMSE on test)."""
    # statsmodels wants a *single* DataFrame with all cols
    # so we concatenates and keep row positions for slicing
    combined = pd.concat([train, test], axis=0)
    # ensure categorical dtype
    combined["level_ord"] = combined["level_idx"].astype(int)

    mdl = smf.mixedlm(
        formula="exit_velo ~ 1 + level_ord + age_centered",
        data=combined.iloc[: len(train)],
        groups=combined.iloc[: len(train)]["batter_id"]
    ).fit(reml=False)

    # predict on test set
    pred = mdl.predict(exog=combined.iloc[len(train):])
    true = test["exit_velo"].values
    rmse = np.sqrt(np.mean((pred - true) ** 2))
    return mdl, rmse

if __name__ == "__main__":
    from src.data.load_data import load_raw
    from src.features.feature_engineering import feature_engineer
    from src.features.preprocess import prepare_for_mixed_and_hierarchical
    from sklearn.model_selection import train_test_split

    raw_path = "data/Research Data Project/Research Data Project/exit_velo_project_data.csv"
    df = load_raw(raw_path)
    df_fe = feature_engineer(df)

    # Prepare and split
    df_model = prepare_for_mixed_and_hierarchical(df_fe)
    train_df, test_df = train_test_split(df_model, test_size=0.2, random_state=42)
 
    # Fit mixed-effects
    mixed_model, rmse_mixed = fit_mixed(train_df, test_df)
    print(f"Mixed-effects model RMSE: {rmse_mixed:.4f}")

    # In the smoke test section: P-Value Checks for Mixed-Effects Models
    print("Mixed-effects model summary:\n", mixed_model.summary())

In [None]:
%%writefile src/utils/bayesian_explainability.py
import arviz as az
import shap, numpy as np
import matplotlib.pyplot as plt

# ---------------- Posterior summaries -----------------
def plot_parameter_forest(idata, var_names=None, hdi_prob=0.95):
    """Caterpillar/forest plot of posterior estimates."""
    return az.plot_forest(
        idata,
        var_names=var_names,
        combined=True,
        hdi_prob=hdi_prob,
        kind="forest",
        figsize=(6, len(var_names or idata.posterior.data_vars) * 0.4),
    )

def posterior_table(idata, round_to=2):
    """
    Return a nicely rounded HDI/mean table with significance.
    """
    summary = az.summary(idata, hdi_prob=0.95).round(round_to)
    summary["significant"] = (summary["hdi_2.5%"] > 0) | (summary["hdi_97.5%"] < 0)
    return summary

# ---------------- Posterior‑predictive checks ---------
def plot_ppc(idata, kind="overlay"):
    """Visual PPC (over‑laid densities by default)."""
    return az.plot_ppc(idata, kind=kind, alpha=0.1)

# ---------------- SHAP-based feature importances ------
def shap_explain(predict_fn, background_df, sample_df):
    """
    Model‑agnostic Kernel SHAP on the *posterior mean predictor*.

    predict_fn(df) must return a 1‑D numpy array of predictions.
    """
    explainer = shap.KernelExplainer(predict_fn, background_df)
    shap_values = explainer.shap_values(sample_df, nsamples=200)
    shap.summary_plot(shap_values, sample_df, show=False)
    plt.tight_layout()
    return shap_values


In [None]:
%%writefile src/models/hierarchical_utils.py

import arviz as az

def save_model(idata, file_path: str, overwrite: bool = True):
    """Save ArviZ InferenceData to NetCDF."""
    idata.to_netcdf(file_path, engine="h5netcdf", overwrite_existing=overwrite)
    print(f"✔︎ saved model → {file_path}")

def load_model(file_path: str):
    """Load ArviZ InferenceData from NetCDF."""
    idata = az.from_netcdf(file_path, engine="h5netcdf")
    print(f"✔︎ loaded model ← {file_path}")
    return idata

if __name__ == "__main__":
    # === Editable settings ===
    # Path to the saved model (NetCDF format)
    MODEL_PATH = "data/models/saved_models/model.nc"
    # Input data for prediction (raw CSV with exit velocity data)
    raw_path = "data/Research Data Project/Research Data Project/exit_velo_project_data.csv"
    # Output predictions file (CSV) or set to None to print to console
    OUTPUT_PREDS_2024 = "data/predictions/predictions_2024.csv"  # <-- EDITABLE: set output CSV path or None
    
    model = load_model(MODEL_PATH)
    save_model(model, MODEL_PATH)


In [None]:
%%writefile src/utils/posterior.py
# src/utils/posterior.py
import numpy as np
import pandas as pd
import arviz as az

# ── REPLACEMENT: paste over the whole function ─────────────────────────
import json, pathlib, numpy as np, pandas as pd, arviz as az

JSON_GLOBAL = pathlib.Path("data/models/saved_models/global_effects.json")

def posterior_to_frame(idata: az.InferenceData) -> pd.DataFrame:
    """
    Returns a batter‑level summary **AND** writes global effects to JSON.

    File written  ➜  data/models/saved_models/global_effects.json
    """
    post = idata.posterior

    # ------- per‑batter u summaries -----------------------------------
    u   = post["u"]                                         # (chain,draw,batter)
    stats = {
        "u_mean"  : u.mean(("chain","draw")).values,
        "u_sd"    : u.std (("chain","draw")).values,
        "u_q2.5"  : np.percentile(u.values,  2.5, axis=(0,1)),
        "u_q50"   : np.percentile(u.values, 50.0, axis=(0,1)),
        "u_q97.5" : np.percentile(u.values,97.5, axis=(0,1)),
    }
    df = pd.DataFrame({"batter_idx": np.arange(u.shape[-1]), **stats})

    # ------- global effects ------------------------------------------
    mu_mean = post["mu"].mean().item()

    # β_age  ➜ last entry of beta vector (age_centered was added last)
    beta  = post["beta"]
    feat_dim = [d for d in beta.dims if d not in ("chain","draw")][0]
    beta_age = beta.isel({feat_dim: -1}).mean().item()

    beta_level = post["beta_level"].mean(("chain","draw")).values.tolist()
    sigma_b    = post["sigma_b"].mean().item()
    sigma_e    = post["sigma_e"].mean().item()

    global_eff = dict(
        mu_mean=mu_mean,
        beta_age=beta_age,
        beta_level=beta_level,
        sigma_b=sigma_b,
        sigma_e=sigma_e,
        median_age=idata.attrs.get("median_age", 26.0),   # we’ll set this soon
    )

    # ➜  write side‑car JSON (overwrite every run)
    JSON_GLOBAL.write_text(json.dumps(global_eff, indent=2))
    print(f"✔︎ wrote global effects → {JSON_GLOBAL}")

    return df





def align_batter_codes(df_roster: pd.DataFrame,
                       train_cats: pd.Index) -> pd.Series:
    """
    Convert integer batter_ids in *roster* into the categorical codes
    **identical** to what the model saw during training.

    Any unseen batter gets code = -1 (handled later).
    """
    cat = pd.Categorical(df_roster["batter_id"], categories=train_cats)
    return pd.Series(cat.codes, index=df_roster.index)


In [None]:
# %%writefile src/models/hierarchical.py

import pymc as pm
import arviz as az
import numpy as np
from src.data.ColumnSchema import _ColumnSchema
from src.features.preprocess import transform_preprocessor
# ── Attempt to import JAX ──────────────────────────────────────
USE_JAX = True
try:
    import jax
    # Debug: confirm what module is loaded
    print(f"🔍 JAX module: {jax!r}")
    print(f"🔍 JAX path:   {getattr(jax, '__file__', 'builtin')}")
    # Ensure version attribute exists (guards circular-import)
    if not hasattr(jax, "__version__"):
        raise ImportError("jax.__version__ missing—possible circular import")
    print(f"✅ JAX version: {jax.__version__}")
    # Enable 64-bit floats on GPU/CPU
    jax.config.update("jax_enable_x64", True)
except Exception as e:
    USE_JAX = False
    print(f"⚠️  Warning: could not import JAX ({e}). Falling back to CPU sampling.")

import pymc as pm
import arviz as az
import numpy as np
import pandas as pd


# Configure JAX for GPU use and X64 precision
jax.config.update("jax_enable_x64", True)
print("JAX version:", jax.__version__)
print("JAX devices:", jax.devices())
print("GPU count:", jax.device_count("gpu"))
print("Default backend:", jax.default_backend())

import logging, pymc.sampling.jax as pmjax


# ── NEW: fit_bayesian_hierarchical with timing & ETAs ────────────────
import time
from contextlib import contextmanager
from tqdm.auto import tqdm          # auto‑selects rich bar in Jupyter / CLI

@contextmanager
def _timed_section(label: str):
    """Context manager that prints elapsed time for a code block."""
    t0 = time.time()
    yield
    dt = time.time() - t0
    print(f"[{label}] finished in {dt:,.1f} s")
    



# ------------------------------------------------------------------
def fit_bayesian_hierarchical(
    df_raw,
    transformer,
    batter_idx: np.ndarray,
    level_idx: np.ndarray,
    *,
    feature_list: list[str] | None = None,
    mu_mean: float  = 88.0,      # ⚙︎ realistic centre
    mu_sd:   float  = 30.0,      # ⚙︎ very weak info
    sigma_prior: float = 10.0,   # ⚙︎ broader σ priors
    draws: int      = 200,       # ⚙︎ dev‑friendly; raise to 1000 in prod
    tune:  int      = 200,
    target_accept: float = 0.9,
    verbose: bool   = True,      # ⚙︎ new – quick sanity prints
    sampler: str    = "jax",     # keep default
):
    """
    Hierarchical EV model with:
      • weak‑info intercept (μ ~ Normal(88, 30))
      • non‑centred random effects
      • configurable diagnostics
    """
    cols = _ColumnSchema()
    if feature_list is None:
        feature_list = cols.model_features()

    # 1) design‑matrix
    X_all, y_ser = transform_preprocessor(df_raw, transformer)
    names = transformer.get_feature_names_out()
    X     = X_all[:, np.isin(names, feature_list)]
    y     = y_ser.values
    n_bat, n_lvl, n_feat = batter_idx.max() + 1, level_idx.max() + 1, X.shape[1]

    with pm.Model() as model:
        # ── priors ─────────────────────────────────────────
        mu         = pm.Normal("mu", mu_mean, mu_sd)
        beta_level = pm.Normal("beta_level", 0, 5,   shape=n_lvl)   # wider
        beta       = pm.Normal("beta",       0, 5,   shape=n_feat)
        sigma_b    = pm.HalfNormal("sigma_b", sigma_prior)
        u_raw      = pm.Normal("u_raw", 0, 1, shape=n_bat)
        u          = pm.Deterministic("u", u_raw * sigma_b)

        # ── likelihood ────────────────────────────────────
        theta    = mu + beta_level[level_idx] + pm.math.dot(X, beta) + u[batter_idx]
        sigma_e  = pm.HalfNormal("sigma_e", sigma_prior)
        pm.Normal("y_obs", theta, sigma_e, observed=y)

        # ── sampling ──────────────────────────────────────
        idata = pm.sample(
            draws=draws,
            tune=tune,
            chains=4,
            target_accept=target_accept,
            nuts_sampler="numpyro" if (sampler == "jax" and USE_JAX) else "nuts",
            progressbar=True,
        )
        idata.extend(pm.sample_posterior_predictive(idata, var_names=["y_obs"]))

    # quick visual sanity
    if verbose:
        print("⚡ First 5 posterior‑pred EV samples:", idata.posterior_predictive["y_obs"].stack(s=("chain","draw")).values[:5])

    return idata
# ------------------------------------------------------------------





# ───────────────────────────────────────────────────────────────────────
# 6. Smoke test (only run when module executed directly)
# data:
#     Data Used in the Hierarchical Model
# Exit Velocity Measurements

# Your outcome variable y is each batted-ball’s exit velocity, as recorded by Statcast—i.e., the speed (mph) at which the ball leaves the bat
# baseballsavant.com
# . Statcast began tracking exit velocity league-wide in 2015, using high-speed cameras and radar to measure every play
# Wikipedia
# .
# Covariates: Age and Competition Level

# You include each player’s centered age (age_centered) to capture how batting strength changes with age. Centering (subtracting the median) improves model convergence and interpretability. The discrete competition levels (level_idx: AA=0, AAA=1, MLB=2) let you estimate systematic differences in exit velocity across minor-league versus major-league play.
# Random Effects: Batter Identity

# By treating batter_id as a categorical random effect (u[batter_idx]), you allow each hitter to have his own baseline deviation from the global mean. This “partial pooling” borrows strength across batters—shrinking estimates for low-sample hitters toward the overall mean—so rarer batters aren’t grossly over- or under-estimated
# PyMC
# .
# ───────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    # %%writefile src/train_hierarchical.py
    from pathlib import Path
    import pandas as pd, numpy as np, arviz as az
    from src.data.load_data import load_raw
    from src.features.feature_engineering import feature_engineer
    from src.features.preprocess import (fit_preprocessor,
                                        prepare_for_mixed_and_hierarchical)
    from src.models.hierarchical import fit_bayesian_hierarchical
    from src.models.hierarchical_utils import save_model
    from src.utils.posterior import posterior_to_frame

    RAW   = Path("data/Research Data Project/Research Data Project/exit_velo_project_data.csv")
    OUT_NC = Path("data/models/saved_models/exitvelo_hmc.nc")
    OUT_POST = Path("data/models/saved_models/posterior_summary.parquet")

    # 1 · prep
    df = load_raw(RAW)
    df_fe = feature_engineer(df)
    df_model = prepare_for_mixed_and_hierarchical(df_fe)

    _, _, tf = fit_preprocessor(df_model, model_type="linear", debug=False)

    b_idx = df_model["batter_id"].cat.codes.values
    l_idx = df_model["level_idx"].values
    draws_and_tune = 5
    target_accept=0.9
    # 2 · fit
    idata = fit_bayesian_hierarchical(df_model, tf, b_idx, l_idx,
                                    sampler="jax", draws=draws_and_tune, tune=draws_and_tune, target_accept=target_accept)

    idata.attrs["median_age"] = df_model["age"].median()   # ← NEW

    # 3 · persist
    save_model(idata, OUT_NC)
    posterior_to_frame(idata).to_parquet(OUT_POST)
    print("✅ training complete – artefacts written:")
    print("   •", OUT_NC)
    print("   •", OUT_POST)



In [None]:
# %%writefile src/models/hierarchical_predict.py

import json
from pathlib import Path

import arviz as az
import pandas as pd
import numpy as np

from src.features.feature_engineering import feature_engineer
from src.features.preprocess import prepare_for_mixed_and_hierarchical
from src.utils.posterior import align_batter_codes


def predict_from_idata(
    df_raw: pd.DataFrame,
    idata: az.InferenceData
) -> pd.DataFrame:
    """
    Generate population-level predictions from a fitted hierarchical model.

    Parameters
    ----------
    df_raw : pd.DataFrame
        Raw roster DataFrame containing at least ['season','batter_id','age','level'].
    idata : arviz.InferenceData
        Fitted model inference data.

    Returns
    -------
    pd.DataFrame
        Input df_raw with added columns:
        - level_idx, age_centered
        - pred_mean (μ + β_level + β_age·age_centered)
    """
    # 1) feature engineering & preprocessing
    df_feat = feature_engineer(df_raw)
    df_model = prepare_for_mixed_and_hierarchical(df_feat)

    # 2) extract posterior-means of the fixed effects
    pm = idata.posterior.mean(dim=("chain", "draw"))
    mu = pm["mu"].values
    beta_level = pm["beta_level"].values
    beta_age = pm["beta_age"].values

    # 3) compute point predictions
    df_model["pred_mean"] = (
        mu[df_model.index]  # mu can be vector per row if modeled that way
        + beta_level[df_model["level_idx"].values]
        + beta_age * df_model["age_centered"].values
    )

    return df_model


def predict_from_summaries(
    roster_csv: Path,
    posterior_parquet: Path,
    global_effects_json: Path,
    output_csv: Path
) -> pd.DataFrame:
    """
    Load your saved summaries + raw roster, merge in random effects,
    compute full point & interval predictions, and write to CSV.

    Returns the full merged DataFrame (with contrib_* and pred_lo95/hi95).
    """
    # 1) load data
    df_post = pd.read_parquet(posterior_parquet)   # tidy posterior summary
    df_roster = pd.read_csv(roster_csv)            # season, batter_id, age

    # 2) global (static) effects
    glob = json.loads(global_effects_json.read_text())
    post_mu    = glob["mu_mean"]
    beta_age   = glob["beta_age"]
    beta_level = glob["beta_level"][2]      # MLB index
    med_age    = glob["median_age"]

    # 3) merge random effects
    df_roster["batter_idx"] = align_batter_codes(df_roster, df_post["batter_idx"])
    df = df_roster.merge(df_post, on="batter_idx", how="left")
    global_sigma_b = df_post["u_sd"].mean()
    df["u_mean"] = df["u_mean"].fillna(0.0)
    df["u_sd"]   = df["u_sd"].fillna(global_sigma_b)

    # 4) compute predictions + contributions + 95% intervals
    df["age_centered"] = df["age"] - med_age
    df["pred_mean"] = (
        post_mu
        + beta_level
        + beta_age   * df["age_centered"]
        + df["u_mean"]
    )
    df["contrib_age"]   = beta_age   * df["age_centered"]
    df["contrib_level"] = beta_level
    df["contrib_u"]     = df["u_mean"]
    z95 = 1.96
    df["pred_lo95"] = df["pred_mean"] - z95 * df["u_sd"]
    df["pred_hi95"] = df["pred_mean"] + z95 * df["u_sd"]

    # 5) export
    df[["season","batter_id","pred_mean","pred_lo95","pred_hi95"]].to_csv(output_csv, index=False)
    print(f"📄 Predictions written → {output_csv}")

    return df


if __name__ == "__main__":
    # Example CLI usage for your 2024 predictions
    BASE          = Path("data/models/saved_models")
    P_SUMMARY     = BASE / "posterior_summary.parquet"
    P_GLOBAL      = BASE / "global_effects.json"
    POSTERIOR = Path("data/models/saved_models/posterior_summary.parquet")
    ROSTER_INPUT    = Path("data/Research Data Project/Research Data Project/exit_velo_validate_data.csv")
    OUTPUT_CSV    = Path("data/predictions/exitvelo_predictions_2024.csv")


    predict_df = predict_from_summaries(
        roster_csv=ROSTER_INPUT,
        posterior_parquet=P_SUMMARY,
        global_effects_json=P_GLOBAL,
        output_csv=OUTPUT_CSV,
    )


    print(predict_df.head()[["batter_id","pred_mean",
                            "contrib_level","contrib_age","contrib_u"]])


In [None]:
%%writefile src/utils/validation.py
"""
Generic K‑fold validator.

• Works for sklearn Pipelines *or* PyMC idata.
• Decides how to extract predictions based on
  the object returned by `fit_func`.
"""

import numpy as np
from sklearn.model_selection import KFold
import arviz as az

from __future__ import annotations
import pandas as pd
from sklearn.model_selection import KFold
from typing import Callable, List, Union
import statsmodels as sm

def _split_xy(df: pd.DataFrame):
    X = df.drop(columns=["exit_velo"])
    y = df["exit_velo"]
    return X, y


def _rmse(a, b):
    return np.sqrt(np.mean((a - b) ** 2))


def run_kfold_cv(
    fit_func: Callable[[pd.DataFrame, pd.DataFrame], tuple],
    df: pd.DataFrame,
    k: int = 5,
    random_state: int = 0,
    **fit_kw
) -> List[float]:
    """
    fit_func(train_df, test_df, **fit_kw) -> (model_or_idata, rmse)
    """
    kf = KFold(n_splits=k, shuffle=True, random_state=random_state)
    rmses: List[float] = []

    for train_idx, test_idx in kf.split(df):
        train, test = df.iloc[train_idx], df.iloc[test_idx]
        _, rmse = fit_func(train, test, **fit_kw)
        rmses.append(rmse)

    return rmses


# helper to score a *single* train/test split for idata
def rmse_pymc(idata: az.InferenceData, test_df: pd.DataFrame) -> float:
    """Posterior mean vs truth."""
    pred = (
        idata.posterior_predictive["y_obs"]
        .mean(("chain", "draw"))
        .values
    )
    return _rmse(pred, test_df["exit_velo"].values)

def run_kfold_cv(fit_func, df, k=5, random_state=0, **fit_kwargs):
    """
    Apply `fit_func(train_df, **fit_kwargs)` then evaluate on held-out.
    Returns list of held-out log_likelihoods or RMSEs.
    """
    kf = KFold(n_splits=k, shuffle=True, random_state=random_state)
    scores = []
    for train_idx, test_idx in kf.split(df):
        train, test = df.iloc[train_idx], df.iloc[test_idx]
        idata = fit_func(train, **fit_kwargs)

        # posterior predictive on test
        ppc = az.from_pymc(posterior_predictive=idata, model=None)
        pred_mean = ppc.posterior_predictive["y_obs"].mean(("chain","draw")).values
        true = test["exit_velo"].values
        rmse = np.sqrt(((pred_mean - true)**2).mean())
        scores.append(rmse)
    return scores

def posterior_predictive_check(idata, df, batter_idx):
    """
    Plot observed vs. simulated exit-velo histograms.
    """
    import matplotlib.pyplot as plt
    obs = df["exit_velo"].values
    sim = idata.posterior_predictive["y_obs"].stack(samples=("chain","draw")).values.flatten()

    fig, ax = plt.subplots(1,2,figsize=(8,3))
    ax[0].hist(obs, bins=30); ax[0].set_title("Observed")
    ax[1].hist(sim, bins=30); ax[1].set_title("Simulated")
    fig.tight_layout()
    return fig




def prediction_interval(model, X, alpha=0.05, method='linear'):
    """
    Compute prediction intervals for a model.
    """
    if method == 'linear':
        # For OLS and Ridge
        X_const = sm.add_constant(X)
        preds = model.get_prediction(X_const)
        pred_int = preds.conf_int(alpha=alpha)
        return preds.predicted_mean, pred_int[:, 0], pred_int[:, 1]
    elif method == 'bayesian':
        # For Bayesian models
        hdi = az.hdi(model, hdi=1 - alpha)
        return (
            hdi.posterior_predictive.y_obs.sel(hdi=f"{alpha/2*100}%"),
            hdi.posterior_predictive.y_obs.sel(hdi=f"{(1-alpha/2)*100}%")
        )
    elif method == 'gbm':
        # For XGBoost quantile regression
        lower = model.predict(X, pred_contribs=False, iteration_range=(0, model.best_iteration))
        upper = model.predict(X, pred_contribs=False, iteration_range=(0, model.best_iteration))
        return lower, upper  # Replace with actual quantile regression
    else:
        raise ValueError("Method not supported")

# Example for bootstrapping GBM
def bootstrap_prediction_interval(model, X, n_bootstraps=1000, alpha=0.05):
    preds = np.zeros((n_bootstraps, X.shape[0]))
    for i in range(n_bootstraps):
        indices = np.random.choice(X.shape[0], X.shape[0], replace=True)
        preds[i] = model.predict(X[indices])
    lower = np.percentile(preds, 100 * alpha / 2, axis=0)
    upper = np.percentile(preds, 100 * (1 - alpha / 2), axis=0)
    return lower, upper


# ───────────────────────────────────────────────────────────────────────
# 6. Smoke test (only run when module executed directly)
# ───────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    from src.data.load_data import load_raw
    from src.features.feature_engineering import feature_engineer
    from src.features.preprocess import prepare_for_mixed_and_hierarchical

    raw_path = "data/Research Data Project/Research Data Project/exit_velo_project_data.csv"
    predict_path = "data/Research Data Project/Research Data Project/exit_velo_validate_data.csv.csv"
    df = load_raw(raw_path)
    df_fe = feature_engineer(df)

    # Prepare the DataFrame
    df_model = prepare_for_mixed_and_hierarchical(df_fe)

    # Extract arrays for PyMC
    batter_idx   = df_model["batter_id"].cat.codes.values
    level_idx    = df_model["level_idx"].values
    age_centered = df_model["age_centered"].values

    # Fit the Bayesian hierarchical model
    idata = fit_bayesian_hierarchical(
        df_model, batter_idx, level_idx, age_centered,
        mu_prior=90, sigma_prior=5,
        sampler="jax",   #  <-- GPU NUTS
        draws=1000, tune=1000
    )

    print(idata)
    
    posterior_predictive_check(idata, df_model, df_model.batter_id.cat.codes.values)
    
    # For Bayesian model:
    lower, upper = prediction_interval(idata, test_df, method='bayesian')
    print(f"Bayesian 95% Prediction Interval: {lower.mean():.2f}–{upper.mean():.2f} mph")

    # For Ridge model:
    pred, lower, upper = prediction_interval(model_ridge, X_test, method='linear')
    print(f"Ridge 95% Prediction Interval: {lower[0]:.2f}–{upper[0]:.2f} mph")

In [None]:
%%writefile src/train.py
"""
Train / compare four families on a 70‑30 split.

Run:
    python -m src.train
"""
from __future__ import annotations
import pandas as pd
from sklearn.model_selection import train_test_split

from src.data.load_data import load_raw
from src.features.preprocess import preprocess

from src.models.linear import fit_ridge
from src.models.gbm   import fit_gbm
from src.models.mixed import fit_mixed
from src.models.hierarchical import fit_bayesian_hierarchical

RAW_PATH = "data/Research Data Project/Research Data Project/exit_velo_project_data.csv"


def main():
    df_raw   = load_raw(RAW_PATH)
    df_clean = preprocess(df_raw)

    train_df, test_df = train_test_split(
        df_clean, test_size=0.30, random_state=42, stratify=df_clean["level_abbr"]
    )

    # ––– A  Ridge  –––––––––––––––––––––––––––––––
    _, rmse_ridge = fit_ridge(train_df, test_df)
    print(f"Ridge RMSE ……  {rmse_ridge:5.2f} mph")

    # ––– B  Gradient‑Boost  ––––––––––––––––––––––
    _, rmse_gbm = fit_gbm(train_df, test_df)
    print(f"XGBoost RMSE … {rmse_gbm:5.2f} mph")

    # ––– C  Mixed‑Effects  –––––––––––––––––––––––
    _, rmse_mixed = fit_mixed(train_df, test_df)
    print(f"Mixed‑LM RMSE  {rmse_mixed:5.2f} mph")

    # ––– D  Bayesian Hierarchical (quick sample) –
    idata = fit_bayesian_hierarchical(
        train_df,
        batter_idx=train_df.batter_id.astype("category").cat.codes.values,
        level_idx=train_df.level_idx.values,
        age_centered=train_df.age_centered.values,
        mu_prior=90,
        sigma_prior=5,
        draws=500, tune=500   # short run for demo
    )
    from src.utils.validation import rmse_pymc
    rmse_bayes = rmse_pymc(idata, test_df)
    print(f"PyMC RMSE ……  {rmse_bayes:5.2f} mph")


if __name__ == "__main__":
    main()



In [None]:
# %%writefile src/models/model_shap_reports.py
# ── NEW: shapash and shapiq imports
from shapash import SmartExplainer
import shapiq

def generate_shapash_report(
    model,
    X,
    y,
    features_dict: dict | None = None,
    preprocessing: object | None = None,
    report_path: str = "shapash_report.html"
):
    """
    Instantiate Shapash SmartExplainer, compile with data, and
    generate both a live app and a standalone HTML report.

    Parameters:
    - model: trained ML model (supports .predict)
    - X: pd.DataFrame, input features
    - y: pd.Series or array, true target values
    - features_dict: optional mapping {col: label} for display
    - preprocessing: optional transformer with inverse_transform
    - report_path: file path to save HTML report
    """
    # 1️⃣ Create the explainer
    xpl = SmartExplainer(
        model=model,
        features_dict=features_dict or {c: c for c in X.columns},
        preprocessing=preprocessing
    )
    # 2️⃣ Compile dataset for Shapash
    y_pred = model.predict(X)
    xpl.compile(
        x=X,
        y_pred=y_pred,
        y_target=y,
        additional_data=None
    )
    # 3️⃣ Launch interactive app (optional; returns a Flask app)
    # app = xpl.run_app()
    # 4️⃣ Generate standalone HTML report
    xpl.generate_report(
        output_file=report_path,
        title_story="Model Explainability Report",
        title_description="Auto-generated by Shapash",
        x_train=None, y_train=None, y_test=X, metrics=[]
    )
    # Return the explainer for further interaction
    return xpl

def compute_shapiq_interactions(
    model,
    X,
    sample_size: int = 100,
    max_order: int = 2
):
    """
    Use shapiq to compute Shapley Interaction values up to `max_order`
    for up to `sample_size` observations.

    Parameters:
    - model: trained ML model
    - X: pd.DataFrame of features
    - sample_size: how many rows to explain
    - max_order: maximum interaction order (e.g., 2 for pairwise)
    
    Returns:
    - interaction_values: shapiq InteractionValues object
    """
    # 1️⃣ Sample data for performance
    X_sample = X.sample(n=min(len(X), sample_size), random_state=42).to_numpy()
    # 2️⃣ Instantiate the explainer
    explainer = shapiq.TabularExplainer(
        model=model,
        data=X_sample,
        index="k-SII",   # or "SV" for standard Shapley values
        max_order=max_order
    )
    # 3️⃣ Explain the first sample
    interaction_values = explainer.explain(X_sample[0], budget=256)
    return interaction_values



if __name__ == "__main__":