# Data Pipeline & Alpha Factors - Chapters 02-05

This notebook demonstrates the Puffin data pipeline, covering the full workflow from
data acquisition through alpha factor computation to portfolio optimization.

**Chapters covered:**
- **Ch 02 - Data Pipeline**: Data providers, caching, storage, and preprocessing
- **Ch 03 - Alternative Data**: Web scraping, transcript parsing (not covered here)
- **Ch 04 - Alpha Factors**: Momentum, volatility, technical indicators, Kalman filter, factor evaluation
- **Ch 05 - Portfolio Optimization**: Mean-variance, risk parity, hierarchical risk parity, tearsheets

**Key concepts:**
- Strategy pattern for swappable `DataProvider` implementations
- SQLite caching to avoid redundant API calls
- Parquet/HDF5 persistent storage via `MarketDataStore`
- Multi-horizon momentum and volatility factor construction
- Kalman filter for signal denoising and trend extraction
- Mean-variance, risk parity, and HRP portfolio construction

## 1. Data Providers

Puffin uses a `DataProvider` abstract base class with concrete implementations for different
data sources. `YFinanceProvider` fetches free historical OHLCV data from Yahoo Finance.
The strategy pattern means you can swap in `AlpacaProvider` or `IBKRDataProvider` without
changing downstream code.

In [None]:
from puffin.data import DataProvider, YFinanceProvider

# YFinanceProvider implements the DataProvider interface
provider = YFinanceProvider()
print(f"Supported assets: {provider.get_supported_assets()}")

# Fetch historical daily data for AAPL
aapl = provider.fetch_historical("AAPL", start="2022-01-01", end="2024-01-01")
print(f"\nShape: {aapl.shape}")
print(f"Index levels: {aapl.index.names}")
print(f"Columns: {list(aapl.columns)}")
aapl.head()

### Multi-Ticker Download

The same `fetch_historical` method handles multiple symbols, returning a MultiIndex
DataFrame indexed by `(Date, Symbol)`.

In [None]:
# Fetch a small universe for portfolio construction later
symbols = ["AAPL", "MSFT", "GOOGL", "AMZN", "META"]
multi = provider.fetch_historical(symbols, start="2022-01-01", end="2024-01-01")
print(f"Multi-ticker shape: {multi.shape}")
print(f"Symbols: {multi.index.get_level_values('Symbol').unique().tolist()}")
multi.tail()

## 2. Data Caching

`DataCache` stores OHLCV data in a local SQLite database so that repeated fetches
for the same symbol and date range hit the cache instead of the API. This is essential
for backtesting workflows where you repeatedly iterate on the same data.

In [None]:
import tempfile, os
from puffin.data import DataCache, MarketDataStore

# Create a temporary cache for demonstration
tmp_dir = tempfile.mkdtemp()
cache = DataCache(db_path=os.path.join(tmp_dir, "demo_cache.db"))

# Extract single-symbol data (drop the Symbol level for cache compatibility)
aapl_single = aapl.droplevel("Symbol")

# Store in cache
cache.put("AAPL", aapl_single, interval="1d")
print("Stored AAPL data in SQLite cache")

# Retrieve from cache
cached = cache.get("AAPL", start="2023-06-01", end="2023-12-31", interval="1d")
print(f"Retrieved {len(cached)} rows from cache")
cached.head()

### Persistent Storage with MarketDataStore

`MarketDataStore` provides Parquet or HDF5 file-based storage with metadata tracking.
This is more suitable for large datasets that need to persist across sessions.

In [None]:
# Create a MarketDataStore in Parquet format
store_dir = os.path.join(tmp_dir, "market_store")
store = MarketDataStore(store_dir, format="parquet")

# Save OHLCV data with metadata
store.save_ohlcv("AAPL", aapl_single, source="yfinance", frequency="1d")
print(f"Stored symbols: {store.list_symbols()}")

# Inspect metadata
print("\nMetadata:")
store.get_metadata("AAPL")

## 3. Preprocessing

The `preprocess()` function handles missing values (forward-fill, interpolation, or drop),
clips extreme return outliers, and validates OHLCV constraints (e.g., High >= Low,
non-negative prices and volume).

In [None]:
import numpy as np
import pandas as pd
from puffin.data import preprocess

# Demonstrate preprocessing on data with injected issues
dirty = aapl_single.copy()
dirty.iloc[10:13, dirty.columns.get_loc("Close")] = np.nan  # inject NaN
dirty.iloc[50, dirty.columns.get_loc("Volume")] = -100       # inject negative volume

