In [1]:
pip install numpy pandas tqdm statsmodels scikit-learn


Collecting pandas
  Downloading pandas-2.2.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (89 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m89.9/89.9 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting statsmodels
  Downloading statsmodels-0.14.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.2 kB)
Collecting scikit-learn
  Downloading scikit_learn-1.6.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (18 kB)
Collecting pytz>=2020.1 (from pandas)
  Downloading pytz-2025.2-py2.py3-none-any.whl.metadata (22 kB)
Collecting tzdata>=2022.7 (from pandas)
  Downloading tzdata-2025.2-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting scipy!=1.9.2,>=1.8 (from statsmodels)
  Downloading scipy-1.15.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (61 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.0/62.0 kB[0m [31m1.4 MB/s[0m eta [36m0:00:0

In [4]:
# rolling_yield_forecasts_multi_horizon.py
"""
Rolling multi-horizon yield‑curve forecasting pipeline
with high‑parallel throughput on large machines.

Key features
------------
• Forecast models implemented
    – Random Walk (RW)
    – AR(1) per‑maturity via Yule–Walker (AR_yw)
    – Full‑curve VAR(1) via Yule–Walker (VAR_yw)
    – 2‑step DNS + VAR(1) via Yule–Walker (DNS_VAR_yw)

• Forecast horizons: [1, 5, 21, 63, 252] trading days
• Rolling 3‑year (756‑day) estimation window
• **Chunk‑based multiprocessing**
    – 4 chunks per (model, horizon) → 20 × 4 = 80 tasks
    – Up to 80 worker processes (leave headroom)
    – `maxtasksperchild=1` prevents memory bloat
• Each chunk writes a temporary CSV → auto‑merged
• Final merged files: ``<model>_h<horizon>_fixed_window_results.csv``
"""

# ---------------------- Imports ---------------------- #
from __future__ import annotations
import json
from pathlib import Path
from typing import List, Dict, Tuple
from multiprocessing import Pool, cpu_count
import uuid

import numpy as np
import pandas as pd
from tqdm import tqdm
from statsmodels.tsa.api import VAR
from sklearn.metrics import mean_absolute_error

# ---------------------- DNS / Utility ---------------------- #

def DNS_formula(x: np.ndarray, f: np.ndarray, lambb: float) -> np.ndarray:
    l1, s1, c1 = f
    term1 = (1 - np.exp(-lambb * x)) / (lambb * x)
    term2 = term1 - np.exp(-lambb * x)
    return l1 + s1 * term1 + c1 * term2


def DNS_OLS(data: np.ndarray, tau_in: np.ndarray, lamb_i: float) -> np.ndarray:
    tau = tau_in.reshape(-1, 1)
    dummy = lamb_i * tau
    col2 = (1 - np.exp(-dummy)) / dummy
    col3 = col2 - np.exp(-dummy)
    X = np.hstack([np.ones_like(tau), col2, col3])
    XtX_inv_Xt = np.linalg.pinv(X.T @ X) @ X.T
    return (XtX_inv_Xt @ data.T).T  # (T × 3)

# ---------------------- Forecast Functions ---------------------- #

def forecast_RW_fct(da: np.ndarray, pred: int = 1) -> np.ndarray:
    return np.tile(da[-1], (pred, 1))


def forecast_AR_yw(da: np.ndarray, pred: int) -> np.ndarray:
    T, n = da.shape
    mu = da.mean(axis=0)
    cov0 = (da - mu).T @ (da - mu) / T
    cov1 = (da[1:] - mu).T @ (da[:-1] - mu) / T
    A = np.diag(np.diag(cov1 @ np.linalg.inv(np.diag(np.diag(cov0)))))
    fc = np.zeros((pred, n))
    fc[0] = mu + A @ (da[-1] - mu)
    for t in range(1, pred):
        fc[t] = mu + A @ (fc[t-1] - mu)
    return fc


def forecast_VAR_yw(da: np.ndarray, pred: int) -> np.ndarray:
    T, n = da.shape
    mu = da.mean(axis=0)
    cov0 = (da - mu).T @ (da - mu) / T
    cov1 = (da[1:] - mu).T @ (da[:-1] - mu) / T
    A = cov1 @ np.linalg.inv(cov0)
    fc = np.zeros((pred, n))
    fc[0] = mu + A @ (da[-1] - mu)
    for t in range(1, pred):
        fc[t] = mu + A @ (fc[t-1] - mu)
    return fc


def forecast_DNS_VAR_yw(da: np.ndarray, tau: np.ndarray, lamb_i: float, pred: int) -> np.ndarray:
    betas = DNS_OLS(da, tau, lamb_i)
    betas_fcst = forecast_VAR_yw(betas, pred)
    return betas_fcst

# ---------------------- Config ---------------------- #

FORECAST_HORIZONS = [1, 5, 21, 63, 252]
WINDOW_SIZE = 3 * 252  # trading days
TAU = np.array([0.25, 0.5, 1, 3, 5, 10])
LAMBDA = 0.496

DATA_PATH = Path("Y_df.csv")
OUT_DIR = Path("./model_outputs").resolve()
TMP_DIR = OUT_DIR / "chunks"
OUT_DIR.mkdir(parents=True, exist_ok=True)
TMP_DIR.mkdir(parents=True, exist_ok=True)

START_DATE = pd.to_datetime("2022-04-13")
END_DATE = pd.to_datetime("2025-03-05")

CHUNKS_PER_COMBO = 4  # 20 combos × 4 = 80 tasks
MAX_PROCS = min(80, cpu_count() - 4)  # leave head‑room

# ---------------------- Utilities ---------------------- #

def load_data() -> pd.DataFrame:
    return pd.read_csv(DATA_PATH, index_col=0, parse_dates=True).sort_index()


def rolling_indices(date_index: pd.DatetimeIndex) -> List[int]:
    return list(range(date_index.get_loc(START_DATE), date_index.get_loc(END_DATE) + 1))


def chunk_list(lst: List[int], n_chunks: int) -> List[List[int]]:
    k, m = divmod(len(lst), n_chunks)
    return [lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n_chunks)]

# ---------------------- Worker ---------------------- #

def forecast_worker(args: Tuple[str, int, List[int], np.ndarray, pd.DatetimeIndex]):
    model_name, h, rows_chunk, y_all, date_index = args
    model_func = MODELS[model_name]
    results: List[Dict] = []

    for i in rows_chunk:
        if i - WINDOW_SIZE < 0:
            continue
        y_hist = y_all[i - WINDOW_SIZE:i]
        y_true = y_all[i]

        if model_name == "DNS_VAR_yw":
            betas_fcst = forecast_DNS_VAR_yw(y_hist, TAU, LAMBDA, h)
            y_pred = DNS_formula(TAU, betas_fcst[-1], LAMBDA)
        else:
          if model_name == "RW":
              y_pred = model_func(y_hist, 1)[-1]  # always use last observation
          else:
              y_pred = model_func(y_hist, h)[-1]

        results.append({
            "eval_date": date_index[i].strftime("%Y-%m-%d"),
            "horizon": h,
            "true_yields": y_true.tolist(),
            "forecast_yields": y_pred.tolist(),
            "mae": mean_absolute_error(y_true, y_pred)
        })

    # Write chunk to temp file
    chunk_id = uuid.uuid4().hex
    tmp_file = TMP_DIR / f"{model_name.lower()}_h{h}_{chunk_id}.csv"
    pd.DataFrame(results).to_csv(tmp_file, index=False)
    return str(tmp_file)

# ---------------------- Merge Helper ---------------------- #

def merge_chunks(model_name: str, h: int):
    pattern = f"{model_name.lower()}_h{h}_*.csv"
    files = list(TMP_DIR.glob(pattern))
    if not files:
        return
    df_all = pd.concat([pd.read_csv(f) for f in files], ignore_index=True)
    out_file = OUT_DIR / f"{model_name.lower()}_h{h}_fixed_window_results.csv"
    df_all.to_csv(out_file, index=False)
    for f in files:
        f.unlink()  # remove temp
    print(f"✅ Merged {len(files)} chunks → {out_file} ({len(df_all)} rows)")

# ---------------------- Main ---------------------- #
if __name__ == "__main__":
    y_df = load_data()
    date_index = y_df.index
    y_all = y_df.values

    MODELS = {
        "RW": forecast_RW_fct,
        "AR_yw": forecast_AR_yw,
        "VAR_yw": forecast_VAR_yw,
        "DNS_VAR_yw": forecast_DNS_VAR_yw,
    }

    base_rows = rolling_indices(date_index)

    args_list = []
    for model_name in MODELS.keys():
        for h in FORECAST_HORIZONS:
            for rows_chunk in chunk_list(base_rows, CHUNKS_PER_COMBO):
                args_list.append((model_name, h, rows_chunk, y_all, date_index))
    
    # Run all workers in parallel
    print(f"Launching {len(args_list)} tasks across {MAX_PROCS} processes …")
    with Pool(processes=MAX_PROCS, maxtasksperchild=1) as pool:
        pool.map(forecast_worker, args_list)
    
    # Merge chunked output files into final results
    for model_name in MODELS.keys():
        for h in FORECAST_HORIZONS:
            merge_chunks(model_name, h)
    
    print("🏁 All model–horizon result files have been merged and saved.")



Launching 20 tasks across 80 processes …
✅ Merged 4 chunks → /model_outputs/rw_h1_fixed_window_results.csv (756 rows)
✅ Merged 4 chunks → /model_outputs/rw_h5_fixed_window_results.csv (756 rows)
✅ Merged 4 chunks → /model_outputs/rw_h21_fixed_window_results.csv (756 rows)
✅ Merged 4 chunks → /model_outputs/rw_h63_fixed_window_results.csv (756 rows)
✅ Merged 4 chunks → /model_outputs/rw_h252_fixed_window_results.csv (756 rows)
🏁 All model–horizon result files have been merged and saved.
