In [80]:
import os
import sys
import numpy as np
import pandas as pd

from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import root_mean_squared_error, r2_score

from dotenv import load_dotenv
load_dotenv()

# -----------------------------------------------------------------------------
# Project path setup
# -----------------------------------------------------------------------------
project_root = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
print(f"Project Root: {project_root}")
print("Sys Path Before:", sys.path)
if project_root not in sys.path:
    print("Inserting project root to sys.path")
    sys.path.insert(0, project_root)

# Now import internal modules
import utils

# -----------------------------------------------------------------------------
# Constants / Config
# -----------------------------------------------------------------------------
COLUMN_CATEGORIES = utils.STATISTICAL_COLUMNS_BY_CATEGORY
TARGET_INPUTS = utils.TARGETS_TO_INPUTS

ROLLING_PERIOD = 4

CATEGORIES_POSITIONS = {
    "passing": ["QB"],
    "rushing_and_receiving": ["RB", "WR", "TE", "QB"],
    # (kicking not available)
}

# -----------------------------------------------------------------------------
# Load Persistent DataFrames
# -----------------------------------------------------------------------------
print("Loading base data frames...")
player_stats_path = r"C:\Users\bengu\Documents\NFL Data Project\clairvoyent-raven-sports-analysis\data\scrubbed_player_data.xlsx" # os.getenv("NFLVERSE_DATA_PATH")
teams_stats_path = r"C:\Users\bengu\Documents\NFL Data Project\clairvoyent-raven-sports-analysis\data\team_stats.csv" # os.getenv("NFLVERSE_TEAMS_DATA_PATH")
injuries_path = os.getenv("NFLVERSE_INJURIES_PATH")
depth_path = os.getenv("NFLVERSE_DEPTH_CHART_PATH")

all_players_df = pd.read_excel(rf"{player_stats_path}", engine="openpyxl")
all_teams_df = pd.read_csv(rf"{teams_stats_path}")
injuries_df = pd.read_excel(rf"{injuries_path}", engine="openpyxl")
depth_df = pd.read_excel(rf"{depth_path}", engine="openpyxl")
print("-" * 40)
print("\n")


Project Root: c:\Users\bengu\Documents\NFL Data Project\clairvoyent-raven-sports-analysis\src
Sys Path Before: ['c:\\Users\\bengu\\Documents\\NFL Data Project\\clairvoyent-raven-sports-analysis\\src', 'C:\\Users\\bengu\\AppData\\Local\\Programs\\Python\\Python310\\python310.zip', 'C:\\Users\\bengu\\AppData\\Local\\Programs\\Python\\Python310\\DLLs', 'C:\\Users\\bengu\\AppData\\Local\\Programs\\Python\\Python310\\lib', 'C:\\Users\\bengu\\AppData\\Local\\Programs\\Python\\Python310', 'c:\\Users\\bengu\\.virtualenvs\\cfeproj-oIABPDjj', '', 'c:\\Users\\bengu\\.virtualenvs\\cfeproj-oIABPDjj\\lib\\site-packages', 'c:\\Users\\bengu\\.virtualenvs\\cfeproj-oIABPDjj\\lib\\site-packages\\win32', 'c:\\Users\\bengu\\.virtualenvs\\cfeproj-oIABPDjj\\lib\\site-packages\\win32\\lib', 'c:\\Users\\bengu\\.virtualenvs\\cfeproj-oIABPDjj\\lib\\site-packages\\Pythonwin']
Loading base data frames...
----------------------------------------




In [20]:
# -----------------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------------
def encode_and_filter_injuries_data(injuries_df: pd.DataFrame = injuries_df):
    df_inj = injuries_df.copy()
    if "gsis_id" in df_inj.columns:
        df_inj = df_inj.rename({"gsis_id": "player_id"}, axis=1)

    filtered = df_inj[["season", "week", "player_id", "report_status", "practice_status"]].copy()

    encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=False)
    X_cat = filtered[["report_status", "practice_status"]]
    encoded_data = encoder.fit_transform(X_cat)
    encoded_feature_names = encoder.get_feature_names_out(X_cat.columns)

    encoded_df = pd.DataFrame(encoded_data, columns=encoded_feature_names, index=filtered.index).fillna(0)

    out = pd.concat([filtered[["season", "week", "player_id"]], encoded_df], axis=1)
    return out, encoded_feature_names


