# 03 - Dynamic Hedging with Kalman Filter

This notebook analyzes time-varying hedge ratios estimated with a Kalman state-space model and discusses stability vs. responsiveness.

In [None]:
from __future__ import annotations

import sys
from pathlib import Path

import matplotlib.pyplot as plt
import pandas as pd
import yaml

PROJECT_ROOT = Path.cwd()
if not (PROJECT_ROOT / "src").exists() and (PROJECT_ROOT.parent / "src").exists():
    PROJECT_ROOT = PROJECT_ROOT.parent
SRC_ROOT = PROJECT_ROOT / "src"
if str(SRC_ROOT) not in sys.path:
    sys.path.insert(0, str(SRC_ROOT))

from costs.transaction_costs import TransactionCostConfig
from data.loaders import load_proxy_hedging_prices
from data.preprocess import MissingDataPolicy, preprocess_prices_and_returns
from evaluation.metrics import evaluate_hedge_performance
from hedging.engine import HedgeConstraints, run_hedging_engine
from models.baselines import RidgeHedgeEstimator
from models.kalman import KalmanMultiProxyHedgeEstimator


In [None]:
cfg = yaml.safe_load((PROJECT_ROOT / "src/config/default.yaml").read_text(encoding="utf-8"))
data_cfg = cfg["data"]
target_name = data_cfg["target"]["name"]
proxy_names = [item["name"] for item in data_cfg["proxies"]]
proxy_files = {item["name"]: item["file"] for item in data_cfg["proxies"]}

prices = load_proxy_hedging_prices(
    raw_dir=PROJECT_ROOT / data_cfg["raw_dir"],
    target_file=data_cfg["target"]["file"],
    proxy_files=proxy_files,
    target_name=target_name,
    date_column=data_cfg.get("date_column", "Date"),
    price_column=data_cfg.get("price_column", "Price"),
)

_, simple_returns, _ = preprocess_prices_and_returns(
    prices=prices,
    frequency=cfg.get("frequency", "B"),
    start_date=cfg.get("date_range", {}).get("start"),
    end_date=cfg.get("date_range", {}).get("end"),
    missing_data_policy=MissingDataPolicy(**cfg.get("missing_data_policy", {})),
)
returns = simple_returns.dropna()

split = cfg["walk_forward"]["splits"][0]
train = returns.loc[split["train_start"]:split["train_end"]]
val = returns.loc[split["val_start"]:split["val_end"]]
test = returns.loc[split["test_start"]:split["test_end"]]
train_val = pd.concat([train, val]).sort_index()


In [None]:
constraints = HedgeConstraints(**cfg["hedging"]["constraints"])
cost_cfg = TransactionCostConfig.from_dict(cfg["transaction_costs"])
scenario = cfg["hedging"].get("cost_scenario", "med")

kal_cfg = cfg["models"]["kalman"]
kalman = KalmanMultiProxyHedgeEstimator(
    target_column=target_name,
    proxy_columns=tuple(proxy_names),
    process_noise=float(kal_cfg.get("process_noise", 1e-4)),
    observation_noise=float(kal_cfg.get("observation_noise", 1e-3)),
    process_noise_grid=tuple(kal_cfg.get("process_noise_grid", [1e-6, 1e-5, 1e-4])),
    observation_noise_grid=tuple(kal_cfg.get("observation_noise_grid", [1e-4, 1e-3, 1e-2])),
)
kalman.calibrate(train, val)
kalman.fit(pd.concat([train, val, test]).sort_index()[[target_name, *proxy_names]], calibrate=False)
kalman_ratios_test = kalman.hedge_ratio_time_series().loc[test.index, proxy_names]

ridge = RidgeHedgeEstimator(target_column=target_name, proxy_columns=tuple(proxy_names)).fit(train_val[[target_name, *proxy_names]])
ridge_ratios_test = ridge.hedge_ratio_time_series(index=test.index)

kalman_res = run_hedging_engine(
    target_returns=test[target_name],
    proxy_returns=test[proxy_names],
    hedge_ratios=kalman_ratios_test,
    rebalance_frequency=cfg["hedging"].get("rebalance_frequency", "daily"),
    constraints=constraints,
    cost_config=cost_cfg,
    cost_scenario=scenario,
)
ridge_res = run_hedging_engine(
    target_returns=test[target_name],
    proxy_returns=test[proxy_names],
    hedge_ratios=ridge_ratios_test,
    rebalance_frequency=cfg["hedging"].get("rebalance_frequency", "daily"),
    constraints=constraints,
    cost_config=cost_cfg,
    cost_scenario=scenario,
)


In [None]:
# Hedge-ratio stability diagnostics
fig, axes = plt.subplots(2, 1, figsize=(10, 7), sharex=True)
for c in proxy_names:
    axes[0].plot(kalman_ratios_test.index, kalman_ratios_test[c], label=f"Kalman {c}")
    axes[0].plot(ridge_ratios_test.index, ridge_ratios_test[c], linestyle="--", label=f"Ridge {c}")
axes[0].set_title("Hedge Ratios: Dynamic (Kalman) vs Static (Ridge)")
axes[0].legend(ncol=2)
axes[0].grid(alpha=0.25)

rolling_std = kalman_ratios_test.rolling(20).std(ddof=1)
for c in proxy_names:
    axes[1].plot(rolling_std.index, rolling_std[c], label=f"Kalman vol {c}")
axes[1].set_title("20-day Rolling Std of Kalman Betas")
axes[1].legend()
axes[1].grid(alpha=0.25)

plt.tight_layout()
plt.show()


In [None]:
comparison = pd.DataFrame({
    "kalman": evaluate_hedge_performance(
        kalman_res.unhedged_pnl,
        kalman_res.hedged_pnl_gross,
        kalman_res.hedged_pnl_net,
        kalman_res.turnover,
        kalman_res.transaction_cost,
    ),
    "ridge": evaluate_hedge_performance(
        ridge_res.unhedged_pnl,
        ridge_res.hedged_pnl_gross,
        ridge_res.hedged_pnl_net,
        ridge_res.turnover,
        ridge_res.transaction_cost,
    ),
}).T
comparison[["hedge_effectiveness", "hedged_net_sharpe_annualized_no_rf", "total_transaction_cost"]]


## Stability Discussion

- Kalman betas adapt gradually through time, capturing local covariance shifts.
- If beta volatility is high, turnover and cost drag can offset gross risk reduction.
- Stability can be tuned through process/observation noise calibration and leverage constraints.