# Summary pipeline: arb
a outcomes and event-study design

This notebook rebuilds Layer 1 around real `arb_*` outcomes from `tips_treasury_implied_rf_2010` and aligns controls from the repo data structure.


In [None]:
from __future__ import annotations

import hashlib
import json
import logging
import shutil
from datetime import datetime
from pathlib import Path

import numpy as np
import pandas as pd
import statsmodels.api as sm
from statsmodels.stats.diagnostic import acorr_ljungbox

from slr_bucket.econometrics.event_study import add_event_time, event_study_regression, jump_estimator
from slr_bucket.io import build_data_catalog, load_any_table, resolve_dataset_path


In [None]:
# CONFIG
CONFIG = {
    "outcomes_source": "tips_treasury_implied_rf_2010",
    "outcome_pattern": "arb_",
    "tenors_required": [2, 5, 10],
    "events": ["2020-04-01", "2021-03-19", "2021-03-31"],
    "windows": [20, 60],
    "event_bins": [(-60,-41),(-40,-21),(-20,-1),(0,0),(1,20),(21,40),(41,60)],
    "total_controls": ["VIX", "HY_OAS", "BAA10Y", "issu_7_bil", "issu_14_bil", "issu_30_bil"],
    "direct_controls": ["VIX", "HY_OAS", "BAA10Y", "issu_7_bil", "issu_14_bil", "issu_30_bil", "SOFR", "spr_tgcr", "spr_effr"],
    "hac_lags": 5,
    "run_layer2": True,
}
cfg_hash = hashlib.sha256(json.dumps(CONFIG, sort_keys=True).encode()).hexdigest()[:12]
run_stamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
repo_root = Path.cwd()
run_dir = repo_root / "outputs" / "summary_pipeline" / f"{run_stamp}_{cfg_hash}"
for sub in ["figures","tables","data","logs"]:
    (run_dir / sub).mkdir(parents=True, exist_ok=True)
latest_dir = repo_root / "outputs" / "summary_pipeline" / "latest"

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s - %(message)s", handlers=[logging.FileHandler(run_dir / "logs" / "pipeline.log"), logging.StreamHandler()], force=True)
logger = logging.getLogger("summary_pipeline")
print(run_dir)


## Data catalog and dataset inventory

New `/data` structure uses layered folders (`raw`, `intermediate`, `series`). This run uses:
- Outcomes: `data/series/tips_treasury_implied_rf_2010.(parquet|csv)` (`arb_*`).
- Preferred merged controls: `data/intermediate/analysis_panel.csv` (if valid for required columns).
- Fallback controls from raw inputs:
  - `raw/event_inputs/controls_vix_creditspreads_fred`
  - `raw/event_inputs/repo_rates_combined` (or `repo_rates_fred`)
  - `raw/event_inputs/treasury_issuance_by_tenor_fiscaldata`
- Layer 2 proxies (optional):
  - `raw/event_inputs/primary_dealer_stats_ofr_stfm_nypd_long`
  - `raw/event_inputs/bank_exposure_y9c_agg_daily.csv`


In [None]:
catalog = build_data_catalog(repo_root / "data")
catalog.to_csv(run_dir / "data" / "data_catalog.csv", index=False)
catalog.to_parquet(run_dir / "data" / "data_catalog.parquet", index=False)
catalog.to_markdown(run_dir / "data" / "data_catalog.md", index=False)
catalog.head(10)


In [None]:
# Outcomes: arb_* only
outcome_path = resolve_dataset_path(CONFIG["outcomes_source"], expected_dir=repo_root / "data" / "series")
out = load_any_table(outcome_path)
out["date"] = pd.to_datetime(out["date"], errors="coerce")
arb_cols = sorted([c for c in out.columns if c.startswith(CONFIG["outcome_pattern"])], key=lambda c: int(c.split("_")[1]))
if not arb_cols:
    raise ValueError("No arb_* columns found in outcomes source")
arb_long = out[["date", *arb_cols]].melt(id_vars=["date"], var_name="outcome", value_name="y")
arb_long["tenor"] = arb_long["outcome"].str.extract(r"arb_(\d+)").astype(float).astype("Int64")
arb_long = arb_long.dropna(subset=["date","y","tenor"]).sort_values(["tenor","date"]).reset_index(drop=True)
val_abs_q = arb_long["y"].abs().quantile([0.5,0.9,0.99]).to_dict()
unit_note = "Values look like bps" if val_abs_q.get(0.5,0) > 0.2 else "Values look like decimals"
logger.info("Loaded outcomes from %s with tenors=%s", outcome_path, sorted(arb_long["tenor"].dropna().unique().tolist()))
{"outcome_path": str(outcome_path), "arb_columns": arb_cols, "value_quantiles_abs": val_abs_q, "unit_note": unit_note}


