# NBA Analytics ETL / DAGs Overview

## Project Purpose
This project ingests, validates, enriches, and surfaces advanced NBA data for downstream analytics. Key inputs include NBA Stats API endpoints (Synergy play types, Player Estimated Metrics, roster/box-score), Basketball-Reference advanced stats, injury reports, Spotrac financial/contract data, and defensive metrics. The pipelines are built with Airflow TaskFlow and classic operator patterns, emphasize auditability, null-legitimacy validation, identity enrichment, incremental history preservation, and eventual analytical consumption via DuckDB/Parquet.

## Primary Data Sources
- **Synergy Play Types (NBA Stats API)**  
  Granular offensive/defensive play-type metrics per player (e.g., Isolation, Transition, Spotup) with rate/volume stats. Includes null-legitimacy validation to distinguish absent play types from data loss. (`eda/synergy_playtype_pull.py`)
- **Player Estimated Metrics (NBA Stats API)**  
  Estimated advanced metrics per player-season (e.g., offensive/defensive ratings, usage, pace). Raw JSON + parameters + DataFrames are archived; enriched with canonical player identity. (`eda/nba_em_connector.py`)
- **Roster & Box Score Data (nba_api_ingest)**  
  Hourly ingestion of roster and box-score data to keep the lake near-real-time. Supports incremental pulls with fallback to full season scrape. (`dags/nba_api_ingest.py`)
- **Advanced Season-Level Metrics (Basketball-Reference)**  
  Daily scrape of season-level advanced stats (e.g., from Basketball-Reference), with seasonal completeness logic and fail-fast behavior when data should exist. (`dags/nba_advanced_ingest.py`)
- **Injury Reports**  
  Aggregates daily NBA injury report feeds (e.g., ESPN/statsurge) with incremental ingestion, cleansing, and historical merging. (`dags/new_injury_reports.py`, `eda/combining_injury_reports.py`)
- **Spotrac Misc Assets**  
  Salary cap history, extensions, tax tracker, transactions, multi-year cap/cash data. Schema drift detection, logical requirement resolution, and asset validation. (`dags/spotrac_misc_assets.py`)
- **Defensive Metrics**  
  Custom defensive metrics collection, season-aware, cached, and merged into a canonical dataset. (`dags/defensive_metrics_collect.py`)

## High-Level Pipeline Components

### Connectors / ETL Modules
- `eda/synergy_playtype_pull.py`: 
  - Fetches Synergy play-type tables, pivots long ➜ wide, validates null legitimacy, merges offensive/defensive, and builds season/master wide tables.
- `eda/nba_em_connector.py`: 
  - Fetches Player Estimated Metrics; archives raw, normalizes, enriches (player identity), computes null diagnostics, and builds historical master via smoke tests.
- `eda/nba_basic_advanced_stats`: 
  - Scrapes advanced metrics per season for rate-based analytical enrichment. (`nba_advanced_ingest.py` wraps this.)
- `eda/nba_api_ingest.py` logic: 
  - Ingests roster and box-score data hourly with incremental logic and fallbacks to full season pulls.
- `eda/new_injury_reports.py` & `eda/combining_injury_reports.py`: 
  - Injury ingestion, normalization, deduplication, and historical merge.
- `eda/spotrac_connector` (used by `spotrac_misc_assets.py`): 
  - Retrieves Spotrac financial/contract assets with validation helpers.
- `eda/defensive_metrics.py` (used by `defensive_metrics_collect.py`): 
  - Collects defensive analytics and consolidates per-season outputs.

### DAGs (Airflow)
- `nba_player_estimated_metrics.py`  
  Drives end-to-end for Player Estimated Metrics: determines missing seasons, fetches raw, normalizes, merges into a final parquet, and validates outputs.
- `nba_api_ingest.py`  
  Hourly ingestion of roster + box-score data from `nba_api`. Uses incremental pull logic to keep data fresh while avoiding redundant full scrapes; respects existing partitions and backfills as needed. (`dags/nba_api_ingest.py`)
- `nba_advanced_ingest.py`  
  Daily scrape of Basketball-Reference season-level advanced stats with logic to surface missing expected data during active windows (Nov–Jun). (`dags/nba_advanced_ingest.py`)
- `nba_data_loader.py`  
  Consumes upstream artifacts (player, advanced, injury), validates them, and loads into DuckDB. Builds materialized views joining player, advanced, and injury data for analytical querying. (`dags/nba_data_loader.py`)
- `new_injury_reports.py`  
  Dynamically maps over dates to pull, upsert, validate, and combine injury report datasets.
- `spotrac_misc_assets.py`  
  Fetches and validates Spotrac static and rolling assets; handles schema drift, merges yearly shards, and enforces logical column requirements.
- `defensive_metrics_collect.py`  
  Season-aware collection and incremental merge of defensive metrics with cache logic.

## Directory Layout (key slices)

api/src/airflow_project/
├── dags/ # Airflow DAG definitions
│ ├── nba_player_estimated_metrics.py
│ ├── nba_api_ingest.py
│ ├── nba_advanced_ingest.py
│ ├── nba_data_loader.py
│ ├── new_injury_reports.py
│ ├── spotrac_misc_assets.py
│ └── defensive_metrics_collect.py
├── eda/ # Extract/Transform/Enrich modules
│ ├── synergy_playtype_pull.py
│ ├── nba_em_connector.py
│ ├── nba_basic_advanced_stats/ # roster/box-score and advanced scrape logic
│ └── ... # Other domain-specific ingestion logic
├── data/
│ ├── new_processed/ # Partitioned, near-real-time ingest (e.g., season=.../part.parquet)
│ ├── raw/ # Raw archived JSON/Parquet (per-source)
│ ├── silver/ # Normalized/enriched intermediate layers
│ └── final/ # Canonical merged outputs for analytics
├── utils/ # Shared helpers: storage, enrichment, config, incremental utils, validation
└── config.py / constants # Paths, directories (e.g., FINAL_DATASET_DIR, DATA_DIR, INJURY_DIR)


## Core Concepts & Guarantees
- **Auditability:** Full JSON + request parameters are preserved for every API fetch.
- **Null-legitimacy Validation:** Especially for Synergy play types—only legitimate absences are zero-filled; any data loss during pivot triggers an error with detailed diagnostics.
- **Identity Enrichment:** Player (and optionally team) names/IDs are normalized via canonical joins (`enrich_join_by`) to ensure consistency across sources.
- **Incremental History:** DAGs compare requested seasons/dates against existing cached outputs, process only missing slices, and merge new shards without duplicate reprocessing unless `force_full` is specified.
- **Near Real-Time Ingestion:** Roster/box-score data is pulled hourly to keep the lake fresh; advanced metrics and other slower-moving data have daily or seasonal cadences.
- **Schema Drift Detection:** Spotrac and other asset pipelines persist prior schema/stats to detect column additions/removals or mean drifts.
- **Smoke Tests & Health Checks:** Critical endpoints (e.g., Player Estimated Metrics) have historical smoke tests to validate coverage and data integrity prior to publishing.

## Typical Usage
1. **Trigger DAGs** via Airflow (UI/CLI). Supply params such as seasons, dates, or `force_full`.
2. **Monitor logs** for diagnostics: missing-season logic, enrichment summaries, null/coverage reports, schema drift alerts.
3. **Consume final datasets** from `data/final/*.parquet` or query via DuckDB materialized views populated by `nba_data_loader`. Example view: `v_player_full_<season>` combines player, advanced, and injury data.

