# Complete Learning Guide — ATSAF Course Materials

## Overview

This directory contains comprehensive learning documentation for the ATSAF (Automated Time Series Forecasting) project. All materials are aligned with the actual codebase implementation, verified against source files, and include runnable examples, checkpoints, and exercises.

## Structure

Each chapter markdown follows a consistent template:

1. **Outcomes** — What you can do after completing the chapter
2. **Concepts** — Plain English explanations of key terms
3. **Architecture** — What inputs/outputs/invariants define the system
4. **Files Touched** — Which source files to read and understand
5. **Step-by-Step Walkthrough** — Runnable examples with expected outputs
6. **Metrics & Success Criteria** — How to know you've succeeded
7. **Pitfalls** — Common mistakes and how to avoid them
8. **Mini-Checkpoint** — Self-test questions
9. **Exercises** — Optional hands-on practice (Easy/Medium/Hard)

---

## Chapters

### [Chapter 0: Time Series Objects & Contracts (Python)](chapter0_time_series_objects.md)

**Timeframe**: ~30-45 minutes to read and run

**What you'll learn:**
- How Python represents time-series objects (Series, DatetimeIndex)
- The `unique_id, ds, y` data contract and why StatsForecast depends on it
- How to normalize timestamps to UTC and detect DST edge cases
- How to validate time-series integrity before modeling

**Key files:**
- `src/chapter0/objects.py` (Chapter 0 helpers)
- `src/chapter1/validate.py` (validate_time_index)
- `src/chapter1/eia_data_simple.py` (prepare_for_forecasting, validate_time_series_integrity)
- `src/chapter3/tasks.py` (compute_time_series_integrity)

**Success criteria:**
- Can explain ts vs tsibble vs timetk in Python terms
- Can build and validate a forecasting-ready table
- Can spot timezone/DST risks before modeling

---

### [Chapter 1: Data Ingestion & Preparation](chapter1_ingestion.md)

**Timeframe**: ~2-4 hours to read, understand, and run

**What you'll learn:**
- How to fetch data from the EIA API with pagination and error handling
- How to normalize data to UTC and validate time-series integrity
- Why DST transitions break naive assumptions and how to detect them
- How to transform raw data into forecasting-ready format (unique_id, ds, y)

**Key files:**
- `src/chapter1/eia_data_simple.py` (main orchestrator)
- `src/chapter1/ingest.py`, `prepare.py`, `validate.py` (helper modules)

**Success criteria:**
- Can pull raw data, validate it, and format for forecasting
- Understand time-series integrity checks (duplicates, missing hours, DST)
- Can explain why UTC normalization is non-negotiable

---

### [Chapter 2: Experimentation & Backtesting](chapter2_experimentation.md)

**Timeframe**: ~3-5 hours to read, understand, and experiment

**What you'll learn:**
- How to design rolling-origin cross-validation for time-series
- How to build a leaderboard comparing multiple models
- When metrics like MAPE fail and what to use instead (RMSE, MASE)
- How to calibrate prediction intervals and interpret coverage

**Key files:**
- `src/chapter2/backtesting.py` (rolling/expanding window strategies)
- `src/chapter2/models.py` (model implementations: ARIMA, Prophet, XGBoost)
- `src/chapter2/training.py` (orchestrate CV across splits)
- `src/chapter2/evaluation.py` (compute metrics, rank models)

**Success criteria:**
- Can run CV and reproduce a leaderboard
- Understand temporal leakage and why it's prevented
- Can explain MAPE pitfalls and recommend alternatives

---

### [Chapter 3: Orchestration & Pipeline DAG](chapter3_orchestration.md)

**Timeframe**: ~2-3 hours to read, understand, and deploy

**What you'll learn:**
- How to decompose a forecasting workflow into independent, rerunnable tasks
- Why idempotency matters and how atomic writes prevent corruption
- How to build a DAG (Directed Acyclic Graph) of dependencies
- How to deploy to Airflow for automated scheduling

**Key files:**
- `src/chapter3/tasks.py` (6 tasks: ingest, prepare, validate, train, register, forecast)
- `src/chapter3/dag_builder.py` (Airflow DAG definition)
- `src/chapter3/cli.py` (Typer-based CLI for manual runs)

**Success criteria:**
- Can run pipeline end-to-end from CLI
- Understand why tasks are linear (no branching, yet)
- Can verify idempotency: re-running produces same outputs

---

### [Chapter 4: Monitoring, Drift Detection & Alerts](chapter4_monitoring.md)

**Timeframe**: ~4-6 hours to read, understand, and implement

**What you'll learn:**
- How to persist forecasts in a queryable database
- How to score forecasts against actuals (as they arrive)
- How to detect model drift using statistical thresholds
- How to monitor data freshness, completeness, and forecast staleness
- When and how to trigger alerts

**Key files:**
- `src/chapter4/db.py` (SQLite schema and CRUD)
- `src/chapter4/forecast_store.py` (persist wide → long format)
- `src/chapter4/scoring.py` (join forecasts with actuals)
- `src/chapter4/drift.py` (threshold-based drift detection)
- `src/chapter4/alerts.py` (alert configuration and checking)
- `src/chapter4/health.py` (data freshness/completeness checks)

**Success criteria:**
- Can persist forecasts and score them
- Understand drift thresholds (mean ± k*std from backtest)
- Can interpret alerts and decide when to retrain

---

## Quick Start

### 1) Run the full pipeline
```bash
cd c:\docker_projects\atsaf
python -m src.chapter3.cli run \
  --start-date 2023-06-01 \
  --end-date 2023-09-30 \
  --horizon 24
```

### 2) Read the relevant chapter
- New to the objects and contracts? → Read [Chapter 0](chapter0_time_series_objects.md)
- Just ingested data? → Read [Chapter 1](chapter1_ingestion.md)
- Training models? → Read [Chapter 2](chapter2_experimentation.md)
- Running pipelines? → Read [Chapter 3](chapter3_orchestration.md)
- Monitoring forecasts? → Read [Chapter 4](chapter4_monitoring.md)

### 3) Run the chapter's walkthrough
Each chapter has a "Step-by-Step Walkthrough" section with copy-paste examples.

### 4) Attempt the mini-checkpoint
Self-test at the end of each chapter to verify understanding.

### 5) (Optional) Attempt exercises
Exercises range from easy (data exploration) to hard (refactoring for new requirements).

---

## Alignment with Actual Code

### What Changed From Initial Plan

The plan initially suggested implementing MLForecast + conformal prediction intervals, but **the actual codebase uses StatsForecast with model-based intervals**. All learning materials have been rewritten to match the real implementation.

Similarly, Chapter 3 was described as having "branching logic" for no-new-data / validation failures, but the **actual pipeline is linear** (strict sequence). All examples reflect this reality.

### Code Fixes Applied

To ensure learning materials match code exactly, the following issues were corrected:

1. **validate_clean() key mismatch** (Chapter 3, tasks.py)
   - Error message was referencing non-existent keys (`duplicate_count`, `missing_hours_total`)
   - **Fixed**: Changed to use actual keys from `compute_time_series_integrity()` (`duplicate_pairs`, `missing_hours`)

2. **Hardcoded confidence level "95"** (Chapter 1, eia_data_simple.py)
   - `evaluate_forecast()` accepted `confidence_level` parameter but then hardcoded 95 when looking up `{model}-lo-95` / `{model}-hi-95` columns
   - **Fixed**: Parameterized to use `confidence_level` variable (lines 913-914, 933-934)

3. **MAPE with zeros warning** (Chapter 1, eia_data_simple.py)
   - MAPE metric can explode or become infinite when y ≈ 0
   - **Added**: Warning in docstring recommending RMSE/MAE/MASE instead
   - Also documented in Chapter 2 pitfalls section

---

## Design Principles Reflected in Materials

### 1) Verifiability
- Every claim about the code is traceable to actual source files
- Every example is runnable (not pseudo-code)
- Success criteria are observable (not vague)

### 2) Fail-Loud Pattern
- Code raises clear errors when assumptions are violated (e.g., validate_clean() raises on integrity failure)
- Learning materials emphasize *why* this matters (prevents bad data from reaching training)

### 3) Idempotency & Atomicity
- Tasks can be re-run and produce same outputs
- Writes are atomic: all-or-nothing (not partial files left behind)
- Learning materials emphasize *when* to exploit this and *how* to verify it works

### 4) Metrics Over Promises
- Primary metrics clearly identified (RMSE > MAPE, MASE for drift detection)
- Secondary metrics explained with context (why MAPE fails, why coverage matters)
- Thresholds are data-driven (from backtest, not magic numbers)

---

## Common Learning Paths

### Path 1: "I just want to run forecasts"
1. Read Chapter 0 (10 min) — understand objects + contract
2. Read Chapter 1 (15 min) — understand data
3. Skim Chapter 2 (10 min) — know the model exists
4. Read Chapter 3 (20 min) — run the pipeline
5. **Done!**

### Path 2: "I want to understand the full system"
1. Read Chapter 0 (30 min) + run walkthrough (15 min)
2. Read Chapter 1 (1 hour) + run walkthrough (30 min)
3. Read Chapter 2 (1.5 hours) + run walkthrough (1 hour) + attempt exercises
4. Read Chapter 3 (1 hour) + run walkthrough (30 min)
5. Read Chapter 4 (1.5 hours) + run walkthrough (1 hour)
6. **Total: ~9 hours**

### Path 3: "I want to modify the system"
1. Do Path 2 above
2. For each chapter you want to modify, read "Pitfalls" section carefully
3. Read the actual source code (not just examples)
4. Make changes incrementally, verify idempotency at each step
5. Update this learning guide if you change architecture

---

## FAQ

### Q: Why is Chapter X different from the course plan?
**A:** The course plan described one implementation (MLForecast + branching DAG + conformal intervals), but your actual codebase uses StatsForecast + linear DAG + model-based intervals. Learning materials reflect what you *actually* built, not the original plan.

### Q: How do I know if I've mastered a chapter?
**A:** Complete the "Mini-Checkpoint" questions without looking at answers. If you can explain all 4 questions clearly, you've got it.

### Q: Can I just read the code without the learning guides?
**A:** Yes, but you'll spend more time and miss important context. The guides are condensed versions of "what matters" and "why." The code has all the details; the guides have all the insight.

### Q: Why does the MAPE warning say it's undefined at zero?
**A:** MAPE = sum(|error| / |actual|). When actual ≈ 0, the denominator is tiny → ratio explodes. This happens with solar (night = 0), wind (calm = 0), or any fuel with downtime. RMSE/MAE don't have this problem.

### Q: What should I do if my forecasts drift?
**A:** (1) Re-run Chapter 4 "detect_drift()" to confirm threshold breached. (2) Re-ingest recent data (Chapter 1) to check for data quality issues. (3) Re-train (Chapter 2, 3) with recent data. (4) Check for external events (holidays, policy changes, weather patterns).

---

## Next Steps

### Immediate
- [ ] Run full pipeline from Chapter 3 CLI
- [ ] Read Chapter 0 (objects + data contract)
- [ ] Read Chapter 1 (data understanding)
- [ ] Read Chapter 3 (pipeline orchestration)

### Short-term (1-2 weeks)
- [ ] Complete Chapter 2 walkthrough + exercises
- [ ] Deploy to Airflow (Chapter 3, optional)
- [ ] Set up monitoring alerts (Chapter 4, optional)

### Medium-term (1-3 months)
- [ ] Modify pipeline for different respondent/fuel types
- [ ] Implement email/Slack alerts (Chapter 4)
- [ ] Build dashboard (query Chapter 4 database)
- [ ] Retrain on drift (orchestrate Chapter 3 → 4 feedback loop)

### Long-term
- [ ] Implement branching logic (no-new-data → skip training)
- [ ] Add conformal prediction intervals (Chapter 2 extension)
- [ ] Multi-model ensemble (Chapter 2 extension)
- [ ] Automated hyperparameter tuning (Chapter 2 extension)

---

## Feedback & Improvements

These learning materials are **living documentation**. If you find:
- An example that doesn't run
- A concept that's unclear
- A pitfall not mentioned
- A code issue not listed above

Please update the relevant chapter markdown. **Keep materials honest**: if it's not in the code, don't claim it in the guide.

---

**Last updated:** January 11, 2026
**Alignment verified against:** src/chapter[1-4]/, latest commits


In [11]:
import os 

# get current working directory
cwd = os.getcwd()
print("Current Working Directory:", cwd)

# change to c:\docker_projects\atsaf if not in currently
target_dir = r'c:\docker_projects\atsaf'
if cwd.lower() != target_dir.lower():
    os.chdir(target_dir)
    print("Changed Directory to:", target_dir)

Current Working Directory: c:\docker_projects\atsaf


# Chapter 0 - Time Series Objects & Contracts (Python)

## Outcomes (what I can do after this)

- [ ] I can explain the Python equivalents of R ts / tsibble / timetk
- [ ] I can create a forecasting-ready DataFrame with columns unique_id, ds, y
- [ ] I can normalize timestamps to UTC and reason about DST edge cases
- [ ] I can validate time-series integrity before modeling
- [ ] I can add basic time-based features without leakage

## Concepts (plain English)

- **Datetime vs Timestamp**: Python `datetime` is the standard type; pandas wraps it as `Timestamp`
- **Timezone-aware vs naive**: naive timestamps have no timezone; always normalize to UTC
- **ts (R)**: a single series with a time index -> `pd.Series` with a `DatetimeIndex`
- **tsibble (R)**: tidy time-series table -> DataFrame with `unique_id, ds, y`
- **timetk (R)**: time-based feature helpers -> pandas `.dt`, `shift`, `rolling`, `expanding`
- **Contract**: invariants that every downstream stage assumes (sorted, unique, regular)

## R to Python mapping (where it fits)

- **ts** -> `pd.Series` with `DatetimeIndex` (single series; good for quick checks)
- **tsibble** -> long DataFrame `unique_id, ds, y` (StatsForecast and pipeline contract)
- **timetk** -> pandas feature engineering helpers + optional `statsmodels` diagnostics
- **Validation layer** -> `validate_time_series_integrity()` / `validate_time_index()`

## Architecture (what we're building)

### Inputs
- Single series or tidy table from any source (API, CSV, Parquet)
- Minimum fields: timestamps + values; plus series id for multi-series

### Outputs
- Forecasting-ready DataFrame with columns `[unique_id, ds, y]` (UTC, hourly)

### Invariants (must always hold)
- `ds` is normalized to UTC (timezone-naive in StatsForecast pipeline)
- No duplicate `(unique_id, ds)` pairs
- Regular hourly frequency with no gaps (unless explicitly allowed)
- Data sorted by `unique_id, ds`
- No leakage: features are built from past data only

### Failure modes
- Naive local timestamps (no timezone) -> DST ambiguity
- DST fall-back duplicates (same hour twice) -> duplicate pairs
- Missing hours -> gaps break backtesting
- Mixed frequencies across series -> invalid model input

## Files touched

- **`src/chapter0/objects.py`** - Chapter 0 helpers (ts, tsibble, timetk-style)
- **`src/chapter1/prepare.py`** - timezone parsing and normalization
- **`src/chapter1/eia_data_simple.py`** - `prepare_for_forecasting()`, `validate_time_series_integrity()`
- **`src/chapter1/validate.py`** - `validate_time_index()` and report printer
- **`src/chapter3/tasks.py`** - `compute_time_series_integrity()` gate in pipeline

## Step-by-step walkthrough

### 1) Create a single-series "ts" object
```python
import pandas as pd

idx = pd.date_range("2024-01-01", periods=6, freq="h", tz="UTC")
y = pd.Series([100, 102, 98, 101, 103, 99], index=idx)
print(type(y), y.index.tz)
```
- **Expect**: pandas Series with UTC-aware index

### 2) Convert to a "tsibble" style table
```python
df = y.reset_index()
df.columns = ["ds", "y"]
df["ds"] = pd.to_datetime(df["ds"], utc=True).dt.tz_localize(None)
df["unique_id"] = "NG_US48"
df = df[["unique_id", "ds", "y"]]
print(df.head())
```
- **Expect**: columns `[unique_id, ds, y]` with timezone-naive UTC timestamps

### 3) Validate the time-series contract
```python
from src.chapter1.validate import validate_time_index, print_validation_report

report = validate_time_index(df)
print_validation_report(report)
```
- **Expect**: PASS, no duplicates, no missing hours

### 4) Add timetk-style features (safe, leakage-free)
```python
df_features = df.assign(
    hour=df["ds"].dt.hour,
    dayofweek=df["ds"].dt.dayofweek,
    y_lag1=df["y"].shift(1),
    y_roll24=df["y"].rolling(24, min_periods=1).mean()
)
print(df_features.head())
```
- **Expect**: new columns; note the first row has NaNs for lagged values

## Metrics & success criteria

- `validate_time_index(df).is_valid` is True
- `df["ds"].dt.tz` is None (timezone-naive UTC)
- No duplicate pairs or missing hours

## Pitfalls (things that commonly break)

1. **Naive timestamps**: missing timezone info hides DST issues
2. **DST transitions**: fall-back creates duplicates, spring-forward creates gaps
3. **Wide format**: multiple value columns; StatsForecast expects long format
4. **Sorting**: unsorted `ds` breaks backtesting and feature alignment
5. **Leakage**: rolling/lag features must use past values only

## Mini-checkpoint (prove you learned it)

Answer these questions:

1. **What is the Python equivalent of an R `ts` object?**
2. **Why does the pipeline require `unique_id, ds, y` even for a single series?**
3. **What two DST problems does the integrity check catch?**
4. **Which functions enforce the data contract in this repo?**

**Answers:**
1. A `pd.Series` with a `DatetimeIndex`.
2. StatsForecast is multi-series-first; the contract stays consistent across one or many series.
3. Fall-back duplicates and spring-forward missing hours.
4. `validate_time_index()` and `validate_time_series_integrity()` / `compute_time_series_integrity()`.

## Exercises (optional, but recommended)

### Easy
1. Build a 48-hour series and verify `validate_time_index()` passes.
2. Introduce one duplicate timestamp and confirm the report fails.

### Medium
1. Create a series in `US/Eastern`, convert to UTC, and confirm `ds` is UTC.
2. Introduce a 3-hour gap and compute how many missing hours are reported.

### Hard
1. Write a helper that enforces the `[unique_id, ds, y]` contract and call it before backtesting.
2. Add a safe holiday or calendar feature using only `ds` (no future leakage).
3. Extend the validation to allow missing hours within a known maintenance window.


In [None]:
%%writefile src/chapter0/__init__.py
# file: src/chapter0/__init__.py
"""
Chapter 0: Time Series Objects and Contracts (Python)
"""

from .objects import (
    add_time_features,
    assert_tsibble_contract,
    normalize_to_utc,
    to_ts_series,
    to_tsibble,
    validate_tsibble,
)

__all__ = [
    "add_time_features",
    "assert_tsibble_contract",
    "normalize_to_utc",
    "to_ts_series",
    "to_tsibble",
    "validate_tsibble",
]


In [None]:
%%writefile src/chapter0/objects.py
# file: src/chapter0/objects.py
"""
Chapter 0: Python equivalents for ts / tsibble / timetk concepts.
"""

from __future__ import annotations

from typing import Iterable, Tuple

import pandas as pd

from src.chapter1.validate import validate_time_index


def normalize_to_utc(df: pd.DataFrame, ds_col: str = "ds") -> pd.DataFrame:
    """
    Normalize a datetime column to timezone-naive UTC.

    StatsForecast expects timezone-naive UTC timestamps.
    """
    if ds_col not in df.columns:
        raise ValueError(f"Missing required datetime column: {ds_col}")

    normalized = df.copy()
    normalized[ds_col] = (
        pd.to_datetime(normalized[ds_col], errors="raise", utc=True)
        .dt.tz_localize(None)
    )
    return normalized


def to_ts_series(
    df: pd.DataFrame,
    ds_col: str = "ds",
    y_col: str = "y",
) -> pd.Series:
    """
    Create a single-series ts object: pd.Series with a DatetimeIndex (UTC-aware).
    """
    if ds_col not in df.columns or y_col not in df.columns:
        raise ValueError(f"Expected columns: {ds_col}, {y_col}")

    ds = pd.to_datetime(df[ds_col], errors="raise", utc=True)
    series = pd.Series(df[y_col].to_numpy(), index=ds)
    series.index = series.index.tz_convert("UTC")
    return series