def filter_depth_data(depth_df: pd.DataFrame = depth_df):
    # Always start from a concrete base df
    if "gsis_id" in depth_df.columns:
        base = depth_df.rename({"gsis_id": "player_id"}, axis=1).copy()
    else:
        base = depth_df.copy()

    filtered = base[["season", "week", "player_id", "depth_team"]].copy()
    filtered = filtered.dropna(subset=["season", "week"]).reset_index(drop=True)

    filtered["week"] = filtered["week"].astype(int)
    filtered["season"] = filtered["season"].astype(int)

    # Fill NaNs in depth with mean depth (around 1.5/2.0)
    if filtered["depth_team"].isna().any():
        filtered["depth_team"] = filtered["depth_team"].fillna(filtered["depth_team"].mean())

    return filtered


def merge_players_to_depth_and_injury(
    all_players_df: pd.DataFrame,
    filtered_injuries_df: pd.DataFrame,
    filtered_depth_df: pd.DataFrame,
) -> pd.DataFrame:
    df = all_players_df.merge(filtered_injuries_df, how="left", on=["player_id", "season", "week"])
    df = df.merge(filtered_depth_df, how="left", on=["player_id", "season", "week"])
    return df


def filter_by_positional_group(df: pd.DataFrame, category_key_a: str, category_key_b: str = "standard") -> pd.DataFrame:
    new_df = df.loc[:, list(COLUMN_CATEGORIES[category_key_a] | COLUMN_CATEGORIES[category_key_b])].copy()
    new_df = new_df[new_df["position"].isin(CATEGORIES_POSITIONS[category_key_a])]
    return new_df.reset_index(drop=True)


def calculate_rolling_data(
    df: pd.DataFrame,
    sort_values: list,
    input_ref: str,
    groupby: list,
    rolling_period: int = 3,
    min_periods: int = 1,
    shift: int = 1,
) -> pd.DataFrame:
    df = df.sort_values(sort_values)
    cols = TARGET_INPUTS[input_ref]
    df[[f"{c}_roll{rolling_period}_shift" for c in cols]] = (
        df.groupby(groupby)[cols]
          .transform(lambda x: x.rolling(rolling_period, min_periods=min_periods).mean().shift(shift))
    )
    return df


def calculate_cumulative_data(
    df: pd.DataFrame,
    sort_values: list,
    input_ref: str,
    groupby: list,
    prefix: str = "",
    shift: int = 1
) -> pd.DataFrame:
    df = df.sort_values(sort_values)
    cols = TARGET_INPUTS[input_ref]
    # mean
    df[[f"{prefix}{c}_cum_avg" for c in cols]] = (
        df.groupby(groupby)[cols]
          .transform(lambda x: x.expanding().mean().shift(shift))
    )
    # std (sample std by default); adjust min_periods/ddof if you prefer
    df[[f"{prefix}{c}_cum_std" for c in cols]] = (
        df.groupby(groupby)[cols]
          .transform(lambda x: x.expanding().std().shift(shift))
    )
    return df


def get_standard_input_cols(target: str, encoded_feature_names) -> list[str]:
    rolling_cols = [col + f"_roll{ROLLING_PERIOD}_shift" for col in TARGET_INPUTS[target]]
    avg_cum_cols = [col + "_cum_avg" for col in TARGET_INPUTS[target]]
    std_cum_cols = [col + "_cum_std" for col in TARGET_INPUTS[target]]
    opp_avg_cum_cols = ["vs_opponent_" + col + "_cum_avg" for col in TARGET_INPUTS[target]]
    opp_std_cum_cols = ["vs_opponent_" + col + "_cum_std" for col in TARGET_INPUTS[target]]

    return (
        rolling_cols
        + avg_cum_cols
        + std_cum_cols
        + opp_avg_cum_cols
        + opp_std_cum_cols
        + ["depth_team"]
        + list(encoded_feature_names)
    )


def scale_inplace(df: pd.DataFrame, cols: list[str], name: str, scalers: dict | None = None):
    if scalers is None:
        scalers = {}
    scaler = StandardScaler()
    df.loc[:, cols] = scaler.fit_transform(df[cols])
    scalers[name] = scaler
    return df, scalers