print(f"Before preprocessing:")
print(f"  NaN count: {dirty.isna().sum().sum()}")
print(f"  Negative volume rows: {(dirty['Volume'] < 0).sum()}")

clean = preprocess(dirty, fill_method="ffill", remove_outliers=True, outlier_std=5.0)

print(f"\nAfter preprocessing:")
print(f"  NaN count: {clean.isna().sum().sum()}")
print(f"  Negative volume rows: {(clean['Volume'] < 0).sum()}")
print(f"  Shape preserved: {dirty.shape} -> {clean.shape}")

## 4. Alpha Factors

Alpha factors quantify signals that predict future returns. Puffin provides:
- **Momentum factors**: Returns over multiple horizons (5d, 21d, 63d, 252d)
- **Volatility factors**: Realized, Parkinson, and Garman-Klass estimators
- **`compute_all_factors()`**: Computes momentum + volatility in one call

Factors are returned with a `(date, symbol)` MultiIndex for cross-sectional analysis.

In [None]:
from puffin.factors import (
    compute_momentum_factors,
    compute_volatility_factors,
    compute_all_factors,
)

# Pivot multi-ticker data to wide format for factor computation
close_prices = multi["Close"].unstack("Symbol")
print(f"Close prices shape: {close_prices.shape}")

# Compute momentum factors
mom_factors = compute_momentum_factors(close_prices, windows=[5, 21, 63])
print(f"\nMomentum factors shape: {mom_factors.shape}")
print(f"Columns: {list(mom_factors.columns)}")
mom_factors.dropna().tail(10)

In [None]:
# Compute volatility factors
vol_factors = compute_volatility_factors(close_prices, windows=[21, 63])
print(f"Volatility factors shape: {vol_factors.shape}")
print(f"Columns: {list(vol_factors.columns)}")
vol_factors.dropna().tail(10)

In [None]:
# Compute all factors at once
all_factors = compute_all_factors(
    close_prices,
    momentum_windows=[5, 21, 63],
    volatility_windows=[21, 63],
)
print(f"Combined factors shape: {all_factors.shape}")
print(f"All columns: {list(all_factors.columns)}")

## 5. Technical Indicators

`TechnicalIndicators` provides a unified interface for computing overlap studies (SMA, EMA,
Bollinger Bands), momentum indicators (RSI, MACD, Stochastic), volume indicators (OBV, A/D),
and volatility indicators (ATR). It uses TA-Lib when available, with pure Python fallbacks.

In [None]:
from puffin.factors import TechnicalIndicators

# Prepare OHLCV dict for a single symbol
ohlcv = {
    "open": aapl_single["Open"],
    "high": aapl_single["High"],
    "low": aapl_single["Low"],
    "close": aapl_single["Close"],
    "volume": aapl_single["Volume"],
}

ti = TechnicalIndicators()
indicators = ti.compute_all(ohlcv, categories=["overlap", "momentum"])
print(f"Technical indicators shape: {indicators.shape}")
print(f"Columns: {list(indicators.columns)}")
indicators[["sma_20", "sma_50", "rsi", "macd"]].dropna().tail()

## 6. Kalman Filter for Trend Extraction

The `KalmanFilter` provides optimal recursive estimation for:
- **Signal denoising**: Removing noise from price series
- **Trend extraction**: Isolating the underlying trend via forward-backward smoothing
- **Dynamic hedge ratios**: Time-varying beta estimation for pairs trading

The `extract_trend()` convenience function applies a Kalman smoother with configurable
process and observation noise parameters.

In [None]:
import matplotlib.pyplot as plt
from puffin.factors import KalmanFilter, extract_trend

close = aapl_single["Close"]

# Extract trend with different smoothing levels
trend_smooth = extract_trend(close, process_variance=1e-6, observation_variance=1e-1)
trend_responsive = extract_trend(close, process_variance=1e-3, observation_variance=1e-1)

fig, ax = plt.subplots(figsize=(12, 5))
close.plot(ax=ax, alpha=0.5, label="Raw Close")
trend_smooth.plot(ax=ax, label="Smooth trend (Q=1e-6)", linewidth=2)
trend_responsive.plot(ax=ax, label="Responsive trend (Q=1e-3)", linewidth=2)
ax.set_title("Kalman Filter Trend Extraction - AAPL")
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

## 7. Factor Evaluation

`FactorEvaluator` measures the predictive power of alpha factors using:
- **Information Coefficient (IC)**: Correlation between factor values and forward returns
- **Factor returns**: Long-short portfolio returns by quantile
- **Turnover analysis**: How often factor rankings change
- **Full tearsheet**: All metrics combined

