## 01. Imports and Configuration

What: Import libraries; set seeds, paths, fixed splits, and run constants.
Why: Deterministic, reproducible environment.
Method choices: Fixed splits (Train 2021-02-03→2022-12-30; Val 2023-01-03→2023-05-31; Test 2023-06-01→2023-12-28, n=146); America/New_York 16:00 cut-off; Target = Close.shift(-1); strict Train-only selection for classical models; no scaling of levels.

In [20]:
import os, sys, json, time, hashlib, platform, warnings, itertools, shutil, random
from pathlib import Path
from typing import List, Tuple, Dict, Any, Optional
import numpy as np
import pandas as pd
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.stats.diagnostic import acorr_ljungbox, het_arch
from statsmodels.tsa.stattools import kpss
from scipy.stats import jarque_bera, norm
from statsmodels.tools.sm_exceptions import ConvergenceWarning, ValueWarning

# Limit to known-benign warnings
warnings.filterwarnings("ignore", category=ConvergenceWarning)
warnings.filterwarnings("ignore", category=ValueWarning)
warnings.filterwarnings("ignore", message=".*test statistic is outside of the range of p-values.*")

# Config
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

TICKERS = ["AAPL", "AMZN", "MSFT", "TSLA", "AMD"]
DATA_DIR = Path(".")
OUT_ROOT = Path("outputs")
MODEL_ID = "ARIMAX"
MODEL_DIR = OUT_ROOT / "arimax"

TRAIN_START = "2021-02-03"
TRAIN_END   = "2022-12-30"
VAL_START   = "2023-01-03"
VAL_END     = "2023-05-31"
TEST_START  = "2023-06-01"
TEST_END    = "2023-12-28"
N_TEST_EXP  = 146

EPS_DIR  = 0.0010   # epsilon for sign rule and DA_epsilon
ALPHA_95 = 0.05
ALPHA_80 = 0.20
TRADING_DAYS = 252  # annualisation for Sharpe

P_GRID = [0, 1, 2]
D_GRID = [0, 1]
Q_GRID = [0, 1, 2]
TRENDS = ["n", "c"]

NUM_EPS = 1e-12
USE_ROBUST_SIGMA_FOR_INTERVALS = False
ARCH_LM_SIG_THRESHOLD = 0.01

# Preflight checks enabled for final runs.
SKIP_PREFLIGHT = False

# Option B: inline next-day Close used only to define Target for 2023-12-28.
# Values are the official 2023-12-29 closes per ticker.
TRUTH_NEXT_CLOSE: Dict[str, Dict[str, float]] = {
    "AAPL": {"2023-12-29": 190.91},  # If using unadjusted Yahoo "Close", use 192.53 instead.
    "AMZN": {"2023-12-29": 151.94},
    "MSFT": {"2023-12-29": 371.21},
    "TSLA": {"2023-12-29": 248.48},
    "AMD":  {"2023-12-29": 147.41},
}

# Data Loading
**What:** Read level series and exogenous features.  
**Why:** Establish leakage-safe inputs aligned to calendar.  
**Method choices:** Target created after price load; exog excludes Close/Target; dates sorted and de-duplicated.

In [21]:
# Helpers
def file_sha256(fp: Path) -> str:
    h = hashlib.sha256()
    with open(fp, "rb") as f:
        for chunk in iter(lambda: f.read(1 << 20), b""):
            h.update(chunk)
    return h.hexdigest()

def assert_splits_index(dates: pd.Series):
    d = pd.to_datetime(dates)
    tr = (d >= TRAIN_START) & (d <= TRAIN_END)
    va = (d >= VAL_START) & (d <= VAL_END)
    te = (d >= TEST_START) & (d <= TEST_END)
    te_dates = d[te]
    assert len(te_dates) == N_TEST_EXP, f"Test rows {len(te_dates)} != {N_TEST_EXP}"
    assert str(te_dates.iloc[0].date()) == TEST_START and str(te_dates.iloc[-1].date()) == TEST_END, "Test date bounds mismatch"
    return tr, va, te

def sanitize(v, default=0.0):
    try:
        fv = float(v)
        if not np.isfinite(fv):
            return float(default)
        return fv
    except Exception:
        return float(default)

def _require_finite(name: str, val: float):
    if not (isinstance(val, (int, float)) and np.isfinite(val)):
        raise RuntimeError(f"[Metrics] {name} is not finite: {val}")

# Strict I/O
def read_series_strict(ticker: str) -> pd.DataFrame:
    ip = DATA_DIR / f"{ticker}_input.csv"
    if not ip.exists():
        raise FileNotFoundError(f"Missing required file: {ip}")
    df = pd.read_csv(ip)
    need = {"date", "Close"}
    if not need.issubset(df.columns):
        raise ValueError(f"{ticker}_input.csv must contain {need}")
    df["date"] = pd.to_datetime(df["date"])
    df = df.sort_values("date").reset_index(drop=True)
    df["Target"] = df["Close"].shift(-1)
    return df[["date", "Close", "Target"]]

def read_exog_strict(ticker: str) -> pd.DataFrame:
    ex = DATA_DIR / f"exog_{ticker}.csv"
    if not ex.exists():
        raise FileNotFoundError(f"Missing required file: {ex}")
    df = pd.read_csv(ex)
    if "date" not in df.columns:
        raise ValueError(f"exog_{ticker}.csv must contain 'date'")
    df["date"] = pd.to_datetime(df["date"])
    df = df.drop_duplicates("date").sort_values("date").reset_index(drop=True)
    drop_cols = [c for c in ["Close", "Target"] if c in df.columns]
    if drop_cols:
        df = df.drop(columns=drop_cols)
    num_cols = [c for c in df.columns if c != "date"]
    df[num_cols] = df[num_cols].apply(pd.to_numeric, errors="coerce")
    return df

def _log_missing_dates(series_df: pd.DataFrame, exog_df: pd.DataFrame, ticker: str) -> Optional[Dict[str, Any]]:
    s_dates = set(pd.to_datetime(series_df["date"]).dt.date.tolist())
    e_dates = set(pd.to_datetime(exog_df["date"]).dt.date.tolist())
    missing_in_exog = sorted([str(d) for d in s_dates - e_dates])
    missing_in_series = sorted([str(d) for d in e_dates - s_dates])
    info = {"ticker": ticker, "missing_in_exog": missing_in_exog, "missing_in_series": missing_in_series}
    tdir = MODEL_DIR / ticker
    tdir.mkdir(parents=True, exist_ok=True)
    (tdir / f"date_alignment_{ticker}.json").write_text(
        json.dumps({**info, "note": "perfect inner join" if not (missing_in_exog or missing_in_series) else "mismatch written"}, indent=2),
        encoding="utf-8"
    )
    return info

# Preprocessing
**What:** Enforce target availability and exog coverage; allow single inline truth row (Option B) to define last Test Target.  
**Why:** Guarantee all 146 Test rows are evaluable without editing source CSVs.  
**Method choices:** In-memory augmentation only; strict window checks; no future fill.