def handle_merge(
    left_df: pd.DataFrame,
    right_df: pd.DataFrame,
    how: str = "left",
    left_on: list = ["opponent_team", "season", "week"],
    right_on: list = ["team", "season", "week"],
) -> pd.DataFrame:
    return left_df.merge(right_df, how=how, left_on=left_on, right_on=right_on)


def generate_target_dataframe_struct(
    encoded_feature_names,
    rushing_and_receiving_df: pd.DataFrame,
    passing_df: pd.DataFrame,
    all_teams_df: pd.DataFrame,
) -> dict[str, pd.DataFrame]:
    standard_inputs = [
        "season", "week", "player_id", "position",
        "player_name", "team", "opponent_team", "depth_team",
    ]
    enc = list(encoded_feature_names)

    target_data_struct: dict[str, pd.DataFrame] = {}

    rushing_cats = ["rsh_yd", "rsh_td", "rc_yd", "rc_td", "rc"]
    passing_cats = ["p_yd", "p_td", "intcpt"]
    both_cats = ["rsh_fmbls", "rc_fmbls"]

    for target in TARGET_INPUTS:
        if target in rushing_cats:
            target_data_struct[target] = rushing_and_receiving_df[TARGET_INPUTS[target] + standard_inputs + enc].copy()
        elif target in passing_cats:
            target_data_struct[target] = passing_df[TARGET_INPUTS[target] + standard_inputs + enc].copy()
        elif target in both_cats:
            both = pd.concat([rushing_and_receiving_df, passing_df], ignore_index=True)
            target_data_struct[target] = both[TARGET_INPUTS[target] + standard_inputs + enc].copy()

    # Defensive (opponent) stats handled separately
    target_data_struct["def"] = all_teams_df[TARGET_INPUTS["def"] + ["season", "week", "team"]].copy()

    return target_data_struct


def get_input_cols_by_target(target_data_struct: dict[str, pd.DataFrame], encoded_feature_names) -> dict[str, list[str]]:
    input_cols_by_target: dict[str, list[str]] = {}
    for target in target_data_struct:
        if target == "def":
            input_cols_by_target[target] = [col + f"_roll{ROLLING_PERIOD}_shift" for col in TARGET_INPUTS[target]]
        else:
            input_cols_by_target[target] = get_standard_input_cols(target, encoded_feature_names)
    return input_cols_by_target


def calculate_rolling_and_cumulative_data(
    target_data_struct: dict[str, pd.DataFrame],
    rolling_period: int = ROLLING_PERIOD,
) -> dict[str, pd.DataFrame]:
    for target, df in target_data_struct.items():
        if target == "def":
            tmp = calculate_rolling_data(df, ["season", "week", "team"], target, ["season", "team"], rolling_period=rolling_period)
            tmp = calculate_cumulative_data(tmp, ["season", "week", "team"], target, ["season", "team"])
            target_data_struct[target] = tmp
        else:
            tmp = calculate_rolling_data(df, ["season", "week", "player_id"], target, ["season", "player_id"], rolling_period=rolling_period)
            tmp = calculate_cumulative_data(tmp, ["season", "week", "player_id"], target, ["season", "player_id"])
            tmp = calculate_cumulative_data(tmp, ["season", "week", "player_id"], target, ["opponent_team", "player_id"], prefix="vs_opponent_")
            target_data_struct[target] = tmp
    return target_data_struct


def scale_target_data(target_data_struct: dict[str, pd.DataFrame], target_input_cols: dict[str, list[str]]) -> dict[str, pd.DataFrame]:
    for target, df in target_data_struct.items():
        cols = target_input_cols.get(target, [])
        if not cols:
            continue
        target_data_struct[target], _ = scale_inplace(df, cols, target)  # discard scalers for now
    return target_data_struct


def merge_target_data_to_defense(target_data_struct: dict[str, pd.DataFrame]) -> dict[str, pd.DataFrame]:
    # merge 'def' onto each offensive df
    def_df = target_data_struct["def"]
    for target, df in target_data_struct.items():
        if target == "def":
            continue
        target_data_struct[target] = handle_merge(df, def_df)
    return target_data_struct