In [None]:
# Controls: prefer intermediate analysis_panel if valid, else fallback build from raw.
def build_controls_panel():
    needed = set(CONFIG["direct_controls"])
    try:
        p = resolve_dataset_path("analysis_panel", expected_dir=repo_root / "data" / "intermediate")
        panel = load_any_table(p)
        panel["date"] = pd.to_datetime(panel["date"], errors="coerce")
        if needed.issubset(set(panel.columns)):
            logger.info("Using controls from intermediate analysis_panel: %s", p)
            return panel[["date", *sorted(needed)]].copy(), str(p)
    except Exception as exc:
        logger.warning("analysis_panel unavailable/invalid (%s), using raw fallback", exc)

    fred = load_any_table(resolve_dataset_path("controls_vix_creditspreads_fred", expected_dir=repo_root / "data" / "raw" / "event_inputs"))
    fred["date"] = pd.to_datetime(fred["date"], errors="coerce")

    try:
        repo = load_any_table(resolve_dataset_path("repo_rates_combined", expected_dir=repo_root / "data" / "raw" / "event_inputs"))
    except FileNotFoundError:
        repo = load_any_table(resolve_dataset_path("repo_rates_fred", expected_dir=repo_root / "data" / "raw" / "event_inputs"))
    repo["date"] = pd.to_datetime(repo["date"], errors="coerce")
    repo = repo.rename(columns={"TGCR":"tgcr", "EFFR":"effr"})
    if "spr_tgcr" not in repo.columns and {"SOFR","tgcr"}.issubset(repo.columns):
        repo["spr_tgcr"] = pd.to_numeric(repo["SOFR"], errors="coerce") - pd.to_numeric(repo["tgcr"], errors="coerce")
    if "spr_effr" not in repo.columns and {"SOFR","effr"}.issubset(repo.columns):
        repo["spr_effr"] = pd.to_numeric(repo["SOFR"], errors="coerce") - pd.to_numeric(repo["effr"], errors="coerce")

    issu = load_any_table(resolve_dataset_path("treasury_issuance_by_tenor_fiscaldata", expected_dir=repo_root / "data" / "raw" / "event_inputs"))
    issu["date"] = pd.to_datetime(issu.get("issue_date"), errors="coerce")
    issu["tenor_bucket"] = pd.to_numeric(issu["tenor_bucket"], errors="coerce")
    issu["issuance_amount"] = pd.to_numeric(issu["issuance_amount"], errors="coerce") / 1e9
    d = issu.pivot_table(index="date", columns="tenor_bucket", values="issuance_amount", aggfunc="sum").reset_index()
    d = d.rename(columns={7.0:"issu_7_bil", 10.0:"issu_10_bil", 14.0:"issu_14_bil", 20.0:"issu_20_bil", 30.0:"issu_30_bil"})
    if "issu_14_bil" not in d.columns:
        d["issu_14_bil"] = d.get("issu_10_bil", 0) + d.get("issu_20_bil", 0)

    controls = fred.merge(repo[[c for c in ["date","SOFR","spr_tgcr","spr_effr"] if c in repo.columns]], on="date", how="outer")                   .merge(d[[c for c in ["date","issu_7_bil","issu_14_bil","issu_30_bil"] if c in d.columns]], on="date", how="left")
    logger.info("Built controls from raw event_inputs")
    return controls, "raw_event_inputs_fallback"

controls, controls_source = build_controls_panel()
controls = controls.sort_values("date")
controls.head()


In [None]:
panel_long = arb_long.merge(controls, on="date", how="left")
for c in CONFIG["direct_controls"]:
    if c in panel_long.columns:
        panel_long[c] = pd.to_numeric(panel_long[c], errors="coerce")
panel_long.to_parquet(run_dir / "data" / "arb_panel_long.parquet", index=False)
panel_long.head()


In [None]:
# Layer 1A: summary stats by tenor and regime
regimes = {
    "pre": (pd.Timestamp("2019-01-01"), pd.Timestamp("2020-03-31")),
    "relief": (pd.Timestamp("2020-04-01"), pd.Timestamp("2021-03-31")),
    "post": (pd.Timestamp("2021-04-01"), pd.Timestamp.max),
}

