# PCA Hedging Project — End-to-End Notebook (Steps 1→4)

This notebook runs the full pipeline with **one main builder** and shows diagnostics at each step.

**Inputs you need**
- Market data table: rows = dates, columns = tenors (e.g. `2Y, 3Y, ...`).
- (Optional) Sensitivities by tenor (DV01 per bp) to run the PnL tracking backtest.

**Outputs**
- Rolling PCA explained variance and loadings
- Family A anchors and mapping diagnostics
- Family B selected proxy instruments (outrights/spreads/flies)
- Backtest metrics: tracking RMSE/MAE/tails

> Tip: If you don’t have sensitivities yet, the notebook can generate **synthetic sensitivities** to validate the pipeline.


In [None]:
# --- Imports ---
import numpy as np
import pandas as pd

# Display options
pd.set_option("display.max_columns", 50)
pd.set_option("display.width", 140)

# --- Paste the unified pipeline code here (the one we built) ---
# You should paste:
# 1) Step 1 utilities + build_market_matrices
# 2) RollingPCAModel + PCAFitResult
# 3) Family A / Family B / Backtest helpers
# 4) PCAHedgeOptions + build_pca_hedge_model

# After pasting, run this cell.


In [None]:
# =========================
# STEP 1 — Load Market Data
# =========================
# Option A: read from a file
# - CSV: first column is date index (or set date_col)
# - Excel: set sheet_name and date_col if date is a column

DATA_MODE = "path"   # "path" | "raw_df" | "x_changes"
PATH = "curve_timeseries.csv"     # <- change
SHEET_NAME = None                # for Excel
DATE_COL = None                  # e.g. "Date" if date is a column
INDEX_COL = 0                    # if date is first column

# Units:
# - input_unit="pct"  means 2.35 is 2.35%
# - input_unit="dec"  means 0.0235 is 2.35%
# - input_unit="bp"   means 235 is 2.35%
UNITS = RateUnits(input_unit="pct", target_unit="bp")

# Optional tenor restriction (recommended to stabilize early):
# RESTRICT_TENORS = ["2Y","3Y","5Y","7Y","10Y","15Y","20Y","30Y"]
RESTRICT_TENORS = None

opts = PCAHedgeOptions(
    data_mode=DATA_MODE,
    path=PATH,
    sheet_name=SHEET_NAME,
    date_col=DATE_COL,
    index_col=INDEX_COL,
    restrict_tenors=RESTRICT_TENORS,
    units=UNITS,

    # PCA defaults (edit freely)
    n_components=3,
    pca_method="cov",
    pca_estimator="ledoitwolf",
    lookback="2Y",
    min_obs=252,

    # Families
    run_familyA=True,
    run_familyB=True,

    # Family A: set to fixed anchors first, then try auto
    familyA=AnchorSelectionConfig(
        anchors_mode="fixed",
        fixed_anchors=["3Y","5Y","10Y","20Y"],
        k_anchors=4,
        ridge_lambda=1e-6,
        cond_max=1e4,
        min_improvement_to_switch=0.002,
    ),

    # Family B stability selection (start conservative)
    familyB=StabilitySelectionConfig(
        n_subsamples=100,
        subsample_frac=0.6,
        alpha_grid=(0.001, 0.002, 0.005),
        l1_ratio=0.9,
        select_prob_threshold=0.6,
        max_features=1,      # set 2-3 for more robustness
        ridge_alpha=1e-6,
    ),
    familyB_max_spreads=2000,
    familyB_max_flies=4000,
    familyB_min_tenor_gap=2,
    familyB_restrict_by_kind=True,

    # Backtest disabled here; we enable after we load sensitivities
    run_backtest=False,
    verbose=True,
)

# Build model (Steps 1→3)
model = build_pca_hedge_model(opts)

levels = model.get("levels")      # may be None if data_mode="x_changes"
changes = model["changes"]
meta = model["meta"]

print("Meta:", meta)
print("Levels shape:", None if levels is None else levels.shape)
print("Changes shape:", changes.shape)

display(changes.tail())


In [None]:
# =====================
# STEP 2 — Rolling PCA
# =====================
pca_states = model["pca_states"]

print(f"Number of as-of PCA fits: {len(pca_states)}")
last_asof = list(pca_states.keys())[-1]
state = pca_states[last_asof]

print("Last asof:", last_asof)
print("Window:", state.window_start.date(), "→", state.window_end.date(), "n_obs:", state.n_obs)

evr = state.explained_var_ratio
display(evr.head(10))

print("Loadings (first 10 tenors):")
display(state.loadings.head(10))

# Quick look at PC1/PC2/PC3 time series in the last window
scores = state.scores_in_window
display(scores.tail())