def train_and_validate_model(
    target_data_struct: dict[str, pd.DataFrame],
    target_input_cols: dict[str, list[str]],
    season_holdout: int = 2024,
):
    model_results: dict[str, dict] = {}
    models: dict[str, LinearRegression] = {}

    for target, df in target_data_struct.items():
        if target == "def":
            continue  # no target variable for defense

        input_cols = target_input_cols[target]
        def_input_cols = target_input_cols["def"]

        # hold out a season for validation split (and avoid leakage)
        mask = df["season"] != season_holdout

        # Build a single feature matrix with aligned rows, then dropna once
        feature_cols = input_cols + def_input_cols
        XY = df.loc[mask, feature_cols + [target]].dropna().reset_index(drop=True)

        if XY.empty:
            model_results[target] = {"validation_rmse": "nan", "r2": "nan"}
            continue

        X = XY[feature_cols].values
        y = XY[target].values

        X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, random_state=42)

        reg = LinearRegression().fit(X_train, y_train)
        preds = reg.predict(X_valid)

        rmse = root_mean_squared_error(y_valid, preds)
        r2 = r2_score(y_valid, preds)

        model_results[target] = {"validation_rmse": f"{rmse:.4f}", "r2": f"{r2:.3f}"}
        models[target] = reg

    return models, model_results

In [81]:
# 1) Prepare injuries/depth and merge with players first
print("Filtering and merging injuries and depth charts...")
filtered_injuries_df, encoded_feature_names = encode_and_filter_injuries_data()
filtered_depth_df = filter_depth_data()
merged_players = merge_players_to_depth_and_injury(all_players_df, filtered_injuries_df, filtered_depth_df)
print("-" * 40)
print("\n")

Filtering and merging injuries and depth charts...
----------------------------------------




In [82]:
# 2) Build positional dataframes FROM the merged players df
print("Generating positional dataframes...")
passing_df = filter_by_positional_group(merged_players, "passing")
rushing_and_receiving_df = filter_by_positional_group(merged_players, "rushing_and_receiving")
print(f"Generated positional dataframes with {passing_df.size} rows and {rushing_and_receiving_df.size} rows")
print("-" * 40)
print("\n")

# Optional: quick null check utility (add .alias only if used)
passing_df.alias = "passing_df"
rushing_and_receiving_df.alias = "rushing_and_receiving_df"

Generating positional dataframes...
Generated positional dataframes with 836418 rows and 8713458 rows
----------------------------------------




In [83]:
# 3) Create per-target dataframes
print("Generating target data structure and input columns by statistical category...")
target_data_struct = generate_target_dataframe_struct(
    encoded_feature_names, rushing_and_receiving_df, passing_df, all_teams_df
)
target_input_cols = get_input_cols_by_target(target_data_struct, encoded_feature_names)
print("-" * 40)
print("\n")

Generating target data structure and input columns by statistical category...
----------------------------------------




In [84]:
# 4) Feature engineering: rolling / cumulative (season & vs-opponent), then scale, then merge defense
print("Engineering features for cumulative and rolling data...")
target_data_struct = calculate_rolling_and_cumulative_data(target_data_struct)
target_data_struct = scale_target_data(target_data_struct, target_input_cols)
target_data_struct = merge_target_data_to_defense(target_data_struct)
print("-" * 40)
print("\n")

Engineering features for cumulative and rolling data...
----------------------------------------




In [96]:
target_translation = {
           "rsh_yd": "rushing_yards", 
           "rsh_td": "rushing_tds", 
           "rc_yd": "receiving_yards", 
           "rc_td": "receiving_tds", 
           "rc": "receptions", 
           "p_yd": "passing_yards", 
           "p_td": "passing_tds", 
           "intcpt": "passing_interceptions", 
           "rsh_fmbls": "rushing_fumbles_lost", 
           "rc_fmbls": "receiving_fumbles_lost"
}