rows = []
for tenor, g in panel_long.groupby("tenor"):
    g = g.sort_values("date")
    for regime, (start, end) in regimes.items():
        s = g[(g["date"]>=start) & (g["date"]<=end)]["y"].dropna()
        if s.empty:
            continue
        lb_p = np.nan
        try:
            lb = acorr_ljungbox(s, lags=[min(10, max(1, len(s)//5))], return_df=True)
            lb_p = float(lb["lb_pvalue"].iloc[0])
        except Exception:
            pass
        rows.append({
            "tenor": int(tenor), "regime": regime,
            "sample_start": s.index.min(), "sample_end": s.index.max(), "N": int(s.shape[0]),
            "mean": s.mean(), "std": s.std(),
            "p1": s.quantile(0.01), "p5": s.quantile(0.05), "p50": s.quantile(0.5), "p95": s.quantile(0.95), "p99": s.quantile(0.99),
            "autocorr1": s.autocorr(lag=1), "ljungbox_pvalue": lb_p,
        })
summary_stats = pd.DataFrame(rows)
summary_stats.to_csv(run_dir / "tables" / "summary_stats.csv", index=False)
summary_stats.head()


In [None]:
# Layer 1B: jump regressions (TOTAL vs DIRECT)
jump_rows = []
for event in CONFIG["events"]:
    for w in CONFIG["windows"]:
        for tenor, g in panel_long.groupby("tenor"):
            for spec, controls_list in [("TOTAL", CONFIG["total_controls"]), ("DIRECT", CONFIG["direct_controls"])]:
                est, se, n = jump_estimator(g, y_col="y", event_date=event, window=w, controls=controls_list, hac_lags=CONFIG["hac_lags"])
                jump_rows.append({
                    "event": event, "window": w, "tenor": int(tenor), "spec": spec,
                    "estimate": est, "se": se, "ci_low": est - 1.96*se if pd.notna(est) and pd.notna(se) else np.nan,
                    "ci_high": est + 1.96*se if pd.notna(est) and pd.notna(se) else np.nan, "N": n,
                })
jump_results = pd.DataFrame(jump_rows)
jump_results.to_csv(run_dir / "tables" / "jump_results.csv", index=False)
jump_results.head()


In [None]:
# Layer 1C: binned event-study + plots
import matplotlib.pyplot as plt
bin_rows = []
for event in CONFIG["events"]:
    for tenor, g in panel_long.groupby("tenor"):
        for spec, controls_list in [("TOTAL", CONFIG["total_controls"]), ("DIRECT", CONFIG["direct_controls"])]:
            es = event_study_regression(g, y_col="y", event_date=event, bins=CONFIG["event_bins"], controls=controls_list, hac_lags=CONFIG["hac_lags"])
            if es.empty:
                continue
            es["event"] = event
            es["tenor"] = int(tenor)
            es["spec"] = spec
            bin_rows.append(es)
            p = es.copy().sort_values("term")
            fig, ax = plt.subplots(figsize=(8,4))
            ax.plot(p["term"], p["estimate"], marker="o")
            ax.fill_between(p["term"], p["ci_low"], p["ci_high"], alpha=0.2)
            ax.axhline(0, color="black", lw=1)
            ax.tick_params(axis='x', rotation=45)
            ax.set_title(f"Event study path | event={event} tenor={int(tenor)} spec={spec}")
            fig.tight_layout()
            fig.savefig(run_dir / "figures" / f"event_path_arb_{int(tenor)}y_{event}_{spec.lower()}.png", dpi=150)
            plt.close(fig)
eventstudy_bins = pd.concat(bin_rows, ignore_index=True) if bin_rows else pd.DataFrame()
eventstudy_bins.to_csv(run_dir / "tables" / "eventstudy_bins.csv", index=False)
eventstudy_bins.head()


In [None]:
# Layer 1D pooled regression with tenor FE + stargazer export
from statsmodels.formula.api import ols

pooled_rows = []
stargazer_models = []
for event in CONFIG["events"]:
    work = add_event_time(panel_long, event)
    work = work[work["event_time"].between(-60, 60)].copy()
    work["post"] = (work["event_time"] >= 0).astype(int)
    for spec, controls_list in [("TOTAL", CONFIG["total_controls"]), ("DIRECT", CONFIG["direct_controls"])]:
        use_cols = ["y","post","tenor", *[c for c in controls_list if c in work.columns]]
        reg = work[use_cols].dropna().copy()
        if reg.empty:
            continue
        rhs = "post + C(tenor)"
        if len(use_cols) > 3:
            rhs += " + " + " + ".join([c for c in use_cols if c not in {"y","post","tenor"}])
        res = ols(f"y ~ {rhs}", data=reg).fit()
        robust = res.get_robustcov_results(cov_type="HAC", maxlags=CONFIG["hac_lags"])
        stargazer_models.append(robust)
        post_idx = robust.model.exog_names.index("post") if "post" in robust.model.exog_names else None
        pooled_rows.append({"event": event, "spec": spec, "N": int(robust.nobs), "post": robust.params[post_idx] if post_idx is not None else np.nan, "se": robust.bse[post_idx] if post_idx is not None else np.nan})

pooled_table = pd.DataFrame(pooled_rows)
pooled_table.to_csv(run_dir / "tables" / "pooled_jump_results.csv", index=False)

html_out = run_dir / "tables" / "regression_table.html"
try:
    from stargazer.stargazer import Stargazer
    if stargazer_models:
        sg = Stargazer(stargazer_models)
        sg.title("Pooled jump regressions (HAC SE)")
        html_out.write_text(sg.render_html(), encoding="utf-8")
    else:
        html_out.write_text("<html><body><p>No pooled models available.</p></body></html>", encoding="utf-8")
except Exception as exc:
    html_out.write_text(f"<html><body><p>Stargazer unavailable: {exc}</p></body></html>", encoding="utf-8")

pooled_table


In [None]:
# Layer 2 mechanism (weekly), skip gracefully if required data missing
layer2_note = ""
try:
    pd_long = load_any_table(resolve_dataset_path("primary_dealer_stats_ofr_stfm_nypd_long", expected_dir=repo_root / "data" / "raw" / "event_inputs"))
    bank = load_any_table(resolve_dataset_path("bank_exposure_y9c_agg_daily", expected_dir=repo_root / "data" / "raw" / "event_inputs"))

    pd_long["date"] = pd.to_datetime(pd_long["date"], errors="coerce")
    bank["date"] = pd.to_datetime(bank["date"], errors="coerce")
    pd_w = pd_long.pivot_table(index="date", columns="mnemonic", values="value", aggfunc="mean").resample("W-FRI").mean()
    pd_w["utilization_index"] = pd_w.sum(axis=1, min_count=1)
    pd_w["utilization_lag1w"] = pd_w["utilization_index"].shift(1)

    b_w = bank.set_index("date").resample("W-FRI").mean()[["agg_exempt_share"]]
    y_w = panel_long.set_index("date").groupby("tenor")["y"].resample("W-FRI").mean().reset_index()
    y_w = y_w.groupby("date", as_index=False)["y"].mean().set_index("date")
    c_w = panel_long.set_index("date")[CONFIG["direct_controls"]].resample("W-FRI").mean()

    mech = y_w.join([b_w, pd_w[["utilization_lag1w"]], c_w], how="inner").dropna()
    mech["relief"] = ((mech.index >= "2020-04-01") & (mech.index <= "2021-03-31")).astype(int)
    mech["z_exempt"] = (mech["agg_exempt_share"] - mech["agg_exempt_share"].mean()) / mech["agg_exempt_share"].std()
    mech["z_util_l1"] = (mech["utilization_lag1w"] - mech["utilization_lag1w"].mean()) / mech["utilization_lag1w"].std()
    mech["relief_x_exempt"] = mech["relief"] * mech["z_exempt"]
    mech["relief_x_util"] = mech["relief"] * mech["z_util_l1"]

    xcols = ["relief", "relief_x_exempt", "relief_x_util", *CONFIG["direct_controls"]]
    reg = mech[["y", *xcols]].dropna()
    X = sm.add_constant(reg[xcols], has_constant="add")
    res = sm.OLS(reg["y"], X).fit(cov_type="HAC", cov_kwds={"maxlags":2})
    mech_out = pd.DataFrame({"term":res.params.index, "coef":res.params.values, "se":res.bse.values})
    mech_out.to_csv(run_dir / "tables" / "layer2_mechanism_weekly.csv", index=False)
    layer2_note = "Layer 2 executed successfully."
except Exception as exc:
    layer2_note = f"Layer 2 skipped gracefully due to missing/unusable inputs: {exc}"

print(layer2_note)


In [None]:
# Refresh latest and write run metadata
if latest_dir.exists():
    shutil.rmtree(latest_dir)
shutil.copytree(run_dir, latest_dir)

notes = {
    "run_dir": str(run_dir),
    "controls_source": controls_source,
    "outcomes_source": str(outcome_path),
    "arb_columns": arb_cols,
    "unit_note": unit_note,
    "layer2_note": layer2_note,
}
(run_dir / "README.md").write_text("# Summary pipeline run

```json
" + json.dumps(notes, indent=2) + "
```
", encoding="utf-8")
notes