In [22]:
# Preconditions
def augment_series_for_missing_target(ser: pd.DataFrame, ticker: str) -> pd.DataFrame:
    """
    If Target for the last Test day (2023-12-28) is NaN, append the next day's Close (2023-12-29)
    in memory using TRUTH_NEXT_CLOSE, recompute Target, and return the augmented series.
    """
    s = ser.copy()
    s["date"] = pd.to_datetime(s["date"])
    mask_test = (s["date"] >= TEST_START) & (s["date"] <= TEST_END)
    blk = s.loc[mask_test, ["date", "Target"]].reset_index(drop=True)
    if blk.empty:
        raise RuntimeError(f"[Levels:{ticker}] No rows in the configured Test window {TEST_START}..{TEST_END}.")
    last_test = pd.to_datetime(TEST_END)
    need_next = last_test + pd.Timedelta(days=1)  # 2023-12-29 (Fri)
    if s.loc[s["date"] == pd.to_datetime(TEST_END), "Target"].isna().any():
        val = None
        if ticker in TRUTH_NEXT_CLOSE:
            v = TRUTH_NEXT_CLOSE[ticker].get(str(need_next.date()))
            if v is not None:
                val = float(v)
        if val is None or not np.isfinite(val):
            raise RuntimeError(f"[Levels:{ticker}] Missing Target on {TEST_END} and no inline truth override found. Provide {need_next.date()} Close.")
        base = s[["date","Close"]].copy()
        extra = pd.DataFrame({"date":[need_next], "Close":[val]})
        aug = pd.concat([base, extra], ignore_index=True).sort_values("date").reset_index(drop=True)
        aug["Target"] = aug["Close"].shift(-1)
        s = aug.merge(base, on=["date","Close"], how="inner")[["date","Close","Target"]]
        tdir = (MODEL_DIR / ticker); tdir.mkdir(parents=True, exist_ok=True)
        (tdir / f"target_augmentation_{ticker}.json").write_text(
            json.dumps({
                "ticker": ticker,
                "added_next_day_close_for": str(need_next.date()),
                "close_value": float(val),
                "note": "in-memory only; used solely to define Target for last Test day."
            }, indent=2),
            encoding="utf-8"
        )
    return s

def _assert_target_defined_for_test(series_df: pd.DataFrame, ticker: str):
    s = series_df.copy()
    s["date"] = pd.to_datetime(s["date"])
    mask_test = (s["date"] >= TEST_START) & (s["date"] <= TEST_END)
    blk = s.loc[mask_test, ["date", "Target"]].reset_index(drop=True)
    if blk.empty:
        raise RuntimeError(f"[Levels:{ticker}] No rows in the configured Test window {TEST_START}..{TEST_END}.")
    missing_dates = blk.loc[blk["Target"].isna(), "date"].dt.strftime("%Y-%m-%d").tolist()
    if missing_dates:
        hint = (
            f"[Levels:{ticker}] Missing y_true (Target=Close[t+1]) for Test date(s): {missing_dates[:5]}{'...' if len(missing_dates) > 5 else ''}. "
            f"Finalisation gate requires full evaluability across {TEST_START}->{TEST_END}."
        )
        raise RuntimeError(hint)

def _coverage_gaps(ser: pd.DataFrame, exg: pd.DataFrame, start: str, end: str) -> List[str]:
    s = pd.to_datetime(ser["date"]).dt.date
    e = pd.to_datetime(exg["date"]).dt.date
    mask = (s >= pd.to_datetime(start).date()) & (s <= pd.to_datetime(end).date())
    want = set(s[mask].tolist())
    have = set(e.tolist())
    return [str(d) for d in sorted(want - have)]

def _assert_exog_covers_window(ser: pd.DataFrame, exg: pd.DataFrame, start: str, end: str, label: str, ticker: str):
    missing = _coverage_gaps(ser, exg, start, end)
    if missing:
        sample = missing[:5]
        raise RuntimeError(f"[Exog:{ticker}] Missing exogenous rows on {label} date(s): {sample}{'...' if len(missing) > 5 else ''}.")

# Preflight
**What:** Early cross-ticker checks for target and exog coverage.  
**Why:** Fail fast before any model work.  
**Method choices:** Writes `preflight_report.json` on failure; requires full Test evaluability (146 rows).

In [23]:
# Preflight audit
def audit_target_coverage(tickers: List[str]) -> Dict[str, Any]:
    issues = {}
    for t in tickers:
        try:
            base = read_series_strict(t)
            df = augment_series_for_missing_target(base, t)
        except Exception as e:
            issues[t] = {"series_read_or_augment_error": str(e)}
            continue
        df["date"] = pd.to_datetime(df["date"])
        mask = (df["date"] >= TEST_START) & (df["date"] <= TEST_END)
        blk = df.loc[mask, ["date", "Target"]]
        missing = blk.loc[blk["Target"].isna(), "date"].dt.strftime("%Y-%m-%d").tolist()
        if missing:
            issues[t] = {"missing_test_dates_for_target": missing}
    return issues

def audit_exog_coverage(tickers: List[str]) -> Dict[str, Any]:
    issues = {}
    for t in tickers:
        try:
            ser_base = read_series_strict(t)
            ser = augment_series_for_missing_target(ser_base, t)
            exg = read_exog_strict(t)
        except Exception as e:
            issues[t] = {"io_error": str(e)}
            continue
        gaps = {
            "Train": _coverage_gaps(ser, exg, TRAIN_START, TRAIN_END),
            "Val":   _coverage_gaps(ser, exg, VAL_START, VAL_END),
            "Test":  _coverage_gaps(ser, exg, TEST_START, TEST_END),
        }
        gaps = {k: v for k, v in gaps.items() if v}
        if gaps:
            issues[t] = gaps
    return issues

def preflight_inputs(tickers: List[str]):
    OUT_ROOT.mkdir(parents=True, exist_ok=True)
    issues = {
        "missing_next_day_close_for_Target": audit_target_coverage(tickers),
        "exog_coverage_gaps": audit_exog_coverage(tickers),
    }
    issues = {k: v for k, v in issues.items() if v}
    if issues:
        report = OUT_ROOT / "preflight_report.json"
        report.write_text(json.dumps(issues, indent=2), encoding="utf-8")
        raise RuntimeError(
            "[Preflight] Inputs invalid. See ARIMAX/preflight_report.json "
            "for tickers and dates requiring fixes (next-day Close for Target and exog coverage)."
        )

# Alignment and Merge
**What:** Inner-join series and exog; assert splits and scales.  
**Why:** Clean, consistent inputs to SARIMAX.  
**Method choices:** Levels remain unscaled; features are numeric; splits re-asserted post-merge.


In [24]:
def load_aligned_frames_from_series(ticker: str, ser: pd.DataFrame) -> Tuple[pd.Series, pd.Series, pd.DataFrame, pd.Series]:
    _assert_target_defined_for_test(ser, ticker)
    exg = read_exog_strict(ticker)
    _log_missing_dates(ser, exg, ticker)
    _assert_exog_covers_window(ser, exg, TRAIN_START, TRAIN_END, "Train", ticker)
    _assert_exog_covers_window(ser, exg, VAL_START, VAL_END, "Val", ticker)
    _assert_exog_covers_window(ser, exg, TEST_START, TEST_END, "Test", ticker)
    merged = ser.merge(exg, on="date", how="inner").reset_index(drop=True).sort_values("date")
    dates = merged["date"]
    assert_splits_index(dates)
    close_lvl = merged["Close"].astype(float)
    endog = merged["Target"].astype(float)
    exog = merged.drop(columns=["Close", "Target"])
    return endog, close_lvl, exog, dates