In [None]:
from puffin.factors import FactorEvaluator

# Prepare a momentum factor for evaluation
# The factor must be a Series with (date, asset) MultiIndex
mom_21 = mom_factors["mom_21"].dropna()
print(f"Factor shape: {mom_21.shape}")
print(f"Index names: {mom_21.index.names}")

# Evaluate the 21-day momentum factor
evaluator = FactorEvaluator(quantiles=5, periods=[1, 5, 21])

# Compute Information Coefficient
forward_returns_1d = close_prices.pct_change(1).shift(-1)
ic = evaluator.compute_ic(mom_21, forward_returns_1d, method="spearman")
print(f"\nMean IC (Spearman): {ic.mean():.4f}")
print(f"IC Std: {ic.std():.4f}")
print(f"IC IR (IC Mean / IC Std): {ic.mean() / (ic.std() + 1e-8):.4f}")

## 8. Portfolio Optimization

Puffin implements three portfolio construction methods:
- **Mean-Variance (Markowitz)**: Classic optimization maximizing Sharpe ratio or minimizing variance
- **Risk Parity**: Each asset contributes equally to total portfolio risk
- **Hierarchical Risk Parity (HRP)**: Clustering-based allocation for more stable weights

In [None]:
from puffin.portfolio import MeanVarianceOptimizer, risk_parity_weights, hrp_weights

# Compute daily returns for the universe
returns_df = close_prices.pct_change().dropna()

# --- Mean-Variance: Maximum Sharpe Ratio ---
mvo = MeanVarianceOptimizer()
max_sharpe = mvo.max_sharpe(returns_df)
print("=== Max Sharpe Portfolio ===")
for sym, w in zip(returns_df.columns, max_sharpe["weights"]):
    print(f"  {sym}: {w:.2%}")
print(f"  Expected Return: {max_sharpe['return'] * 252:.2%}")
print(f"  Volatility: {max_sharpe['risk'] * np.sqrt(252):.2%}")
print(f"  Sharpe: {max_sharpe['sharpe']:.3f}")

In [None]:
# --- Risk Parity ---
rp_weights = risk_parity_weights(returns_df)
print("=== Risk Parity Portfolio ===")
for sym, w in zip(returns_df.columns, rp_weights):
    print(f"  {sym}: {w:.2%}")

# --- Hierarchical Risk Parity ---
hrp_w = hrp_weights(returns_df, linkage_method="single")
print("\n=== HRP Portfolio ===")
for sym, w in zip(returns_df.columns, hrp_w):
    print(f"  {sym}: {w:.2%}")

# Compare all three methods
comparison = pd.DataFrame({
    "Max Sharpe": max_sharpe["weights"],
    "Risk Parity": rp_weights,
    "HRP": hrp_w,
}, index=returns_df.columns)
print("\n=== Weight Comparison ===")
comparison

## 9. Performance Tearsheet

`generate_tearsheet()` computes comprehensive portfolio statistics including annualized
return, Sharpe ratio, Sortino ratio, maximum drawdown, VaR/CVaR, and win rate.
`plot_returns()` visualizes cumulative performance against an optional benchmark.

In [None]:
from puffin.portfolio import generate_tearsheet, plot_returns, print_tearsheet_summary

# Simulate portfolio returns using HRP weights
portfolio_returns = (returns_df * hrp_w).sum(axis=1)
portfolio_returns.name = "HRP Portfolio"

# Equal-weight benchmark
equal_weights = np.ones(len(symbols)) / len(symbols)
benchmark_returns = (returns_df * equal_weights).sum(axis=1)
benchmark_returns.name = "Equal Weight"

# Generate tearsheet
tearsheet = generate_tearsheet(portfolio_returns, benchmark=benchmark_returns)
print_tearsheet_summary(tearsheet)

# Plot cumulative returns
fig = plot_returns(portfolio_returns, benchmark=benchmark_returns)
plt.show()

## Exercises

1. **Expand the universe**: Add 5 more symbols and re-run the portfolio optimization. How do the HRP weights change?
2. **Factor decay**: Use `FactorEvaluator.compute_factor_returns()` with periods `[1, 5, 21]` to see how momentum factor returns decay over longer horizons.
3. **Kalman crossover**: Use `kalman_ma_crossover()` from `puffin.factors` to generate trading signals and backtest the strategy.
4. **Compare volatility estimators**: Compute Parkinson and Garman-Klass volatility by passing OHLC data as a dict to `compute_volatility_factors()`. Plot all three estimators against each other.