def train_and_validate_model(
    target_data_struct: dict[str, pd.DataFrame],
    target_input_cols: dict[str, list[str]],
    season_holdout: int = 2024,
):
    model_results: dict[str, dict] = {}
    models: dict[str, LinearRegression] = {}
    predictions: dict[str, np.array] = {}
    trues: dict[str, np.array] = {}

    for target, df in target_data_struct.items():
        if target == "def":
            continue  # no target variable for defense

        input_cols = target_input_cols[target]
        def_input_cols = target_input_cols["def"]

        # hold out a season for validation split (and avoid leakage)
        mask = df["season"] != season_holdout

        # Build a single feature matrix with aligned rows, then dropna once
        feature_cols = input_cols + def_input_cols
        XY = df.loc[mask, feature_cols + [target_translation[target]]].fillna(0) #.dropna().reset_index(drop=True)

        if XY.empty:
            model_results[target] = {"validation_rmse": "nan", "r2": "nan"}
            continue

        X = XY[feature_cols].values
        y = XY[target_translation[target]].values

        X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, random_state=42)

        reg = LinearRegression().fit(X_train, y_train)
        preds = reg.predict(X_valid)

        rmse = root_mean_squared_error(y_valid, preds)
        r2 = r2_score(y_valid, preds)

        model_results[target] = {"validation_rmse": f"{rmse:.4f}", "r2": f"{r2:.3f}"}
        models[target] = reg
        predictions[target] = preds
        trues[target] = y_valid

    return models, model_results, trues, predictions

In [86]:
p_df = target_data_struct["p_yd"]

In [97]:
# 5) Train & validate models
print("Training and validating model...")
models, model_results, trues, preds = train_and_validate_model(target_data_struct, target_input_cols, season_holdout=2025)
print("-" * 40) 
print("\n")
  
# 6) Report
print("\nMODEL RESULTS:")
for target, metrics in model_results.items():
    print(f"{target}: {metrics}")

Training and validating model...
----------------------------------------



MODEL RESULTS:
rsh_yd: {'validation_rmse': '15.6698', 'r2': '0.581'}
rsh_td: {'validation_rmse': '0.2668', 'r2': '0.205'}
rsh_fmbls: {'validation_rmse': '0.1341', 'r2': '0.122'}
rc_yd: {'validation_rmse': '23.0212', 'r2': '0.449'}
rc_td: {'validation_rmse': '0.3577', 'r2': '0.161'}
rc: {'validation_rmse': '1.6301', 'r2': '0.486'}
rc_fmbls: {'validation_rmse': '0.1036', 'r2': '0.051'}
p_yd: {'validation_rmse': '308.0146', 'r2': '-7.710'}
p_td: {'validation_rmse': '2.8312', 'r2': '-5.348'}
intcpt: {'validation_rmse': '1.1975', 'r2': '-0.625'}


In [99]:
for true, pred in zip(trues["p_yd"], preds["p_yd"]):
    print(f"True: {true}, Pred: {pred}")

True: 192, Pred: 213.94155390531452
True: 267, Pred: 167.61434269770047
True: 177, Pred: 159.7330602993719
True: 245, Pred: 216.7510712812182
True: 160, Pred: 156.1728007231182
True: 219, Pred: 234.4533599484878
True: 244, Pred: 134.97364083968907
True: 282, Pred: 250.3377501561422
True: 157, Pred: 245.42656297153857
True: 0, Pred: 34.05290175687557
True: 237, Pred: 265.2549345416831
True: 230, Pred: 221.1639460883174
True: 272, Pred: 124.64098407422574
True: 170, Pred: 179.40313173997626
True: 212, Pred: 220.0873136383808
True: 207, Pred: 109.27972559654862
True: 364, Pred: 224.14042004724863
True: 271, Pred: 244.7373620338504
True: 22, Pred: 160.40922657226432
True: 227, Pred: 105.98155304756128
True: 302, Pred: 246.06594863447486
True: 285, Pred: 271.5822827020394
True: 94, Pred: 145.29769063117715
True: 0, Pred: 110.38522851608164
True: 209, Pred: 232.97247657513356
True: 270, Pred: 272.1140366347542
True: 89, Pred: 190.32877225300757
True: 166, Pred: 246.04394157326237
True: 247, 

In [17]:
def save_and_store_model_weights(models: dict, path: str = "models"):
    """Save the coefficients and intercept for the linear regression models to models/ folder."""
    os.makedirs(path, exist_ok=True)

    for name, lr in models.items():
        np.savez(
            os.path.join(path, f"{name}_linreg_weights.npz"),
            coef=lr.coef_,
            intercept=lr.intercept_,
            n_features_in_=lr.n_features_in_,
            feature_names_in_=getattr(lr, "feature_names_in_", None),
        )
        
save_and_store_model_weights(models)