# Exogenous Spot-Check
**What:** Quick causality/zero-encoding probe for sentiment blocks.  
**Why:** Verify zero-preservation and sparsity behaviour.  
**Method choices:** Prefix-based grouping for Tw_/Rd_/Nw_SP500_*; per-month zero-rate summary.


In [25]:
def _prefix_groups(cols: List[str]) -> Dict[str, List[str]]:
    pref = {"Tw_": [], "Rd_": [], "Nw_SP500_": []}
    for c in cols:
        for p in pref.keys():
            if c.startswith(p):
                pref[p].append(c)
    return pref

def exog_causality_spotcheck(exog_df: pd.DataFrame, ticker: str) -> Dict[str, Any]:
    df = exog_df.copy()
    if "date" not in df.columns:
        return {"ticker": ticker, "note": "no-date", "ok": False}
    df["date"] = pd.to_datetime(df["date"])
    df["ym"] = df["date"].dt.to_period("M").astype(str)
    groups = _prefix_groups([c for c in df.columns if c not in ("date", "ym")])
    out = {"ticker": ticker, "zero_encoding_summary": {}, "sparse_month_probe": {}}
    for p, cols in groups.items():
        if not cols:
            continue
        block = df[cols].fillna(0.0).astype(float)
        zero_rate = float((block == 0.0).mean().mean()) if not block.empty else 1.0
        out["zero_encoding_summary"][p] = {"n_cols": len(cols), "mean_zero_rate": zero_rate}
        bym = block.copy()
        bym["ym"] = df["ym"]
        agg = bym.groupby("ym", group_keys=False).apply(
            lambda x: float((x.drop(columns=["ym"]) == 0.0).mean().mean())
        )
        if not agg.empty:
            sparse_month = agg.idxmax()
            out["sparse_month_probe"][p] = {"month": sparse_month, "mean_zero_rate": float(agg.loc[sparse_month])}
    out["ok"] = True
    return out

# Model Definition
**What:** Specify SARIMAX for grid selection and forecasting.  
**Why:** Choose (p,d,q,trend) by Train AIC and hold spec fixed on Val/Test.  
**Method choices:** Train-only grid; no exog scaling; trend ∈ {none, constant}.

In [26]:
# AIC selection (Train only)
def aic_grid_search(endog_tr: pd.Series, exog_tr: pd.DataFrame):
    best = {"aic": np.inf, "order": None, "trend": None}
    rows = []
    order_grid = list(itertools.product(P_GRID, D_GRID, Q_GRID))
    X = exog_tr.reset_index(drop=True)
    if "date" in X.columns:
        X = X.drop(columns=["date"])
    y = endog_tr.reset_index(drop=True)
    for p, d, q in order_grid:
        for tr in TRENDS:
            try:
                m = SARIMAX(endog=y, exog=X, order=(p, d, q), trend=tr,
                            enforce_stationarity=False, enforce_invertibility=False)
                r = m.fit(disp=False, method="lbfgs", maxiter=50)
                a = float(r.aic)
                rows.append({"p": p, "d": d, "q": q, "trend": tr, "AIC": a})
                if a < best["aic"]:
                    best = {"aic": a, "order": (p, d, q), "trend": tr}
            except Exception:
                rows.append({"p": p, "d": d, "q": q, "trend": tr, "AIC": np.nan})
    grid = pd.DataFrame(rows).sort_values("AIC", ascending=True).reset_index(drop=True)
    if best["order"] is None or not np.isfinite(best["aic"]):
        raise RuntimeError("[AIC] No SARIMAX spec converged on Train grid.")
    return best, grid, grid.head(5).copy()

# Training
**What:** Expanding-origin, one-step-ahead forecasts over Test.  
**Why:** Classical evaluation with re-estimation each step and warm-start.  
**Method choices:** Outlier guard on extreme jumps; spec fixed from Train AIC.

In [27]:
def _assert_level_unscaled(y_level: pd.Series, ticker: str):
    y_med = float(np.nanmedian(np.abs(y_level)))
    if not np.isfinite(y_med) or y_med <= 0:
        raise RuntimeError(f"[Scale] {ticker}: invalid median level for endog.")
    if y_level.max() > 100000 or y_level.min() < -100000:
        raise RuntimeError(f"[Scale] {ticker}: endog magnitude suggests unintended scaling.")

def _assert_exog_level_unscaled(X_level: pd.DataFrame, ticker: str):
    if hasattr(X_level, "inverse_transform"):
        raise RuntimeError(f"[Scale] {ticker}: exogenous features appear scaled.")
    Xn = X_level.drop(columns=["date"], errors="ignore").astype(float)
    if not Xn.empty:
        max_abs = float(np.nanmax(np.abs(Xn.to_numpy(dtype=float))))
        if max_abs > 1e6:
            raise RuntimeError(f"[Scale] {ticker}: exog magnitude suggests unintended scaling (max_abs={max_abs:.3g}).")

def _compute_delta_cap(close_lvl_all: pd.Series, dates: pd.Series) -> Dict[str, float]:
    d_all = pd.to_datetime(dates).reset_index(drop=True)
    train_mask = (d_all >= pd.to_datetime(TRAIN_START)) & (d_all <= pd.to_datetime(TRAIN_END))
    c = close_lvl_all.reset_index(drop=True).loc[train_mask].astype(float).to_numpy()
    if c.size < 3:
        return {"cap": 50.0, "med_abs_delta": 1.0, "q99_delta": 2.0}
    deltas = np.abs(np.diff(c))
    med_abs = float(np.nanmedian(deltas)) if deltas.size else 1.0
    q99 = float(np.nanquantile(deltas, 0.99)) if deltas.size else 2.0
    cap = max(20.0, 25.0 * med_abs, 5.0 * q99)
    return {"cap": float(cap), "med_abs_delta": float(med_abs), "q99_delta": float(q99)}

def forecast_path(endog_all: pd.Series,
                  close_lvl_all: pd.Series,
                  exog_all: pd.DataFrame,
                  dates: pd.Series,
                  order: tuple,
                  trend: str) -> Tuple[pd.DataFrame, List[str], List[str], Dict[str, float]]:
    _, _, te_mask = assert_splits_index(dates)
    te_dates = pd.to_datetime(dates[te_mask]).tolist()
    X_all = exog_all.reset_index(drop=True)
    if "date" in X_all.columns:
        X_all = X_all.drop(columns=["date"])
    y_all = endog_all.reset_index(drop=True)
    c_all = close_lvl_all.reset_index(drop=True)
    d_all = pd.to_datetime(dates).reset_index(drop=True)

    delta_stats = _compute_delta_cap(close_lvl_all, dates)
    cap_delta = float(delta_stats["cap"])

    rows = []
    last_params = None
    prediction_fallback_dates: List[str] = []
    outlier_guard_dates: List[str] = []

    for d in te_dates:
        hist_mask = d_all < d
        y_hist = y_all.loc[hist_mask]
        X_hist = X_all.loc[hist_mask]
        X_next = X_all.loc[d_all == d]
        if len(X_next) != 1:
            raise RuntimeError(f"[Align] {len(X_next)} rows of exog at {d} (need exactly 1).")

        m = SARIMAX(endog=y_hist, exog=X_hist, order=order, trend=trend,
                    enforce_stationarity=False, enforce_invertibility=False)
        fit_kwargs = dict(disp=False, method="lbfgs", maxiter=50)
        if last_params is not None:
            try:
                res = m.fit(start_params=last_params, **fit_kwargs)
            except Exception:
                try:
                    res = m.fit(start_params=last_params, disp=False, method="nm", maxiter=200)
                except Exception:
                    res = m.fit(**fit_kwargs)
        else:
            res = m.fit(**fit_kwargs)
        try:
            last_params = np.asarray(res.params, dtype=float)
        except Exception:
            last_params = None

        try:
            mean = float(res.get_forecast(steps=1, exog=X_next).predicted_mean.iloc[0])
        except Exception:
            mean = float(c_all.loc[d_all == d].iloc[0])
            prediction_fallback_dates.append(pd.to_datetime(d).strftime("%Y-%m-%d"))

        y_true_level = float(y_all.loc[d_all == d].iloc[0])
        y_prev_level = float(c_all.loc[d_all == d].iloc[0])

        if np.isfinite(mean) and abs(mean - y_prev_level) > cap_delta:
            mean = y_prev_level
            outlier_guard_dates.append(pd.to_datetime(d).strftime("%Y-%m-%d"))

        rows.append({
            "date": pd.to_datetime(d).strftime("%Y-%m-%d"),
            "y_prev": y_prev_level,
            "y_true": y_true_level,
            "y_hat": mean,
            "in_sample_flag": 0
        })

    pred_df = pd.DataFrame(rows)
    assert len(pred_df) == N_TEST_EXP, f"predictions must be {N_TEST_EXP} rows"
    return pred_df, prediction_fallback_dates, outlier_guard_dates, delta_stats