def to_tsibble(
    df: pd.DataFrame,
    unique_id_col: str = "unique_id",
    ds_col: str = "ds",
    y_col: str = "y",
) -> pd.DataFrame:
    """
    Create a tidy time-series table with columns [unique_id, ds, y].
    """
    missing = [col for col in (unique_id_col, ds_col, y_col) if col not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

    tidy = df[[unique_id_col, ds_col, y_col]].copy()
    tidy.columns = ["unique_id", "ds", "y"]
    tidy = normalize_to_utc(tidy, ds_col="ds")
    return tidy


def validate_tsibble(df: pd.DataFrame) -> Tuple[bool, str]:
    """
    Validate the tsibble contract using the Chapter 1 integrity check.
    """
    result = validate_time_index(df)
    if result.is_valid:
        return True, "valid"
    return False, "invalid"


def assert_tsibble_contract(df: pd.DataFrame) -> None:
    """
    Raise a ValueError if the tsibble contract is violated.
    """
    result = validate_time_index(df)
    if not result.is_valid:
        raise ValueError(
            f"Invalid tsibble: duplicates={result.n_duplicates}, "
            f"missing_hours={result.n_missing_hours}, "
            f"monotonic={result.is_monotonic}"
        )


def add_time_features(
    df: pd.DataFrame,
    ds_col: str = "ds",
    y_col: str = "y",
    lags: Iterable[int] = (1, 24, 168),
    rolling_windows: Iterable[int] = (24, 168),
) -> pd.DataFrame:
    """
    Basic timetk-style features using pandas (safe, past-only).
    """
    features = df.copy()
    features = normalize_to_utc(features, ds_col=ds_col)
    features = features.sort_values(ds_col).reset_index(drop=True)

    features["hour"] = features[ds_col].dt.hour
    features["dayofweek"] = features[ds_col].dt.dayofweek
    features["month"] = features[ds_col].dt.month

    for lag in lags:
        features[f"y_lag_{lag}"] = features[y_col].shift(lag)

    shifted = features[y_col].shift(1)
    for window in rolling_windows:
        features[f"y_roll_mean_{window}"] = shifted.rolling(window, min_periods=1).mean()

    return features


# Chapter 1 — Data Ingestion & Preparation

## Outcomes (what I can do after this)

- [ ] I can pull raw EIA electricity data via REST API with proper pagination and error handling
- [ ] I can validate time-series data for duplicates, missing hours, and DST edge cases
- [ ] I can transform raw data into forecasting-ready format (unique_id, ds, y)
- [ ] I can explain why UTC normalization and data sorting matter for backtesting

## Prerequisite (read first)

- Chapter 0 for the time-series contract (unique_id, ds, y) and UTC rules

## Concepts (plain English)

- **API Pagination**: Splitting large datasets into fixed-size chunks to avoid timeouts
- **Datetime Normalization**: Converting all timestamps to UTC and handling DST transitions
- **Time-series Integrity**: Detecting duplicates, missing hours, and repeated timestamps (DST fall-back)
- **Monotonicity**: Ensuring timestamps are sorted chronologically (required for backtesting)
- **Data Schema**: unique_id (series identifier), ds (datetime), y (numeric value); tsibble analog in Python

## Architecture (what we're building)

### Inputs
- EIA API credentials (via environment variable `EIA_API_KEY`)
- Data range (start_date, end_date as YYYY-MM-DD strings)
- Series identifier (respondent, fueltype)

### Outputs
- **raw.parquet**: Unmodified API response (columns: time, value, respondent, fueltype)
- **clean.parquet**: Normalized data (columns: unique_id, ds, y, with UTC timestamps)
- **metadata.json**: Data snapshot (row count, date range, integrity report)

### Invariants (must always hold)
- Timestamps must be monotonically increasing (no gaps, no duplicates, no reversals)
- All values must be numeric and non-negative (electricity generation)
- All timestamps must be in UTC (no local timezones)
- No data loss: raw.parquet row count ≥ clean.parquet row count

### Failure modes
- API unavailable → retries up to 3 times, then raises RuntimeError
- DST transition creates duplicate hour → integrity check catches it, raises ValueError
- Missing hours in sequence → detected and reported, task fails if threshold exceeded
- Non-numeric values → converted during prepare, logged as NaN if conversion fails

## Files touched

- **`src/chapter1/eia_data_simple.py`** (1,773 lines) — Main orchestrator (see Methods section)
  - **`EIADataFetcher`** class: Pull, prepare, validate, format for forecasting
  - **`ExperimentConfig`**: Defines horizon, windows, models for downstream chapters
- **`src/chapter1/ingest.py`** — Paginated API calls with stable sort
- **`src/chapter1/prepare.py`** — Datetime parsing and numeric conversion
- **`src/chapter1/validate.py`** — Time-series integrity checks (duplicates, missing hours, DST)
- **`src/chapter1/config.py`** — Settings (API key, date ranges, respondent/fuel types)

## Step-by-step walkthrough

### 1) Initialize the fetcher
```python
from src.chapter1.eia_data_simple import EIADataFetcher
import os

api_key = os.getenv("EIA_API_KEY")
fetcher = EIADataFetcher(api_key)
```
- **Expect**: No errors. Fetcher is ready to pull data.
- **If it fails**: Check that `.env` file exists and contains `EIA_API_KEY=<your_key>`

### 2) Pull raw data
```python
df_raw = fetcher.pull_data(
    start_date="2023-06-01",
    end_date="2023-06-30",
    respondent="NG_US48",
    fueltype="NG"
)
print(f"Raw rows: {len(df_raw)}, Columns: {df_raw.columns.tolist()}")
```
- **Expect**:
  - ~720 rows (30 days × 24 hours)
  - Columns: `['time', 'value', 'respondent', 'fueltype']`
  - time is string (e.g., "2023-06-01T00:00:00-04:00")
  - value is string (e.g., "1234.56")
- **If it fails**:
  - "API Error": Check API credentials and network connectivity
  - "Empty response": Check date range and respondent/fueltype exist in EIA

### 3) Prepare (normalize) data
```python
df_prepared = fetcher.prepare_data(df_raw)
print(df_prepared.head())
print(f"Data types: {df_prepared.dtypes.to_dict()}")
```
- **Expect**:
  - Columns: `['time', 'value', 'respondent', 'fueltype']`
  - `time` is `datetime64[ns, UTC]` (pandas Timestamp)
  - `value` is `float64`
  - No NaN values
- **If it fails**:
  - Conversion error: Check for non-numeric values in API response (may indicate API schema change)

### 4) Validate time-series integrity
```python
is_valid = fetcher.validate_data(df_prepared)
print(f"Basic validation: {is_valid}")

integrity = fetcher.validate_time_series_integrity(df_prepared, unique_id="respondent")
print(f"Integrity status: {integrity['status']}")
print(f"Duplicate pairs: {integrity.get('duplicate_count', 0)}")
print(f"Missing hours: {integrity.get('missing_hours_total', 0)}")
```
- **Expect**:
  - `is_valid` = True
  - `integrity['status']` = "valid"
  - Duplicate and missing hour counts = 0 (or acceptable based on thresholds)
- **If it fails**:
  - "Duplicates detected": DST fall-back (1 hour repeats). Check integrity['duplicate_pairs']
  - "Missing hours": Gap in data (e.g., server downtime). Check `integrity['missing_gaps_count']` and `longest_gap_hours`

### 5) Format for forecasting
```python
df_forecast = fetcher.prepare_for_forecasting(df_prepared, unique_id="respondent")
print(df_forecast.columns.tolist())
print(df_forecast.head())
```
- **Expect**:
  - Columns: `['unique_id', 'ds', 'y']`
  - `unique_id` = "NG_US48" (constant)
  - `ds` = datetime (UTC, hourly, no gaps)
  - `y` = numeric value (energy generation)
- **If it fails**: Time-series integrity failed (duplicates, gaps). Re-run step 4 to diagnose.

### 6) End-to-end test
```python
df_full = fetcher.full_pipeline(
    start_date="2023-06-01",
    end_date="2023-06-30",
    unique_id_col="respondent"
)
print(f"Output rows: {len(df_full)}, Columns: {df_full.columns.tolist()}")
```
- **Expect**: Same as step 5 (columns: unique_id, ds, y)
- **If it fails**: Check logs for which step failed (pull, prepare, validate, or format)

## Metrics & success criteria

### Primary metric
- **Integrity status**: All records pass validation (no duplicates, no missing hours)

### Secondary metrics
- **Data freshness**: Last record within 24 hours of run time
- **Row count consistency**: Raw ≥ Clean (loss should be minimal, <5%)
- **Timezone correctness**: All timestamps are UTC

### "Good enough" threshold
- No duplicates (count = 0)
- No missing hours within the date range
- All values numeric and ≥ 0

### What would make me re-ingest / re-validate
- API returns unexpected schema (missing columns, wrong data type)
- >5% row loss during prepare (indicates conversion failures)
- New duplicates appear (suggests API bug or DST change)

## Pitfalls (things that commonly break)

1. **Timezone confusion**:
   - API returns local time with offset (e.g., "-04:00"), but we convert to UTC
   - If you skip UTC normalization, backtesting will be incorrect

2. **DST transitions** (US Eastern Time, US Central Time):
   - Fall-back (Nov, 1-2 AM repeats) → creates duplicate rows → integrity check catches it
   - Spring-forward (Mar, 2-3 AM is skipped) → missing row → detected as gap
   - Always run integrity check; don't assume API data is DST-clean

3. **Empty or sparse data**:
   - Some respondent/fueltype combos may have gaps (e.g., solar only during day)
   - Prepare handles NaN → fill_value; validate reports gaps
   - Don't force forecasting on sparse series without understanding why

4. **API pagination edge case**:
   - If dataset is exactly a multiple of page size, last page may appear empty
   - Code has stable sort to avoid this, but monitor for off-by-one errors

5. **Numeric conversion failing silently**:
   - If a few rows have non-numeric values, `pd.to_numeric(..., errors='coerce')` sets them to NaN
   - Validate always checks for NaN; if any appear, investigate API response

## Mini-checkpoint (prove you learned it)

Answer these questions:

1. **Explain the purpose of UTC normalization**. Why can't we just use the API's local timezone?
2. **What is a DST edge case** and how does our code detect it?
3. **What are the three checks in `validate_time_series_integrity()`**? (duplicates, missing hours, …)
4. **Why do we create a separate `prepare_for_forecasting()` step** instead of just using the raw API columns?

**Answers:**
1. Backtesting assumes monotonic, non-overlapping timestamps. Local timezones have repeated/missing hours during DST transitions. UTC eliminates this ambiguity.
2. DST fall-back repeats 1 hour (e.g., 1:30 AM occurs twice). Our integrity check detects duplicate (unique_id, ds) pairs and reports them.
3. Duplicates (same timestamp twice), missing hours (gaps in sequence), DST repeated hours (detected by duplicate).
4. StatsForecast expects schema `(unique_id, ds, y)` with no extra columns. Transforming here makes the interface explicit and validates assumptions.

## Exercises (optional, but recommended)

### Easy
1. Pull data for a different respondent (e.g., "NG_CA1") and verify the row count matches your date range.
2. Change start_date to a DST transition date (e.g., "2023-11-05") and inspect the integrity report.

### Medium
1. Intentionally corrupt one row (e.g., set a timestamp to 12:30 instead of 12:00) and run integrity check. What gets reported?
2. Pull a 1-year dataset and plot y vs ds. Look for gaps. Explain what you see (summer maintenance? solar seasonal?).

### Hard
1. Modify `validate_time_series_integrity()` to allow *up to* 2 duplicate pairs and *up to* 3 missing hours. Test that the threshold works correctly.
2. Write a function that estimates "optimal fill strategy" for missing hours: forward-fill vs interpolation vs skip. Test on a 3-month dataset with known gaps.


# Chapter 2 — Experimentation & Backtesting

## Outcomes (what I can do after this)

- [ ] I can set up and run rolling-origin cross-validation on time-series data
- [ ] I can build a leaderboard comparing multiple models and pick a champion
- [ ] I can explain why RMSE is primary and when metrics like MAPE become unreliable
- [ ] I can interpret prediction interval coverage and assess whether intervals are well-calibrated
- [ ] I can explain the difference between rolling-window and expanding-window backtesting strategies

## Prerequisite (read first)

- Chapter 0 for the time-series contract (unique_id, ds, y) and UTC rules

## Concepts (plain English)

- **Backtesting (rolling-origin CV)**: Simulating real-world forecasting by repeating train→test splits forward through time. Prevents temporal leakage.
- **Horizon (h)**: How many time steps ahead we forecast (e.g., 24 hours)
- **Step size**: How far forward we move the training/test boundary each iteration (e.g., 168 hours = 1 week)
- **Expanding window**: Training set grows over time; test set is always fixed size
- **Rolling window**: Both train and test windows slide forward together; train size stays constant
- **Leakage**: Using future data to train models that predict the past (always a bug in backtesting)
- **Coverage**: Percentage of actual values that fall inside the model's [lo, hi] prediction interval (should ≈ confidence level)
- **MASE**: Mean Absolute Scaled Error; normalized by seasonal naive baseline; doesn't blow up at zero like MAPE
- **Feature engineering**: Lagged values and time-based features (used by XGBoostModel)

## Where this fits in the learning map (R → Python)

- **Seasonal analysis** → seasonal naive baselines and MSTL models in `src/chapter1/eia_data_simple.py`
- **Correlation analysis** → optional ACF/PACF diagnostics (use statsmodels if needed; not in pipeline by default)
- **Smoothing methods** → `ExponentialSmoothingModel` in `src/chapter2/models.py` and Holt-Winters in the StatsForecast pipeline
- **Decomposition** → MSTL models in `src/chapter1/eia_data_simple.py`
- **Forecasting strategies** → rolling-origin CV in `src/chapter2/backtesting.py` and `EIADataFetcher.cross_validate()`

## Feature engineering (optional ML track)

- The StatsForecast pipeline only uses `unique_id`, `ds`, `y` (no X features).
- `src/chapter2/feature_engineering.py` is for EDA or ML models that consume features.
- For multi-step horizons, avoid leakage by using a forecasting-aware tool (e.g., MLForecast) or recursive feature generation.

## Architecture (what we're building)

### Inputs
- **Forecasting-ready DataFrame**: columns `[unique_id, ds, y]`, UTC timestamps, no gaps/duplicates
- **Backtest config**: horizon (h), n_windows, step_size, confidence level
- **Model registry**: list of model classes to train and evaluate

### Outputs
- **cv_results.parquet**: Wide format from StatsForecast.cross_validation()
  - Columns: unique_id, ds, [model names], [model-lo-95], [model-hi-95]
- **leaderboard.parquet**: Model rankings (see below)
- **Metrics for each model**: RMSE, MAE, MAPE, MASE, Coverage (%)

### Invariants (must always hold)
- No temporal leakage: training set ends strictly before test period begins
- No data shuffling: timestamps always sorted ascending
- Test periods do not overlap (no double-counting of actuals)
- All models trained on identical train/test splits (fair comparison)

### Failure modes
- Sparse data in train window → model.fit() returns all NaN → metrics become NaN → leaderboard shows NaN rank
- Sparse or zero values in test → MAPE undefined or infinite → use MASE/RMSE instead
- Horizon too large → forecasting beyond data → poor coverage and high error
- n_windows too large → leftover data at end not evaluated → incomplete picture

## Files touched

- **`src/chapter1/eia_data_simple.py`**
  - `cross_validate()` method: Runs rolling-origin CV with StatsForecast or traditional models
  - `evaluate_forecast()`: Computes RMSE/MAPE/MASE/coverage on holdout split
  - `register_best_model()`: Logs champion model to MLflow
  - `ExperimentConfig`: Dataclass defining horizon, n_windows, step_size, model list

- **`src/chapter2/backtesting.py`**
  - `RollingWindowBacktest` class: Fixed-size training window slides forward
  - `ExpandingWindowBacktest` class: Training window grows; test set fixed size
  - `BacktestingStrategy` interface: Routes to above based on config

- **`src/chapter2/models.py`**
  - `ForecastModel` (abstract base): fit(), predict(), get_name()
  - Concrete implementations:
    - `ExponentialSmoothingModel`: Simple trend-based smoothing (baseline)
    - `ARIMAModel`: StatModels ARIMA with automatic parameter tuning
    - `ProphetModel`: Facebook Prophet with seasonal decomposition
    - `XGBoostModel`: Lagged features → tree ensemble
  - `ModelFactory`: Create models by name string

- **`src/chapter2/feature_engineering.py`**
  - `build_timetk_features()`: Calendar + lag + rolling features (pandas-only)

- **`src/chapter2/training.py`**
  - `TrainingPipeline`: Orchestrates training across backtesting splits
  - For each split: fit all models, generate forecasts, compute metrics
  - Returns: results DataFrame with all metrics

- **`src/chapter2/evaluation.py`**
  - `ModelSelector` class: select_best_model(), generate_leaderboard()
  - Metrics computation (NaN-aware): RMSE, MAE, MAPE, MASE, Coverage
  - Ranking by primary metric (default: RMSE)

## Step-by-step walkthrough

### 1) Prepare forecasting-ready data
```python
from src.chapter1.eia_data_simple import EIADataFetcher
import os

api_key = os.getenv("EIA_API_KEY")
fetcher = EIADataFetcher(api_key)

df_clean = fetcher.pull_data(
    start_date="2023-01-01",
    end_date="2023-12-31"
).pipe(fetcher.prepare_data)

df_forecast = fetcher.prepare_for_forecasting(
    df_clean, unique_id="NG_US48"
)
print(f"Rows: {len(df_forecast)}, Columns: {df_forecast.columns.tolist()}")
```
- **Expect**: 8,760 rows (365 days × 24 hours), columns: [unique_id, ds, y]
- **If it fails**: Check Chapter 1 prerequisites (API key, date range, data integrity)

### 2) Define backtest configuration
```python
from src.chapter1.eia_data_simple import ExperimentConfig

config = ExperimentConfig(
    name="baseline_experiment",
    horizon=24,          # Forecast next 24 hours
    n_windows=5,         # 5 train/test splits
    step_size=168,       # Move forward 1 week each time
    confidence_level=95,
    models=["AutoARIMA", "SeasonalNaive", "HoltWinters"],
    metrics=["rmse", "mape", "mase", "coverage"]
)
print(f"Config: {config}")
```
- **Expect**: Config object with all parameters set
- **If it fails**: Check that model names exist in chapter2.models.ModelFactory

### 3) Run cross-validation
```python
cv_results, leaderboard = fetcher.cross_validate(
    df_forecast,
    horizon=config.horizon,
    n_windows=config.n_windows,
    step_size=config.step_size,
    level=[config.confidence_level],
    models_to_train=config.models
)

print("CV Results shape:", cv_results.shape)
print("\nLeaderboard (top 3):")
print(leaderboard.head(3))
```
- **Expect**:
  - `cv_results`: Wide DataFrame with columns like `[unique_id, ds, AutoARIMA, AutoARIMA-lo-95, AutoARIMA-hi-95, ...]`
  - `leaderboard`: Ranked by RMSE (ascending)
    - Columns: model, rmse_mean, rmse_std, mape_mean, mase_mean, coverage_pct, rank
    - Top row = best model (lowest RMSE)
- **If it fails**:
  - NaN in leaderboard: Insufficient data in train window or model failed to converge
  - Memory error: Too many windows or horizon; reduce n_windows or horizon

### 4) Interpret the leaderboard
```python
print(f"Champion model: {leaderboard.iloc[0]['model']}")
print(f"RMSE: {leaderboard.iloc[0]['rmse_mean']:.2f} ± {leaderboard.iloc[0]['rmse_std']:.2f}")
print(f"Coverage: {leaderboard.iloc[0]['coverage_pct']:.1f}%")

# Check if coverage is reasonable (should be near confidence_level)
expected_coverage = 95
actual_coverage = leaderboard.iloc[0]['coverage_pct']
if abs(actual_coverage - expected_coverage) > 5:
    print(f"⚠️  Coverage is {actual_coverage:.1f}%, expected ≈{expected_coverage}%")
```
- **Expect**:
  - Champion RMSE is lowest among all models
  - Coverage ≈ 95% (within ±5% is acceptable)
  - MASE < 1 means better than seasonal naive baseline
- **If it fails**:
  - Coverage >> 95%: Intervals too wide; model is overconfident
  - Coverage << 95%: Intervals too tight; model underestimates uncertainty
  - MASE > 1: Model worse than naive seasonal; may need more tuning or longer training window

### 5) Evaluate on holdout split
```python
# Train on first 80%, test on last 20%
n_test = int(len(df_forecast) * 0.2)
train = df_forecast.iloc[:-n_test]
test = df_forecast.iloc[-n_test:]

# Get champion from leaderboard
champion_model_name = leaderboard.iloc[0]['model']

# Forecast and evaluate
forecast = fetcher.evaluate_forecast(
    train, test,
    model=champion_model_name,
    horizon=config.horizon,
    confidence_level=config.confidence_level
)

print(f"Holdout RMSE: {forecast['rmse']:.2f}")
print(f"Holdout Coverage: {forecast['coverage_pct']:.1f}%")
```
- **Expect**: Metrics on holdout set consistent with CV results (±10% variance is normal)
- **If it fails**: Holdout metrics much worse than CV → possible distribution shift or model overfitting to CV splits

### 6) Register champion in MLflow
```python
model_uri = fetcher.register_best_model(
    leaderboard=leaderboard,
    config=config,
    df_train=df_forecast,
    run_name="experiment_v1"
)

print(f"Model registered: {model_uri}")
```
- **Expect**: MLflow run created with model artifact, metrics logged
- **If it fails**: MLflow server unavailable or config missing; check MLflow setup

## Metrics & success criteria

### Primary metric
- **RMSE** (Root Mean Squared Error): Lower is better. Penalizes large errors heavily. Use as main ranking criterion.

### Secondary metrics
- **MASE** (Mean Absolute Scaled Error): Normalized by seasonal naive; MASE < 1 = better than baseline
- **Coverage %**: Should be ≈ confidence_level (95%); ±5% acceptable
- **MAE**: Absolute error in original units; easier to interpret than RMSE

### "Good enough" threshold
- RMSE < [domain-specific baseline] (depends on magnitude of y; compare to naive)
- MASE < 1 (beating seasonal naive)
- Coverage 90% to 98% (if using 95% confidence level)

### What would make me retrain / change strategy
- RMSE increases >20% vs previous experiment → model degradation; check for data drift
- Coverage < 85% or > 99% → intervals miscalibrated; recalibrate or use different method
- MASE > 1.5 → worse than naive; horizon may be too long or train window too short
- Multiple models have RMSE = NaN → insufficient training data; increase train window or reduce horizon

## Pitfalls (things that commonly break)

1. **MAPE with zero or near-zero values**:
   - MAPE = sum(|y_true - y_pred| / |y_true|) → explodes when y_true ≈ 0
   - Example: Solar generation at night = 0 → MAPE = ∞
   - **Fix**: Always use RMSE/MAE/MASE as primary metrics; report MAPE with caution or only on non-zero subset

2. **Temporal leakage (common mistake)**:
   - If train and test overlap or test comes before train, metrics are meaningless
   - Our code validates no leakage, but if you modify it, this will silently break results
   - **Fix**: Always verify: `cutoff_date < test_start_date`

3. **Confidence level hard-coded**:
   - Currently we hard-code "95" in several places; if you want different confidence level, it may not work
   - **Fix**: Pass confidence_level through all functions consistently

4. **Too many windows with sparse data**:
   - If n_windows is large and data is sparse, training windows may be nearly empty
   - Model fails to fit → returns NaN → leaderboard shows NaN ranks
   - **Fix**: Check that min_train_size ≥ 100 and each window has data; reduce n_windows if needed

5. **Horizon too large**:
   - If h=168 (1 week ahead), but your model only sees 30-day history, uncertainty is huge
   - Coverage will be poor; consider shortening horizon or lengthening training window
   - **Fix**: Plot y vs ds to understand seasonality; set horizon ≤ 1 season

## Mini-checkpoint (prove you learned it)

Answer these:

1. **Explain rolling-origin vs expanding-window backtesting**. When would you use each?
2. **Why does MAPE fail when y=0**? What metric is more robust?
3. **What does "coverage" mean in the context of prediction intervals?**
4. **If leaderboard shows coverage=85% (below 95%), what does it mean and how would you fix it?**

**Answers:**
1. Rolling-origin: train window is fixed size, slides forward (mimics real forecast deployment). Expanding: train grows over time (useful to detect model degradation as data volume increases). Use rolling-origin to match production; use expanding to detect drift.
2. MAPE = |error| / |y|; when y≈0, ratio explodes. Use RMSE (absolute scale) or MASE (normalized by baseline) instead.
3. Coverage is the % of test actuals that fall within [lower, upper] prediction interval. Should ≈ confidence level (95% for 95% interval).
4. Coverage=85% means the interval is too tight (underestimating uncertainty). Model may be overconfident. Expand intervals or recalibrate using conformal prediction.

## Exercises (optional, but recommended)

### Easy
1. Run cross-validation with horizon=12 (instead of 24) and compare RMSE vs horizon=24. Explain the difference.
2. Extract leaderboard for one model and plot its RMSE across the 5 CV windows. Is it stable or does it spike?

### Medium
1. Run cross-validation with step_size=24 (daily) instead of 168 (weekly). How many windows are created? How do metrics change?
2. Identify the bottom-ranked model in leaderboard. Calculate MASE manually for one test window and verify it's > 1.

### Hard
1. Implement a custom metric (e.g., "% of forecasts within 10% of actual") and add it to leaderboard.
2. Run cross-validation for 3 different series (e.g., different respondents or fueltypes) and compare their leaderboards. Which series is easier/harder to forecast? Why?
3. Modify the confidence level to 80% (instead of 95%) and re-run CV. Compare coverage between 80% and 95% intervals. Verify they differ as expected.


In [None]:
%%writefile src/chapter2/feature_engineering.py
# file: src/chapter2/feature_engineering.py
"""
Chapter 2: Simple timetk-style feature helpers (pandas only).

These helpers are not used by the default StatsForecast pipeline. Use them
for EDA or when you add an ML-based forecasting track that explicitly consumes
X features (e.g., MLForecast or sklearn models).
"""

from __future__ import annotations

from typing import Iterable

import pandas as pd


def add_calendar_features(df: pd.DataFrame, ds_col: str = "ds") -> pd.DataFrame:
    """
    Add basic calendar features from a datetime column.
    """
    features = df.copy()
    if ds_col not in features.columns:
        raise ValueError(f"Missing datetime column: {ds_col}")

    ds = pd.to_datetime(features[ds_col], errors="raise")
    features["hour"] = ds.dt.hour
    features["dayofweek"] = ds.dt.dayofweek
    features["dayofyear"] = ds.dt.dayofyear
    features["month"] = ds.dt.month
    features["is_weekend"] = ds.dt.dayofweek >= 5
    return features


def add_lag_features(
    df: pd.DataFrame,
    y_col: str = "y",
    lags: Iterable[int] = (1, 24, 168),
) -> pd.DataFrame:
    """
    Add lag features using past values only.
    """
    features = df.copy()
    if y_col not in features.columns:
        raise ValueError(f"Missing value column: {y_col}")

    for lag in lags:
        features[f"{y_col}_lag_{lag}"] = features[y_col].shift(lag)
    return features


def add_rolling_features(
    df: pd.DataFrame,
    y_col: str = "y",
    windows: Iterable[int] = (24, 168),
) -> pd.DataFrame:
    """
    Add rolling-window mean features with a 1-step shift to avoid leakage.
    """
    features = df.copy()
    if y_col not in features.columns:
        raise ValueError(f"Missing value column: {y_col}")

    shifted = features[y_col].shift(1)
    for window in windows:
        features[f"{y_col}_roll_mean_{window}"] = shifted.rolling(window, min_periods=1).mean()
    return features


def build_timetk_features(
    df: pd.DataFrame,
    ds_col: str = "ds",
    y_col: str = "y",
    lags: Iterable[int] = (1, 24, 168),
    windows: Iterable[int] = (24, 168),
) -> pd.DataFrame:
    """
    Convenience wrapper: calendar + lag + rolling features.
    """
    features = add_calendar_features(df, ds_col=ds_col)
    features = add_lag_features(features, y_col=y_col, lags=lags)
    features = add_rolling_features(features, y_col=y_col, windows=windows)
    return features


In [None]:
%%writefile src/chapter1/eia_data_simple.py
# file: src/chapter1/eia_data_simple.py
"""
EIA Data Fetcher - Step by Step
=================================

A simplified module for pulling EIA electricity generation data
following the same step-by-step approach as the R script.

This module makes it easy to understand each step and test along the way.

Usage:
    from eia_data_simple import EIADataFetcher
    import os

    # Step 1: Initialize
    api_key = os.getenv("EIA_API_KEY")
    fetcher = EIADataFetcher(api_key)

    # Step 2: Pull raw data
    df_raw = fetcher.pull_data(
        start_date="2023-01-01",
        end_date="2024-12-31"
    )

    # Step 3: Inspect data
    print(f"Rows: {len(df_raw)}")
    print(df_raw.head())

    # Step 4: Prepare (convert types, sort, etc.)
    df_prepared = fetcher.prepare_data(df_raw)

    # Step 5: Validate
    is_valid = fetcher.validate_data(df_prepared)

    # Step 6: Get statistics
    stats = fetcher.get_stats(df_prepared)
    print(stats)
"""

import logging
import os
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import pytz
import requests
from dotenv import load_dotenv

# Optional: pydantic-settings for type-safe config
try:
    from pydantic_settings import BaseSettings
    PYDANTIC_AVAILABLE = True
except ImportError:
    PYDANTIC_AVAILABLE = False

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Load environment variables from .env file
load_dotenv()

# Optional: MLflow for experiment tracking
try:
    import mlflow
    from mlflow.models import infer_signature
    MLFLOW_AVAILABLE = True
    logger.info("MLflow is available for experiment tracking")
except ImportError:
    MLFLOW_AVAILABLE = False
    logger.warning("MLflow not available - experiment tracking disabled")


# =============================================================================
# CONFIGURATION CLASSES
# =============================================================================

@dataclass
class ExperimentConfig:
    """
    Configuration for backtesting experiments.

    This defines the "contract" for reproducible experiments:
    - What data range to use
    - How to split for cross-validation
    - Which models and metrics to evaluate

    Example:
        >>> config = ExperimentConfig(
        ...     name="baseline_experiment",
        ...     horizon=24,
        ...     n_windows=5,
        ...     step_size=168,  # Weekly steps
        ... )
    """
    name: str = "default_experiment"
    horizon: int = 24              # Forecast horizon in hours
    n_windows: int = 5             # Number of CV windows
    step_size: int = 168           # Hours between windows (168 = 1 week)
    confidence_level: int = 95     # Prediction interval level
    models: List[str] = field(default_factory=lambda: [
        "SeasonalNaive", "AutoARIMA", "MSTL"
    ])
    metrics: List[str] = field(default_factory=lambda: [
        "rmse", "mape", "mase", "coverage"
    ])


# Pydantic-settings config (if available)
if PYDANTIC_AVAILABLE:
    class Settings(BaseSettings):
        """
        Type-safe configuration using pydantic-settings.

        Automatically reads from environment variables or .env file.
        Validates types and provides defaults.

        Example:
            >>> settings = Settings()
            >>> print(settings.eia_api_key[:8])
        """
        eia_api_key: str
        respondent: str = "US48"
        fueltype: str = "NG"
        start_date: str = "2024-01-01"
        end_date: str = "2024-12-31"

        class Config:
            env_file = ".env"
            env_file_encoding = "utf-8"


class EIADataFetcher:
    """
    Step-by-step EIA data fetcher matching R script workflow.

    Each method represents a step in the data pipeline:
    1. pull_data() - Fetch from API
    2. inspect_data() - View structure
    3. prepare_data() - Clean and convert
    4. validate_data() - Check quality
    5. get_stats() - Calculate summary stats
    """

    def __init__(self, api_key: str):
        """
        Initialize the fetcher with API credentials.

        Args:
            api_key: EIA API key from https://www.eia.gov/opendata/
        """
        self.api_key = api_key
        self.api_url = "https://api.eia.gov/v2/electricity/rto/fuel-type-data/data/"
        logger.info(f"Fetcher initialized (API key length: {len(api_key)})")
        print(f"Step 1: Fetcher initialized (API key length: {len(api_key)})")

    def pull_data(
        self,
        start_date: str = "2023-01-01",
        end_date: str = "2024-12-31",
        respondent: str = "US48",
        fueltype: str = "NG",
        length: int = 5000
    ) -> pd.DataFrame:
        """
        STEP 2: Pull raw data from EIA API with pagination.

        Features:
        - Handles >5000 row datasets with pagination loop
        - Stable sort order (period ascending) for reproducibility
        - Logs pagination details for transparency

        Args:
            start_date: Start date (YYYY-MM-DD)
            end_date: End date (YYYY-MM-DD)
            respondent: Region code (default: US48 = Lower 48 states)
            fueltype: Fuel type code (default: NG = Natural Gas)
            length: Records per request (default: 5000, max allowed)

        Returns:
            DataFrame with raw API response

        Example:
            >>> df_raw = fetcher.pull_data()
            >>> print(f"Retrieved {len(df_raw)} rows")
        """
        print(f"\nStep 2: Pulling data from EIA API...")
        print(f"  Date range: {start_date} to {end_date}")
        print(f"  Respondent: {respondent}")
        print(f"  Fuel type: {fueltype}")

        all_records = []
        offset = 0
        request_count = 0

        try:
            while True:
                # Build API parameters with pagination and STABLE SORT
                # Using sort params ensures pages don't shuffle during pagination
                params = {
                    "api_key": self.api_key,
                    "data[]": "value",
                    "facets[respondent][]": respondent,
                    "facets[fueltype][]": fueltype,
                    "frequency": "hourly",
                    "start": f"{start_date}T00",
                    "end": f"{end_date}T23",
                    "length": length,
                    "offset": offset,
                    # STABLE SORT: Request data in ascending order from the API
                    # This ensures consistent ordering across paginated requests
                    "sort[0][column]": "period",
                    "sort[0][direction]": "asc",
                }

                # Make API request
                logger.info(f"API request: offset={offset}, length={length}")
                response = requests.get(self.api_url, params=params)
                response.raise_for_status()

                # Parse response
                data = response.json()
                records = data["response"]["data"]
                request_count += 1

                logger.debug(f"Request {request_count}: received {len(records)} rows")

                if not records:
                    break  # No more data

                all_records.extend(records)
                offset += length

            if not all_records:
                raise ValueError("No data returned from API")

            # Convert to DataFrame
            # Data is already sorted ascending by period from the API (stable sort params)
            # We still verify sort order here as a safety check
            df = pd.DataFrame(all_records)
            df = df.sort_values("period", ascending=True).reset_index(drop=True)

            print(f"  Sending requests...")
            print(f"  [OK] Retrieved {len(df)} total rows across {request_count} request(s)")
            print(f"  Columns: {', '.join(df.columns.tolist())}")

            logger.info(f"Data pull complete: {len(df)} rows in {request_count} API requests")

            return df

        except Exception as e:
            logger.error(f"Data pull failed: {e}", exc_info=True)
            print(f"  [ERROR] {e}")
            raise

    def inspect_data(self, df: pd.DataFrame) -> None:
        """
        STEP 3: Inspect raw data structure.

        Displays:
        - Data shape
        - Column info
        - First few rows
        - Data types

        Example:
            >>> fetcher.inspect_data(df_raw)
        """
        print(f"\nStep 3: Inspecting data structure...")
        print(f"  Shape: {df.shape[0]} rows x {df.shape[1]} columns")
        print(f"\n  Column info:")
        for col in df.columns:
            dtype = df[col].dtype
            non_null = df[col].notna().sum()
            print(f"    - {col}: {dtype} ({non_null} non-null)")

        print(f"\n  First 3 rows:")
        for i, row in df.head(3).iterrows():
            print(f"    Row {i}: {dict(row)}")



    def prepare_data(self, df: pd.DataFrame, timezone_policy: str = "UTC") -> pd.DataFrame:
        """
        STEP 4: Prepare data (clean, convert, sort).

        Performs:
        - Parse datetime from 'period' field
        - Apply timezone policy (UTC normalization for consistency)
        - Convert 'value' to numeric
        - Sort by datetime
        - Standardize column names

        Args:
            df: Raw DataFrame from pull_data()
            timezone_policy: "UTC" (recommended) - normalize all times to UTC

        Returns:
            Cleaned and prepared DataFrame

        Example:
            >>> df_clean = fetcher.prepare_data(df_raw, timezone_policy="UTC")
        """
        print(f"\nStep 4: Preparing data...")

        df = df.copy()

        # Parse period to datetime (fail-loud on parse errors)
        print(f"  - Parsing period field...")
        try:
            df["period"] = pd.to_datetime(df["period"], errors="raise")
        except (ValueError, TypeError) as e:
            logger.error(f"DateTime parsing failed: {e}")
            logger.error(f"Sample period values: {df['period'].head(10).tolist()}")
            raise ValueError(f"Cannot parse period field as datetime: {e}") from e

        # Apply timezone policy (UTC normalization)
        print(f"  - Applying timezone policy: {timezone_policy}")
        if timezone_policy == "UTC":
            # Assume period is in UTC if no timezone info
            if df["period"].dt.tz is None:
                df["period"] = df["period"].dt.tz_localize("UTC")
            else:
                df["period"] = df["period"].dt.tz_convert("UTC")
            logger.info("Timezone policy: UTC normalization applied")

        # Extract date
        print(f"  - Extracting date...")
        df["date"] = df["period"].dt.date

        # Convert value to numeric (fail-loud on coercion)
        print(f"  - Converting value to numeric...")
        df["value_before_coercion"] = df["value"].copy()  # Keep original for audit
        try:
            df["value"] = pd.to_numeric(df["value"], errors="raise")
        except (ValueError, TypeError) as e:
            # Count non-numeric values and provide detailed error
            non_numeric_rows = df[pd.to_numeric(df["value"], errors="coerce").isna()]
            logger.error(f"Numeric conversion failed: {len(non_numeric_rows)} non-numeric rows")
            logger.error(f"Sample non-numeric values: {non_numeric_rows['value'].head(5).tolist()}")
            raise ValueError(
                f"Cannot convert value column to numeric: {len(non_numeric_rows)} unparseable rows. "
                f"This typically indicates upstream schema changes or data quality issues. "
                f"Sample values: {non_numeric_rows['value'].head(3).tolist()}"
            ) from e

        # Verify no coercions occurred
        if df["value"].isna().sum() > 0:
            coercion_count = df["value"].isna().sum()
            logger.error(f"Coercion produced {coercion_count} NaN values during numeric conversion")
            raise ValueError(
                f"Numeric conversion coerced {coercion_count} values to NaN. "
                f"Review original values: {df[df['value'].isna()]['value_before_coercion'].head(5).tolist()}"
            )

        # Sort by datetime
        print(f"  - Sorting by datetime...")
        df = df.sort_values("period").reset_index(drop=True)

        # Standardize column names
        df.columns = [col.lower().replace("-", "_") for col in df.columns]

        # Remove temporary audit column if present
        df = df.drop(columns=["value_before_coercion"], errors="ignore")

        # Select key columns
        key_cols = ["date", "period", "value", "respondent", "fueltype"]
        df = df[[col for col in key_cols if col in df.columns]]

        print(f"  [OK] Data prepared: {df.shape[0]} rows, {df.shape[1]} columns")
        logger.info(f"Data preparation complete: {df.shape[0]} rows")

        return df

    def validate_time_series_integrity(self, df: pd.DataFrame) -> Dict:
        """
        STEP 4B: Comprehensive time series integrity validation.

        Critical checks for production:
        - No duplicates on (unique_id, ds)
        - Regular hourly frequency
        - Missing hours detection
        - DST repeated hours (freq = 0)
        - Complete final hours for backtesting

        Args:
            df: Prepared DataFrame from prepare_for_forecasting()

        Returns:
            Dictionary with integrity report:
            - duplicate_count: Number of duplicate (unique_id, ds) pairs
            - missing_hours_total: Total count of missing hours across all gaps
            - missing_gaps_count: Number of gaps detected
            - longest_gap_hours: Duration of longest gap
            - dst_repeated_hours: Count of repeated hours (DST backward)
            - gaps_detail: List of gap locations with hour counts
            - status: "valid" or "invalid"

        Example:
            >>> df_forecast = fetcher.prepare_for_forecasting(df)
            >>> integrity = fetcher.validate_time_series_integrity(df_forecast)
            >>> print(integrity['status'])
        """
        print(f"\nStep 4B: Validating time series integrity...")

        report = {
            "duplicate_count": 0,
            "missing_hours_total": 0,
            "missing_gaps_count": 0,
            "longest_gap_hours": 0,
            "dst_repeated_hours": 0,
            "gaps_detail": [],
            "status": "valid"
        }

        df_sorted = df.sort_values(["unique_id", "ds"]).reset_index(drop=True)

        # Check 1: Duplicates on (unique_id, ds)
        dups = df_sorted.groupby(["unique_id", "ds"]).size()
        duplicate_rows = (dups > 1).sum()
        report["duplicate_count"] = int(duplicate_rows)

        if duplicate_rows > 0:
            print(f"  [FAIL] Found {duplicate_rows} duplicate (unique_id, ds) pairs")
            logger.error(f"Time series integrity: {duplicate_rows} duplicates")
            report["status"] = "invalid"
            return report
        else:
            print(f"  [OK] No duplicates on (unique_id, ds)")

        # Check 2-4: Frequency, gaps, DST for each series
        for uid in df_sorted["unique_id"].unique():
            sub = df_sorted[df_sorted["unique_id"] == uid].copy()
            sub = sub.sort_values("ds").reset_index(drop=True)

            # Calculate time differences
            time_diffs = sub["ds"].diff()
            expected_freq = pd.Timedelta(hours=1)

            # Missing hours: count actual missing hours per gap (not gap occurrences)
            # For a gap of 5 hours (e.g., 10:00 to 15:00), there are 4 missing hours
            missing_mask = time_diffs > expected_freq
            if missing_mask.any():
                gap_indices = sub.index[missing_mask].tolist()
                report["missing_gaps_count"] += len(gap_indices)

                for idx in gap_indices:
                    if idx > 0:
                        gap_duration = time_diffs[idx]
                        gap_hours = gap_duration.total_seconds() / 3600
                        # Missing hours = gap duration - 1 (e.g., 5 hour gap = 4 missing)
                        missing_hours_in_gap = int(gap_hours - 1)
                        report["missing_hours_total"] += missing_hours_in_gap

                        report["gaps_detail"].append({
                            "unique_id": uid,
                            "before_ds": sub.loc[idx-1, "ds"],
                            "after_ds": sub.loc[idx, "ds"],
                            "gap_hours": gap_hours,
                            "missing_hours": missing_hours_in_gap
                        })

            # DST repeated hours (gap = 0, clocks go back)
            repeated_mask = time_diffs == pd.Timedelta(0)
            repeated_in_series = repeated_mask.sum()
            report["dst_repeated_hours"] += int(repeated_in_series)

            # Longest gap
            if len(time_diffs) > 0:
                max_gap = time_diffs.max()
                if pd.notna(max_gap):
                    gap_hours = max_gap.total_seconds() / 3600
                    report["longest_gap_hours"] = max(
                        report["longest_gap_hours"],
                        gap_hours
                    )

        # Report findings
        if report["duplicate_count"] > 0:
            print(f"  [FAIL] Found {report['duplicate_count']} duplicates")
            logger.error(f"Time series integrity FAILED: {report['duplicate_count']} duplicate (unique_id, ds) pairs detected")
            report["status"] = "invalid"
            return report
        else:
            print(f"  [OK] No duplicates")

        if report["missing_hours_total"] > 0:
            print(f"  [FAIL] {report['missing_hours_total']} missing hours detected ({report['missing_gaps_count']} gaps)")
            logger.error(
                f"Time series integrity FAILED: {report['missing_hours_total']} missing hours, "
                f"{report['missing_gaps_count']} gaps, longest gap {report['longest_gap_hours']:.1f} hours"
            )
            gap_summary = "\n".join([
                f"  {g['unique_id']}: {g['before_ds']} → {g['after_ds']} ({g['gap_hours']:.1f} hours, {g['missing_hours']} missing)"
                for g in report["gaps_detail"][:5]  # Show first 5 gaps
            ])
            report["status"] = "invalid"
            return report
        else:
            print(f"  [OK] No missing hours (complete frequency)")

        if report["dst_repeated_hours"] > 0:
            print(f"  [INFO] {report['dst_repeated_hours']} DST repeated hours (clocks back)")
            logger.info(f"Time series has {report['dst_repeated_hours']} DST repeated hours, which is expected")

        print(f"  [OK] Time series integrity validated")
        logger.info(f"Time series integrity report: {report}")

        return report

    def validate_data(self, df: pd.DataFrame) -> bool:
        """
        STEP 5: Validate data quality.

        Checks:
        - No empty DataFrame
        - Value column is numeric
        - No missing values
        - Dates are in order

        Returns:
            bool: True if all validations pass

        Example:
            >>> is_valid = fetcher.validate_data(df_clean)
        """
        print(f"\nStep 5: Validating data...")

        checks_passed = 0
        checks_total = 5

        # Check 1: Not empty
        if len(df) > 0:
            print(f"  [OK] Data is not empty: {len(df)} rows")
            checks_passed += 1
        else:
            print(f"  [FAIL] Data is empty")
            return False

        # Check 2: Value column exists and is numeric
        if "value" in df.columns and pd.api.types.is_numeric_dtype(df["value"]):
            print(f"  [OK] Value column is numeric")
            checks_passed += 1
        else:
            print(f"  [FAIL] Value column missing or not numeric")
            return False

        # Check 3: No missing values in value column
        missing = df["value"].isna().sum()
        if missing == 0:
            print(f"  [OK] No missing values in 'value' column")
            checks_passed += 1
        else:
            print(f"  [WARN] {missing} missing values in 'value' column")
            checks_passed += 1  # Warning, not failure

        # Check 4: Dates are in order
        if df["period"].is_monotonic_increasing:
            print(f"  [OK] Dates are in chronological order")
            checks_passed += 1
        else:
            print(f"  [FAIL] Dates are not in order")
            return False

        # Check 5: Value range is reasonable (electricity in MWh)
        if df["value"].min() > 0 and df["value"].max() < 1_000_000:
            print(f"  [OK] Value range is reasonable")
            checks_passed += 1
        else:
            print(f"  [WARN] Value range seems unusual: {df['value'].min():.0f} to {df['value'].max():.0f}")
            checks_passed += 1  # Warning

        print(f"\n  Validation: {checks_passed}/{checks_total} checks passed")
        return True

    def get_stats(self, df: pd.DataFrame) -> Dict:
        """
        STEP 6: Calculate summary statistics.

        Returns:
            Dictionary with:
            - date_range: (start_date, end_date)
            - record_count: Total records
            - value_stats: min, max, mean, std
            - missing_count: Count of NaN values

        Example:
            >>> stats = fetcher.get_stats(df_clean)
            >>> print(f"Date range: {stats['date_range']}")
        """
        print(f"\nStep 6: Calculating statistics...")

        stats = {
            "date_range": (df["date"].min(), df["date"].max()),
            "record_count": len(df),
            "value_stats": {
                "min": df["value"].min(),
                "max": df["value"].max(),
                "mean": df["value"].mean(),
                "std": df["value"].std(),
            },
            "missing_count": df["value"].isna().sum(),
        }

        print(f"  Date range: {stats['date_range'][0]} to {stats['date_range'][1]}")
        print(f"  Record count: {stats['record_count']}")
        print(f"  Value range: {stats['value_stats']['min']:.0f} to {stats['value_stats']['max']:.0f} MWh")
        print(f"  Value mean: {stats['value_stats']['mean']:.0f} MWh")
        print(f"  Value std: {stats['value_stats']['std']:.0f} MWh")
        print(f"  Missing values: {stats['missing_count']}")

        return stats

    def prepare_for_forecasting(
        self, 
        df: pd.DataFrame,
        unique_id: str = "1"
    ) -> pd.DataFrame:
        """
        Prepare data for statsforecast/mlforecast by reformatting columns.

        Statsforecast requires: unique_id, ds (timestamp), y (values)

        Args:
            df: Cleaned DataFrame from prepare_data()
            unique_id: Series identifier (default: fuel type like "NG")

        Returns:
            DataFrame with statsforecast format (unique_id, ds, y)

        Example:
            >>> df_clean = fetcher.prepare_data(df_raw)
            >>> df_forecast = fetcher.prepare_for_forecasting(df_clean)
        """
        # Validation: check required columns
        required_cols = ['period', 'value']
        missing_cols = [col for col in required_cols if col not in df.columns]
        if missing_cols:
            logger.error(f"Missing required columns: {missing_cols}")
            raise ValueError(
                f"prepare_for_forecasting requires columns {required_cols}, "
                f"but found: {df.columns.tolist()}. "
                f"Ensure prepare_data() completed successfully."
            )

        # Validation: check for NaN values (should not exist after prepare_data)
        nan_count_period = df['period'].isna().sum()
        nan_count_value = df['value'].isna().sum()

        if nan_count_period > 0:
            logger.error(f"Found {nan_count_period} NaN values in period column")
            raise ValueError(
                f"Cannot prepare forecasting data with {nan_count_period} NaN values in period. "
                f"This indicates incomplete datetime parsing. "
                f"Run validate_data() and check prepare_data() error logs."
            )

        if nan_count_value > 0:
            logger.error(f"Found {nan_count_value} NaN values in value column")
            raise ValueError(
                f"Cannot prepare forecasting data with {nan_count_value} NaN values in value. "
                f"This indicates failed numeric conversion. "
                f"Run validate_data() and check prepare_data() error logs."
            )

        df_forecast = df.copy()
        df_forecast.columns = [col.lower() for col in df_forecast.columns]

        # Rename columns for statsforecast
        df_forecast['ds'] = df_forecast['period']
        df_forecast['y'] = df_forecast['value']
        df_forecast['unique_id'] = unique_id

        # CRITICAL: StatsForecast requires timezone-naive UTC datetimes
        # Convert from tz-aware UTC to timezone-naive
        df_forecast['ds'] = pd.to_datetime(df_forecast['ds'], errors='raise', utc=True).dt.tz_localize(None)

        # Keep only required columns
        df_forecast = df_forecast[['unique_id', 'ds', 'y']].reset_index(drop=True)

        logger.info(f"Prepared {len(df_forecast)} records for forecasting with unique_id={unique_id}")
        logger.debug(f"ds dtype: {df_forecast['ds'].dtype}, tz={df_forecast['ds'].dt.tz if hasattr(df_forecast['ds'], 'dt') else 'N/A'}")

        return df_forecast

    def train_test_split(
        self,
        df: pd.DataFrame,
        test_hours: int = 72
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Split data into training and testing sets.

        Leaves last N hours for testing, rest for training.

        Args:
            df: DataFrame with 'ds' (datetime) column
            test_hours: Number of hours to reserve for testing (default: 72)

        Returns:
            (train_df, test_df)

        Example:
            >>> train_df, test_df = fetcher.train_test_split(df_forecast, test_hours=168)
            >>> print(f"Train: {len(train_df)}, Test: {len(test_df)}")
        """
        df = df.sort_values('ds').reset_index(drop=True)

        # Calculate split point
        split_point = df['ds'].max() - timedelta(hours=test_hours)

        train_df = df[df['ds'] <= split_point].reset_index(drop=True)
        test_df = df[df['ds'] > split_point].reset_index(drop=True)

        print(f"\nTrain/Test Split:")
        print(f"  Training set: {len(train_df)} records")
        print(f"  Testing set: {len(test_df)} records")
        print(f"  Split point: {split_point}")

        return train_df, test_df

    def build_models(self) -> List:
        """
        Build a list of forecasting models for statsforecast.

        Models included:
        - AutoARIMA: Auto-tuned ARIMA model
        - SeasonalNaive: Baseline seasonal model (same value from year ago)
        - DynamicOptimizedTheta: Theta model with automatic optimization
        - HoltWinters: Exponential smoothing model
        - MSTL_ARIMA: Multi-seasonal trend with ARIMA trend forecaster
        - MSTL_HoltWinters: Multi-seasonal trend with HoltWinters trend forecaster

        Returns:
            List of statsforecast model objects

        Example:
            >>> from statsforecast import StatsForecast
            >>> models = fetcher.build_models()
            >>> sf = StatsForecast(models=models, freq='h')
        """
        from statsforecast.models import (MSTL, AutoARIMA,
                                          DynamicOptimizedTheta, HoltWinters,
                                          SeasonalNaive)

        models = [
            AutoARIMA(season_length=24),
            SeasonalNaive(season_length=24),
            DynamicOptimizedTheta(season_length=24),
            HoltWinters(season_length=24),
            MSTL(season_length=[24, 24 * 7], trend_forecaster=AutoARIMA(), alias="MSTL_ARIMA"),
            MSTL(season_length=[24, 24 * 7], trend_forecaster=HoltWinters(), alias="MSTL_HoltWinters"),
        ]

        return models

    def forecast(
        self,
        train_df: pd.DataFrame,
        horizon: int = 72,
        confidence_level: int = 95
    ) -> pd.DataFrame:
        """
        Create forecasts using StatsForecast.

        Args:
            train_df: Training DataFrame with unique_id, ds, y columns
            horizon: Number of steps to forecast (default: 72 hours)
            confidence_level: Prediction interval level (default: 95%)

        Returns:
            DataFrame with forecasts and prediction intervals

        Example:
            >>> forecast_df = fetcher.forecast(train_df, horizon=168)
            >>> print(forecast_df.head())
        """
        from statsforecast import StatsForecast
        from statsforecast.models import AutoARIMA

        models = self.build_models()

        # Initialize StatsForecast object
        sf = StatsForecast(
            models=models,
            freq='h',  # Hourly data (lowercase for pandas compatibility)
            fallback_model=AutoARIMA(),
            n_jobs=-1  # Use all available cores
        )

        print(f"\nTraining {len(models)} models...")
        print(f"  Models: {', '.join([type(m).__name__ for m in models])}")
        print(f"  Horizon: {horizon} hours")
        print(f"  Confidence level: {confidence_level}%")

        # Generate forecast (note: h comes first in signature)
        forecast_df = sf.forecast(h=horizon, df=train_df, level=[confidence_level])

        print(f"  [OK] Forecast generated: {len(forecast_df)} predictions")

        return forecast_df

    def evaluate_forecast(
        self,
        forecast_df: pd.DataFrame,
        test_df: pd.DataFrame,
        train_df: Optional[pd.DataFrame] = None,
        season_length: int = 24,
        confidence_level: int = 95
    ) -> pd.DataFrame:
        """
        Evaluate forecast performance against test data.

        Calculates: MAPE, RMSE, MASE, and prediction interval coverage

        Critical features:
        - Merge on (unique_id, ds) for multi-series correctness
        - All metrics computed on valid rows only (NaN-aware)
        - Coverage denominator = valid rows (not total rows)
        - MASE scales error relative to seasonal naive baseline

        Args:
            forecast_df: DataFrame from forecast() method
            test_df: Test partition from train_test_split()
            train_df: Training data (required for MASE calculation)
            season_length: Seasonal period for MASE (default: 24 hours)
            confidence_level: Prediction interval level (default: 95%)

        Returns:
            DataFrame with performance metrics for each model

        Example:
            >>> metrics = fetcher.evaluate_forecast(
            ...     forecast_df, test_df, train_df, confidence_level=95
            ... )
            >>> print(metrics.sort_values('rmse'))
        """
        from sklearn.metrics import (mean_absolute_percentage_error,
                                     mean_squared_error)

        # Merge forecast with test data on BOTH unique_id and ds (critical for multi-series)
        fc = forecast_df.merge(
            test_df,
            how="left",
            on=["unique_id", "ds"]  # ← BOTH keys for correctness
        )

        logger.info(f"Evaluation merge: {len(forecast_df)} forecast rows, {len(test_df)} test rows, {len(fc)} merged rows")

        # Helper functions for metrics (NaN-aware)
        def mape(y, yhat):
            """Mean Absolute Percentage Error (ignoring NaNs).

            ⚠️ WARNING: MAPE is undefined when y ≈ 0 (e.g., solar at night).
            For series with zeros, prefer RMSE, MAE, or MASE instead.
            """
            mask = y.notna() & yhat.notna()
            if mask.sum() == 0:
                return np.nan
            return mean_absolute_percentage_error(y[mask], yhat[mask])

        def rmse(y, yhat):
            """Root Mean Squared Error (ignoring NaNs)"""
            mask = y.notna() & yhat.notna()
            if mask.sum() == 0:
                return np.nan
            return np.sqrt(mean_squared_error(y[mask], yhat[mask]))

        def mase(y, yhat, y_train, season_length=24):
            """
            Mean Absolute Scaled Error.

            Scales the MAE by the MAE of a seasonal naive forecast on training data.
            MASE < 1 means the model beats seasonal naive.
            MASE > 1 means seasonal naive is better.

            Args:
                y: Actual test values
                yhat: Predicted values
                y_train: Training data for computing naive baseline
                season_length: Seasonal period (default: 24 for hourly data)
            """
            if y_train is None or len(y_train) < season_length + 1:
                return np.nan

            mask = y.notna() & yhat.notna()
            if mask.sum() == 0:
                return np.nan

            # MAE of the forecast
            mae_forecast = np.mean(np.abs(y[mask].values - yhat[mask].values))

            # MAE of seasonal naive on training data
            # Seasonal naive: y_t = y_{t - season_length}
            y_train_arr = y_train["y"].values
            naive_errors = np.abs(y_train_arr[season_length:] - y_train_arr[:-season_length])
            mae_naive = np.mean(naive_errors)

            if mae_naive < 1e-10:  # Avoid division by zero
                return np.nan

            return mae_forecast / mae_naive

        def coverage(y, lower, upper):
            """Prediction interval coverage (ignoring NaNs, denominator = valid rows)"""
            mask = y.notna() & lower.notna() & upper.notna()
            if mask.sum() == 0:
                return np.nan
            within = ((y[mask] >= lower[mask]) & (y[mask] <= upper[mask])).sum()
            return (within / mask.sum()) * 100  # denominator = valid rows only

        # Get model names (exclude metadata columns and interval bounds)
        model_cols = [col for col in forecast_df.columns
                     if col not in ['unique_id', 'ds'] and
                     not col.endswith(f'-lo-{confidence_level}') and
                     not col.endswith(f'-hi-{confidence_level}')]

        # Calculate metrics for each model
        rows = []
        for model in model_cols:
            y = fc["y"]
            yhat = fc[model]

            # Count valid rows for this model
            mask = y.notna() & yhat.notna()
            valid_count = mask.sum()

            lo_col = f"{model}-lo-{confidence_level}"
            hi_col = f"{model}-hi-{confidence_level}"
            if lo_col in fc.columns and hi_col in fc.columns:
                coverage_value = coverage(
                    y=y,
                    lower=fc[lo_col],
                    upper=fc[hi_col],
                )
            else:
                coverage_value = np.nan

            rows.append({
                "model": model,
                "mape": mape(y=y, yhat=yhat),
                "rmse": rmse(y=y, yhat=yhat),
                "mase": mase(y=y, yhat=yhat, y_train=train_df, season_length=season_length),
                "coverage": coverage_value,
                "valid_rows": valid_count,
            })

        fc_performance = pd.DataFrame(rows).sort_values('rmse')

        # Report merge quality
        valid_total = fc["y"].notna().sum()
        print(f"\nEvaluation Metrics (on {valid_total} valid rows out of {len(fc)} total):")
        print(f"{'Model':<20} {'MAPE':<8} {'RMSE':<8} {'MASE':<8} {'Coverage':<10} {'Valid':<8}")
        print("-" * 62)
        for _, row in fc_performance.iterrows():
            coverage_str = f"{row['coverage']:.1f}%" if pd.notna(row['coverage']) else "N/A"
            mase_str = f"{row['mase']:.3f}" if pd.notna(row['mase']) else "N/A"
            print(f"{row['model']:<20} {row['mape']:.4f}  {row['rmse']:<8.0f} {mase_str:<8} {coverage_str:<10} {int(row['valid_rows']):<8}")

        logger.info(f"Evaluation complete: {valid_total} valid rows, {len(fc_performance)} models evaluated")

        return fc_performance

    def cross_validate(
        self,
        df: pd.DataFrame,
        config: Optional[ExperimentConfig] = None,
        horizon: int = 24,
        n_windows: int = 5,
        step_size: int = 168,
        confidence_level: int = 95
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """
        Run rolling origin cross-validation for robust model evaluation.

        Instead of a single train/test split, this creates multiple windows:
        - Window 1: Train on data up to t1, test on t1 to t1+horizon
        - Window 2: Train on data up to t2, test on t2 to t2+horizon
        - ... and so on

        This gives you a better estimate of how your model will perform
        on future unseen data.

        Args:
            df: DataFrame with unique_id, ds, y columns
            config: ExperimentConfig (overrides other params if provided)
            horizon: Forecast horizon in hours (default: 24)
            n_windows: Number of CV windows (default: 5)
            step_size: Hours between windows (default: 168 = 1 week)
            confidence_level: Prediction interval level (default: 95)

        Returns:
            Tuple of (cv_results_df, leaderboard_df)
            - cv_results_df: Raw predictions for each cutoff
            - leaderboard_df: Aggregated metrics per model

        Example:
            >>> cv_results, leaderboard = fetcher.cross_validate(
            ...     df_forecast,
            ...     horizon=24,
            ...     n_windows=5,
            ...     step_size=168
            ... )
            >>> print(leaderboard)
        """
        from statsforecast import StatsForecast

        # Use config if provided
        if config is not None:
            horizon = config.horizon
            n_windows = config.n_windows
            step_size = config.step_size
            confidence_level = config.confidence_level

        print(f"\n{'='*60}")
        print(f"CROSS-VALIDATION")
        print(f"{'='*60}")
        print(f"  Horizon: {horizon} hours")
        print(f"  Windows: {n_windows}")
        print(f"  Step size: {step_size} hours")

        # Build models
        models = self.build_models()

        # Create StatsForecast object
        sf = StatsForecast(
            models=models,
            freq='h',
            n_jobs=-1,
        )

        # Run cross-validation
        # This is the key statsforecast method for backtesting
        print(f"\n  Running {n_windows} CV windows...")
        cv_df = sf.cross_validation(
            df=df,
            h=horizon,
            step_size=step_size,
            n_windows=n_windows,
            level=[confidence_level],
        )

        print(f"  [OK] CV complete: {len(cv_df)} total predictions")
        print(f"  Cutoff dates: {cv_df['cutoff'].nunique()} unique")

        # Compute metrics per cutoff per model
        cv_metrics = []
        model_names = [type(m).__name__ if not hasattr(m, 'alias') else m.alias
                       for m in models]

        for cutoff in cv_df["cutoff"].unique():
            window_df = cv_df[cv_df["cutoff"] == cutoff]
            y_true = window_df["y"].values

            for model in model_names:
                if model not in window_df.columns:
                    continue

                y_pred = window_df[model].values
                mask = np.isfinite(y_true) & np.isfinite(y_pred)

                if mask.sum() == 0:
                    continue

                # RMSE
                rmse_val = np.sqrt(np.mean((y_true[mask] - y_pred[mask]) ** 2))

                # MAPE
                mape_val = 100 * np.mean(np.abs((y_true[mask] - y_pred[mask]) / y_true[mask]))

                # Coverage
                lo_col = f"{model}-lo-{confidence_level}"
                hi_col = f"{model}-hi-{confidence_level}"
                coverage_val = np.nan
                if lo_col in window_df.columns and hi_col in window_df.columns:
                    lo = window_df[lo_col].values
                    hi = window_df[hi_col].values
                    within = ((y_true >= lo) & (y_true <= hi)).sum()
                    coverage_val = 100 * within / len(y_true)

                cv_metrics.append({
                    "cutoff": cutoff,
                    "model": model,
                    "rmse": rmse_val,
                    "mape": mape_val,
                    "coverage": coverage_val,
                })

        cv_metrics_df = pd.DataFrame(cv_metrics)

        # Create leaderboard by aggregating across windows
        leaderboard = cv_metrics_df.groupby("model").agg({
            "rmse": ["mean", "std"],
            "mape": ["mean", "std"],
            "coverage": "mean",
        }).round(2)

        # Flatten column names
        leaderboard.columns = ["_".join(col).strip() for col in leaderboard.columns.values]
        leaderboard = leaderboard.sort_values("rmse_mean").reset_index()

        print(f"\n{'='*60}")
        print(f"LEADERBOARD (aggregated across {n_windows} windows)")
        print(f"{'='*60}")
        print(f"{'Model':<20} {'RMSE':<12} {'MAPE':<12} {'Coverage':<10}")
        print("-" * 54)
        for _, row in leaderboard.iterrows():
            rmse_str = f"{row['rmse_mean']:.0f} ± {row['rmse_std']:.0f}"
            mape_str = f"{row['mape_mean']:.2f} ± {row['mape_std']:.2f}"
            cov_str = f"{row['coverage_mean']:.1f}%"
            print(f"{row['model']:<20} {rmse_str:<12} {mape_str:<12} {cov_str:<10}")

        return cv_df, leaderboard

    def register_best_model(
        self,
        leaderboard: pd.DataFrame,
        model_name: str = None,
        experiment_name: str = "eia_forecasting",
        alias: str = "champion",
        train_df: Optional[pd.DataFrame] = None,
        default_horizon: int = 24,
        freq: str = "h",
    ) -> Optional[str]:
        """
        Register the best model from cross-validation to MLflow Model Registry.
        FIXED: Actually logs an MLflow Model at artifact_path="model" (pyfunc),
        so mlflow.register_model(runs:/.../model, ...) succeeds.
        """
        if not MLFLOW_AVAILABLE:
            print("  [SKIP] MLflow not available - cannot register model")
            return None

        if train_df is None:
            raise ValueError(
                "register_best_model requires train_df (unique_id/ds/y) so we can log a real MLflow model."
            )
        required = {"unique_id", "ds", "y"}
        if not required.issubset(train_df.columns):
            raise ValueError(f"train_df must have {sorted(required)}, got {train_df.columns.tolist()}")

        # Select best model
        if model_name is None:
            model_name = leaderboard.iloc[0]["model"]

        best_metrics = leaderboard[leaderboard["model"] == model_name].iloc[0]

        print(f"\n{'='*60}")
        print(f"MODEL REGISTRY")
        print(f"{'='*60}")
        print(f"  Best model: {model_name}")
        print(f"  RMSE: {best_metrics['rmse_mean']:.0f}")
        print(f"  Registering with alias: {alias}")

        import mlflow
        import mlflow.pyfunc
        from mlflow.models import infer_signature

        mlflow.set_experiment(experiment_name)

        # --- pyfunc wrapper ---
        class _StatsForecastPyFunc(mlflow.pyfunc.PythonModel):
            def __init__(self, chosen_model: str, confidence_level: int, freq: str, default_h: int):
                self.chosen_model = chosen_model
                self.confidence_level = int(confidence_level)
                self.freq = freq
                self.default_h = int(default_h)

            @staticmethod
            def _build_models():
                from statsforecast.models import (
                    MSTL, AutoARIMA, DynamicOptimizedTheta, HoltWinters, SeasonalNaive
                )
                return [
                    AutoARIMA(season_length=24),
                    SeasonalNaive(season_length=24),
                    DynamicOptimizedTheta(season_length=24),
                    HoltWinters(season_length=24),
                    MSTL(season_length=[24, 24 * 7], trend_forecaster=AutoARIMA(), alias="MSTL_ARIMA"),
                    MSTL(season_length=[24, 24 * 7], trend_forecaster=HoltWinters(), alias="MSTL_HoltWinters"),
                ]

            def predict(self, context, model_input: pd.DataFrame) -> pd.DataFrame:
                import pandas as pd
                from statsforecast import StatsForecast

                df = model_input.copy()

                # Allow passing a horizon in the input (single value repeated is fine)
                if "horizon" in df.columns:
                    h = int(df["horizon"].iloc[0])
                    df = df.drop(columns=["horizon"])
                else:
                    h = self.default_h

                # Expect unique_id/ds/y
                if not {"unique_id", "ds", "y"}.issubset(df.columns):
                    raise ValueError(
                        f"pyfunc input must contain unique_id/ds/y (+ optional horizon). Got: {df.columns.tolist()}"
                    )

                df["ds"] = pd.to_datetime(df["ds"], errors="raise")
                # Optional safety: strip tz if present
                if getattr(df["ds"].dt, "tz", None) is not None:
                    df["ds"] = df["ds"].dt.tz_convert("UTC").dt.tz_localize(None)

                # Pick the chosen model by alias or class name
                chosen = None
                for m in self._build_models():
                    name = getattr(m, "alias", type(m).__name__)
                    if name == self.chosen_model or type(m).__name__ == self.chosen_model:
                        chosen = m
                        break
                if chosen is None:
                    available = [getattr(m, "alias", type(m).__name__) for m in self._build_models()]
                    raise ValueError(f"Unknown chosen_model={self.chosen_model}. Available: {available}")

                sf = StatsForecast(models=[chosen], freq=self.freq, n_jobs=1)
                out = sf.forecast(df=df, h=h, level=[self.confidence_level]).reset_index()
                return out

        with mlflow.start_run(run_name=f"register_{model_name}"):
            # Log metrics/params (same as you had)
            mlflow.log_metric("rmse_mean", float(best_metrics["rmse_mean"]))
            mlflow.log_metric("rmse_std", float(best_metrics["rmse_std"]))
            mlflow.log_metric("mape_mean", float(best_metrics["mape_mean"]))
            mlflow.log_metric("coverage_mean", float(best_metrics["coverage_mean"]))
            mlflow.log_param("model_name", model_name)

            # Input example needs enough history for MSTL weekly season length
            # Use last 30 days (720 hours) to be safe
            example_in = train_df.sort_values("ds").tail(24 * 30)[["unique_id", "ds", "y"]].copy()
            example_in["horizon"] = int(default_horizon)
            # ✅ MLflow requires timezone-naive datetimes
            example_in["ds"] = pd.to_datetime(example_in["ds"], errors="raise", utc=True).dt.tz_localize(None)


            # Run once to infer output signature (can be a bit slow, but deterministic)
            pyfunc_model = _StatsForecastPyFunc(
                chosen_model=str(model_name),
                confidence_level=int(best_metrics.get("coverage_mean", 95) and 95),  # keep your pipeline default
                freq=freq,
                default_h=int(default_horizon),
            )
            example_out = pyfunc_model.predict(None, example_in)

            signature = infer_signature(example_in, example_out)

            # THIS is the key fix: log a real MLflow Model at artifact_path="model"
            mlflow.pyfunc.log_model(
                artifact_path="model",
                python_model=pyfunc_model,
                signature=signature,
                input_example=example_in,
                pip_requirements=[
                    f"mlflow=={mlflow.__version__}",
                    "pandas",
                    "numpy",
                    "statsforecast",
                ],
            )

            registered_model_name = f"{experiment_name}_{model_name}"
            run_id = mlflow.active_run().info.run_id
            model_uri = f"runs:/{run_id}/model"

            try:
                result = mlflow.register_model(model_uri, registered_model_name)
                version = result.version

                # Alias (works on newer MLflow); safe fallback if not supported
                client = mlflow.tracking.MlflowClient()
                try:
                    client.set_registered_model_alias(registered_model_name, alias, version)
                except Exception:
                    client.set_model_version_tag(registered_model_name, version, "alias", alias)

                print(f"  [OK] Registered: {registered_model_name} v{version}")
                print(f"  [OK] Alias '{alias}' assigned to v{version}")
                logger.info(f"Model registered: {registered_model_name} v{version} with alias {alias}")
                return f"{registered_model_name}@{alias}"

            except Exception as e:
                logger.warning(f"Model registration failed: {e}")
                print(f"  [WARN] Registration failed: {e}")
                return None


    def _create_plot(self, test_df: pd.DataFrame, forecast_df: pd.DataFrame):
        """
        Create an interactive plotly visualization of forecast vs actuals.

        Args:
            test_df: Test partition with actual values
            forecast_df: Forecast predictions

        Returns:
            Plotly Figure object
        """
        import plotly.graph_objects as go

        # Merge data
        merged = test_df.merge(forecast_df, on=['unique_id', 'ds'])
        merged = merged.sort_values('ds')

        # Create figure
        fig = go.Figure()

        # Add actual values
        fig.add_trace(go.Scatter(
            x=merged['ds'],
            y=merged['y'],
            mode='lines',
            name='Actual',
            line=dict(color='blue', width=2)
        ))

        # Add forecast from best model (MSTL_ARIMA)
        if 'MSTL_ARIMA' in merged.columns:
            fig.add_trace(go.Scatter(
                x=merged['ds'],
                y=merged['MSTL_ARIMA'],
                mode='lines',
                name='MSTL_ARIMA (Best)',
                line=dict(color='red', width=2, dash='dash')
            ))

            # Add 95% confidence interval if available
            if 'MSTL_ARIMA-hi-95' in merged.columns and 'MSTL_ARIMA-lo-95' in merged.columns:
                fig.add_trace(go.Scatter(
                    x=merged['ds'].tolist() + merged['ds'].tolist()[::-1],
                    y=merged['MSTL_ARIMA-hi-95'].tolist() + merged['MSTL_ARIMA-lo-95'].tolist()[::-1],
                    fill='toself',
                    fillcolor='rgba(255, 0, 0, 0.2)',
                    line=dict(color='rgba(255, 0, 0, 0)'),
                    name='95% Confidence Interval'
                ))

        # Update layout
        fig.update_layout(
            title='EIA Electricity Generation: Forecast vs Actual',
            xaxis_title='Date',
            yaxis_title='Generation (MWh)',
            hovermode='x unified',
            height=400,
            template='plotly_white'
        )

        return fig

    def run_experiment(
        self,
        df: pd.DataFrame,
        experiment_name: str,
        test_hours: int = 72,
        models_to_test: Optional[List[str]] = None,
        track_with_mlflow: bool = False
    ) -> Dict:
        """
        Run a complete forecasting experiment with model evaluation.

        Args:
            df: Cleaned DataFrame from full_pipeline()
            experiment_name: Name for this experiment
            test_hours: Hours to reserve for testing (default: 72)
            models_to_test: List of model names to test (None = all)
            track_with_mlflow: Whether to log to MLflow (default: False)

        Returns:
            Dictionary with experiment results including:
            - experiment_name: Name of experiment
            - timestamp: When experiment was run
            - data_shape: (rows, columns) of data used
            - train_size: Number of training records
            - test_size: Number of testing records
            - metrics: Performance metrics for each model
            - best_model: Name of best performing model
            - results: Full results DataFrame

        Example:
            >>> results = fetcher.run_experiment(df, "exp1_baseline")
            >>> print(results['best_model'])
        """
        logger.info(f"Starting experiment: {experiment_name} with test_hours={test_hours}")

        print(f"\n" + "="*60)
        print(f"EXPERIMENT: {experiment_name}")
        print("="*60)

        # Start MLflow run if enabled
        if track_with_mlflow and MLFLOW_AVAILABLE:
            mlflow.start_run(run_name=experiment_name)
            mlflow.log_param("experiment_name", experiment_name)
            mlflow.log_param("test_hours", test_hours)
            logger.info("MLflow run started for experiment")

        try:
            # Prepare data
            logger.info("Preparing data for experiment")
            print(f"\nPreparing data for experiment...")
            df_forecast = self.prepare_for_forecasting(df)
            train_df, test_df = self.train_test_split(df_forecast, test_hours=test_hours)

            # Log data info
            logger.info(f"Data shape: {df.shape}, Train: {len(train_df)}, Test: {len(test_df)}")
            print(f"  Data shape: {df.shape}")
            print(f"  Training records: {len(train_df)}")
            print(f"  Testing records: {len(test_df)}")

            if track_with_mlflow and MLFLOW_AVAILABLE:
                mlflow.log_param("data_rows", df.shape[0])
                mlflow.log_param("train_size", len(train_df))
                mlflow.log_param("test_size", len(test_df))

            # Train and forecast
            logger.info("Training models")
            print(f"\nTraining models...")
            forecast_df = self.forecast(train_df, horizon=len(test_df))

            # Evaluate (pass train_df for MASE calculation)
            logger.info("Evaluating model performance")
            print(f"\nEvaluating performance...")
            metrics_df = self.evaluate_forecast(forecast_df, test_df, train_df=train_df)

            # Log metrics to MLflow
            if track_with_mlflow and MLFLOW_AVAILABLE:
                for _, row in metrics_df.iterrows():
                    model_name = row['model']
                    mlflow.log_metrics({
                        f"{model_name}_mape": row['mape'],
                        f"{model_name}_rmse": row['rmse'],
                        f"{model_name}_coverage": row['coverage'],
                    })

            # Identify best model
            best_model = metrics_df.iloc[0]['model']
            best_rmse = metrics_df.iloc[0]['rmse']

            logger.info(f"Experiment {experiment_name} complete - Best model: {best_model}, RMSE: {best_rmse:.0f}")

            # Compile results
            experiment_results = {
                "experiment_name": experiment_name,
                "timestamp": datetime.now().isoformat(),
                "data_shape": df.shape,
                "train_size": len(train_df),
                "test_size": len(test_df),
                "metrics": metrics_df,
                "best_model": best_model,
                "best_rmse": best_rmse,
                "results": metrics_df
            }

            # Log summary
            if track_with_mlflow and MLFLOW_AVAILABLE:
                mlflow.log_metric("best_rmse", best_rmse)
                mlflow.log_param("best_model", best_model)

            print(f"\n[OK] Experiment complete!")
            print(f"  Best model: {best_model}")
            print(f"  Best RMSE: {best_rmse:.0f}")

            return experiment_results

        except Exception as e:
            logger.error(f"Experiment {experiment_name} failed: {str(e)}", exc_info=True)
            if track_with_mlflow and MLFLOW_AVAILABLE:
                mlflow.log_param("error", str(e))
            raise

        finally:
            if track_with_mlflow and MLFLOW_AVAILABLE:
                mlflow.end_run()
                logger.info(f"MLflow run ended for experiment {experiment_name}")

    def save_datasets(
        self,
        raw_df: pd.DataFrame,
        clean_df: pd.DataFrame,
        integrity_report: Dict,
        output_dir: str = "data",
        pull_params: Optional[Dict] = None
    ) -> str:
        """
        Save datasets with versioning and metadata for reproducibility.

        Creates:
        - raw.parquet: Unmodified API response
        - clean.parquet: After prepare + validate
        - metadata.json: Pull parameters, timestamps, row counts, integrity report

        This enables:
        - Reproducibility across experiments
        - Data lineage tracking
        - Debugging with raw vs clean comparison
        - Integrity validation history

        Args:
            raw_df: Raw DataFrame from pull_data()
            clean_df: Cleaned DataFrame from prepare_data()
            integrity_report: Report from validate_time_series_integrity()
            output_dir: Directory to save datasets (default: "data")
            pull_params: Dictionary of pull_data() parameters for metadata

        Returns:
            Path to saved metadata file

        Example:
            >>> metadata_path = fetcher.save_datasets(
            ...     raw_df, clean_df, integrity_report,
            ...     pull_params={"start_date": "2024-12-01", "respondent": "US48"}
            ... )
        """
        import json
        import os

        # Create output directory
        os.makedirs(output_dir, exist_ok=True)

        # Generate timestamp for this version
        timestamp = datetime.now(tz=pytz.UTC).isoformat()

        # Build metadata
        metadata = {
            "version": "1.0",
            "timestamp": timestamp,
            "pull_parameters": pull_params or {},
            "raw_row_count": len(raw_df),
            "clean_row_count": len(clean_df),
            "validation_status": "passed" if integrity_report.get("status") == "valid" else "failed",
            "integrity_report": {
                "duplicate_count": integrity_report.get("duplicate_count", 0),
                "missing_hours": integrity_report.get("missing_hours", 0),
                "longest_gap_hours": integrity_report.get("longest_gap_hours", 0),
                "dst_repeated_hours": integrity_report.get("dst_repeated_hours", 0),
            },
            "columns": {
                "raw": list(raw_df.columns),
                "clean": list(clean_df.columns),
            }
        }

        # Save raw data
        raw_path = os.path.join(output_dir, "raw.parquet")
        raw_df.to_parquet(raw_path, index=False)
        print(f"  [OK] Raw data saved: {raw_path}")
        logger.info(f"Raw data saved: {raw_path} ({len(raw_df)} rows)")

        # Save clean data
        clean_path = os.path.join(output_dir, "clean.parquet")
        clean_df.to_parquet(clean_path, index=False)
        print(f"  [OK] Clean data saved: {clean_path}")
        logger.info(f"Clean data saved: {clean_path} ({len(clean_df)} rows)")

        # Save metadata
        metadata_path = os.path.join(output_dir, "metadata.json")
        with open(metadata_path, "w") as f:
            json.dump(metadata, f, indent=2, default=str)
        print(f"  [OK] Metadata saved: {metadata_path}")
        logger.info(f"Metadata saved: {metadata_path}")

        return metadata_path

    def compare_experiments(self, experiments: List[Dict]) -> pd.DataFrame:
        """
        Compare results from multiple experiments.

        Args:
            experiments: List of experiment result dictionaries

        Returns:
            DataFrame comparing best models from each experiment

        Example:
            >>> exp1 = fetcher.run_experiment(df, "exp1")
            >>> exp2 = fetcher.run_experiment(df, "exp2")
            >>> comparison = fetcher.compare_experiments([exp1, exp2])
        """
        logger.info(f"Comparing {len(experiments)} experiments")

        comparison_rows = []

        for exp in experiments:
            best_row = exp['results'].iloc[0]
            comparison_rows.append({
                "experiment": exp["experiment_name"],
                "best_model": exp["best_model"],
                "mape": best_row['mape'],
                "rmse": best_row['rmse'],
                "coverage": best_row['coverage'],
                "timestamp": exp["timestamp"]
            })

        comparison_df = pd.DataFrame(comparison_rows).sort_values('rmse')

        # Log comparison results
        logger.info("Experiment comparison results:")
        for _, row in comparison_df.iterrows():
            logger.info(f"  {row['experiment']}: {row['best_model']} (RMSE: {row['rmse']:.0f}, MAPE: {row['mape']:.4f})")

        best_exp = comparison_df.iloc[0]
        logger.info(f"Best overall: {best_exp['experiment']} with {best_exp['best_model']} (RMSE: {best_exp['rmse']:.0f})")

        print(f"\n" + "="*60)
        print("EXPERIMENT COMPARISON")
        print("="*60)
        print(f"\n{'Experiment':<25} {'Best Model':<20} {'RMSE':<10} {'MAPE':<10}")
        print("-" * 65)
        for _, row in comparison_df.iterrows():
            print(f"{row['experiment']:<25} {row['best_model']:<20} {row['rmse']:.0f}      {row['mape']:.4f}")

        return comparison_df

    def full_pipeline(
        self,
        start_date: str = "2023-01-01",
        end_date: str = "2024-12-31",
        respondent: str = "US48",
        fueltype: str = "NG",
        track_with_mlflow: bool = False
    ) -> Tuple[pd.DataFrame, Dict]:
        """
        Run the complete pipeline: pull -> prepare -> validate -> stats.

        Args:
            start_date: Start date for data pull (YYYY-MM-DD)
            end_date: End date for data pull (YYYY-MM-DD)
            respondent: Region code (default: US48)
            fueltype: Fuel type code (default: NG)
            track_with_mlflow: Whether to log to MLflow (default: False)

        Returns:
            (cleaned_dataframe, statistics_dict)

        Example:
            >>> df, stats = fetcher.full_pipeline()
        """
        logger.info(f"Starting full pipeline: {start_date} to {end_date}")

        print("\n" + "="*60)
        print("FULL PIPELINE: Pull -> Prepare -> Validate -> Stats")
        print("="*60)

        # Start MLflow run if enabled
        if track_with_mlflow and MLFLOW_AVAILABLE:
            mlflow.start_run(run_name=f"pipeline_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
            mlflow.log_param("start_date", start_date)
            mlflow.log_param("end_date", end_date)
            mlflow.log_param("respondent", respondent)
            mlflow.log_param("fueltype", fueltype)
            logger.info("MLflow run started")

        try:
            # Step 2: Pull
            logger.info("Pulling raw data from EIA API")
            df_raw = self.pull_data(start_date, end_date, respondent, fueltype)

            # Step 3: Inspect
            logger.info(f"Raw data shape: {df_raw.shape}")
            self.inspect_data(df_raw)

            # Step 4: Prepare
            logger.info("Preparing data")
            df_clean = self.prepare_data(df_raw)

            # Step 5: Validate
            logger.info("Validating data")
            is_valid = self.validate_data(df_clean)

            if not is_valid:
                logger.warning("Data validation failed")
                if track_with_mlflow and MLFLOW_AVAILABLE:
                    mlflow.log_param("validation_status", "failed")
            else:
                logger.info("Data validation passed")
                if track_with_mlflow and MLFLOW_AVAILABLE:
                    mlflow.log_param("validation_status", "passed")

            # Step 6: Stats
            logger.info("Computing statistics")
            stats = self.get_stats(df_clean)

            # Log stats to MLflow
            if track_with_mlflow and MLFLOW_AVAILABLE:
                mlflow.log_metric("record_count", stats['record_count'])
                mlflow.log_metric("value_min", stats['value_stats']['min'])
                mlflow.log_metric("value_max", stats['value_stats']['max'])
                mlflow.log_metric("value_mean", stats['value_stats']['mean'])
                mlflow.log_metric("missing_count", stats['missing_count'])
                logger.info("Statistics logged to MLflow")

            logger.info("Full pipeline completed successfully")

            print("\n" + "="*60)
            print("PIPELINE COMPLETE")
            print("="*60 + "\n")

            return df_clean, stats

        except Exception as e:
            logger.error(f"Pipeline failed with error: {str(e)}", exc_info=True)
            if track_with_mlflow and MLFLOW_AVAILABLE:
                mlflow.log_param("error", str(e))
            raise

        finally:
            if track_with_mlflow and MLFLOW_AVAILABLE:
                mlflow.end_run()
                logger.info("MLflow run ended")


# Allow testing individual steps
if __name__ == "__main__":
    api_key = os.getenv("EIA_API_KEY")
    if not api_key:
        print("Error: EIA_API_KEY not found in environment variables or .env file")
        print("Please create a .env file with: EIA_API_KEY=your_api_key_here")
        exit(1)

    # Initialize
    fetcher = EIADataFetcher(api_key)

    # Run full pipeline
    df, stats = fetcher.full_pipeline(
        start_date="2024-12-01",
        end_date="2024-12-31"
    )

    print("\n" + "="*60)
    print("RESULTS")
    print("="*60)
    print(f"\nDataFrame shape: {df.shape}")
    print(f"\nFirst 5 rows:")
    print(df.head())
    print(f"\nLast 5 rows:")
    print(df.tail())

    print("\n[OK] Data successfully loaded from .env file!")
    print("[OK] Ready for time series analysis and forecasting")

    # FORECASTING WORKFLOW
    print("\n" + "="*60)
    print("FORECASTING WORKFLOW")
    print("="*60)

    # Step 1: Prepare data for forecasting
    df_forecast = fetcher.prepare_for_forecasting(df)
    print(f"\nData reformatted for forecasting:")
    print(f"  Columns: {list(df_forecast.columns)}")
    print(f"  Shape: {df_forecast.shape}")

    # Step 2: Train/test split (72 hours test set)
    train_df, test_df = fetcher.train_test_split(df_forecast, test_hours=72)

    # Step 3: Train models and create forecast
    forecast_df = fetcher.forecast(train_df, horizon=len(test_df))

    # Step 4: Evaluate performance (pass train_df for MASE)
    metrics = fetcher.evaluate_forecast(forecast_df, test_df, train_df=train_df)

    # Step 5: Visualize results
    print("\n" + "="*60)
    print("VISUALIZATION")
    print("="*60)

    try:
        print("\nGenerating forecast visualization...")

        # Create the plot using StatsForecast's plot method
        p = fetcher._create_plot(test_df, forecast_df)

        # Display the plot inline in Jupyter
        p.show()

        print(f"  [OK] Forecast plot displayed")

    except Exception as e:
        print(f"  [INFO] Plotly visualization setup: {str(e)[:60]}...")
        print("  (This is optional - forecast metrics are available above)")

    print("\n[OK] Forecasting workflow complete!")


# Chapter 3 — Orchestration & Pipeline DAG

## Outcomes (what I can do after this)

- [ ] I can run an end-to-end forecasting pipeline from the CLI
- [ ] I can understand how tasks decompose a workflow and why idempotency matters
- [ ] I can deploy the pipeline as an Airflow DAG (if Airflow is installed)
- [ ] I can visualize the pipeline dependency graph and explain task ordering
- [ ] I can modify task configurations without breaking downstream dependencies

## Concepts (plain English)

- **Task**: An atomic unit of work that can be re-run independently (pull data, validate, train, forecast)
- **DAG** (Directed Acyclic Graph): A visual representation of task dependencies (X → Y means X must finish before Y starts)
- **Idempotency**: A task can be re-run multiple times with the same inputs and produce the same outputs (no side effects like duplicate records)
- **Atomic writes**: File writes that either fully complete or fail (no partial files left behind)
- **Linear pipeline**: Tasks execute sequentially in a fixed order (no branching or conditional execution)
- **CLI**: Command-line interface to trigger the pipeline (vs. scheduled in Airflow)
- **Run ID**: A unique identifier for one pipeline execution (timestamps, UUIDs, etc.) used to group artifacts

## Architecture (what we're building)

### Inputs
- **CLI arguments**:
  - `--start-date` YYYY-MM-DD
  - `--end-date` YYYY-MM-DD
  - `--horizon` integer (hours ahead to forecast)
  - `--overwrite` boolean (re-run all tasks if True)
  - `--output-dir` path (default: "artifacts/")

- **Environment**: `.env` file with `EIA_API_KEY`

### Outputs
- **data/raw.parquet**: Raw API response
- **data/clean.parquet**: Normalized data (UTC, valid schema)
- **data/metadata.json**: Ingestion snapshot (date range, row count, integrity report)
- **artifacts/cv_results.parquet**: Cross-validation forecast table
- **artifacts/leaderboard.parquet**: Model rankings
- **artifacts/predictions.parquet**: Final forecast (trained on all clean data)
- **mlflow/**: MLflow experiment and model artifact

### Pipeline Flow
```
ingest_eia()
    ↓ (produces data/raw.parquet)
prepare_clean()
    ↓ (produces data/clean.parquet)
validate_clean()
    ↓ (validates, produces report)
train_backtest_select()
    ↓ (produces cv_results, leaderboard)
register_champion()
    ↓ (registers model in MLflow)
forecast_publish()
    ↓ (produces predictions)
(optional) Chapter 4 integration
```

### Invariants (must always hold)
- Each task's output directory exists before writing
- Outputs are written atomically (all or nothing)
- If `--overwrite=False`, skip task if output already exists (idempotent)
- Task ordering is strict: no task runs until all predecessors finish
- All intermediate files include run_id for traceability

### Failure modes
- API unavailable during ingest_eia() → task fails, pipeline stops, no partial files
- Validation fails (duplicates, missing hours) → pipeline stops before training (prevents bad model)
- Training runs out of memory → no model artifact written (MLflow stays clean)
- Forecast publish fails → no predictions written, but leaderboard is preserved (can retry)

## Files touched

- **`src/chapter3/tasks.py`** (433 lines)
  - `ingest_eia(config, run_id)` → raw.parquet
  - `prepare_clean(raw_path, config, run_id)` → clean.parquet, metadata.json
  - `validate_clean(clean_path, run_id)` → raises ValueError if not valid
  - `train_backtest_select(clean_path, config, run_id)` → cv_results.parquet, leaderboard.parquet
  - `register_champion(leaderboard, config, clean_path, run_id)` → registers in MLflow
  - `forecast_publish(clean_path, config, run_id)` → predictions.parquet

- **`src/chapter3/dag_builder.py`** (121 lines)
  - `build_daily_dag()`: Returns Airflow DAG (if airflow is installed)
  - `build_dag_dot()`: Returns DOT graph string for CLI visualization
  - DAG has schedule_interval = "0 6 * * *" (6 AM UTC daily)

- **`src/chapter3/cli.py`** (69 lines)
  - `run()`: Typer command to execute pipeline from CLI
  - Parses arguments, generates run_id, calls tasks in sequence

- **`src/chapter3/config.py`** (67 lines)
  - `PipelineConfig` dataclass: all configuration parameters

- **`src/chapter3/io_utils.py`** (33 lines)
  - Helpers for atomic parquet/JSON writes

## Step-by-step walkthrough

### 1) Verify setup and prerequisites
```bash
cd c:\docker_projects\atsaf
python -m pytest tests/ -v  # Run tests to verify environment
echo $EIA_API_KEY  # Verify API key is set in .env
```
- **Expect**: Tests pass; API key is set
- **If it fails**: Check .env file and install dependencies with `pip install -e .` or `uv sync`

### 2) View pipeline DAG (without running)
```bash
cd c:\docker_projects\atsaf
python -c "
from src.chapter3.dag_builder import build_dag_dot
dot_string = build_dag_dot()
print(dot_string)
"
# Copy output to https://dreampuf.github.io/GraphvizOnline/ to visualize
```
- **Expect**: DOT graph showing 6 tasks and dependencies: ingest → prepare → validate → train → register → forecast
- **If it fails**: Check that dag_builder.py is present and imports are correct

### 3) Run pipeline end-to-end (CLI)
```bash
cd c:\docker_projects\atsaf
python -m src.chapter3.cli run \
  --start-date 2023-06-01 \
  --end-date 2023-09-30 \
  --horizon 24 \
  --output-dir artifacts/
```
- **Expect**:
  - Logs: "Task: ingest_eia started", "Task: prepare_clean started", etc.
  - Files created: data/raw.parquet, data/clean.parquet, artifacts/cv_results.parquet, etc.
  - Run finishes in 2-10 minutes depending on data size and model training
- **If it fails**:
  - "API Error": Check EIA_API_KEY in .env
  - "Validation failed": Re-run Chapter 1 to diagnose integrity issues
  - "Out of memory": Reduce date range or horizon; reduce n_windows

### 4) Inspect task outputs
```python
import pandas as pd

# Check raw data
df_raw = pd.read_parquet("c:\docker_projects\atsaf\data\raw.parquet")
print(f"Raw shape: {df_raw.shape}, Columns: {df_raw.columns.tolist()}")

# Check clean data
df_clean = pd.read_parquet("c:\docker_projects\atsaf\data\clean.parquet")
print(f"Clean shape: {df_clean.shape}, Columns: {df_clean.columns.tolist()}")

# Check leaderboard
leaderboard = pd.read_parquet("c:\docker_projects\atsaf\artifacts\leaderboard.parquet")
print(f"\nLeaderboard:\n{leaderboard.head()}")

# Check metadata
import json
with open("c:\docker_projects\atsaf\data\metadata.json") as f:
    metadata = json.load(f)
    print(f"\nMetadata: {metadata}")
```
- **Expect**:
  - raw: columns [time, value, respondent, fueltype]
  - clean: columns [unique_id, ds, y] with UTC timestamps
  - leaderboard: columns [model, rmse_mean, rmse_std, rank]
  - metadata: status='valid', row_count, date_range
- **If it fails**: Check pipeline logs to see which task failed

### 5) Re-run with `--overwrite` flag
```bash
cd c:\docker_projects\atsaf
python -m src.chapter3.cli run \
  --start-date 2023-06-01 \
  --end-date 2023-09-30 \
  --horizon 24 \
  --overwrite  # Force re-run all tasks
```
- **Expect**: Same outputs as step 3 (deterministic)
- **If it fails**: Something is non-deterministic (e.g., random seed not set in model)

### 6) Re-run without `--overwrite` (test idempotency)
```bash
cd c:\docker_projects\atsaf
python -m src.chapter3.cli run \
  --start-date 2023-06-01 \
  --end-date 2023-09-30 \
  --horizon 24
  # No --overwrite flag
```
- **Expect**: Pipeline finishes immediately (skips all tasks because files exist)
- **If it fails**: Tasks are not checking for existing outputs; check `if output_path.exists(): return` in each task

### 7) Deploy to Airflow (optional, if Airflow is installed)
```bash
# Set Airflow home
export AIRFLOW_HOME=~/airflow_atsaf

# Initialize DB
airflow db init

# Create Airflow user
airflow users create \
  --username admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email admin@example.com

# Copy DAG to Airflow DAGs folder
cp src/chapter3/dag_builder.py ~/airflow_atsaf/dags/atsaf_daily_pipeline.py

# Start scheduler and webserver
airflow scheduler &
airflow webserver --port 8080 &

# Visit http://localhost:8080 to see DAG
```
- **Expect**: DAG appears in Airflow UI; can manually trigger runs
- **If it fails**: Check Airflow logs at ~/airflow_atsaf/logs/

## Metrics & success criteria

### Primary metric
- **Pipeline success rate**: 100% of runs complete without errors

### Secondary metrics
- **Task execution time**: Each task < 5 minutes (ingest < 1min, train < 3min)
- **Idempotency**: Re-running with same inputs produces same outputs
- **File integrity**: All output files valid (non-empty, correct schema)

### "Good enough" threshold
- Pipeline completes in < 10 minutes for 12-month dataset
- All 6 tasks execute in order with no skipped tasks (on first run)
- Artifacts folder contains all expected files (raw, clean, cv_results, leaderboard, predictions)

### What would make me redesign
- Task execution time > 30 minutes → parallelize or reduce data
- Output files corrupt (can't read as parquet) → implement better error handling
- Idempotency broken (re-run produces different results) → add random seed or determinism

## Pitfalls (things that commonly break)

1. **File path confusion (Windows vs Linux)**:
   - Windows uses `\`, Linux uses `/`
   - Our code uses `pathlib.Path` which handles both, but command-line args may have issues
   - **Fix**: Always use forward slashes in CLI args or use raw strings (r"path\to\file")

2. **Task ordering assumptions**:
   - If you refactor and skip validate_clean(), bad data gets trained
   - Our linear order is strict: ingest → prepare → validate → train → register → forecast
   - **Fix**: Never skip or reorder tasks without understanding downstream impact

3. **Output directory doesn't exist**:
   - If `artifacts/` or `data/` directory doesn't exist, writes fail silently
   - **Fix**: Run `mkdir -p artifacts/ data/` first, or let tasks create them

4. **API rate limiting**:
   - EIA API limits to ~50 requests/second; if pulling large date ranges, may timeout
   - **Fix**: Add delay between requests or use batch API endpoint

5. **MLflow registration fails silently**:
   - If MLflow is not initialized or artifact store is misconfigured, registration may skip
   - **Fix**: Check `mlflow.get_tracking_uri()` and `mlflow.get_artifact_uri()` before deploying

6. **run_id collision**:
   - If two pipelines run simultaneously with same run_id, they may overwrite each other
   - **Fix**: Use UUID or timestamp + hostname in run_id

## Mini-checkpoint (prove you learned it)

Answer these:

1. **Draw the DAG by hand**: 6 boxes for tasks, arrows for dependencies. Explain why no arrows go backward.
2. **What is idempotency and why does prepare_clean() check if output exists before writing?**
3. **If validate_clean() is skipped, what could go wrong?** Give a concrete example.
4. **If horizon=24 is changed to horizon=48 in CLI, which tasks re-run and which are cached?**

**Answers:**
1. Boxes: [ingest] → [prepare] → [validate] → [train] → [register] → [forecast]. No backward arrows because time flows forward; we can't validate before ingesting.
2. Idempotency means re-running with same inputs yields same outputs (no hidden state). prepare_clean() checks if output exists; if so, skips work (saves time). This requires no side effects (pure function style).
3. Without validation, bad data (duplicates, missing hours) enters training. Model learns on leaky/corrupt series → poor forecasts. In production, invalid data gets a bad model deployed.
4. Only train_backtest_select() and forecast_publish() re-run (they depend on horizon). ingest, prepare, validate use same data regardless of horizon, so cached files are reused.

## Exercises (optional, but recommended)

### Easy
1. Run the pipeline with a 1-month date range and measure total execution time. Then run with 3-month range. Does time scale linearly with data size?
2. Manually delete `artifacts/leaderboard.parquet` and re-run with `--overwrite=false`. Does the pipeline detect missing file and re-train?

### Medium
1. Modify the CLI to accept a `--model-list` argument (e.g., `--model-list AutoARIMA,HoltWinters`) and pass it to train_backtest_select(). Test with different model combos.
2. Add a new task `save_metrics_csv()` that converts artifacts/leaderboard.parquet to CSV. Update the DAG to place it after forecast_publish().

### Hard
1. Implement `--check-only` flag that runs through ingest/prepare/validate but skips training and forecasting. Useful for data validation without model cost.
2. Modify the pipeline to run for 5 different respondents (e.g., NG_CA1, NG_TX, NG_US48, etc.) in parallel within one `run` call. Track run_id per respondent to avoid collisions.
3. Deploy to Airflow and set up a task retry policy: if a task fails, retry 3 times with 5-minute delays. Test by injecting a temporary API error and verifying retry behavior.


In [13]:
%%writefile src/chapter3/config.py
# file: src/chapter3/config.py
"""
Chapter 3: Pipeline Configuration
"""

from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional


@dataclass(frozen=True)
class PipelineConfig:
    # Data parameters
    start_date: str = "2024-01-01"
    end_date: str = "2024-12-31"
    respondent: str = "US48"
    fueltype: str = "NG"

    # IO
    data_dir: str = "data"
    artifacts_dir: str = "artifacts"
    overwrite: bool = False

    # Forecasting / backtest
    horizon: int = 24
    n_windows: int = 5
    step_size: int = 168
    confidence_level: int = 95

    # Scheduling (Airflow)
    schedule: str = "0 6 * * *"
    retries: int = 3
    retry_delay_minutes: int = 5

    # MLflow
    experiment_name: str = "eia_forecasting"
    model_alias: str = "champion"

    def run_id(self) -> str:
        return datetime.utcnow().strftime("%Y%m%d_%H%M%S")

    def data_path(self) -> Path:
        return Path(self.data_dir)

    def artifacts_path(self) -> Path:
        return Path(self.artifacts_dir)

    def raw_path(self) -> Path:
        return self.data_path() / "raw.parquet"

    def clean_path(self) -> Path:
        return self.data_path() / "clean.parquet"

    def metadata_path(self) -> Path:
        return self.data_path() / "metadata.json"

    def leaderboard_path(self) -> Path:
        return self.artifacts_path() / "leaderboard.parquet"

    def cv_results_path(self) -> Path:
        return self.artifacts_path() / "cv_results.parquet"

    def predictions_path(self) -> Path:
        return self.artifacts_path() / "predictions.parquet"


Overwriting src/chapter3/config.py


In [14]:
%%writefile src/chapter3/io_utils.py
# file: src/chapter3/io_utils.py
from __future__ import annotations

import json
import os
from pathlib import Path
from typing import Any, Dict

import pandas as pd


def ensure_dir(path: Path) -> None:
    path.mkdir(parents=True, exist_ok=True)


def atomic_write_parquet(df: pd.DataFrame, path: Path) -> None:
    """
    Atomic parquet write: write to temp in same directory, then replace.
    """
    ensure_dir(path.parent)
    tmp = path.with_suffix(path.suffix + ".tmp")
    df.to_parquet(tmp, index=False)
    os.replace(tmp, path)


def atomic_write_json(payload: Dict[str, Any], path: Path) -> None:
    ensure_dir(path.parent)
    tmp = path.with_suffix(path.suffix + ".tmp")
    with open(tmp, "w", encoding="utf-8") as f:
        json.dump(payload, f, indent=2, default=str)
    os.replace(tmp, path)




Overwriting src/chapter3/io_utils.py


In [None]:
%%writefile src/chapter3/tasks.py
# file: src/chapter3/tasks.py
"""
Chapter 3: Idempotent Pipeline Tasks

These tasks are designed to be:
- deterministic for a given config (historical pulls)
- atomic on write
- safe to rerun (overwrite flag controls)
"""

from __future__ import annotations

import logging
import os
from datetime import datetime, timezone
from typing import Dict, Optional, Tuple

import pandas as pd

from src.chapter3.config import PipelineConfig
from src.chapter3.io_utils import atomic_write_json, atomic_write_parquet, ensure_dir

logger = logging.getLogger(__name__)


def _import_fetcher():
    """
    Supports either:
      - eia_data_simple.py at repo root
      - or src/chapter1/eia_data_simple.py (if you later move it)
    """
    try:
        from eia_data_simple import EIADataFetcher, ExperimentConfig  # type: ignore
        return EIADataFetcher, ExperimentConfig
    except Exception:
        from ..chapter1.eia_data_simple import EIADataFetcher, ExperimentConfig  # type: ignore
        return EIADataFetcher, ExperimentConfig


def _require_api_key() -> str:
    api_key = os.getenv("EIA_API_KEY")
    if not api_key:
        raise EnvironmentError(
            "EIA_API_KEY is missing. Add it to your environment or .env file."
        )
    return api_key


def compute_time_series_integrity(df_forecast: pd.DataFrame) -> Dict:
    """
    Computes the same core invariants as your fetcher’s integrity method,
    but returns a dict reliably (your current method prints but doesn’t return).
    Requires columns: unique_id, ds, y
    """
    if not {"unique_id", "ds", "y"}.issubset(df_forecast.columns):
        raise ValueError(f"Expected unique_id/ds/y, got {df_forecast.columns.tolist()}")

    df = df_forecast.sort_values(["unique_id", "ds"]).reset_index(drop=True)

    # duplicates on (unique_id, ds)
    dup_counts = df.groupby(["unique_id", "ds"]).size()
    duplicate_pairs = int((dup_counts > 1).sum())

    missing_hours = 0
    longest_gap_hours = 0.0
    gaps_detail = []

    for uid in df["unique_id"].unique():
        sub = df[df["unique_id"] == uid].sort_values("ds").reset_index(drop=True)
        diffs = sub["ds"].diff()
        expected = pd.Timedelta(hours=1)

        # gaps > 1 hour
        miss_mask = diffs > expected
        missing_hours += int(miss_mask.sum())

        if len(diffs) > 0 and diffs.notna().any():
            max_gap = diffs.max()
            if pd.notna(max_gap):
                gap_h = max_gap.total_seconds() / 3600
                longest_gap_hours = max(longest_gap_hours, gap_h)

        if miss_mask.any():
            idxs = sub.index[miss_mask].tolist()
            for idx in idxs[:10]:
                gaps_detail.append(
                    {
                        "unique_id": uid,
                        "before_ds": sub.loc[idx - 1, "ds"],
                        "after_ds": sub.loc[idx, "ds"],
                        "gap_hours": float((sub.loc[idx, "ds"] - sub.loc[idx - 1, "ds"]).total_seconds() / 3600),
                    }
                )

    status = "valid" if (duplicate_pairs == 0 and missing_hours == 0) else "invalid"
    return {
        "status": status,
        "duplicate_pairs": duplicate_pairs,
        "missing_hours": int(missing_hours),
        "longest_gap_hours": float(longest_gap_hours),
        "gaps_detail": gaps_detail,
        "n_rows": int(len(df)),
    }


def ingest_eia(config: PipelineConfig, run_id: str = "") -> str:
    """
    Task 1: Pull raw data and save data/raw.parquet
    """
    raw_path = config.raw_path()
    ensure_dir(raw_path.parent)

    if raw_path.exists() and not config.overwrite:
        logger.info(f"[ingest] raw exists, skipping: {raw_path}")
        return str(raw_path)

    EIADataFetcher, _ = _import_fetcher()
    api_key = _require_api_key()
    fetcher = EIADataFetcher(api_key)

    df_raw = fetcher.pull_data(
        start_date=config.start_date,
        end_date=config.end_date,
        respondent=config.respondent,
        fueltype=config.fueltype,
    )

    atomic_write_parquet(df_raw, raw_path)
    logger.info(f"[ingest] wrote raw: {raw_path} ({len(df_raw)} rows)")

    # Optional: log to Chapter 4 monitoring
    if run_id:
        try:
            from src.chapter4.run_log import log_run
            from src.chapter4.config import MonitoringConfig
            log_run(
                MonitoringConfig().db_path,
                run_id,
                "success",
                "ingest",
                raw_rows=len(df_raw)
            )
        except Exception as e:
            logger.debug(f"[ingest] Chapter 4 logging failed: {e}")

    return str(raw_path)


def prepare_clean(raw_path: str, config: PipelineConfig, run_id: str = "") -> str:
    """
    Task 2: Prepare clean dataset and save data/clean.parquet + data/metadata.json
    """
    clean_path = config.clean_path()
    ensure_dir(clean_path.parent)

    if clean_path.exists() and not config.overwrite:
        logger.info(f"[prepare] clean exists, skipping: {clean_path}")
        return str(clean_path)

    EIADataFetcher, _ = _import_fetcher()
    api_key = os.getenv("EIA_API_KEY", "dummy")  # no API call in this step
    fetcher = EIADataFetcher(api_key)

    df_raw = pd.read_parquet(raw_path)
    df_clean = fetcher.prepare_data(df_raw, timezone_policy="UTC")

    # Keep metadata for Chapter 4 health checks
    metadata = {
        "pull_timestamp": datetime.now(timezone.utc).isoformat(),
        "respondent": config.respondent,
        "fueltype": config.fueltype,
        "start_date": config.start_date,
        "end_date": config.end_date,
        "raw_rows": int(len(df_raw)),
        "clean_rows": int(len(df_clean)),
    }

    atomic_write_parquet(df_clean, clean_path)
    atomic_write_json(metadata, config.metadata_path())

    logger.info(f"[prepare] wrote clean: {clean_path} ({len(df_clean)} rows)")

    # Optional: log to Chapter 4 monitoring
    if run_id:
        try:
            from src.chapter4.run_log import log_run
            from src.chapter4.config import MonitoringConfig
            log_run(
                MonitoringConfig().db_path,
                run_id,
                "success",
                "prepare",
                clean_rows=len(df_clean)
            )
        except Exception as e:
            logger.debug(f"[prepare] Chapter 4 logging failed: {e}")

    return str(clean_path)


def validate_clean(clean_path: str, run_id: str = "") -> Dict:
    """
    Task 3: Validate time series integrity (duplicates + missing hours).
    """
    EIADataFetcher, _ = _import_fetcher()
    api_key = os.getenv("EIA_API_KEY", "dummy")
    fetcher = EIADataFetcher(api_key)

    df_clean = pd.read_parquet(clean_path)
    df_forecast = fetcher.prepare_for_forecasting(df_clean, unique_id="NG_US48")

    report = compute_time_series_integrity(df_forecast)
    if report["status"] != "valid":
        raise ValueError(
            f"Time-series integrity failed: "
            f"{report['duplicate_pairs']} duplicate (unique_id, ds) pairs; "
            f"{report['missing_hours']} missing hours; "
            f"longest gap={report['longest_gap_hours']:.1f}h"
        )

    logger.info(f"[validate] OK: {report}")

    # Optional: log to Chapter 4 monitoring
    if run_id:
        try:
            from src.chapter4.run_log import log_run
            from src.chapter4.config import MonitoringConfig
            log_run(
                MonitoringConfig().db_path,
                run_id,
                "success",
                "validate"
            )
        except Exception as e:
            logger.debug(f"[validate] Chapter 4 logging failed: {e}")

    return report


def train_backtest_select(clean_path: str, config: PipelineConfig, run_id: str = "") -> pd.DataFrame:
    """
    Task 4: Cross-validate and write artifacts/leaderboard.parquet + artifacts/cv_results.parquet
    Returns leaderboard.
    """
    leaderboard_path = config.leaderboard_path()
    cv_path = config.cv_results_path()
    ensure_dir(leaderboard_path.parent)

    if leaderboard_path.exists() and not config.overwrite:
        logger.info(f"[train] leaderboard exists, skipping: {leaderboard_path}")
        return pd.read_parquet(leaderboard_path)

    EIADataFetcher, ExperimentConfig = _import_fetcher()
    api_key = os.getenv("EIA_API_KEY", "dummy")
    fetcher = EIADataFetcher(api_key)

    df_clean = pd.read_parquet(clean_path)
    df_forecast = fetcher.prepare_for_forecasting(df_clean, unique_id="NG_US48")

    exp = ExperimentConfig(
        name=config.experiment_name,
        horizon=config.horizon,
        n_windows=config.n_windows,
        step_size=config.step_size,
        confidence_level=config.confidence_level,
    )

    cv_results, leaderboard = fetcher.cross_validate(df_forecast, config=exp)

    atomic_write_parquet(cv_results, cv_path)
    atomic_write_parquet(leaderboard, leaderboard_path)

    logger.info(f"[train] wrote cv: {cv_path} ({len(cv_results)} rows)")
    logger.info(f"[train] wrote leaderboard: {leaderboard_path} ({len(leaderboard)} rows)")

    # Optional: log to Chapter 4 monitoring
    if run_id:
        try:
            from src.chapter4.run_log import log_run
            from src.chapter4.config import MonitoringConfig
            log_run(
                MonitoringConfig().db_path,
                run_id,
                "success",
                "train"
            )
        except Exception as e:
            logger.debug(f"[train] Chapter 4 logging failed: {e}")

    return leaderboard


def register_champion(leaderboard: pd.DataFrame, config: PipelineConfig, clean_path: str, run_id: str = "") -> Optional[str]:
    """
    Task 5: Register best model in MLflow (if available).
    """
    EIADataFetcher, _ = _import_fetcher()
    api_key = os.getenv("EIA_API_KEY", "dummy")
    fetcher = EIADataFetcher(api_key)

    df_clean = pd.read_parquet(clean_path)
    df_forecast = fetcher.prepare_for_forecasting(df_clean, unique_id="NG_US48")

    model_uri = fetcher.register_best_model(
        leaderboard=leaderboard,
        experiment_name=config.experiment_name,
        alias=config.model_alias,
        train_df=df_forecast,
        default_horizon=config.horizon,
        freq="h",
    )
    logger.info(f"[register] model_uri={model_uri}")

    # Optional: log to Chapter 4 monitoring
    if run_id:
        try:
            from src.chapter4.run_log import log_run
            from src.chapter4.config import MonitoringConfig
            log_run(
                MonitoringConfig().db_path,
                run_id,
                "success",
                "register"
            )
        except Exception as e:
            logger.debug(f"[register] Chapter 4 logging failed: {e}")

    return model_uri



def forecast_publish(clean_path: str, config: PipelineConfig, run_id: str = "") -> str:
    """
    Task 6: Fit on all clean data and publish artifacts/predictions.parquet
    """
    pred_path = config.predictions_path()
    ensure_dir(pred_path.parent)

    if pred_path.exists() and not config.overwrite:
        logger.info(f"[forecast] predictions exist, skipping: {pred_path}")
        return str(pred_path)

    EIADataFetcher, _ = _import_fetcher()
    api_key = os.getenv("EIA_API_KEY", "dummy")
    fetcher = EIADataFetcher(api_key)

    df_clean = pd.read_parquet(clean_path)
    df_train = fetcher.prepare_for_forecasting(df_clean, unique_id="NG_US48")

    forecast_df = fetcher.forecast(
        train_df=df_train,
        horizon=config.horizon,
        confidence_level=config.confidence_level,
    )

    atomic_write_parquet(forecast_df, pred_path)
    logger.info(f"[forecast] wrote predictions: {pred_path} ({len(forecast_df)} rows)")

    # Optional: persist to Chapter 4 monitoring store
    if run_id:
        try:
            from src.chapter4.forecast_store import persist_forecasts
            from src.chapter4.config import MonitoringConfig

            persist_forecasts(
                db_path=MonitoringConfig().db_path,
                run_id=run_id,
                forecast_df=forecast_df,
                confidence_level=config.confidence_level,
            )
            logger.info(f"[forecast] persisted to Chapter 4 store")
        except ImportError:
            logger.debug("[forecast] Chapter 4 not available, skipping persist")
        except Exception as e:
            logger.warning(f"[forecast] Chapter 4 persist failed: {e}")

    # Optional: log to Chapter 4 monitoring
    if run_id:
        try:
            from src.chapter4.run_log import log_run
            from src.chapter4.config import MonitoringConfig
            log_run(
                MonitoringConfig().db_path,
                run_id,
                "success",
                "forecast"
            )
        except Exception as e:
            logger.debug(f"[forecast] Chapter 4 logging failed: {e}")

    return str(pred_path)


def run_full_pipeline(config: PipelineConfig) -> Dict:
    """
    Runs tasks in order and returns a summary dict.
    """
    logger.info("=" * 60)
    logger.info("START PIPELINE")
    logger.info("=" * 60)

    # Compute run_id once and pass through all tasks (for Chapter 4 monitoring)
    run_id = config.run_id()
    logger.info(f"Pipeline run_id: {run_id}")

    raw = ingest_eia(config, run_id=run_id)
    clean = prepare_clean(raw, config, run_id=run_id)
    integrity = validate_clean(clean, run_id=run_id)
    leaderboard = train_backtest_select(clean, config, run_id=run_id)
    model_uri = register_champion(leaderboard, config, clean, run_id=run_id)
    predictions = forecast_publish(clean, config, run_id=run_id)

    best_model = leaderboard.iloc[0]["model"] if len(leaderboard) else None
    best_rmse = leaderboard.iloc[0]["rmse_mean"] if len(leaderboard) else None

    out = {
        "raw_path": raw,
        "clean_path": clean,
        "integrity": integrity,
        "leaderboard_path": str(config.leaderboard_path()),
        "cv_results_path": str(config.cv_results_path()),
        "predictions_path": predictions,
        "best_model": best_model,
        "best_rmse_mean": best_rmse,
        "model_uri": model_uri,
        "run_id": run_id,
    }

    logger.info("=" * 60)
    logger.info("PIPELINE COMPLETE")
    logger.info("=" * 60)
    return out


In [16]:
%%writefile src/chapter3/dag_builder.py
# file: src/chapter3/dag_builder.py
"""
Chapter 3: DAG Builder

- If Airflow is installed: returns a real Airflow DAG
- If not: provides a DOT graph string for notebook visualization
"""

from __future__ import annotations

from datetime import datetime, timedelta
from typing import Any, Dict, Optional

from src.chapter3.config import PipelineConfig

try:
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    AIRFLOW_AVAILABLE = True
except Exception:
    AIRFLOW_AVAILABLE = False
    DAG = None
    PythonOperator = None


DEFAULT_ARGS = {
    "owner": "data-team",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}


def build_daily_dag(
    dag_id: str = "eia_daily_pipeline",
    schedule: str = "0 6 * * *",
    start_date: Optional[datetime] = None,
    default_args: Optional[Dict[str, Any]] = None,
    config: Optional[PipelineConfig] = None,
) -> "DAG":
    if not AIRFLOW_AVAILABLE:
        raise ImportError("Airflow is not installed. Install apache-airflow to use build_daily_dag().")

    from src.chapter3.tasks import (
        ingest_eia,
        prepare_clean,
        validate_clean,
        train_backtest_select,
        forecast_publish,
        register_champion,
    )

    if start_date is None:
        start_date = datetime.utcnow() - timedelta(days=1)
    if default_args is None:
        default_args = DEFAULT_ARGS.copy()
    if config is None:
        config = PipelineConfig()

    with DAG(
        dag_id=dag_id,
        default_args=default_args,
        description="EIA Forecasting Pipeline",
        schedule_interval=schedule,
        start_date=start_date,
        catchup=False,
        tags=["eia", "forecasting"],
    ) as dag:

        t_ingest = PythonOperator(
            task_id="ingest",
            python_callable=lambda: ingest_eia(config),
        )

        t_prepare = PythonOperator(
            task_id="prepare",
            python_callable=lambda: prepare_clean(str(config.raw_path()), config),
        )

        t_validate = PythonOperator(
            task_id="validate",
            python_callable=lambda: validate_clean(str(config.clean_path())),
        )

        def _train():
            lb = train_backtest_select(str(config.clean_path()), config)
            # returning a DataFrame is not ideal for XCom; write path is the artifact
            return str(config.leaderboard_path())

        t_train = PythonOperator(task_id="train", python_callable=_train)

        def _register():
            import pandas as pd
            lb = pd.read_parquet(config.leaderboard_path())
            return register_champion(lb, config, str(config.clean_path()))


        t_register = PythonOperator(task_id="register", python_callable=_register)

        t_forecast = PythonOperator(
            task_id="forecast",
            python_callable=lambda: forecast_publish(str(config.clean_path()), config),
        )

        t_ingest >> t_prepare >> t_validate >> t_train >> t_register >> t_forecast

    return dag


def build_dag_dot() -> str:
    """
    DOT graph fallback (works without Airflow) for notebook visualization.
    """
    return """digraph EIA_PIPELINE {
  rankdir=LR;
  node [shape=box, style="rounded,filled", fillcolor="#eef2ff"];

  ingest -> prepare -> validate -> train -> register -> forecast;
}"""


Overwriting src/chapter3/dag_builder.py


In [26]:
#%%writefile src/chapter3/cli.py
# file: src/chapter3/cli.py
from __future__ import annotations

import logging
import sys

import typer
from rich.console import Console
from rich.table import Table

from src.chapter3.config import PipelineConfig
from src.chapter3.tasks import run_full_pipeline

logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
app = typer.Typer(add_completion=False)
console = Console()


def _strip_ipykernel_args(argv: list[str]) -> list[str]:
    """Jupyter/ipykernel injects `-f <connection_file>` into sys.argv."""
    out = [argv[0]]
    i = 1
    while i < len(argv):
        a = argv[i]
        if a in ("-f", "--f"):
            i += 2  # skip flag + value
            continue
        if a.startswith("--f="):
            i += 1
            continue
        out.append(a)
        i += 1
    return out


@app.command()
def run(
    start_date: str = "2024-01-01",
    end_date: str = "2024-12-31",
    horizon: int = 24,
    respondent: str = "US48",
    fueltype: str = "NG",
    overwrite: bool = False,
):
    cfg = PipelineConfig(
        start_date=start_date,
        end_date=end_date,
        horizon=horizon,
        respondent=respondent,
        fueltype=fueltype,
        overwrite=overwrite,
    )

    results = run_full_pipeline(cfg)

    table = Table(title="Pipeline Results")
    table.add_column("Key", style="cyan")
    table.add_column("Value", style="green")

    for k, v in results.items():
        table.add_row(str(k), str(v))

    console.print(table)


if __name__ == "__main__":
    sys.argv = _strip_ipykernel_args(sys.argv)
    app(standalone_mode=False)  # <-- prevents SystemExit in Jupyter



2026-01-10 15:38:37,513 | INFO | START PIPELINE
2026-01-10 15:38:37,514 | INFO | [ingest] raw exists, skipping: data\raw.parquet
2026-01-10 15:38:37,515 | INFO | [prepare] clean exists, skipping: data\clean.parquet
2026-01-10 15:38:37,516 | INFO | Fetcher initialized (API key length: 40)
2026-01-10 15:38:37,521 | INFO | Prepared 8761 records for forecasting with unique_id=NG_US48
2026-01-10 15:38:37,525 | INFO | [validate] OK: {'status': 'valid', 'duplicate_pairs': 0, 'missing_hours': 0, 'longest_gap_hours': 1.0, 'gaps_detail': [], 'n_rows': 8761}
2026-01-10 15:38:37,525 | INFO | [train] leaderboard exists, skipping: artifacts\leaderboard.parquet


Step 1: Fetcher initialized (API key length: 40)


2026-01-10 15:38:37,528 | INFO | Fetcher initialized (API key length: 40)
2026-01-10 15:38:37,532 | INFO | Prepared 8761 records for forecasting with unique_id=NG_US48


Step 1: Fetcher initialized (API key length: 40)

MODEL REGISTRY
  Best model: MSTL
  RMSE: 10311
  Registering with alias: champion



Hint: Inferred schema contains integer column(s). Integer columns in Python cannot represent missing values. If your input data contains missing values at inference time, it will be encoded as floats and will cause a schema enforcement error. The best way to avoid this problem is to infer the model schema based on a realistic data sample (training dataset) that includes missing values. Alternatively, you can declare integer columns as doubles (float64) whenever these columns may have missing values. See `Handling Integers With Missing Values <https://www.mlflow.org/docs/latest/models.html#handling-integers-with-missing-values>`_ for more details.

2026/01/10 15:38:38 INFO mlflow.pyfunc: Validating input example against model signature


Downloading artifacts:   0%|          | 0/7 [00:00<?, ?it/s]

Registered model 'eia_forecasting_MSTL' already exists. Creating a new version of this model...
Created version '4' of model 'eia_forecasting_MSTL'.
2026-01-10 15:38:38,666 | INFO | Model registered: eia_forecasting_MSTL v4 with alias champion
2026-01-10 15:38:38,674 | INFO | [register] model_uri=eia_forecasting_MSTL@champion
2026-01-10 15:38:38,675 | INFO | [forecast] predictions exist, skipping: artifacts\predictions.parquet
2026-01-10 15:38:38,676 | INFO | PIPELINE COMPLETE


  [OK] Registered: eia_forecasting_MSTL v4
  [OK] Alias 'champion' assigned to v4


# Chapter 4 — Monitoring, Drift Detection & Alerts

## Outcomes (what I can do after this)

- [ ] I can persist forecasts and actuals into a queryable database
- [ ] I can compute rolling accuracy metrics and detect model drift
- [ ] I can set alert thresholds based on backtest performance
- [ ] I can run health checks (freshness, completeness, forecast staleness)
- [ ] I can interpret drift reports and decide when to retrain

## Concepts (plain English)

- **Forecast persistence**: Store predictions in a time-series database for later scoring
- **Actuals**: Real observed values that come in after the forecast horizon (e.g., 24 hours later)
- **Scoring**: Comparing predictions vs actuals using metrics (RMSE, MAPE, coverage)
- **Drift**: Model performance degrades over time (e.g., MAPE increases from 5% to 8%)
- **Restatement**: Re-forecasting recent periods as new actuals arrive (e.g., rescore last 7 days hourly)
- **Health check**: Monitoring data freshness, completeness, and forecast staleness
- **Alert threshold**: Metric value that triggers an alert (e.g., MAPE > 10% is critical)
- **Rolling window**: Computing metrics over a sliding time window (e.g., last 7 days)

## Architecture (what we're building)

### Inputs
- **Forecasts table** (from Chapter 3):
  - Columns: run_id, created_ts_utc, unique_id, ds, model, yhat, lo, hi
  - One row per (model, unique_id, hour)

- **Actuals** (append-only stream):
  - Same schema as forecasts, but marked as observed
  - Arrives continuously (e.g., hourly updates from Chapter 1 data fetch)

- **Backtest metrics** (from Chapter 2):
  - Historical RMSE, MAPE, coverage from cross-validation
  - Used to compute alert thresholds (mean ± k*std)

### Outputs
- **Metrics database** (SQLite):
  - `pipeline_runs`: Execution log (when pipeline ran, status, rows processed)
  - `forecasts`: Stored predictions (queryable by model, date range, unique_id)
  - `forecast_scores`: Rolling metrics (RMSE/MAPE/coverage per horizon)
  - `alerts`: Alert events (triggered when threshold breached)

- **Health report**:
  - Data freshness: Hours since last ingest
  - Data completeness: Missing hours in recent window
  - Forecast freshness: Hours since predictions generated

### Invariants (must always hold)
- Forecast stored once, scored multiple times (as actuals arrive)
- Alerts are immutable (never deleted, only logged)
- Health checks don't require actuals (can run in real-time, before scoring)
- No missing primary keys: (run_id, model, unique_id, ds) uniquely identifies a forecast

### Failure modes
- Forecast not yet scored: yhat exists but no metrics (actuals haven't arrived yet)
- Actuals missing entirely: Can't score forecasts → alert STALE_FORECAST
- Data drift (schema changed): New columns appear; scoring fails → alert PIPELINE_FAILURE
- Threshold too sensitive: Generates too many alerts → tune k (std multiplier)

## Files touched

- **`src/chapter4/db.py`** (279 lines)
  - `init_monitoring_db()`: Creates SQLite schema (pipeline_runs, forecasts, forecast_scores, alerts)
  - `MonitoringDB` class: Read/write interface to monitoring database

- **`src/chapter4/forecast_store.py`** (code present but not in line count)
  - `persist_forecasts()`: Unpivots wide forecast table → long table → writes to forecasts table

- **`src/chapter4/scoring.py`** (code present)
  - `score_forecasts()`: Join forecasts with actuals, compute RMSE/MAPE/coverage per horizon

- **`src/chapter4/drift.py`** (code present)
  - `compute_drift_threshold_from_backtest()`: Threshold = best_metric + k*std
  - `rolling_accuracy()`: Query historical metrics for a model/series/horizon
  - `detect_drift()`: Compare latest metric to threshold, return drift/ok/no_data
  - `write_alert()`: Persist alert to database

- **`src/chapter4/health.py`** (code present)
  - `check_freshness()`: Hours since last ingest
  - `check_completeness()`: Missing hours in recent data
  - `check_forecast_freshness()`: Hours since predictions generated
  - `full_health_check()`: Combined report

- **`src/chapter4/alerts.py`** (279 lines)
  - `AlertSeverity` enum: INFO, WARNING, CRITICAL
  - `AlertType` enum: STALE_DATA, MISSING_DATA, STALE_FORECAST, DATA_DRIFT, MODEL_DRIFT, PIPELINE_FAILURE
  - `AlertConfig`: Thresholds for data freshness, completeness, forecast age, drift severity
  - `check_alerts()`: Evaluate health + drift against thresholds
  - `send_alert()`: Route to log/email/slack (currently log-only)

- **`src/chapter4/run_log.py`** (code present)
  - `log_run()`: Log pipeline execution (start time, status, row counts, duration)

- **`src/chapter4/config.py`** (21 lines)
  - Configuration for alert thresholds and monitoring

## Step-by-step walkthrough

### 1) Initialize monitoring database
```python
from src.chapter4.db import init_monitoring_db

db_path = "monitoring.db"
init_monitoring_db(db_path)

# Verify schema created
import sqlite3
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
print(f"Tables: {[t[0] for t in tables]}")
```
- **Expect**: 4 tables created: `[pipeline_runs, forecasts, forecast_scores, alerts]`
- **If it fails**: Check that db_path is writable and SQLite is installed

### 2) Persist forecasts from Chapter 3
```python
from src.chapter4.forecast_store import persist_forecasts
import pandas as pd

# Load forecast from Chapter 3 (wide format)
forecasts_wide = pd.read_parquet("artifacts/predictions.parquet")
print(f"Wide shape: {forecasts_wide.shape}, Columns: {forecasts_wide.columns.tolist()}")

# Persist to monitoring DB (converts to long format)
persist_forecasts(
    db_path="monitoring.db",
    forecasts_wide=forecasts_wide,
    run_id="run_20240101_060000",
    model_name="AutoARIMA"
)

# Verify in DB
conn = sqlite3.connect("monitoring.db")
df_stored = pd.read_sql_query(
    "SELECT * FROM forecasts LIMIT 5",
    conn
)
print(f"\nStored forecasts:\n{df_stored}")
```
- **Expect**: 5 rows with columns [run_id, created_ts_utc, unique_id, ds, model, yhat, lo, hi]
- **If it fails**: forecasts_wide may have wrong schema; check expected wide columns (model name, model-lo-95, model-hi-95)

### 3) Check data freshness and completeness
```python
from src.chapter4.health import full_health_check

report = full_health_check(
    clean_data_path="data/clean.parquet",
    predictions_path="artifacts/predictions.parquet",
    now=pd.Timestamp.utcnow()
)

print(f"Health Report:")
print(f"  Data freshness: {report['data_freshness_hours']:.1f} hours old")
print(f"  Data completeness: {report['missing_hours']:.0f} missing hours")
print(f"  Forecast freshness: {report['forecast_freshness_hours']:.1f} hours old")
```
- **Expect**:
  - data_freshness_hours ≈ 0 (just ingested)
  - missing_hours = 0 (complete data)
  - forecast_freshness_hours ≈ 0 (just generated)
- **If it fails**: Check that clean_data_path and predictions_path exist

### 4) Compute drift threshold from backtest
```python
from src.chapter4.drift import compute_drift_threshold_from_backtest
import pandas as pd

# Load backtest leaderboard from Chapter 2
leaderboard = pd.read_parquet("artifacts/leaderboard.parquet")
champion_model = leaderboard.iloc[0]['model']
backtest_rmse_mean = leaderboard.iloc[0]['rmse_mean']
backtest_rmse_std = leaderboard.iloc[0]['rmse_std']

# Threshold = mean + 2*std (configurable k)
threshold = compute_drift_threshold_from_backtest(
    metric_mean=backtest_rmse_mean,
    metric_std=backtest_rmse_std,
    k=2.0,  # Standard deviations above mean
    metric_name="RMSE"
)

print(f"Backtest RMSE: {backtest_rmse_mean:.2f} ± {backtest_rmse_std:.2f}")
print(f"Drift threshold (RMSE > {threshold:.2f}): Alert triggered")
```
- **Expect**: threshold ≈ backtest_rmse_mean + 2*backtest_rmse_std
- **If it fails**: leaderboard may not have expected columns (rmse_mean, rmse_std)

### 5) Score forecasts vs actuals
```python
from src.chapter4.scoring import score_forecasts
import pandas as pd

# Load stored forecasts
conn = sqlite3.connect("monitoring.db")
forecasts_df = pd.read_sql_query(
    "SELECT * FROM forecasts WHERE run_id='run_20240101_060000'",
    conn
)

# Load actuals (same schema as forecasts, but y_actual instead of yhat)
# For now, use clean data as "actuals" (in real deployment, actuals come from live ingest)
actuals_df = pd.read_parquet("data/clean.parquet")
actuals_df.columns = ['unique_id', 'ds', 'y_actual']

# Score (compute metrics)
scores = score_forecasts(
    forecasts=forecasts_df,
    actuals=actuals_df,
    horizon_hours=24
)

print(f"Scores shape: {scores.shape}")
print(f"Columns: {scores.columns.tolist()}")
print(f"Sample:\n{scores.head()}")
```
- **Expect**: Columns [run_id, model, unique_id, horizon_hours, rmse, mape, coverage_pct, valid_rows]
- **If it fails**: actuals_df schema may not match; check column names

### 6) Detect drift
```python
from src.chapter4.drift import detect_drift

# Get rolling metrics for the model over last 7 days
rolling_metrics = rolling_accuracy(
    db_path="monitoring.db",
    model_name="AutoARIMA",
    unique_id="NG_US48",
    horizon_hours=24,
    window_days=7
)

# Latest metric (most recent day)
if len(rolling_metrics) > 0:
    latest_rmse = rolling_metrics.iloc[-1]['rmse']
    threshold = 12.5  # From step 4

    status = detect_drift(
        latest_metric=latest_rmse,
        threshold=threshold,
        metric_name="RMSE"
    )

    print(f"Latest RMSE: {latest_rmse:.2f}")
    print(f"Threshold: {threshold:.2f}")
    print(f"Status: {status}")
else:
    print("No historical metrics; skipping drift check")
```
- **Expect**: status = "ok" (if latest_rmse < threshold) or "drift" (if latest_rmse > threshold)
- **If it fails**: rolling_metrics may be empty (no historical data yet); this is normal on first run

### 7) Run health checks and trigger alerts
```python
from src.chapter4.alerts import check_alerts, AlertConfig
from src.chapter4.db import MonitoringDB

# Set alert config
config = AlertConfig(
    data_freshness_warning_hours=4,
    data_freshness_critical_hours=6,
    missing_data_warning_hours=1,
    missing_data_critical_hours=3,
    forecast_freshness_warning_hours=12,
    forecast_freshness_critical_hours=24,
    drift_warning_threshold=0.1,      # 10% increase
    drift_critical_threshold=0.3,     # 30% increase
    rmse_increase_warning_pct=15,
    rmse_increase_critical_pct=25
)

# Run checks
health = full_health_check(...)
db = MonitoringDB("monitoring.db")

alerts = check_alerts(
    health_report=health,
    config=config,
    db=db,
    model_name="AutoARIMA"
)

print(f"Alerts triggered: {len(alerts)}")
for alert in alerts:
    print(f"  [{alert.severity}] {alert.alert_type}: {alert.message}")
```
- **Expect**: No alerts if system is healthy
- **If it fails**: Check health report values (freshness, completeness) against thresholds

### 8) Query historical metrics
```python
import pandas as pd
import sqlite3

conn = sqlite3.connect("monitoring.db")

# Leaderboard: average metrics per model (across all dates/unique_ids)
leaderboard = pd.read_sql_query("""
    SELECT
        model,
        ROUND(AVG(rmse), 2) as avg_rmse,
        ROUND(AVG(mape), 2) as avg_mape,
        ROUND(AVG(coverage_pct), 1) as avg_coverage,
        COUNT(*) as n_scores
    FROM forecast_scores
    GROUP BY model
    ORDER BY avg_rmse
""", conn)
print(f"Leaderboard:\n{leaderboard}")

# Time-series of RMSE for champion model
rmse_over_time = pd.read_sql_query("""
    SELECT
        DATE(scored_ts_utc) as date,
        ROUND(AVG(rmse), 2) as daily_rmse
    FROM forecast_scores
    WHERE model = 'AutoARIMA'
    GROUP BY DATE(scored_ts_utc)
    ORDER BY date DESC
    LIMIT 14
""", conn)
print(f"\nRMSE over last 14 days:\n{rmse_over_time}")

# Alerts log
alerts = pd.read_sql_query("""
    SELECT alert_ts_utc, alert_type, severity, message
    FROM alerts
    ORDER BY alert_ts_utc DESC
    LIMIT 10
""", conn)
print(f"\nRecent alerts:\n{alerts}")
```
- **Expect**:
  - Leaderboard sorted by avg_rmse (champion first)
  - RMSE values stable or gradually increasing (if drifting)
  - Alerts only if thresholds breached
- **If it fails**: No data in tables (need to run earlier steps first)

## Metrics & success criteria

### Primary metric
- **Drift detection latency**: Alert triggered within 1 hour of drift occurring

### Secondary metrics
- **False positive rate**: <5% of alerts are spurious (threshold too sensitive)
- **Coverage tracking**: Prediction intervals stay well-calibrated (coverage ≈ 95%)
- **Data freshness**: Max age of ingested data < 6 hours

### "Good enough" threshold
- Alert thresholds based on backtest performance (mean ± 2*std)
- Health checks run daily with no critical alerts
- Forecasts are scored within 24 hours of horizon completion

### What would make me retrain / change monitoring
- MAPE increases > 50% vs backtest → drift detected → retrain
- Coverage < 85% or > 99% → intervals miscalibrated → recalibrate or add conformal prediction
- Data freshness > 24 hours → data pipeline broken → investigate ingestion
- >10% of forecasts missing actuals → scoring broken → investigate data join

## Pitfalls (things that commonly break)

1. **No actuals data initially**:
   - Forecasts arrive immediately, but actuals take 24+ hours to arrive
   - Scoring fails if you try to score today's 24-hour forecast (actuals not yet observed)
   - **Fix**: Implement "future actuals" step: daily ingest fetches latest actuals and back-scores forecasts

2. **Threshold too aggressive**:
   - If k=1 (mean + 1*std), threshold is too low → constant false alarms
   - If k=3 (mean + 3*std), threshold too high → misses real drift
   - **Recommendation**: Start with k=2, tune based on false positive rate after 2 weeks

3. **Rolling window includes forecast horizon**:
   - If scoring yesterday's forecast with today's data, that's correct (forecast completed)
   - If trying to score today's forecast with today's data, that's wrong (horizon incomplete)
   - **Fix**: Only score forecasts where horizon_date ≤ today - 1 day

4. **Alerts not being sent**:
   - Current implementation logs alerts to database; email/Slack are stubs
   - If you expect email alerts, they won't come
   - **Fix**: Implement send_alert() with real email/Slack integration

5. **No baseline for drift detection**:
   - First few days of scores are too sparse to compare rolling window
   - Drift detection only works after backtest threshold is set
   - **Fix**: Seed threshold from backtest leaderboard (not from live scores)

6. **Duplicate actuals**:
   - If data ingest is rerun for the same date range, actuals may duplicate
   - Scoring will count same actual twice → metrics become meaningless
   - **Fix**: Use `INSERT OR REPLACE` (upsert) when writing actuals; key on (unique_id, ds)

## Mini-checkpoint (prove you learned it)

Answer these:

1. **Why can't we score a 24-hour forecast immediately after generating it?** When is it safe to score?
2. **Explain the drift threshold formula**: threshold = mean + k*std. What does k control?
3. **What's the difference between a forecast not being scored yet vs. a forecast drifting?** How does the system tell them apart?
4. **If the threshold is threshold = 12.5 RMSE and latest RMSE = 12.3, should an alert trigger?**

**Answers:**
1. We can't score it immediately because the actual value isn't observed yet (it's 24 hours in the future). Safe to score once the forecast period is complete and actuals are available (next day).
2. k=std multiplier. Higher k = wider band (fewer alerts, lower false positive rate). Lower k = tighter band (more alerts, higher false positive rate). k=2 is standard (95% of data under normal distribution).
3. Not yet scored: yhat exists, no metric row in forecast_scores (actuals haven't arrived). Drifting: metric row exists, but rmse > threshold. System checks both: if forecast too old without score → STALE_FORECAST alert; if metric exists and rmse > threshold → DRIFT alert.
4. No alert. Latest RMSE (12.3) < threshold (12.5), so status="ok". Alert only triggers if latest RMSE > 12.5.

## Exercises (optional, but recommended)

### Easy
1. Run full health check 3 times (at different times). How do freshness values change? When does data become "critical"?
2. Query the alerts table and count alerts by type and severity. Which type is most common?

### Medium
1. Manually modify one forecast's yhat value (e.g., multiply by 2) and re-score. How much does RMSE change? Does it trigger a drift alert?
2. Set drift threshold very low (k=0.5) and re-run drift detection. How many false alarms do you get?
3. Write a SQL query to find the longest period without any forecast scores (data gap). Investigate why.

### Hard
1. Implement restatement logic: for each day, re-score all forecasts from last 7 days as new actuals arrive. Verify RMSE stabilizes over the 7-day window.
2. Build a dashboard query that shows rolling 7-day RMSE for each model. Plot to CSV or JSON for visualization.
3. Implement automatic threshold tuning: adjust k based on false positive rate from last 30 days. If >5% alerts are false (later retracted), reduce k; if <2% alerts, increase k.


In [18]:
%%writefile src/chapter4/config.py
# file: src/chapter4/config.py
from __future__ import annotations
from dataclasses import dataclass

@dataclass(frozen=True)
class MonitoringConfig:
    db_path: str = "artifacts/monitoring/monitoring.sqlite"

    # Restatement strategy (data can be revised)
    restatement_lookback_hours: int = 336  # 2 weeks (matches your notes)

    # Drift windows (rolling accuracy)
    roll_7d_hours: int = 24 * 7
    roll_14d_hours: int = 24 * 14

    # Drift threshold policy (data-driven, computed from backtests)
    # e.g. threshold = mean + k*std (k chosen)
    drift_std_k: float = 2.0

    # Alert routing (keep simple at first)
    alert_print_only: bool = True


Writing src/chapter4/config.py


In [19]:
%%writefile src/chapter4/db.py
# file: src/chapter4/db.py
from __future__ import annotations
import sqlite3
from pathlib import Path
from typing import Iterable, Optional, Tuple

def connect(db_path: str) -> sqlite3.Connection:
    Path(db_path).parent.mkdir(parents=True, exist_ok=True)
    con = sqlite3.connect(db_path)
    con.execute("PRAGMA journal_mode=WAL;")
    con.execute("PRAGMA synchronous=NORMAL;")
    return con

def init_db(db_path: str) -> None:
    con = connect(db_path)
    cur = con.cursor()

    cur.execute("""
    CREATE TABLE IF NOT EXISTS pipeline_runs (
        run_id TEXT PRIMARY KEY,
        ts_utc TEXT NOT NULL,
        status TEXT NOT NULL,
        step TEXT NOT NULL,
        message TEXT,
        raw_rows INTEGER,
        clean_rows INTEGER,
        duration_sec REAL
    );
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS forecasts (
        run_id TEXT NOT NULL,
        created_ts_utc TEXT NOT NULL,
        unique_id TEXT NOT NULL,
        ds TEXT NOT NULL,
        model TEXT NOT NULL,
        yhat REAL,
        lo REAL,
        hi REAL,
        PRIMARY KEY (run_id, model, unique_id, ds)
    );
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS forecast_scores (
        scored_ts_utc TEXT NOT NULL,
        run_id TEXT NOT NULL,
        model TEXT NOT NULL,
        unique_id TEXT NOT NULL,
        horizon_hours INTEGER NOT NULL,
        rmse REAL,
        mape REAL,
        coverage_pct REAL,
        valid_rows INTEGER,
        PRIMARY KEY (run_id, model, unique_id, horizon_hours)
    );
    """)

    cur.execute("""
    CREATE TABLE IF NOT EXISTS alerts (
        alert_ts_utc TEXT NOT NULL,
        alert_type TEXT NOT NULL,
        severity TEXT NOT NULL,
        message TEXT NOT NULL,
        metadata_json TEXT
    );
    """)

    con.commit()
    con.close()


Writing src/chapter4/db.py


In [20]:
%%writefile src/chapter4/run_log.py
# file: src/chapter4/run_log.py
from __future__ import annotations
from datetime import datetime, timezone
from typing import Optional
from src.chapter4.db import connect

def now_utc_iso() -> str:
    return datetime.now(timezone.utc).isoformat()

def log_run(
    db_path: str,
    run_id: str,
    status: str,
    step: str,
    message: Optional[str] = None,
    raw_rows: Optional[int] = None,
    clean_rows: Optional[int] = None,
    duration_sec: Optional[float] = None,
) -> None:
    con = connect(db_path)
    con.execute("""
        INSERT OR REPLACE INTO pipeline_runs
        (run_id, ts_utc, status, step, message, raw_rows, clean_rows, duration_sec)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    """, (run_id, now_utc_iso(), status, step, message, raw_rows, clean_rows, duration_sec))
    con.commit()
    con.close()


Writing src/chapter4/run_log.py


In [21]:
%%writefile src/chapter4/forecast_store.py
# file: src/chapter4/forecast_store.py
from __future__ import annotations
from datetime import datetime, timezone
import pandas as pd
from src.chapter4.db import connect

def _utc_iso() -> str:
    return datetime.now(timezone.utc).isoformat()

def persist_forecasts(
    db_path: str,
    run_id: str,
    forecast_df: pd.DataFrame,
    confidence_level: int = 95
) -> None:
    """
    Expects StatsForecast output:
      columns include: unique_id, ds, <model1>, <model1>-lo-95, <model1>-hi-95, ...
    Writes long format into SQLite.
    """
    required = {"unique_id", "ds"}
    if not required.issubset(forecast_df.columns):
        raise ValueError(f"forecast_df missing {required}, got {forecast_df.columns.tolist()}")

    df = forecast_df.copy()
    df["ds"] = pd.to_datetime(df["ds"], errors="raise", utc=True).dt.tz_localize(None)

    # Model columns = non-metadata and not interval cols
    model_cols = [
        c for c in df.columns
        if c not in ("unique_id", "ds")
        and not c.endswith(f"-lo-{confidence_level}")
        and not c.endswith(f"-hi-{confidence_level}")
    ]

    rows = []
    created_ts = _utc_iso()

    for m in model_cols:
        lo_col = f"{m}-lo-{confidence_level}"
        hi_col = f"{m}-hi-{confidence_level}"
        has_int = (lo_col in df.columns) and (hi_col in df.columns)

        tmp = df[["unique_id", "ds", m]].rename(columns={m: "yhat"})
        tmp["model"] = m
        tmp["lo"] = df[lo_col] if has_int else None
        tmp["hi"] = df[hi_col] if has_int else None
        tmp["run_id"] = run_id
        tmp["created_ts_utc"] = created_ts

        rows.append(tmp)

    out = pd.concat(rows, ignore_index=True)

    con = connect(db_path)
    con.executemany("""
        INSERT OR REPLACE INTO forecasts
        (run_id, created_ts_utc, unique_id, ds, model, yhat, lo, hi)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    """, [
        (
            r.run_id,
            r.created_ts_utc,
            r.unique_id,
            str(r.ds),
            r.model,
            None if pd.isna(r.yhat) else float(r.yhat),
            None if pd.isna(r.lo) else float(r.lo),
            None if pd.isna(r.hi) else float(r.hi),
        )
        for r in out.itertuples(index=False)
    ])
    con.commit()
    con.close()


Writing src/chapter4/forecast_store.py


In [22]:
%%writefile src/chapter4/scoring.py
# file: src/chapter4/scoring.py
from __future__ import annotations
from datetime import datetime, timezone
import numpy as np
import pandas as pd
from src.chapter4.db import connect

def _utc_iso() -> str:
    return datetime.now(timezone.utc).isoformat()

def score_forecasts(
    db_path: str,
    actuals_df: pd.DataFrame,
    max_horizon_hours: int = 72
) -> pd.DataFrame:
    """
    actuals_df must be statsforecast format: unique_id, ds, y (timezone-naive UTC preferred)

    Scores all forecasts in DB that match actuals_df on (unique_id, ds).
    Aggregates per (run_id, model, unique_id, horizon_hours).

    horizon_hours is computed as ds - forecast_created_ts (rounded to hours).
    """
    required = {"unique_id", "ds", "y"}
    if not required.issubset(actuals_df.columns):
        raise ValueError(f"actuals_df missing {required}, got {actuals_df.columns.tolist()}")

    act = actuals_df.copy()
    act["ds"] = pd.to_datetime(act["ds"], errors="raise", utc=True).dt.tz_localize(None)

    con = connect(db_path)
    fc = pd.read_sql_query("SELECT * FROM forecasts", con)
    con.close()

    if fc.empty:
        return pd.DataFrame()

    fc["ds"] = pd.to_datetime(fc["ds"], errors="raise")
    fc["created_ts_utc"] = pd.to_datetime(fc["created_ts_utc"], errors="raise", utc=True).dt.tz_localize(None)

    merged = fc.merge(act, on=["unique_id", "ds"], how="inner")
    if merged.empty:
        return pd.DataFrame()

    merged["horizon_hours"] = ((merged["ds"] - merged["created_ts_utc"]).dt.total_seconds() / 3600.0).round().astype(int)
    merged = merged[(merged["horizon_hours"] >= 1) & (merged["horizon_hours"] <= max_horizon_hours)]

    def rmse(y, yhat):
        m = np.isfinite(y) & np.isfinite(yhat)
        if m.sum() == 0:
            return np.nan, 0
        return float(np.sqrt(np.mean((y[m] - yhat[m]) ** 2))), int(m.sum())

    def mape(y, yhat):
        m = np.isfinite(y) & np.isfinite(yhat) & (np.abs(y) > 1e-12)
        if m.sum() == 0:
            return np.nan
        return float(np.mean(np.abs((y[m] - yhat[m]) / y[m])))

    def coverage(y, lo, hi):
        m = np.isfinite(y) & np.isfinite(lo) & np.isfinite(hi)
        if m.sum() == 0:
            return np.nan
        return float(100.0 * np.mean((y[m] >= lo[m]) & (y[m] <= hi[m])))

    rows = []
    for (run_id, model, uid, h), g in merged.groupby(["run_id", "model", "unique_id", "horizon_hours"]):
        y = g["y"].to_numpy()
        yhat = g["yhat"].to_numpy()
        lo = g["lo"].to_numpy()
        hi = g["hi"].to_numpy()

        r, n = rmse(y, yhat)
        rows.append({
            "scored_ts_utc": _utc_iso(),
            "run_id": run_id,
            "model": model,
            "unique_id": uid,
            "horizon_hours": int(h),
            "rmse": r,
            "mape": mape(y, yhat),
            "coverage_pct": coverage(y, lo, hi),
            "valid_rows": n,
        })

    scores = pd.DataFrame(rows)

    con = connect(db_path)
    con.executemany("""
        INSERT OR REPLACE INTO forecast_scores
        (scored_ts_utc, run_id, model, unique_id, horizon_hours, rmse, mape, coverage_pct, valid_rows)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
    """, [
        (
            r.scored_ts_utc, r.run_id, r.model, r.unique_id, r.horizon_hours,
            None if pd.isna(r.rmse) else float(r.rmse),
            None if pd.isna(r.mape) else float(r.mape),
            None if pd.isna(r.coverage_pct) else float(r.coverage_pct),
            int(r.valid_rows),
        )
        for r in scores.itertuples(index=False)
    ])
    con.commit()
    con.close()

    return scores


Writing src/chapter4/scoring.py


In [23]:
%%writefile src/chapter4/drift.py
# file: src/chapter4/drift.py
from __future__ import annotations
import json
from datetime import datetime, timezone
import pandas as pd
from src.chapter4.db import connect

def _utc_iso() -> str:
    return datetime.now(timezone.utc).isoformat()

def compute_drift_threshold_from_backtest(
    leaderboard_path: str,
    metric_col: str = "mape_mean",
    std_col: str = "mape_std",
    k: float = 2.0
) -> float:
    lb = pd.read_parquet(leaderboard_path)
    if lb.empty:
        raise ValueError("Leaderboard is empty; cannot compute drift threshold.")
    best = lb.iloc[0]
    if metric_col not in lb.columns or std_col not in lb.columns:
        raise ValueError(f"Expected {metric_col} and {std_col} in leaderboard columns: {lb.columns.tolist()}")
    return float(best[metric_col] + k * best[std_col])

def rolling_accuracy(
    db_path: str,
    model: str,
    unique_id: str,
    horizon_hours: int = 24
) -> pd.DataFrame:
    con = connect(db_path)
    df = pd.read_sql_query("""
        SELECT scored_ts_utc, run_id, model, unique_id, horizon_hours, rmse, mape, coverage_pct, valid_rows
        FROM forecast_scores
        WHERE model = ? AND unique_id = ? AND horizon_hours = ?
        ORDER BY scored_ts_utc ASC
    """, con, params=(model, unique_id, int(horizon_hours)))
    con.close()

    if df.empty:
        return df

    df["scored_ts_utc"] = pd.to_datetime(df["scored_ts_utc"], errors="raise", utc=True)
    return df

def detect_drift(
    db_path: str,
    leaderboard_path: str,
    model: str,
    unique_id: str,
    horizon_hours: int,
    k: float = 2.0,
) -> dict:
    thr = compute_drift_threshold_from_backtest(leaderboard_path, k=k)
    hist = rolling_accuracy(db_path, model=model, unique_id=unique_id, horizon_hours=horizon_hours)

    if hist.empty:
        return {"status": "no_data", "threshold": thr}

    latest = hist.iloc[-1]
    drifted = (pd.notna(latest["mape"]) and float(latest["mape"]) > thr)

    return {
        "status": "drift" if drifted else "ok",
        "threshold": float(thr),
        "latest_mape": None if pd.isna(latest["mape"]) else float(latest["mape"]),
        "latest_rmse": None if pd.isna(latest["rmse"]) else float(latest["rmse"]),
        "latest_scored_ts": str(latest["scored_ts_utc"]),
        "model": model,
        "unique_id": unique_id,
        "horizon_hours": int(horizon_hours),
    }

def write_alert(db_path: str, alert_type: str, severity: str, message: str, metadata: dict) -> None:
    con = connect(db_path)
    con.execute("""
        INSERT INTO alerts (alert_ts_utc, alert_type, severity, message, metadata_json)
        VALUES (?, ?, ?, ?, ?)
    """, (_utc_iso(), alert_type, severity, message, json.dumps(metadata)))
    con.commit()
    con.close()


Overwriting src/chapter4/drift.py


In [24]:
%%writefile src/chapter4/dashboard_app.py
# file: src/chapter4/dashboard_app.py
import pandas as pd
import streamlit as st
from src.chapter4.db import connect

st.set_page_config(page_title="EIA Forecast Monitoring", layout="wide")
st.title("EIA Forecast Monitoring")

db_path = st.sidebar.text_input("DB Path", "artifacts/monitoring/monitoring.sqlite")

con = connect(db_path)

runs = pd.read_sql_query("SELECT * FROM pipeline_runs ORDER BY ts_utc DESC LIMIT 100", con)
alerts = pd.read_sql_query("SELECT * FROM alerts ORDER BY alert_ts_utc DESC LIMIT 200", con)
scores = pd.read_sql_query("SELECT * FROM forecast_scores ORDER BY scored_ts_utc DESC LIMIT 2000", con)

con.close()

c1, c2 = st.columns(2)
with c1:
    st.subheader("Recent Pipeline Runs")
    st.dataframe(runs, use_container_width=True)

with c2:
    st.subheader("Alerts")
    st.dataframe(alerts, use_container_width=True)

st.subheader("Accuracy (Scored Forecasts)")
if not scores.empty:
    scores["scored_ts_utc"] = pd.to_datetime(scores["scored_ts_utc"], utc=True)
    st.line_chart(
        scores.sort_values("scored_ts_utc").set_index("scored_ts_utc")[["mape", "rmse"]],
        use_container_width=True
    )
    st.dataframe(scores.head(200), use_container_width=True)
else:
    st.info("No scored forecasts yet. Run the scoring job after actuals arrive.")


Writing src/chapter4/dashboard_app.py
