# 01 Data Pipeline

This notebook builds a reproducible S&P 500 daily dataset (2003-2024) for volatility modeling.

Pipeline outputs:
- Raw OHLCV data (`data/raw/sp500_raw.csv`)
- Processed log-return dataset (`data/processed/sp500_log_returns.csv`)
- Rolling split registry (`data/processed/rolling_splits.csv`)

Design constraints:
- Log returns as the core target input
- Strict time-ordered splits to avoid lookahead bias
- Deterministic transformations for reproducibility


In [None]:
from __future__ import annotations

from pathlib import Path
import sys

import matplotlib.pyplot as plt
import pandas as pd

PROJECT_ROOT = Path.cwd().resolve()
if not (PROJECT_ROOT / "src").exists():
    PROJECT_ROOT = PROJECT_ROOT.parent
if str(PROJECT_ROOT) not in sys.path:
    sys.path.append(str(PROJECT_ROOT))

from src.data import PipelineConfig, generate_rolling_splits, run_data_pipeline
from src.utils import set_seed

set_seed(42)
pd.set_option("display.max_columns", 50)
pd.set_option("display.width", 120)


In [None]:
# Centralized config keeps experiments consistent across notebooks.
TICKER = "^GSPC"
START_DATE = "2003-01-01"
END_DATE = "2024-12-31"

RAW_DIR = PROJECT_ROOT / "data" / "raw"
PROCESSED_DIR = PROJECT_ROOT / "data" / "processed"

config = PipelineConfig(
    ticker=TICKER,
    start_date=START_DATE,
    end_date=END_DATE,
    raw_dir=RAW_DIR,
    processed_dir=PROCESSED_DIR,
)
config


In [None]:
raw_df, processed_df, raw_path, processed_path = run_data_pipeline(config)

print(f"Raw rows: {len(raw_df):,}")
print(f"Processed rows: {len(processed_df):,}")
print(f"Raw file: {raw_path}")
print(f"Processed file: {processed_path}")

processed_df.head()


In [None]:
# Basic data-quality checks for research reproducibility.
assert processed_df["date"].is_monotonic_increasing, "Dates must be sorted ascending."
assert processed_df["date"].is_unique, "Dates must be unique."
assert processed_df["log_return"].notna().all(), "Log returns must be non-null after cleaning."
assert (processed_df["adj_close"] > 0).all(), "Adjusted close must be positive."

summary = processed_df[["log_return", "sq_return", "rv_21d"]].describe().T
summary


In [None]:
splits_df = generate_rolling_splits(
    processed_df,
    min_train_size=756,  # ~3 years
    val_size=252,        # ~1 year
    test_size=21,        # ~1 month test block
    step_size=21,        # monthly rolling step (non-overlap with test block)
    expanding_train=True,
)

splits_path = PROCESSED_DIR / "rolling_splits.csv"
splits_df.to_csv(splits_path, index=False)

print(f"Generated {len(splits_df)} rolling splits")
print(f"Saved split registry: {splits_path}")
splits_df.head()


In [None]:
fig, axes = plt.subplots(2, 1, figsize=(12, 8), sharex=True)

axes[0].plot(processed_df["date"], processed_df["adj_close"], color="tab:blue")
axes[0].set_title("S&P 500 Adjusted Close")
axes[0].set_ylabel("Price")
axes[0].grid(alpha=0.25)

axes[1].plot(processed_df["date"], processed_df["log_return"], color="tab:red", linewidth=0.8)
axes[1].set_title("Daily Log Returns")
axes[1].set_ylabel("Log Return")
axes[1].set_xlabel("Date")
axes[1].grid(alpha=0.25)

plt.tight_layout()
plt.show()


## Next

Use `data/processed/sp500_log_returns.csv` and `data/processed/rolling_splits.csv` as the common input contract for:
- `02_garch_baseline.ipynb`
- `03_lstm_model.ipynb`
- `04_gru_model.ipynb`
- `05_hybrid_models.ipynb`
- `06_evaluation_qlike.ipynb`
- `07_gate_visualization.ipynb`