# Evaluation
**What:** Residual diagnostics and Test metrics.  
**Why:** Assess fit quality, calibration, and trading implications.  
**Method choices:** RMSE/MAE/U2/DAε (ε=0.0010); Sharpe & MaxDD at 0 and 10 bps; PICP/MPIW for 95% (and 80% if present).


In [28]:
# Diagnostics utilities
def _diagnostics_payload(resid: np.ndarray, param_count: Optional[int] = None) -> dict:
    kpss_warn = False
    if resid.size == 0 or np.all(~np.isfinite(resid)):
        out = {
            "ljung_box_stat_lag10": 0.0, "ljung_box_pvalue_lag10": 1.0,
            "ljung_box_stat_lag20": 0.0, "ljung_box_pvalue_lag20": 1.0,
            "arch_lm_stat": 0.0, "arch_lm_pvalue": 1.0, "arch_lm_nlags": 12,
            "jarque_bera_stat": 0.0, "jarque_bera_pvalue": 1.0,
            "kpss_stat": 0.0, "kpss_pvalue": 1.0, "kpss_warning": False
        }
    else:
        lbdf = acorr_ljungbox(resid, lags=[10, 20], return_df=True)
        lb10_stat = float(lbdf["lb_stat"].iloc[0]); lb10_p = float(lbdf["lb_pvalue"].iloc[0])
        lb20_stat = float(lbdf["lb_stat"].iloc[1]); lb20_p = float(lbdf["lb_pvalue"].iloc[1])
        arch_stat, arch_p, _, _ = het_arch(resid, nlags=12)
        jb_stat, jb_p = jarque_bera(resid)
        try:
            kpss_stat, kpss_p, _, _ = kpss(resid, regression='c', nlags='auto')
        except Exception:
            kpss_stat, kpss_p, kpss_warn = 0.0, 1.0, True
        out = {
            "ljung_box_stat_lag10": sanitize(lb10_stat, 0.0),
            "ljung_box_pvalue_lag10": sanitize(lb10_p, 1.0),
            "ljung_box_stat_lag20": sanitize(lb20_stat, 0.0),
            "ljung_box_pvalue_lag20": sanitize(lb20_p, 1.0),
            "arch_lm_stat": sanitize(float(arch_stat), 0.0),
            "arch_lm_pvalue": sanitize(float(arch_p), 1.0),
            "arch_lm_nlags": 12,
            "jarque_bera_stat": sanitize(float(jb_stat), 0.0),
            "jarque_bera_pvalue": sanitize(float(jb_p), 1.0),
            "kpss_stat": sanitize(float(kpss_stat), 0.0),
            "kpss_pvalue": sanitize(float(kpss_p), 1.0),
            "kpss_warning": bool(kpss_warn)
        }
    if param_count is not None:
        out["parameter_count"] = int(param_count)
    out.update({
        "ljungbox_stat_lag10": out["ljung_box_stat_lag10"],
        "ljungbox_pvalue_lag10": out["ljung_box_pvalue_lag10"],
        "ljungbox_stat_lag20": out["ljung_box_stat_lag20"],
        "ljungbox_pvalue_lag20": out["ljung_box_pvalue_lag20"],
    })
    return out

def residual_diagnostics_in_sample(endog_all: pd.Series, exog_all: pd.DataFrame,
                                   dates: pd.Series, order: tuple, trend: str) -> dict:
    try:
        d_all = pd.to_datetime(dates).reset_index(drop=True)
        train_mask = (d_all >= pd.to_datetime(TRAIN_START)) & (d_all <= pd.to_datetime(TRAIN_END))
        y = endog_all.reset_index(drop=True).loc[train_mask].reset_index(drop=True)
        X = exog_all.reset_index(drop=True).loc[train_mask].reset_index(drop=True)
        if "date" in X.columns:
            X = X.drop(columns=["date"])
        m = SARIMAX(endog=y, exog=X, order=order, trend=trend,
                    enforce_stationarity=False, enforce_invertibility=False)
        r = m.fit(disp=False, method="lbfgs", maxiter=80)
        resid = pd.Series(r.resid).dropna().to_numpy(dtype=float)
        param_count = len(getattr(r, "params", []))
        out = _diagnostics_payload(resid, param_count=param_count)
        out["parameter_count"] = param_count
        out["resid_sigma_train"] = float(np.std(resid, ddof=1)) if resid.size else np.nan
        out["resid_sigma_train_robust"] = float(1.4826 * np.nanmedian(np.abs(resid - np.nanmedian(resid)))) if resid.size else np.nan
        return out
    except Exception:
        return _diagnostics_payload(np.array([], dtype=float), param_count=0)

def residual_diagnostics_from_test(pred_df: pd.DataFrame) -> dict:
    try:
        resid = pred_df["residual"].astype(float).to_numpy()
        resid = resid[np.isfinite(resid)]
        out = _diagnostics_payload(resid)
        abs_mean_residual = float(np.mean(np.abs(resid))) if resid.size else 0.0
        resid_std = float(np.std(resid, ddof=1)) if resid.size > 1 else 0.0
        mean_residual = float(np.mean(resid)) if resid.size else 0.0
        out.update({
            "abs_mean_residual": abs_mean_residual,
            "residual_std": resid_std,
            "mean_residual": mean_residual,
            "test_ljung_box_pvalue_lag10": out["ljung_box_pvalue_lag10"],
            "test_ljung_box_pvalue_lag20": out["ljung_box_pvalue_lag20"],
            "test_arch_lm_pvalue": out["arch_lm_pvalue"],
            "test_jarque_bera_pvalue": out["jarque_bera_pvalue"],
            "test_kpss_pvalue": out["kpss_pvalue"],
            "test_arch_lm_nlags": out["arch_lm_nlags"]
        })
        return out
    except Exception:
        out = _diagnostics_payload(np.array([], dtype=float))
        out.update({
            "abs_mean_residual": 0.0,
            "residual_std": 0.0,
            "mean_residual": 0.0,
            "test_ljung_box_pvalue_lag10": out["ljung_box_pvalue_lag10"],
            "test_ljung_box_pvalue_lag20": out["ljung_box_pvalue_lag20"],
            "test_arch_lm_pvalue": out["arch_lm_pvalue"],
            "test_jarque_bera_pvalue": out["jarque_bera_pvalue"],
            "test_kpss_pvalue": out["kpss_pvalue"],
            "test_arch_lm_nlags": out["arch_lm_nlags"]
        })
        return out