## Validation & Health Checks
- Each DAG includes a validation task asserting required fields, deduplication consistency, and basic sanity (e.g., rating top-n, row counts).
- Enrichment stages log before/after null diagnostics and surface identity mismatches (e.g., missing team info).
- Schema snapshots enable drift detection over time. 
- `nba_data_loader` ensures upstream dependencies complete via sensors before loading into DuckDB and builds analytical views.

## Failure Modes / Remediation
- **Stale/Missing API Data:** Missing-season tracking with `force_full` override allows re-fetching; smoke tests surface unexpected gaps.
- **Illegitimate Nulls:** Pivot validation will raise with sampled context, prompting investigation of upstream transformation logic.
- **Schema Drift:** Incoming column changes are detected and logged; update logical mappings accordingly.
- **Enrichment Failures:** Logged explicitly with context; may need updates to canonical reference tables or fuzzy matching logic.

## Running Locally
- Install dependencies (`nba_api`, `pandas`, `airflow`, `duckdb`, `pendulum`, internal utils).
- Ensure configuration (`utils.config`, `nba_basics_config`, etc.) points to correct root directories (e.g., `DATA_DIR`, `FINAL_DATASET_DIR`).
- Kick off DAGs manually for desired season/date ranges.
- Example: Run the Player Estimated Metrics smoke test:
  ```python
  from eda.nba_em_connector import _smoke_test_all_seasons
  results = _smoke_test_all_seasons(start_season="2015-16", end_season="2024-25")

In [16]:
%%writefile api/src/airflow_project/utils/config.py
"""
Central configuration for the NBA‑Player‑Valuation project.
All magic values live here so they can be tweaked without code edits.
"""
from pathlib import Path
import os

# ── Core directories ───────────────────────────────────────────────────────────
def find_project_root(name: str = "airflow_project") -> Path:
    """
    Walk up from this file (or cwd) until a directory named `name` is found.
    Fallback to cwd if not found.
    """
    try:
        p = Path(__file__).resolve()
    except NameError:
        p = Path.cwd()
    # walk through p and its parents
    for parent in (p, *p.parents):
        if parent.name == name or (parent / ".git").is_dir():
            return parent
    # no match → fallback
    return Path.cwd()

# Allow explicit override
if find_project_root():
    PROJECT_ROOT = Path(find_project_root()).resolve() # / "api/src/airflow_project"
else:
    PROJECT_ROOT = find_project_root() # / "api/src/airflow_project"


DATA_DIR: Path = Path(PROJECT_ROOT / "data")
LOG_DIR: Path = Path(PROJECT_ROOT / "logs")
DUCKDB_FILE: Path = Path(DATA_DIR / "nba.duckdb")

# NBA stats API
NBA_API_RPM: int = int(os.getenv("NBA_API_RPM", "12"))  # requests per minute

# Spotrac scraping
SPOTRAC_BASE: str = "https://www.spotrac.com/nba"
SPOTRAC_FREE_AGENTS: str = f"{SPOTRAC_BASE}/free-agents/{{year}}/"
SPOTRAC_CAP_TRACKER: str = f"{SPOTRAC_BASE}/cap/{{year}}/"
SPOTRAC_TAX_TRACKER: str = f"{SPOTRAC_BASE}/tax/_/year/{{year}}/"
# Spotrac dedicated folder


# Injury sources
NBA_OFFICIAL_INJURY_URL: str = "https://cdn.nba.com/static/json/injury/injury_report_{{date}}.json"
ROTOWIRE_RSS: str = "https://www.rotowire.com/rss/news.php?sport=NBA"

# StatsD (optional)
STATSD_HOST: str = os.getenv("STATSD_HOST", "localhost")
STATSD_PORT: int = int(os.getenv("STATSD_PORT", "8125"))

# ── Data ranges ────────────────────────────────────────────────────────────────
SEASONS: range = range(2015, 2026)  # inclusive upper bound matches Spotrac sample

# Thread pools & concurrency
MAX_WORKERS: int = int(os.getenv("NPV_MAX_WORKERS", "8")) 


# ── Core data directories ───────────────────────────────────────────────────────────
RAW_DIR      : Path = DATA_DIR / "raw"
DEBUG_DIR    : Path = DATA_DIR / "debug"
EXPORTS_DIR  : Path = DATA_DIR / "exports"
INJURY_DIR   : Path = DATA_DIR / "injury_reports"
INJURY_DATASETS_DIR : Path = DATA_DIR / "injury_datasets"
NBA_BASIC_ADVANCED_STATS_DIR : Path = DATA_DIR / "nba_basic_advanced_stats"
ADVANCED_METRICS_DIR: Path = DATA_DIR / "new_processed" / "advanced_metrics"
NBA_BASE_DATA_DIR : Path = DATA_DIR / "nba_processed"
DEFENSE_DATA_DIR : Path = DATA_DIR / "defense_metrics"
FINAL_DATASET_DIR : Path = DATA_DIR / "merged_final_dataset"
PLAY_TYPES_DIR : Path = DATA_DIR / "synergyplay_types"
SPOTRAC_DIR  : Path = DATA_DIR / "spotrac_contract_data"
SILVER_DIR   : Path = SPOTRAC_DIR / "silver"
FINAL_DIR    : Path = SPOTRAC_DIR / "final"
SPOTRAC_DEBUG_DIR    : Path = SPOTRAC_DIR / "debug"
SPOTRAC_RAW_DIR      : Path = SPOTRAC_DIR / "raw"

# ── Helper functions (single source of truth) ────────────────────────────────
def get_injury_base_dir() -> Path:
    """
    Return the canonical injury_reports root.  
    ENV `INJURY_DATA_DIR` wins; otherwise we fall back to INJURY_DIR.
    Always creates the dir so callers can assume it exists.
    """
    base = Path(os.getenv("INJURY_DATA_DIR", INJURY_DIR)).resolve()
    return base

def injury_path(*parts: str) -> Path:
    """Shorthand for get_injury_base_dir().joinpath(*parts).resolve()."""
    return get_injury_base_dir().joinpath(*parts).resolve()

# ── One‑shot: ensure all declared dirs exist at import‑time ───────────────────
for _p in (
    DATA_DIR, RAW_DIR, DEBUG_DIR, EXPORTS_DIR, INJURY_DIR,
    SPOTRAC_DIR, SILVER_DIR, FINAL_DIR, SPOTRAC_DEBUG_DIR, SPOTRAC_RAW_DIR,
    NBA_BASIC_ADVANCED_STATS_DIR, NBA_BASE_DATA_DIR, ADVANCED_METRICS_DIR,
    DEFENSE_DATA_DIR, FINAL_DATASET_DIR,
    PLAY_TYPES_DIR   # ← ensure synergy play‑types folder exists
):
    _p.mkdir(parents=True, exist_ok=True)



print("all directories:")
print("root directory:")
print(f"PROJECT_ROOT: {PROJECT_ROOT}")
print(f"DATA_DIR: {DATA_DIR}")
print(f"RAW_DIR: {RAW_DIR}")
print(f"DEBUG_DIR: {DEBUG_DIR}")
print(f"EXPORTS_DIR: {EXPORTS_DIR}")
print(f"INJURY_DIR: {INJURY_DIR}")
print(f"INJURY_DATASETS_DIR: {INJURY_DATASETS_DIR}")
print(f"NBA_BASIC_ADVANCED_STATS_DIR: {NBA_BASIC_ADVANCED_STATS_DIR}")
print(f"NBA_BASE_DATA_DIR: {NBA_BASE_DATA_DIR}")
print(f"ADVANCED_METRICS_DIR: {ADVANCED_METRICS_DIR}")
print(f"DEFENSE_DATA_DIR: {DEFENSE_DATA_DIR}")
print(f"FINAL_DATASET_DIR: {FINAL_DATASET_DIR}")
print(f"PLAY_TYPES_DIR: {PLAY_TYPES_DIR}")
print(f"SPOTRAC_DIR: {SPOTRAC_DIR}")
print(f"SILVER_DIR: {SILVER_DIR}")
print(f"FINAL_DIR: {FINAL_DIR}")
print(f"SPOTRAC_DEBUG_DIR: {SPOTRAC_DEBUG_DIR}")
print(f"SPOTRAC_RAW_DIR: {SPOTRAC_RAW_DIR}")
print("all directories:")


Writing api/src/airflow_project/utils/config.py


# utils

In [8]:
%%writefile api/src/airflow_project/utils/airflow_helpers.py
"""
Shared utilities for Airflow DAGs to eliminate common AI-generated patterns.
"""
from pathlib import Path
from typing import List, Optional, Union
import pandas as pd
import re


def format_season_range(start_year: int, end_year: int) -> List[str]:
    """Generate season strings like '2015-16', '2016-17' from year range."""
    return [f"{y}-{(y+1)%100:02d}" for y in range(start_year, end_year + 1)]



def normalize_xcom_input(xcom_value) -> Union[str, List[str], None]:
    """Handle Airflow's LazyXComSequence and other XCom types consistently."""
    if xcom_value is None:
        return None
    
    # Handle LazyXComSequence (Airflow 3+)
    try:
        from airflow.sdk.execution_time.lazy_sequence import LazyXComSequence
        if isinstance(xcom_value, LazyXComSequence):
            xcom_value = list(xcom_value)
    except ImportError:
        pass
    
    # Handle other iterables
    if hasattr(xcom_value, "__iter__") and not isinstance(xcom_value, (str, Path)):
        xcom_value = list(xcom_value)
    
    # If it's a list, return the first element (common pattern)
    if isinstance(xcom_value, list):
        return xcom_value[0] if xcom_value else None
    
    return xcom_value


def load_cached_seasons(parquet_path: Path, force_full: bool) -> List[str]:
    """Load existing seasons from cache, respecting force_full flag."""
    if parquet_path.exists() and not force_full:
        return pd.read_parquet(parquet_path)["season"].unique().tolist()
    return []


def merge_parquet_shards(shard_paths: List[str], final_path: Path, 
                        force_full: bool = False, 
                        dedup_cols: Optional[List[str]] = None) -> str:
    """Merge parquet shards with caching logic."""
    clean_paths = [p for p in shard_paths if p]
    
    if not clean_paths:
        if final_path.exists():
            print(f"No new shards; using existing {final_path}")
            return str(final_path)
        raise RuntimeError("No shards provided and no cache present")
    
    fresh_dfs = [pd.read_parquet(p) for p in clean_paths]
    merged_fresh = pd.concat(fresh_dfs, ignore_index=True)
    
    if final_path.exists() and not force_full:
        cached = pd.read_parquet(final_path)
        merged = pd.concat([cached, merged_fresh], ignore_index=True)
        if dedup_cols:
            merged = merged.drop_duplicates(subset=dedup_cols)
    else:
        merged = merged_fresh
    
    final_path.parent.mkdir(parents=True, exist_ok=True)
    merged.to_parquet(final_path, index=False)
    return str(final_path)


def validate_required_columns(df: pd.DataFrame, required_cols: set, name: str = "dataframe"):
    """Validate that required columns exist in dataframe."""
    missing = required_cols - set(df.columns)
    if missing:
        raise ValueError(f"{name} missing required columns: {missing}")


def normalize_header(header: str) -> str:
    """Normalize header for robust matching."""
    return re.sub(r'[^a-z]', '', header.lower())


def extract_year_from_path(path: Union[str, Path]) -> Optional[int]:
    """Extract year from filename like 'advanced_2024-25.parquet' -> 2024."""
    match = re.search(r'(\d{4})', Path(path).stem)
    return int(match.group(1)) if match else None 

Overwriting api/src/airflow_project/utils/airflow_helpers.py


# Dags

In [9]:
%%writefile api/src/airflow_project/dags/nba_player_estimated_metrics.py
"""
Pull NBA Player Estimated Metrics from stats.nba.com via nba_api.
"""


from __future__ import annotations
from pathlib import Path
from typing import Dict, List

import pendulum
import pandas as pd
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

from eda.nba_em_connector import (
    fetch_player_estimated_metrics,
    normalize_player_estimated_metrics,
)
from utils.storage import write_final_dataset
from utils.airflow_helpers import format_season_range, normalize_xcom_input, load_cached_seasons

# Config
SEASON_START_DEFAULT = "2015-16"
SEASON_END_DEFAULT = "2024-25"
SEASON_TYPE_DEFAULT = "Regular Season"

BASE_DIR = Path("api/src/airflow_project/data")
RAW_DIR = BASE_DIR / "raw" / "player_est_metrics"
SILVER_DIR = BASE_DIR / "silver" / "player_est_metrics"
FINAL_DIR = BASE_DIR / "final"

FINAL_PARQUET = FINAL_DIR / "player_est_metrics_all.parquet"

REQ_COLS = {
    "PLAYER_ID", "PLAYER_NAME", "E_OFF_RATING", "E_DEF_RATING", 
    "E_NET_RATING", "E_USG_PCT", "E_PACE", "E_AST_RATIO", 
    "SEASON", "SEASON_TYPE",
}


@dag(
    dag_id="nba_player_estimated_metrics",
    start_date=pendulum.datetime(2025, 7, 1, tz="America/New_York"),
    schedule="15 4 * * *",
    catchup=False,
    tags=["nba", "player_estimated_metrics", "nba_api"],
    default_args={"retries": 2},
    params={
        "seasons": None,
        "season_start": SEASON_START_DEFAULT,
        "season_end": SEASON_END_DEFAULT,
        "season_type": SEASON_TYPE_DEFAULT,
        "force_full": False,
    },
)
def nba_player_estimated_metrics_dag():

    @task
    def determine_seasons() -> Dict[str, List[str] | bool | str]:
        """Figure out which seasons to process."""
        ctx = get_current_context()
        p = ctx["params"]

        if p.get("seasons"):
            requested = list(map(str, p["seasons"]))
        else:
            start_year = int(p.get("season_start", SEASON_START_DEFAULT).split("-")[0])
            end_year = int(p.get("season_end", SEASON_END_DEFAULT).split("-")[0])
            requested = format_season_range(start_year, end_year)

        season_type = str(p.get("season_type", SEASON_TYPE_DEFAULT))
        force_full = bool(p.get("force_full", False))

        existing = load_cached_seasons(FINAL_PARQUET, force_full)
        missing = sorted(set(requested) - set(existing))

        print(f"[determine_seasons] requested={requested}, existing={existing}, missing={missing}, force_full={force_full}")

        return {
            "requested": requested,
            "missing": missing,
            "season_type": season_type,
            "force_full": force_full,
        }

    @task
    def build_fetch_payloads(season_info: Dict[str, List[str] | bool | str]) -> List[Dict[str, str]]:
        """Build payloads for expand_kwargs."""
        season_type = str(season_info["season_type"])
        payloads = [{"season": s, "season_type": season_type} for s in season_info["missing"]]
        print(f"[build_fetch_payloads] {len(payloads)} payloads")
        return payloads

    @task
    def fetch_one_season(season: str, season_type: str) -> Dict[str, str]:
        """Fetch raw data for a single season."""
        RAW_DIR.mkdir(parents=True, exist_ok=True)

        result = fetch_player_estimated_metrics(
            season=season,
            season_type=season_type,
            raw_dir=RAW_DIR
        )
        print(f"[fetch_one_season] {season} saved {list(result.keys())}")
        return {k: str(v) for k, v in result.items()}

    @task
    def collect_fetched(fetched):
        """Materialize LazyXComSequence to list."""
        return list(fetched)

    @task(multiple_outputs=True)
    def normalize_shards(saved_paths_dicts):
        """Normalize shards and combine into temp parquet."""
        saved_paths_dicts = normalize_xcom_input(saved_paths_dicts)
        if not isinstance(saved_paths_dicts, list):
            saved_paths_dicts = list(saved_paths_dicts)

        SILVER_DIR.mkdir(parents=True, exist_ok=True)

        final_paths_all = []
        df_frames = []

        for paths in saved_paths_dicts:
            norm_paths, df = normalize_player_estimated_metrics(
                {k: Path(v) for k, v in paths.items()},
                silver_dir=SILVER_DIR
            )
            final_paths_all.extend([str(p) for p in norm_paths])
            if not df.empty:
                df_frames.append(df)

        tmp = pd.concat(df_frames, ignore_index=True) if df_frames else pd.DataFrame()
        tmp_path = SILVER_DIR / "_tmp_combined_current_run.parquet"
        tmp.to_parquet(tmp_path, index=False)

        print(f"[normalize_shards] combined {len(tmp)} rows")
        return {"final_paths": final_paths_all, "combined_tmp": str(tmp_path)}

    @task
    def merge_to_final(tmp_path: str, season_info: Dict[str, List[str] | bool | str]) -> str:
        """Merge temp parquet into final dataset."""
        tmp_path = Path(tmp_path) if tmp_path else None

        if tmp_path and tmp_path.exists():
            fresh = pd.read_parquet(tmp_path)
        else:
            fresh = pd.DataFrame()

        if fresh.empty and FINAL_PARQUET.exists() and not season_info["force_full"]:
            print("[merge_to_final] No new data; using cached final.")
            return str(FINAL_PARQUET)

        if FINAL_PARQUET.exists() and not season_info["force_full"]:
            cached = pd.read_parquet(FINAL_PARQUET)
            merged = pd.concat([cached, fresh], ignore_index=True)
        else:
            merged = fresh

        if not merged.empty:
            missing_cols = {"SEASON", "SEASON_TYPE"} - set(merged.columns)
            if missing_cols:
                raise ValueError(f"Missing required columns for deduplication: {missing_cols}")
            merged = merged.drop_duplicates(subset=["PLAYER_ID", "SEASON", "SEASON_TYPE"])

        FINAL_DIR.mkdir(parents=True, exist_ok=True)
        final_path = write_final_dataset(merged, FINAL_PARQUET)
        print(f"[merge_to_final] final {len(merged)} rows")
        return str(final_path)

    @task
    def validate_outputs(final_pq: str) -> None:
        """Validate final dataset."""
        df = pd.read_parquet(final_pq)

        missing = REQ_COLS - set(df.columns)
        if missing:
            raise ValueError(f"Missing required columns: {missing}")

        print("Rows by SEASON:")
        print(df.groupby("SEASON").size())
        print("Rows by SEASON_TYPE:")
        print(df.groupby("SEASON_TYPE").size())
        print("Top 5 E_NET_RATING by season:")
        print(df.sort_values("E_NET_RATING", ascending=False)
              .groupby("SEASON")
              .head(5)[["PLAYER_NAME", "E_NET_RATING", "SEASON"]]
              .to_string(index=False))

    # Workflow
    season_info = determine_seasons()
    payloads = build_fetch_payloads(season_info)
    fetched = fetch_one_season.expand_kwargs(payloads)
    collected = collect_fetched(fetched)
    norm_res = normalize_shards(collected)
    final_pq = merge_to_final(tmp_path=norm_res["combined_tmp"], season_info=season_info)
    validate_outputs(final_pq)

nba_player_estimated_metrics_dag()






Overwriting api/src/airflow_project/dags/nba_player_estimated_metrics.py


In [10]:
%%writefile api/src/airflow_project/dags/new_injury_reports.py
"""
## NBA Injury Reports ETL (ESPN + others)

Daily pipeline that:
1. Determines which dates to ingest (default: yesterday -> today).
2. Fetches and stores injury reports (one task per day via dynamic mapping).
3. Runs a simple validation.

Implements TaskFlow API & dynamic task mapping. :contentReference[oaicite:1]{index=1}
"""

from __future__ import annotations
from pathlib import Path
import pandas as pd
from airflow.sdk.execution_time.lazy_sequence import LazyXComSequence  # NEW
import datetime as dt
import pendulum

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

# Import your module
from eda.new_injury_reports import run_daily_ingestion, clean_and_normalize, incremental_store, make_sources
from eda.combining_injury_reports import (
    build_full_dataset,  # now the new function
    injury_path,
)
from utils.config import injury_path

# -------- Config defaults (override with DAG Params) -----------------
# DEFAULT_OUTPUT = Path("api/src/airflow_project/data/injury_reports")
# Use the canonical injury_reports directory from config:
DEFAULT_OUTPUT = injury_path("")      # returns data/injury_reports
print(f"DEFAULT_OUTPUT: {DEFAULT_OUTPUT}")

@dag(
    dag_id="injury_reports_daily",
    start_date=pendulum.datetime(2025, 7, 1, tz="America/New_York"),
    schedule="0 5 * * *",             # 05:00 ET daily pull
    catchup=False,
    max_active_runs=1,
    tags=["nba", "injuries", "espn"],
    default_args={"retries": 1},
    params={
        "start_date": None,           # ISO yyyy-mm-dd
        "end_date": None,             # ISO yyyy-mm-dd
        "days_back": 1,               # if no start/end given
        "force_full": False,          # not used yet, kept for parity
        "fix_return_year": True,
        "season_start_year": 2024,    # NEW: season configuration
        "allowed_years": [2024, 2025], # computed from season_start_year
        "statsurge_root": "data/statsurge",
        "parquet_path": str(DEFAULT_OUTPUT / "injuries_primary.parquet"),
    },
)
def injury_reports_daily():

    @task
    def determine_dates() -> list[str]:
        """
        Decide which dates to fetch.
        Priority:
          1. explicit start_date/end_date params
          2. days_back param (N days ending yesterday)
        Returns list of ISO date strings.
        """
        ctx = get_current_context()
        p = ctx["params"]

        if p.get("start_date") and p.get("end_date"):
            start = dt.date.fromisoformat(p["start_date"])
            end = dt.date.fromisoformat(p["end_date"])
        else:
            days_back = int(p.get("days_back", 1))
            end = dt.date.today()
            start = end - dt.timedelta(days=days_back)

        dates = [ (start + dt.timedelta(days=i)).isoformat()
                  for i in range((end - start).days + 1) ]

        print(f"[determine_dates] start={start} end={end} -> {len(dates)} days")
        return dates

    @task
    def fetch_one_day(date_str: str) -> dict[str, str]:
        """
        Ingest one day's injuries and upsert to parquet.
        RETURNS a dict so downstream can safely access by string key:
          {"parquet_path": "<path/to/file.parquet>"}
        """
        ctx = get_current_context()
        p = ctx["params"]

        date = dt.date.fromisoformat(date_str)
        # parquet_path = Path(p["parquet_path"])
        parquet_path = injury_path("injuries_primary.parquet")

        # Season wiring ----------------------------------------------------------
        season_start = int(p.get("season_start_year", 2024))
        allowed_years = (season_start, season_start + 1)

        # Let run_daily_ingestion build sources internally to avoid duplication
        df_day = run_daily_ingestion(
            date=date,
            sources=None,  # Let it build sources internally
            cleaner=clean_and_normalize,
            store_fn=lambda d: incremental_store(d, path=parquet_path),
            drop_duplicates_on=["player_name","team","status","body_part","report_date","est_return_date"],
            fix_return_year=bool(p.get("fix_return_year", True)),
            allowed_years=allowed_years,
            statsurge_root=p.get("statsurge_root", "data/statsurge"),
        )
        print(f"[fetch_one_day] {date} rows={len(df_day)} -> {parquet_path}")

        return {"parquet_path": str(parquet_path)}

    @task
    def final_check(parquet_path: str | list[str] | None = None) -> None:
        """
        Basic validation. Handles Airflow LazyXComSequence / list / str safely.
        """
        from airflow.sdk.execution_time.lazy_sequence import LazyXComSequence  # optional import guard

        print(f"[final_check] Received parquet_path type: {type(parquet_path)}")
        print(f"[final_check] Received parquet_path value: {parquet_path}")

        if parquet_path is None:
            raise ValueError("final_check received no parquet_path")

        # Normalize lazy proxies or other iterables to a list
        if isinstance(parquet_path, LazyXComSequence) or (
            not isinstance(parquet_path, (str, Path)) and hasattr(parquet_path, "__iter__")
        ):
            parquet_path = list(parquet_path)

        # If we ended up with a list, pick the first element (or iterate/validate all)
        if isinstance(parquet_path, list):
            if not parquet_path:
                raise ValueError("final_check received an empty list for parquet_path")
            parquet_path = parquet_path[0]

        pq = Path(parquet_path)
        if not pq.exists():
            raise FileNotFoundError(f"Parquet file not found: {pq}")

        df = pd.read_parquet(pq)
        req = {"player_name","team","status","report_date","source"}
        miss = req - set(df.columns)
        assert not miss, f"Missing columns: {miss}"

        print(f"[final_check] Successfully validated {len(df)} rows")
        print("Rows by report_date:\n", df.groupby("report_date").size())

    @task
    def combine_full_dataset(parquet_path: str | list[str] | LazyXComSequence | None = None) -> str:
        # 1. Normalize lazy proxies or other iterables to a list
        if isinstance(parquet_path, LazyXComSequence) \
        or (not isinstance(parquet_path, (str, Path)) and hasattr(parquet_path, "__iter__")):
            parquet_path = list(parquet_path)

        # 2. If it’s a list, pick the first element
        if isinstance(parquet_path, list):
            if not parquet_path:
                raise ValueError("combine_full_dataset received an empty list for parquet_path")
            parquet_path = parquet_path[0]

        # 3. Now it’s safe to build a Path
        file_a = Path(parquet_path).resolve()
        file_b = injury_path("NBA Player Injury Stats(1951 - 2023).parquet")
        out_dir = injury_path("")

        full_path = build_full_dataset(
            file_a=file_a,
            file_b=file_b,
            out_dir=out_dir,
            full_out_name="historical_injuries_1951_2025_clean.parquet",
            duplicate_strategy="keep_first",
            write_intermediates=False,
        )
        return str(full_path)


    # ---- DAG flow ----
    date_list = determine_dates()
    paths = fetch_one_day.expand(date_str=date_list)

    # SAFE string-key lookup; no int indexing
    # Airflow will automatically reduce multiple identical paths to a single value
    final_check(parquet_path=paths["parquet_path"])
    combined = combine_full_dataset(parquet_path=paths["parquet_path"])

injury_reports_daily()


Overwriting api/src/airflow_project/dags/new_injury_reports.py


In [11]:
%%writefile api/src/airflow_project/dags/spotrac_misc_assets.py

"""
Fetch Spotrac misc assets and save to final directory.
"""


from __future__ import annotations
from pathlib import Path
import pendulum, pandas as pd
import re
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.exceptions import AirflowSkipException
import os

from eda.spotrac_connector import (
    fetch_spotrac_salary_cap_history,
    fetch_spotrac_multi_year_tracker,
    fetch_spotrac_cash_tracker,
    fetch_spotrac_extensions,
    fetch_spotrac_tax_tracker,
    fetch_spotrac_transactions,
)
from utils.storage import write_final_dataset
from utils.config import RAW_DIR, DEBUG_DIR, FINAL_DIR, SPOTRAC_RAW_DIR, SPOTRAC_DEBUG_DIR
from utils.airflow_helpers import normalize_header, extract_year_from_path

BASE_DIR = RAW_DIR.parent


def resolve_column_aliases(df, column_map: dict[str, list[str]], 
                          extra_fixers: dict[str, callable] = None):
    """Rename columns to standard names using aliases."""
    rename_log = {}
    cols_raw = list(df.columns)
    cols_norm = {normalize_header(c): c for c in cols_raw}

    for target, aliases in column_map.items():
        if target in df.columns:
            continue
        found = None
        for alias in aliases:
            norm = normalize_header(alias)
            if norm in cols_norm:
                found = cols_norm[norm]
                break
        if found:
            df = df.rename(columns={found: target})
            rename_log[found] = target
        elif extra_fixers and target in extra_fixers:
            df = extra_fixers[target](df)

    missing = [k for k in column_map if k not in df.columns]
    return df, missing, rename_log


@dag(
    dag_id="spotrac_misc_assets",
    start_date=pendulum.datetime(2025, 7, 1, tz="America/New_York"),
    schedule="15 4 * * *",
    catchup=False,
    tags=["nba", "spotrac", "misc"],
    default_args={"retries": 2},
    params={
        "season_start": 2015,
        "season_end": 2025,
        "recent_years": 3,
        "txn_days": 7,
        "force_full": False,
    },
)
def spotrac_misc_assets():

    @task
    def determine_years() -> dict:
        """Build year lists and flags."""
        ctx = get_current_context()["params"]
        yr_all = list(range(int(ctx["season_start"]), int(ctx["season_end"]) + 1))
        recent_n = int(ctx["recent_years"])
        info = {
            "all_years": yr_all,
            "recent_years": yr_all[-recent_n:],
            "txn_days": int(ctx["txn_days"]),
            "force_full": bool(ctx["force_full"]),
        }
        print(f"[determine_years] {info}")
        return info

    @task
    def get_recent_years(info: dict) -> list[int]:
        """Extract recent years for mapping."""
        yrs = info["recent_years"]
        print(f"[get_recent_years] {yrs}")
        return yrs

    # Static assets
    @task
    def fetch_cba() -> str:
        """Fetch CBA history."""
        out_path, df, _ = fetch_spotrac_salary_cap_history(
            raw_dir=SPOTRAC_RAW_DIR,
            debug_dir=SPOTRAC_DEBUG_DIR,
        )
        abs_out = str(Path(out_path).resolve())
        print(f"[fetch_cba] rows={len(df)} cols={df.shape[1]} -> {abs_out}")
        return abs_out

    @task
    def fetch_multi_cap() -> str:
        out, df, _ = fetch_spotrac_multi_year_tracker(
            raw_dir=SPOTRAC_RAW_DIR,
            debug_dir=SPOTRAC_DEBUG_DIR,
        )
        print(f"[fetch_multi_cap] rows={len(df)} cols={df.shape[1]} -> {out}")
        return str(out)

    @task
    def fetch_multi_cash() -> str:
        out, df, _ = fetch_spotrac_cash_tracker(
            raw_dir=SPOTRAC_RAW_DIR,
            debug_dir=SPOTRAC_DEBUG_DIR,
        )
        print(f"[fetch_multi_cash] rows={len(df)} cols={df.shape[1]} -> {out}")
        return str(out)

    # Yearly extensions/tax
    @task
    def fetch_extension(year: int) -> str | None:
        """Fetch extensions for a single year."""
        try:
            out, df, _ = fetch_spotrac_extensions(
                year,
                raw_dir=SPOTRAC_RAW_DIR,
                debug_dir=SPOTRAC_DEBUG_DIR,
            )
            print(f"[fetch_extension] year={year} rows={len(df)} -> {out}")
            return str(out)
        except RuntimeError as e:
            print(f"[fetch_extension] year={year} failed: {e}")
            raise AirflowSkipException(f"No extensions table for {year}")

    @task
    def fetch_tax(year: int) -> str:
        out, df, _ = fetch_spotrac_tax_tracker(
            year,
            raw_dir=SPOTRAC_RAW_DIR,
            debug_dir=SPOTRAC_DEBUG_DIR,
        )
        print(f"[fetch_tax] year={year} rows={len(df)} -> {out}")
        return str(out)

    # Rolling transactions
    @task
    def fetch_txn(info: dict) -> str:
        """Pull transactions for rolling window."""
        from datetime import date, timedelta
        end = date.today()
        start = end - timedelta(days=info["txn_days"])
        out, df, _ = fetch_spotrac_transactions(
            start.isoformat(),
            end.isoformat(),
            raw_dir=SPOTRAC_RAW_DIR,
            debug_dir=SPOTRAC_DEBUG_DIR,
        )
        if out:
            print(f"[fetch_txn] {start}->{end} rows={len(df)} -> {out}")
            return str(out)
        print(f"[fetch_txn] {start}->{end} no data")
        return ""

    # Merge helpers
    @task
    def merge_yearly(paths: list[str | None], name: str) -> str:
        """Merge parquet shards from mapped tasks."""
        clean = [p for p in paths if p]
        final = FINAL_DIR / f"{name}_all.parquet"
        print(f"[merge_yearly] merging {len(clean)} shards of {name}")

        if clean:
            dfs = [pd.read_parquet(p) for p in clean]
            merged = pd.concat(dfs, ignore_index=True)
            write_final_dataset(merged, final)
            print(f"[merge_yearly] wrote {final}")
            return str(final)

        if final.exists():
            print(f"[merge_yearly] using existing {final}")
            return str(final)

        print(f"[merge_yearly] no data available")
        return ""

    @task
    def validate_assets(cba: str, cap: str, cash: str,
                        ext: str, tax: str, txn: str) -> None:
        """Validate all produced parquet assets."""
        import json

        input_paths = {
            "cba": cba,
            "multi_cap": cap,
            "multi_cash": cash,
            "extensions": ext,
            "tax": tax,
            "txn": txn
        }
        mandatory = {"cba", "multi_cap", "multi_cash"}

        # Check existence
        valid_paths = {}
        for label, p in input_paths.items():
            if not p or not os.path.isfile(p):
                if label in mandatory:
                    raise FileNotFoundError(f"{label} file missing: {p}")
                print(f"[validate_assets] {label} skipped")
            else:
                valid_paths[label] = p

        print(f"[validate_assets] validating: {list(valid_paths.keys())}")

        # Column mapping
        column_maps = {
            "cba": {"year": ["season", "yr"]},
            "multi_cap": {"team": ["team", "teamname"]},
            "multi_cash": {"team": ["team", "teamname"]},
            "extensions": {
                "player": ["players"],
                "total": ["total value", "value", "amount", "totalvalue"],
                "year": ["yr", "season"],
            },
            "tax": {
                "team": ["team", "teamname"],
                "tax": ["tax bill", "taxbill", "luxurytax", "taxthreshold"],
                "year": ["yr", "season"],
            },
            "txn": {
                "date": ["transactiondate"],
                "player": ["players", "player(s)"],
                "type": ["details", "move", "transactiontype"],
            },
        }

        # Setup caching
        cache_dir = DEBUG_DIR / "validate"
        cache_dir.mkdir(parents=True, exist_ok=True)
        schema_cache = cache_dir / "validate_schema_cache.json"
        stats_cache = cache_dir / "validate_stats_cache.json"

        old_schema = {}
        old_stats = {}
        if schema_cache.exists():
            old_schema = pd.read_json(schema_cache, typ="series").to_dict()
        if stats_cache.exists():
            old_stats = json.loads(stats_cache.read_text())

        drift_report = {}
        row_drift = {}
        current_stats = {}

        for label, p in valid_paths.items():
            df = pd.read_parquet(p)

            # Schema drift check
            prev_cols = set(old_schema.get(label, []))
            curr_cols = set(df.columns.tolist())
            added = sorted(curr_cols - prev_cols)
            removed = sorted(prev_cols - curr_cols)
            if added or removed:
                drift_report[label] = {"added": added, "removed": removed}

            # Resolve column aliases
            logical_req = column_maps[label]

            def add_year_if_missing(dfi: pd.DataFrame) -> pd.DataFrame:
                y = extract_year_from_path(p)
                return dfi.assign(year=y) if y is not None else dfi

            dfn, missing, renames = resolve_column_aliases(
                df, logical_req, extra_fixers={"year": add_year_if_missing}
            )
            if missing:
                raise AssertionError(f"{label} missing cols: {set(missing)}")

            # Numeric stats
            num_cols = [c for c in dfn.columns if pd.api.types.is_numeric_dtype(dfn[c])]
            means = {c: float(dfn[c].dropna().mean()) for c in num_cols}
            current_stats[label] = {
                "rows": int(len(dfn)),
                "cols": int(len(dfn.columns)),
                "means": means
            }

            # Compare with previous stats
            if label in old_stats:
                prev = old_stats[label]
                if prev["rows"] != current_stats[label]["rows"]:
                    row_drift[label] = {"old": prev["rows"], "new": current_stats[label]["rows"]}
                
                mean_drift = {c: (prev["means"].get(c), means.get(c)) 
                             for c in set(prev["means"]) | set(means)}
                flagged = {}
                for c, (old_m, new_m) in mean_drift.items():
                    if old_m is None or new_m is None:
                        continue
                    if old_m == 0 and new_m != 0:
                        flagged[c] = (old_m, new_m)
                    else:
                        rel = abs(new_m - old_m) / (abs(old_m) + 1e-9)
                        if rel > 0.05:
                            flagged[c] = (old_m, new_m)
                if flagged:
                    print(f"[validate_assets] {label} mean drift >5%: {flagged}")

            print(f"[validate_assets] {label}: rows={len(dfn)} OK; renames={renames}")

        # Persist snapshots
        pd.Series({
            lbl: pd.read_parquet(p).columns.tolist()
            for lbl, p in valid_paths.items()
        }).to_json(schema_cache)
        stats_cache.write_text(json.dumps(current_stats, indent=2))

        if drift_report:
            print("[validate_assets] SCHEMA DRIFT:")
            for lbl, rep in drift_report.items():
                print(f"  - {lbl}: added={rep['added']} removed={rep['removed']}")

        if row_drift:
            print("[validate_assets] ROW COUNT DRIFT:")
            for lbl, rep in row_drift.items():
                print(f"  - {lbl}: old={rep['old']} new={rep['new']}")

    # DAG graph
    info = determine_years()
    recent_list = get_recent_years(info)

    cba_p = fetch_cba()
    cap_p = fetch_multi_cap()
    cash_p = fetch_multi_cash()

    ext_paths = fetch_extension.expand(year=recent_list)
    tax_paths = fetch_tax.expand(year=recent_list)

    ext_final = merge_yearly(ext_paths, name="extensions")
    tax_final = merge_yearly(tax_paths, name="tax")

    txn_p = fetch_txn(info)

    validate_assets(cba_p, cap_p, cash_p, ext_final, tax_final, txn_p)

spotrac_misc_assets()




Overwriting api/src/airflow_project/dags/spotrac_misc_assets.py


In [12]:
%%writefile api/src/airflow_project/dags/defensive_metrics_collect.py

"""
Collect NBA defensive‑metrics into parquet using TaskFlow + dynamic mapping.

**Params supported**
- seasons:        explicit ["2022‑23","2023‑24"]  (overrides everything)
- season_start:   first season if `seasons` not given
- season_end:     last  season if `seasons` not given
- force_full:     if True ignore cache & rebuild all

Runs at 04:30 ET daily.
"""
from __future__ import annotations
from pathlib import Path
from typing import Iterable, Dict, List

import pandas as pd
import pendulum

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context

from eda.defensive_metrics import DefensiveMetricsCollector
from utils.storage import write_final_dataset

# ---- Config ----
BASE_DIR   = Path("api/src/airflow_project/data")
RAW_DIR    = BASE_DIR / "raw"
FINAL_DIR  = BASE_DIR / "final"
MIN_SEASON = "2018-19"        # hard floor in case params go wild
MAX_SEASON = "2024-25"        # keep in sync with collector

# ------------------------------------------------------------------ DAG
@dag(
    dag_id="defensive_metrics_collect",
    start_date=pendulum.datetime(2025, 7, 1, tz="America/New_York"),
    schedule="30 4 * * *",
    catchup=False,
    tags=["nba", "defense", "metrics"],
    default_args={"retries": 2},
    params={
        "seasons": None,                # explicit list
        "season_start": MIN_SEASON,
        "season_end":   MAX_SEASON,
        "force_full":   False,
    },
)
def defensive_metrics_collect():
    # ---------- helpers ----------
    @task
    def determine_seasons() -> Dict[str, List[str] | bool]:
        """
        Decide which seasons to fetch by comparing DAG params to cached parquet.
        """
        ctx   = get_current_context()
        p     = ctx["params"]

        # build requested list
        if p.get("seasons"):
            requested: List[str] = list(map(str, p["seasons"]))
        else:
            start = str(p.get("season_start", MIN_SEASON))
            end   = str(p.get("season_end",   MAX_SEASON))
            # assume season format 'YYYY-YY'
            start_year = int(start.split("-")[0])
            end_year   = int(end.split("-")[0])
            requested  = [f"{y}-{str(y+1)[-2:]}" for y in range(start_year, end_year + 1)]

        force_full: bool = bool(p.get("force_full", False))

        cache = FINAL_DIR / "def_metrics_all.parquet"
        if cache.exists() and not force_full:
            existing = pd.read_parquet(cache)["season"].unique().tolist()
        else:
            existing = []

        missing = sorted(set(requested) - set(existing))

        print("[determine_seasons] requested:", requested)
        print("[determine_seasons] existing :", existing)
        print("[determine_seasons] missing  :", missing)
        print("[determine_seasons] force_full:", force_full)

        return {
            "requested": requested,
            "missing":   missing,
            "force_full": force_full,
        }

    # ---------- dynamic fetch ----------
    @task
    def seasons_for_mapping(meta: dict) -> List[str]:
        """Return only the seasons we still need (for .expand)."""
        return list(meta["missing"])

    @task
    def fetch_one_season(season: str) -> str:
        """
        Pull a single season and write a parquet shard into RAW_DIR.
        """
        df = DefensiveMetricsCollector.collect_metrics_for_seasons([season])
        out = RAW_DIR / f"def_metrics_{season}.parquet"
        out.parent.mkdir(parents=True, exist_ok=True)
        df.to_parquet(out, index=False)
        print(f"[fetch_one_season] season={season} rows={len(df)} → {out}")
        return str(out)

    # ---------- merge / overwrite ----------
    @task
    def merge_shards(shard_paths: List[str], meta: dict) -> str:
        """
        Merge all shards (new or cached) and overwrite the final parquet.
        """
        final_path = FINAL_DIR / "def_metrics_all.parquet"
        fresh_dfs  = [pd.read_parquet(p) for p in shard_paths]

        if fresh_dfs:
            merged_fresh = pd.concat(fresh_dfs, ignore_index=True)
            if final_path.exists() and not meta["force_full"]:
                cached = pd.read_parquet(final_path)
                merged = pd.concat([cached, merged_fresh], ignore_index=True).drop_duplicates(
                    subset=["PLAYER_NAME", "season"]
                )
            else:
                merged = merged_fresh
            final_path = write_final_dataset(merged, final_path)
        else:
            if final_path.exists():
                print("[merge_shards] No new seasons; using cached file", final_path)
            else:
                raise RuntimeError("No shards provided and no cache present.")

        return str(final_path)

    # ---------- final assertions ----------
    @task
    def validate_outputs(final_pq: str) -> None:
        df = pd.read_parquet(final_pq)
        must_have = {
            "PLAYER_NAME", "season", "DEF_RATING", "dbpm", "dws", "PLUSMINUS"
        }
        missing = must_have - set(df.columns)
        assert not missing, f"Missing cols: {missing}"
        print("Rows/season:\n", df.groupby("season").size())

    # ---------------- DAG orchestration ----------------
    meta          = determine_seasons()
    seasons_list  = seasons_for_mapping(meta)
    shards        = fetch_one_season.expand(season=seasons_list)
    final_path    = merge_shards(shards, meta)
    validate_outputs(final_path)

defensive_metrics_collect()


Overwriting api/src/airflow_project/dags/defensive_metrics_collect.py


In [13]:
%%writefile api/src/airflow_project/dags/nba_api_ingest.py
# dags/nba_api_ingest.py
"""
Pulls roster + box‑score data from nba_api once per hour and writes Parquet
partitions under data/new_processed/season=<YYYY-YY>/part.parquet.

Why hourly?
• The NBA Stats endpoints update within minutes after a game ends.
• Hourly keeps your lake near‑real‑time without hammering the API.
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import os, sys, pathlib

# ── Use centralized config ─────────────────────────────────────────────────────
from utils.nba_basics_config import PROJECT_ROOT, DATA_DIR
from utils.incremental_utils import (
    get_latest_ingested_date, 
    get_next_date_to_pull, 
    incremental_pull,
    should_pull_incremental
)

# Allow `nba_basic_advanced_stats` imports
sys.path.insert(0, str(PROJECT_ROOT / "api" / "src"))

default_args = {
    "owner": "data_eng",
    "email": ["alerts@example.com"],
    "email_on_failure": True,
    "depends_on_past": False,      # explicit
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "sla": timedelta(hours=1),
}

with DAG(
    dag_id="nba_api_ingest",
    start_date=datetime(2025, 7, 1),
    schedule="@hourly",            # unified scheduling API (Airflow ≥ 2.4)
    catchup=False,
    default_args=default_args,
    max_active_runs=1,             # avoid overlapping pulls
    tags=["nba", "api", "ingest"],
    params={
        "start_year": 2024,  # first season to pull
        "end_year":   2025,  # last season to pull
    },
) as dag:

    def pull_incremental(**context):
        """
        Determine the next date to pull based on existing data and perform incremental pull.
        Falls back to full season pull if no existing data is found.
        """
        p = context["params"]
        sy = int(p["start_year"])
        ey = int(p["end_year"])
        
        # For now, focus on the current season (2024-25)
        season = f"{sy}-{str(sy+1)[-2:]}"
        
        print(f"[pull_incremental] Checking for incremental pull for season {season}")
        
        # Check if we should do incremental pull
        if should_pull_incremental(DATA_DIR, season):
            # Get the next date to pull
            next_date = get_next_date_to_pull(DATA_DIR, season)
            if next_date:
                print(f"[pull_incremental] Pulling incremental data for {next_date}")
                incremental_pull(
                    data_dir=DATA_DIR,
                    season=season,
                    date_to_pull=next_date,
                    workers=8,
                    debug=False
                )
            else:
                print(f"[pull_incremental] No next date found, skipping")
        else:
            # Fall back to full season pull
            print(f"[pull_incremental] No existing data found, doing full season pull")
            from eda.nba_basic_advanced_stats.main import main as pull_main
            pull_main(
                start_year=sy,
                end_year=ey,
                small_debug=True,
                workers=8,
                overwrite=False,
                output_base=str(DATA_DIR),
            )

    PythonOperator(
        task_id="scrape_incremental_data",
        python_callable=pull_incremental,
    ) 


Overwriting api/src/airflow_project/dags/nba_api_ingest.py


In [14]:
%%writefile api/src/airflow_project/dags/nba_advanced_ingest.py
# dags/nba_advanced_ingest.py
"""
Daily scrape of Basketball‑Reference season‑level advanced metrics.
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import os, sys
from pathlib import Path

# ── Use centralized config ─────────────────────────────────────────────────────
from utils.nba_basics_config import PROJECT_ROOT, ADVANCED_METRICS_DIR

sys.path.insert(0, str(PROJECT_ROOT / "api" / "src"))
from eda.nba_basic_advanced_stats.scrape_utils import _season_advanced_df

default_args = {
    "owner": "data_eng",
    "email": ["alerts@example.com"],
    "email_on_failure": True,
    "depends_on_past": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "sla": timedelta(hours=1),
}

with DAG(
    dag_id="nba_advanced_ingest",
    start_date=datetime(2025, 7, 1),
    schedule="@daily",
    catchup=False,
    max_active_runs=1,
    default_args=default_args,
    tags=["nba", "advanced", "ingest"],
    params={
        "start_year": 2024,  # first season to scrape
        "end_year":   2025,  # last season to scrape
    },
) as dag:

    def scrape_adv(**ctx):
        """
        Loop through start_year..end_year, fetch each season's
        advanced stats, and write to parquet.

        - If a season fetch fails, log a big warning and skip it.
        - After looping, list all missing seasons in a single warning.
        - If _any_ missing season falls in its data-window (Nov–Jun),
        raise an error so the DAG fails (data should exist).
        """
        from datetime import datetime

        p = ctx["params"]
        start_year = int(p["start_year"])
        end_year   = int(p["end_year"])

        missing = []

        # 1️⃣ Fetch each season
        for y in range(start_year, end_year + 1):
            season = f"{y}-{str(y+1)[-2:]}"
            try:
                df = _season_advanced_df(season)
            except RuntimeError as err:
                print(f"⚠️  [WARNING] Unable to fetch advanced stats for {season}: {err}")
                missing.append(season)
                continue

            out_dir = ADVANCED_METRICS_DIR
            out_dir.mkdir(parents=True, exist_ok=True)
            df.to_parquet(out_dir / f"advanced_{season}.parquet", index=False)

        # 2️⃣ Summary missing seasons
        if missing:
            print("\n⚠️⚠️⚠️  Missing advanced data for seasons:", ", ".join(missing), "⚠️⚠️⚠️\n")

            # 3️⃣ If it's currently Nov–Jun for any missing season, fail
            now = datetime.now()
            should_exist = []
            for season in missing:
                sy = int(season[:4])
                if (now.year == sy and now.month >= 11) or \
                (now.year == sy + 1 and 1 <= now.month <= 6):
                    should_exist.append(season)

            if should_exist:
                raise RuntimeError(
                    f"Advanced stats pages _should_ exist for: {', '.join(should_exist)} "
                    f"(current month: {now.month}). Aborting DAG."
                )

    PythonOperator(
        task_id="scrape_advanced_metrics",
        python_callable=scrape_adv,
    ) 







Overwriting api/src/airflow_project/dags/nba_advanced_ingest.py


In [15]:
%%writefile api/src/airflow_project/dags/nba_data_loader.py
# dags/nba_data_loader.py
"""
Load processed NBA data into DuckDB for analysis.
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
import duckdb
import pandas as pd

from pathlib import Path
import sys

# ── Use centralized config ─────────────────────────────────────────────────────
from utils.config import (
    PROJECT_ROOT, DATA_DIR, INJURY_DIR, NBA_BASE_DATA_DIR
)

# Ensure our code can import the eda package
sys.path.insert(0, str(PROJECT_ROOT / "api" / "src"))
from eda.nba_basic_advanced_stats.data_utils import validate_data

# Use centralized data root from config
DATA_ROOT = DATA_DIR

default_args = {
    "owner": "data_eng",
    "email": ["alerts@example.com"],
    "email_on_failure": True,
    "depends_on_past": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "sla": timedelta(hours=3),
}

with DAG(
    dag_id="nba_data_loader",
    start_date=datetime(2025, 7, 1),
    schedule="@daily",
    catchup=False,
    max_active_runs=1,
    default_args=default_args,
    tags=["nba", "loader", "duckdb"],
    params={"season": "2024-25"},
) as dag:

    # ─── sensors (one per upstream DAG) ────────────────────────────────
    sensor_args = dict(
        poke_interval=300,
        mode="reschedule",   # avoids tying up a worker slot
    )
    wait_api = ExternalTaskSensor(
        task_id="wait_api_ingest",
        external_dag_id="nba_api_ingest",
        external_task_id="scrape_incremental_data",
        timeout=3600,
        **sensor_args,
    )
    wait_adv = ExternalTaskSensor(
        task_id="wait_advanced_ingest",
        external_dag_id="nba_advanced_ingest",
        external_task_id="scrape_advanced_metrics",
        timeout=3600,
        **sensor_args,
    )
    wait_injury = ExternalTaskSensor(
        task_id="wait_injury_etl",
        external_dag_id="injury_etl",
        external_task_id="process_injury_data",
        timeout=7200,
        poke_interval=600,
        mode="reschedule",
    )

    # ─── loader task ───────────────────────────────────────────────────
    def load_to_duckdb(**ctx):
        season = ctx["params"]["season"]
        # ▶ use centralized DATA_DIR for everything
        db_path = DATA_DIR / "nba_stats.duckdb"
        con = duckdb.connect(db_path)

        sources = {
            f"player_{season}": NBA_BASE_DATA_DIR / f"season={season}/part.parquet",
            f"advanced_{season}": DATA_DIR / f"new_processed/advanced_metrics/advanced_{season}.parquet",
            "injury_master": INJURY_DIR / "injury_master.parquet",
        }

        for alias, path in sources.items():
            if path.exists():
                if alias.startswith("player"):
                    df = pd.read_parquet(path)
                    validate_data(df, name=alias, save_reports=True)
                con.execute(
                    f"CREATE OR REPLACE TABLE {alias.replace('-', '_')} AS "
                    f"SELECT * FROM read_parquet('{path}')"
                )

        # materialised view – wildcard parquet scan is fine too
        con.execute(f"""
            CREATE OR REPLACE VIEW v_player_full_{season.replace('-', '_')} AS
            SELECT *
            FROM player_{season.replace('-', '_')} p
            LEFT JOIN advanced_{season.replace('-', '_')} a USING(player, season)
            LEFT JOIN injury_master i USING(player, season)
        """)
        con.close()

    loader = PythonOperator(
        task_id="validate_and_load",
        python_callable=load_to_duckdb,
    )

    [wait_api, wait_adv, wait_injury] >> loader 


Overwriting api/src/airflow_project/dags/nba_data_loader.py