# Optional plot (matplotlib)
import matplotlib.pyplot as plt

plt.figure()
scores["PC1"].plot()
plt.title("PC1 score (last calibration window)")
plt.show()

plt.figure()
scores["PC2"].plot()
plt.title("PC2 score (last calibration window)")
plt.show()

plt.figure()
scores["PC3"].plot()
plt.title("PC3 score (last calibration window)")
plt.show()


In [None]:
# =============================
# STEP 3A — Family A (Anchors)
# =============================
familyA = model.get("familyA", {})
print("Family A days:", len(familyA))

if familyA:
    lastA = familyA[list(familyA.keys())[-1]]
    print("Last asof:", lastA.asof)
    print("Anchors:", lastA.anchors)
    print(f"Reconstruction score={lastA.score:.6f} rmse={lastA.rmse:.4f} cond={lastA.cond:.1f}")

    print("Mapping B (anchors -> tenors) excerpt:")
    display(lastA.mapping_B.iloc[:, :10])

    # Show anchor evolution (how often they change)
    anchors_series = pd.Series({d: tuple(v.anchors) for d, v in familyA.items()})
    turnover = (anchors_series != anchors_series.shift(1)).sum()
    print(f"Anchor set changes over time: {int(turnover)} (out of {len(anchors_series)})")

    display(anchors_series.tail(15))


In [None]:
# =====================================
# STEP 3B — Family B (Sparse Proxies)
# =====================================
familyB = model.get("familyB", {})
print("Family B days:", len(familyB))

if familyB:
    last_date = list(familyB.keys())[-1]
    fits = familyB[last_date]
    print("Last asof:", last_date)

    for pc, fit in fits.items():
        print(f"{pc}: selected={fit.selected}")
        # Show top 10 selection probabilities
        display(fit.selection_prob.head(10))

    # Track how often selections change (union of selected instruments)
    union_sel = pd.Series({
        d: tuple(sorted(set(inst for f in day.values() for inst in f.selected)))
        for d, day in familyB.items()
    })
    changes_count = (union_sel != union_sel.shift(1)).sum()
    print(f"Union instrument set changes: {int(changes_count)} (out of {len(union_sel)})")
    display(union_sel.tail(15))


In [None]:
# =========================
# STEP 4 — Backtest (PnL)
# =========================
# Provide sensitivities in one of two ways:
#   A) sens_by_tenor: DataFrame (dates x tenors)
#   B) sens_vec:      Series (tenors)

USE_SYNTHETIC_SENS = True

if USE_SYNTHETIC_SENS:
    # Synthetic book for end-to-end validation (replace once desk sensitivities arrive)
    # Concentrated around 10Y, smooth across curve
    sens_vec = make_synthetic_sensitivities(changes, center_tenor="10Y", width=2.0, total_risk=1e6)
    sens_by_tenor = None
else:
    # Example: load real sensitivities
    # sens_by_tenor = pd.read_csv("sensitivities.csv", index_col=0, parse_dates=True)
    # sens_vec = None
    sens_by_tenor = None
    sens_vec = None

# Rebuild model with backtest enabled
opts_bt = opts
opts_bt.run_backtest = True
opts_bt.sens_by_tenor = sens_by_tenor
opts_bt.sens_vec = sens_vec
opts_bt.backtest = BacktestConfig(lookback=opts.lookback, min_obs=opts.min_obs, ridge_alpha=1e-6)

model_bt = build_pca_hedge_model(opts_bt)

print("Backtest keys:", [k for k in model_bt.keys() if k.startswith("backtest") or k.startswith("stats")])

if "statsA" in model_bt:
    print("Family A stats:", model_bt["statsA"])
if "statsB" in model_bt:
    print("Family B stats:", model_bt["statsB"])

# Plot PnL comparison (if available)
import matplotlib.pyplot as plt

if "backtestA" in model_bt:
    pnl_full = model_bt["backtestA"]["pnl_full"]
    pnl_A = model_bt["backtestA"]["pnl_A"]

    plt.figure()
    (pnl_full.cumsum()).plot(label="Full")
    (pnl_A.cumsum()).plot(label="Family A hedge")
    plt.legend()
    plt.title("Cumulative PnL: Full vs Family A hedge")
    plt.show()

if "backtestB" in model_bt:
    pnl_full = model_bt["backtestB"]["pnl_full"]
    pnl_B = model_bt["backtestB"]["pnl_B"]

    plt.figure()
    (pnl_full.cumsum()).plot(label="Full")
    (pnl_B.cumsum()).plot(label="Family B hedge")
    plt.legend()
    plt.title("Cumulative PnL: Full vs Family B hedge")
    plt.show()