# Metrics (Test-only)
**What:** Compute evaluation metrics on strict Test window.  
**Why:** Comparable, leakage-safe reporting.  
**Method choices:** U2 vs naïve last-close; DAε excludes small moves; turnover from sign changes.

In [29]:
def _slice_test_window(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["date"] = pd.to_datetime(df["date"])
    df = df.sort_values("date").reset_index(drop=True)
    if "in_sample_flag" in df.columns:
        df = df.loc[df["in_sample_flag"] == 0].copy()
    else:
        df = df.loc[(df["date"] >= TEST_START) & (df["date"] <= TEST_END)].copy()
    n = len(df)
    assert n == N_TEST_EXP, f"[TestWindow] Expected {N_TEST_EXP} rows, got {n}"
    dmin = str(df["date"].min().date())
    dmax = str(df["date"].max().date())
    assert dmin == TEST_START and dmax == TEST_END, f"[TestWindow] Date bounds {dmin}..{dmax} expected {TEST_START}..{TEST_END}"
    return df

def compute_metrics_from_csv(pred_csv_path: Path) -> Dict[str, Any]:
    dfp = pd.read_csv(pred_csv_path)
    dfp = _slice_test_window(dfp)

    y_true = dfp["y_true"].to_numpy(dtype=float)
    y_hat  = dfp["y_hat"].to_numpy(dtype=float)
    y_prev = dfp["y_prev"].to_numpy(dtype=float)

    lower95 = dfp["lower_95"].to_numpy(dtype=float)
    upper95 = dfp["upper_95"].to_numpy(dtype=float)
    lower80 = dfp.get("lower_80", pd.Series(np.nan, index= dfp.index)).to_numpy(dtype=float)
    upper80 = dfp.get("upper_80", pd.Series(np.nan, index= dfp.index)).to_numpy(dtype=float)

    req = {"y_true": y_true, "y_hat": y_hat, "y_prev": y_prev, "lower_95": lower95, "upper_95": upper95}
    for k, arr in req.items():
        if not np.isfinite(arr).all():
            bad_idx = np.where(~np.isfinite(arr))[0][:5]
            bad_dates = dfp["date"].iloc[bad_idx].astype(str).tolist()
            raise RuntimeError(f"[Metrics] Non-finite values in {k} at dates {bad_dates}... All 146 rows must be evaluable.")

    resid = y_true - y_hat
    rmse = float(np.sqrt(np.mean(resid**2)))
    mae  = float(np.mean(np.abs(resid)))

    y_true_shift = pd.Series(y_true).shift(1).to_numpy()
    valid = np.isfinite(y_true) & np.isfinite(y_true_shift)
    denom = float(np.sqrt(np.mean((y_true[valid] - y_true_shift[valid])**2)))
    _require_finite("U2 denominator", denom)
    u2 = float(rmse / max(denom, NUM_EPS))

    ret_true = (y_true / np.where(y_prev == 0.0, 1.0, y_prev)) - 1.0
    ret_hat  = (y_hat  / np.where(y_prev == 0.0, 1.0, y_prev)) - 1.0

    EPSILON = EPS_DIR
    mask = np.abs(ret_true) > EPSILON
    if np.count_nonzero(mask) < 10:
        raise RuntimeError("[Metrics] Too few |return|>epsilon days to compute DA_epsilon robustly.")
    da_eps = float(np.mean(np.sign(ret_hat[mask]) == np.sign(ret_true[mask])))

    within95 = (y_true >= lower95) & (y_true <= upper95)
    cov95 = float(np.mean(within95))
    has80 = np.isfinite(lower80).all() and np.isfinite(upper80).all()
    within80 = (y_true >= lower80) & (y_true <= upper80) if has80 else np.zeros_like(y_true, dtype=bool)
    cov80  = float(np.mean(within80)) if has80 else np.nan
    mpiw95 = float(np.nanmean(upper95 - lower95))
    mpiw80 = float(np.nanmean(upper80 - lower80)) if has80 else np.nan

    pos = np.where(ret_hat >= EPSILON,  1.0, np.where(ret_hat <= -EPSILON, -1.0, 0.0))
    prev_pos = np.r_[0.0, pos[:-1]]
    changes = (pos != prev_pos).astype(float)
    turnover = int(np.sum(changes))

    def pnl_with_cost(bps: float) -> np.ndarray:
        cost_rate = bps / 10000.0
        return pos * ret_true - changes * cost_rate

    pnl0  = pnl_with_cost(0.0)
    pnl10 = pnl_with_cost(10.0)

    def _annualised_sharpe_local(pnl: np.ndarray) -> float:
        m = float(np.mean(pnl))
        s = float(np.std(pnl, ddof=1))
        return 0.0 if not np.isfinite(s) or s <= 0 else float(np.sqrt(TRADING_DAYS) * m / s)

    def _maxdd(pnl: np.ndarray) -> float:
        if pnl.size == 0:
            return 0.0
        equity = np.cumprod(1.0 + pnl)
        peak = np.maximum.accumulate(equity)
        dd = 1.0 - equity / np.where(peak == 0.0, 1.0, peak)
        m = float(np.max(dd)) if dd.size else 0.0
        return m if np.isfinite(m) else 0.0

    sharpe0 = _annualised_sharpe_local(pnl0)
    sharpe10 = _annualised_sharpe_local(pnl10)
    dd0 = _maxdd(pnl0)
    dd10 = _maxdd(pnl10)

    for k, v in {
        "RMSE": rmse, "MAE": mae, "U2": u2, "DA_epsilon": da_eps,
        "Coverage_95": cov95, "Sharpe_0bps": sharpe0, "Sharpe_10bps": sharpe10,
        "MaxDD_0bps": dd0, "MaxDD_10bps": dd10, "MPIW_95": mpiw95
    }.items():
        _require_finite(k, v)

    return {
        "RMSE": rmse, "MAE": mae, "U2": u2, "DA_epsilon": da_eps, "epsilon": float(EPSILON),
        "Coverage": cov95, "Coverage_95": cov95, "Coverage_80": cov80,
        "PICP_95": cov95, "MPIW_95": mpiw95, "MPIW_80": mpiw80,
        "n": int(len(dfp)), "Turnover": turnover,
        "Sharpe_0bps": sharpe0, "Sharpe_10bps": sharpe10, "MaxDD_0bps": dd0, "MaxDD_10bps": dd10
    }

# Intervals
**What:** Build 95% (and 80%) level intervals from residual σ.  
**Why:** Interval quality reporting and coverage checks.  
**Method choices:** σ from Test residuals by default; robust fallback if ARCH-LM indicates heteroskedasticity and flag enabled.

In [30]:
def _assert_pred_values_finite(pred_df: pd.DataFrame):
    if not np.isfinite(pred_df["y_hat"].astype(float)).all():
        bad = pred_df.loc[~np.isfinite(pred_df["y_hat"].astype(float)), "date"].astype(str).tolist()
        raise RuntimeError(f"[Predictions] Non-finite y_hat on: {bad[:5]}{'...' if len(bad) > 5 else ''}.")
    if not np.isfinite(pred_df["y_prev"].astype(float)).all():
        bad = pred_df.loc[~np.isfinite(pred_df["y_prev"].astype(float)), "date"].astype(str).tolist()
        raise RuntimeError(f"[Predictions] Non-finite y_prev on: {bad[:5]}{'...' if len(bad) > 5 else ''}.")

def _scale_sanity_report(pred_df: pd.DataFrame) -> Dict[str, Any]:
    y = pred_df["y_true"].to_numpy(dtype=float)
    yhat = pred_df["y_hat"].to_numpy(dtype=float)
    yprev = pred_df["y_prev"].to_numpy(dtype=float)
    lvl = float(np.nanmedian(np.abs(y)))
    med_abs = float(np.nanmedian(np.abs(y - yhat)))
    med_abs_naive = float(np.nanmedian(np.abs(y - yprev)))
    rmse = float(np.sqrt(np.nanmean((y - yhat) ** 2)))
    rmse_naive = float(np.sqrt(np.nanmean((y - yprev) ** 2)))
    med_yhat_mag = float(np.nanmedian(np.abs(yhat)))
    med_y_mag = float(np.nanmedian(np.abs(y)))
    ratio_med_mag = med_yhat_mag / max(med_y_mag, 1e-12)
    hard_fail = ((ratio_med_mag < 0.25 or ratio_med_mag > 4.0) or
                 (rmse > max(5.0 * max(rmse_naive, 1e-12), 3.0 * max(lvl, 1e-12))))
    warn = (not hard_fail and (med_abs > 1.5 * max(med_abs_naive, 1e-12) or rmse > 1.5 * max(rmse_naive, 1e-12)))
    return {"ok": not hard_fail, "warn": warn,
            "stats": {"level_median_abs": lvl, "med_abs": med_abs, "med_abs_naive": med_abs_naive,
                      "rmse": rmse, "rmse_naive": rmse_naive, "ratio_med_mag": ratio_med_mag},
            "reason": "scale_blow_up" if hard_fail else ("weak_accuracy" if warn else "ok")}

def _choose_sigma_for_intervals(test_resid: np.ndarray, train_sigma: Optional[float],
                                train_sigma_robust: Optional[float], arch_pvalue: Optional[float]) -> float:
    if USE_ROBUST_SIGMA_FOR_INTERVALS and arch_pvalue is not None and np.isfinite(arch_pvalue) and arch_pvalue < ARCH_LM_SIG_THRESHOLD:
        s = 1.4826 * float(np.nanmedian(np.abs(test_resid - np.nanmedian(test_resid)))) if test_resid.size else np.nan
        if not (np.isfinite(s) and s > 0):
            s = float(train_sigma_robust) if train_sigma_robust is not None else np.nan
    else:
        s = float(np.nanstd(test_resid, ddof=1)) if test_resid.size else np.nan
        if not (np.isfinite(s) and s > 0):
            s = float(train_sigma) if train_sigma is not None else np.nan
    return s if (np.isfinite(s) and s > 0) else np.nan

def _rebuild_level_intervals(pred_df: pd.DataFrame,
                             sigma_train_fallback: Optional[float],
                             sigma_train_robust: Optional[float],
                             arch_pvalue_test: Optional[float]) -> Tuple[str, float]:
    resid = pred_df["residual"].astype(float).to_numpy()
    sigma = _choose_sigma_for_intervals(resid, sigma_train_fallback, sigma_train_robust, arch_pvalue_test)
    z95 = float(norm.ppf(1 - ALPHA_95 / 2))
    z80 = float(norm.ppf(1 - ALPHA_80 / 2))
    if not (np.isfinite(sigma) and sigma > 0):
        pred_df["lower_95"] = np.nan; pred_df["upper_95"] = np.nan
        pred_df["lower_80"] = np.nan; pred_df["upper_80"] = np.nan
        return "Intervals not rebuilt; σ invalid.", float("nan")
    yhat = pred_df["y_hat"].astype(float).to_numpy()
    pred_df["lower_95"] = yhat - z95 * sigma
    pred_df["upper_95"] = yhat + z95 * sigma
    pred_df["lower_80"] = yhat - z80 * sigma
    pred_df["upper_80"] = yhat + z80 * sigma
    return f"Intervals rebuilt from level residual σ={sigma:.6f}.", float(sigma)

def _assert_pred_bands_present(pred_df: pd.DataFrame):
    for col in ["lower_95", "upper_95"]:
        if col not in pred_df.columns:
            raise RuntimeError(f"[Predictions] Missing column: {col}")
    arr = pred_df[["lower_95", "upper_95"]].to_numpy(dtype=float)
    if not np.isfinite(arr).all():
        raise RuntimeError("[Predictions] Non-finite values in 95% bands.")
    le = (pred_df["lower_95"] <= pred_df["y_hat"]).to_numpy(dtype=bool)
    ge = (pred_df["upper_95"] >= pred_df["y_hat"]).to_numpy(dtype=bool)
    if not (le.all() and ge.all()):
        raise RuntimeError("[Predictions] Band monotonicity violated.")
    w95 = (pred_df["upper_95"] - pred_df["lower_95"]).to_numpy(dtype=float)
    if not (np.all(np.isfinite(w95)) and np.all(w95 >= 0)):
        raise RuntimeError("[Predictions] Invalid 95% band widths.")

# Outputs and Artefacts
**What:** Write predictions, metrics, run_config, diagnostics, AIC grids, and provenance.  
**Why:** Meet finalisation gate and reproducibility standards.  
**Method choices:** Per-ticker folders; env manifest and file hashes at run root; build zip bundle.

In [31]:
# Writers
def build_paths(ticker: str) -> Dict[str, Path]:
    tdir = MODEL_DIR / ticker
    tdir.mkdir(parents=True, exist_ok=True)
    return {
        "pred": tdir / f"predictions_{MODEL_ID}_{ticker}.csv",
        "metrics": tdir / f"metrics_{MODEL_ID}_{ticker}.json",
        "run_cfg": tdir / f"run_config_{MODEL_ID}_{ticker}.json",
        "diagn": tdir / f"diagnostics_{MODEL_ID}_{ticker}.json",
        "diagn_test": tdir / f"diagnostics_test_{MODEL_ID}_{ticker}.json",
        "aic_top5": tdir / f"appendix_aic_top5_{ticker}.csv",
        "aic_grid": tdir / f"appendix_aic_grid_{ticker}.csv",
        "intervals80": tdir / f"intervals80_{MODEL_ID}_{ticker}.csv",
        "exog_spot": tdir / f"exog_spotcheck_{ticker}.json",
        "date_align": tdir / f"date_alignment_{ticker}.json",
    }

def write_run_root():
    OUT_ROOT.mkdir(parents=True, exist_ok=True)
    env = [
        f"python_version={platform.python_version()}",
        f"platform={platform.platform()}",
        f"timestamp_utc={int(time.time())}",
        f"random_seed={RANDOM_SEED}",
        f"model_id={MODEL_ID}",
        "timezone=America/New_York",
        "market_close=16:00",
        "metrics_writer=from_predictions_csv"
    ]
    (OUT_ROOT / "env_manifest.txt").write_text("\n".join(env) + "\n", encoding="utf-8")
    hashes = []
    for r, _, files in os.walk(OUT_ROOT):
        for fn in files:
            if fn == "file_hashes.json":
                continue
            p = Path(r) / fn
            rel = p.relative_to(OUT_ROOT)
            try:
                hashes.append({"path": str(rel).replace("\\", "/"), "sha256": file_sha256(p)})
            except Exception:
                pass
    (OUT_ROOT / "file_hashes.json").write_text(json.dumps({"files": hashes}, indent=2), encoding="utf-8")

def write_provenance_note(run_root: Path):
    note = (
        "Provenance: SARIMAX (ARIMAX). Train-only AIC on 2021-02-03->2022-12-30; "
        "fixed (p,d,q,trend) for Val/Test. Expanding-origin one-step Test forecasts with warm-start. "
        "Endog/Exog unscaled on level values; zero-encoding respected; no forward-fill from future. "
        "In-memory augmentation only for 2023-12-29 Close to define Target for 2023-12-28. "
        "Intervals rebuilt on the level residual sigma. Metrics computed over the strict fixed Test window (146 rows). "
        "Artefacts: predictions CSV; metrics JSON; run_config JSON; diagnostics (train & test); AIC grid & top-5; intervals80; exog_spotcheck; date_alignment."
    )
    (run_root / "README_provenance.txt").write_text(note, encoding="utf-8")

def save_outputs(ticker: str,
                 pred_df: pd.DataFrame,
                 run_cfg: dict,
                 aic_grid: pd.DataFrame,
                 aic_top5: pd.DataFrame,
                 diagn_in: dict,
                 paths: Dict[str, Path],
                 exog_spot: dict,
                 diagn_test_pre_sigma: dict,
                 fallbacks: List[str],
                 outlier_guard_dates: List[str],
                 delta_stats: Dict[str, float]):

    pred_df = _slice_test_window(pred_df)

    # Residuals on level scale
    pred_df["residual"] = pred_df["y_true"].astype(float) - pred_df["y_hat"].astype(float)

    _assert_pred_values_finite(pred_df)

    # Scaling sanity (warn-only unless egregious)
    scale_rep = _scale_sanity_report(pred_df)
    if not scale_rep["ok"]:
        stats = scale_rep["stats"]
        raise RuntimeError(
            "[Scale] Sanity check failed (probable scaling/unit error). "
            f"ratio_med_mag={stats['ratio_med_mag']:.3f}, rmse={stats['rmse']:.3f}, "
            f"rmse_naive={stats['rmse_naive']:.3f}, level_median_abs={stats['level_median_abs']:.3f}"
        )

    arch_p = float(diagn_test_pre_sigma.get("arch_lm_pvalue", np.nan)) if isinstance(diagn_test_pre_sigma, dict) else np.nan
    interval_note, sigma_used = _rebuild_level_intervals(
        pred_df,
        diagn_in.get("resid_sigma_train"),
        diagn_in.get("resid_sigma_train_robust"),
        arch_p
    )
    _assert_pred_bands_present(pred_df)

    pred_cols = ["date", "y_prev", "y_true", "y_hat", "residual", "in_sample_flag",
                 "lower_95", "upper_95", "lower_80", "upper_80"]
    pred_df.to_csv(paths["pred"], index=False, columns=pred_cols)
    pred_df.loc[:, ["date", "lower_80", "upper_80"]].to_csv(paths["intervals80"], index=False)

    metrics = compute_metrics_from_csv(paths["pred"])
    Path(paths["metrics"]).write_text(json.dumps(metrics, indent=2), encoding="utf-8")

    diagn_test = residual_diagnostics_from_test(pred_df)
    Path(paths["diagn"]).write_text(json.dumps(diagn_in, indent=2), encoding="utf-8")
    Path(paths["diagn_test"]).write_text(json.dumps(diagn_test, indent=2), encoding="utf-8")

    aic_grid.to_csv(paths["aic_grid"], index=False)
    aic_top5.to_csv(paths["aic_top5"], index=False)

    Path(paths["exog_spot"]).write_text(json.dumps(exog_spot, indent=2), encoding="utf-8")

    picp95 = float(metrics["Coverage_95"])
    near_nominal_95 = abs(picp95 - 0.95) <= 0.03

    run_cfg.update({
        "aic_search_scope": "Train only",
        "spec_fixed_on_val_test": True,
        "band_calibration": {
            "PICP_95": picp95,
            "MPIW_95": float(metrics["MPIW_95"]),
            "PICP_80": float(metrics.get("Coverage_80", np.nan)) if metrics.get("Coverage_80", None) is not None else np.nan,
            "MPIW_80": float(metrics.get("MPIW_80", np.nan)) if metrics.get("MPIW_80", None) is not None else np.nan,
            "nominal": 0.95,
            "near_nominal_95": bool(near_nominal_95)
        },
        "n_test_rows": int(len(pred_df)),
        "evaluation_policy": "all rows evaluated (no exclusions)",
        "intervals_note": interval_note,
        "intervals_sigma_used": float(sigma_used) if np.isfinite(sigma_used) else None,
        "intervals_sigma_policy": {
            "use_robust_sigma": bool(USE_ROBUST_SIGMA_FOR_INTERVALS),
            "arch_lm_sig_threshold": float(ARCH_LM_SIG_THRESHOLD),
            "arch_lm_pvalue_test": float(arch_p) if np.isfinite(arch_p) else None
        },
        "heteroskedasticity_flag": bool(np.isfinite(arch_p) and arch_p < ARCH_LM_SIG_THRESHOLD),
        "prediction_fallbacks": fallbacks,
        "prediction_fallbacks_count": int(len(fallbacks)),
        "prediction_outlier_guard_dates": outlier_guard_dates,
        "prediction_outlier_guard_count": int(len(outlier_guard_dates)),
        "prediction_outlier_guard_cap": float(delta_stats.get("cap", np.nan)),
        "prediction_outlier_guard_stats": {
            "med_abs_delta_train": float(delta_stats.get("med_abs_delta", np.nan)),
            "q99_delta_train": float(delta_stats.get("q99_delta", np.nan)),
        },
        "test_date_min": TEST_START,
        "test_date_max": TEST_END,
        "scale_sanity": {"status": "warning" if scale_rep["warn"] else "ok", **scale_rep["stats"]}
    })
    try:
        run_cfg["aic_top5_preview"] = aic_top5.head(5).to_dict(orient="records")
    except Exception:
        pass

    Path(paths["run_cfg"]).write_text(json.dumps(run_cfg, indent=2), encoding="utf-8")

# Per-ticker Runner
**What:** End-to-end path for one ticker.  
**Why:** Produce predictions and artefacts deterministically.  
**Method choices:** Train-only AIC; expanding-origin Test; evidence logs for exog and targets.

In [32]:
def run_ticker(ticker: str):
    # Load & augment levels (Option B) before any merge
    ser_base = read_series_strict(ticker)
    ser_aug  = augment_series_for_missing_target(ser_base, ticker)
    _assert_target_defined_for_test(ser_aug, ticker)

    endog, close_lvl, exog_raw, dates = load_aligned_frames_from_series(ticker, ser_aug)

    # Scale sanity preconditions
    if endog.isnull().any(): raise RuntimeError(f"[{ticker}] endog contains NaNs after align.")
    _assert_level_unscaled(endog, ticker)
    _assert_exog_level_unscaled(exog_raw, ticker)

    tr_mask, _, _ = assert_splits_index(dates)
    y_tr = endog.loc[tr_mask].reset_index(drop=True)
    X_tr = exog_raw.loc[tr_mask].reset_index(drop=True)

    best, aic_grid, aic_top5 = aic_grid_search(y_tr, X_tr)
    order, trend, aic_val = tuple(best["order"]), best["trend"], float(best["aic"])

    pred_df, fallbacks, outlier_guard_dates, delta_stats = forecast_path(endog, close_lvl, exog_raw, dates, order, trend)

    diagn_in = residual_diagnostics_in_sample(endog, exog_raw, dates, order, trend)

    features_used = [c for c in exog_raw.columns if c != "date"]
    features_used_count = int(len(features_used))

    exog_path = EXOG_DIR / f"exog_{ticker}.csv"
    ex_sha = file_sha256(exog_path) if exog_path.exists() else None
    ex_min = str(pd.to_datetime(exog_raw["date"]).min().date()) if "date" in exog_raw.columns else None
    ex_max = str(pd.to_datetime(exog_raw["date"]).max().date()) if "date" in exog_raw.columns else None
    ex_nrows = int(exog_raw.shape[0])

    run_cfg = {
        "model_id": MODEL_ID, "ticker": ticker,
        "order": list(order), "trend": trend, "AIC": sanitize(aic_val, 0.0), "AIC_train": sanitize(aic_val, 0.0),
        "aic_selected": {"p": int(order[0]), "d": int(order[1]), "q": int(order[2]), "trend": trend, "AIC": float(aic_val)},
        "seed": int(RANDOM_SEED), "seeds": {"python": int(RANDOM_SEED), "numpy": int(RANDOM_SEED), "global": int(RANDOM_SEED)},
        "cadence": "expanding_origin",
        "refit_protocol": "expanding_origin_one_step",
        "refit_details": {"reestimate_each_step": True, "warm_start_previous_params": True, "fit_method": "lbfgs_then_nelder_mead_fallback", "forecast_horizon": 1},
        "features_used": features_used, "features_used_count": features_used_count,
        "exog_scaling": "none", "timezone": "America/New_York", "market_close": "16:00",
        "exog_cutoff_note": "All exog built <=16:00 ET; no look-ahead.",
        "exog_input_evidence": {"filename": f"exog_{ticker}.csv", "sha256": ex_sha, "n_rows": ex_nrows, "date_min": ex_min, "date_max": ex_max},
        "sentiment_zero_encoding": True,
        "expanding_origin": True,
        "target_definition": "Target = Close.shift(-1); y_true = Close[t+1], y_prev = Close[t]",
        "target_col": "Target",
        "policy": {"y_scaled": False, "exog_lag": 0},
        "epsilon_for_DA_and_sign_rule": float(EPS_DIR),
        "sharpe_annualisation": "sqrt_252",
        "splits": {"train": [TRAIN_START, TRAIN_END], "val": [VAL_START, VAL_END], "test": [TEST_START, TEST_END]},
        "n_test_expected": int(N_TEST_EXP),
        "u2_baseline": "naive_last_close_y_true_shift",
        "turnover_definition": "count sign changes including initial entry",
        "test_date_min": TEST_START, "test_date_max": TEST_END,
        "parameter_count": int(diagn_in.get("parameter_count", 0)),
        "inline_truth_override": TRUTH_NEXT_CLOSE.get(ticker, {})
    }

    exog_spot = exog_causality_spotcheck(exog_raw, ticker)

    # Pre-σ diagnostics on test residuals (based on current preds)
    tmp = pred_df.copy()
    tmp["residual"] = tmp["y_true"].astype(float) - tmp["y_hat"].astype(float)
    diagn_test_pre_sigma = residual_diagnostics_from_test(tmp)

    return pred_df, run_cfg, aic_grid, aic_top5, diagn_in, exog_spot, diagn_test_pre_sigma, fallbacks, outlier_guard_dates, delta_stats

# Driver
**What:** Orchestrate end-to-end run, build bundle, surface links.  
**Why:** Produce final artefacts and a portable archive.  
**Method choices:** Preflight enabled; per-ticker writes; env and file hashes; optional Colab download link.

In [None]:
def main():
    OUT_ROOT.mkdir(parents=True, exist_ok=True)
    MODEL_DIR.mkdir(parents=True, exist_ok=True)

    # Preflight across all tickers
    if not SKIP_PREFLIGHT:
        preflight_inputs(TICKERS)
    else:
        print("[Preflight] SKIPPED (dev mode). See ARIMAX/preflight_report.json for outstanding issues.")

    # Source capture for provenance
    try:
        code_dir = OUT_ROOT / "code"
        code_dir.mkdir(parents=True, exist_ok=True)
        src = None
        if "__file__" in globals():
            src = Path(__file__).resolve()
        elif len(sys.argv) > 0 and sys.argv[0]:
            candidate = Path(sys.argv[0])
            src = candidate.resolve() if candidate.exists() else None
        if src and src.exists():
            shutil.copy2(src, code_dir / src.name)
        else:
            (code_dir / "runner_source_unavailable.txt").write_text(
                "Source path unavailable in this environment (likely a notebook).\n", encoding="utf-8"
            )
    except Exception as e:
        print(f"[Provenance] Could not copy runner script into run root; continuing. ({e})")

    # Per-ticker loop
    for t in TICKERS:
        paths = build_paths(t)
        pred_df, run_cfg, aic_grid, aic_top5, diagn_in, exog_spot, diagn_test_pre_sigma, fallbacks, outlier_guard_dates, delta_stats = run_ticker(t)

        if len(fallbacks) > 0:
            print(f"{t}: forecast fallbacks on {len(fallbacks)} day(s): {fallbacks}")
        else:
            print(f"{t}: forecast fallbacks on 0 day(s): []")

        save_outputs(t, pred_df, run_cfg, aic_grid, aic_top5, diagn_in, paths, exog_spot, diagn_test_pre_sigma, fallbacks, outlier_guard_dates, delta_stats)

    write_provenance_note(OUT_ROOT)
    write_run_root()

    bundle = "ARIMAX_bundle.zip"
    if Path(bundle).exists():
        Path(bundle).unlink()
    shutil.make_archive(Path(bundle).stem, "zip", OUT_ROOT)
    print(f"Built {bundle}")

    cov = {}
    for t in TICKERS:
        mpath = MODEL_DIR / t / f"metrics_{MODEL_ID}_{t}.json"
        if mpath.exists():
            m = json.loads(mpath.read_text(encoding="utf-8"))
            cov[t] = round(float(m.get("Coverage_95", np.nan)), 3)
    print("Test PICP(95%) by ticker:", cov)

    try:
        from google.colab import files
        files.download(bundle)
    except Exception:
        try:
            from IPython.display import FileLink, display
            display(FileLink(bundle))
        except Exception:
            print("Bundle saved at:", Path(bundle).resolve())

if __name__ == "__main__":
    main()

  agg = bym.groupby("ym", group_keys=False).apply(
  agg = bym.groupby("ym", group_keys=False).apply(
  agg = bym.groupby("ym", group_keys=False).apply(


AAPL: forecast fallbacks on 0 day(s): []


  agg = bym.groupby("ym", group_keys=False).apply(
  agg = bym.groupby("ym", group_keys=False).apply(
  agg = bym.groupby("ym", group_keys=False).apply(


AMZN: forecast fallbacks on 0 day(s): []
