# Modern Probabilistic Forecasting Methods

This notebook demonstrates end-to-end examples of important probabilistic forecasting methods using Python.

## 0) Common setup: data + utilities

All examples use the same synthetic dataset: one daily series with covariates (trend + seasonality + promo + price).

The DGP is deliberately realistic and **hard**: heteroscedastic noise, heavy tails (Student-t), occasional outliers, a structural break, holiday effects, and covariate interactions. This ensures different methods show genuine strengths and weaknesses.

In [None]:
import numpy as np
import pandas as pd

rng = np.random.default_rng(7)

# --- Realistic synthetic daily data ---
# This DGP is deliberately "hard" so different methods show real strengths/weaknesses:
#   - heteroscedastic noise  → constant-variance models (SARIMAX) will miscalibrate
#   - heavy tails (Student-t) → Gaussian assumptions underestimate tail risk
#   - occasional outliers     → tests robustness
#   - structural break        → favours models that weight recent data
#   - covariate interactions  → tree-based / deep models should outperform linear

n = 900
ds = pd.date_range("2022-01-01", periods=n, freq="D")
t = np.arange(n)

# ── Covariates ──
promo = (rng.random(n) < 0.12).astype(int)
price = 10 + 0.02*t + rng.normal(0, 0.4, n) - 0.8*promo

# ── Deterministic components ──
weekly  = 1.2 * np.sin(2*np.pi*t/7)
yearly  = 2.0 * np.sin(2*np.pi*t/365.25)
trend   = 0.01 * t

# ── Structural break: level shift at ~day 500 (mid-2023) ──
level_shift = 2.5 * (t >= 500).astype(float)

# ── Holiday-like effects (Christmas/NY and Easter-ish windows) ──
holidays = np.zeros(n)
for i, d in enumerate(ds):
    if (d.month == 12 and d.day >= 23) or (d.month == 1 and d.day <= 2):
        holidays[i] = rng.uniform(3, 6)       # strong Christmas / New Year lift
    elif d.month == 4 and 10 <= d.day <= 17:
        holidays[i] = rng.uniform(1, 3)        # Easter-ish bump

# ── Covariate effects with interaction ──
promo_effect = 3.0*promo + 0.5*promo*(price - price.mean())   # promo x price interaction
price_effect = -0.6 * price

# ── Heteroscedastic noise: scale varies with local level ──
base_level  = 20 + trend + yearly
noise_scale = 0.8 + 0.06 * np.abs(base_level - base_level.mean())   # ranges ~0.8 – 2.0
noise = rng.standard_t(df=5, size=n) * noise_scale                  # heavy tails (df=5)

# ── Outliers: ~1.5% of days get a large shock ──
outlier_mask = rng.random(n) < 0.015
outliers = outlier_mask * rng.choice([-1, 1], size=n) * rng.uniform(4, 8, size=n)

# ── Assemble target ──
y = (20 + trend + weekly + yearly + level_shift
     + holidays + promo_effect + price_effect
     + noise + outliers)

df = pd.DataFrame({"ds": ds, "y": y, "promo": promo, "price": price})
df = df.set_index("ds")

print(f"Series length: {len(df)}, range: [{df['y'].min():.1f}, {df['y'].max():.1f}]")
print(f"Outlier days: {outlier_mask.sum()}, structural break at day 500")

# --- train / test split ---
h = 60  # forecast horizon
train = df.iloc[:-h].copy()
test  = df.iloc[-h:].copy()

In [None]:
def make_lag_features(data: pd.DataFrame, lags=(1,2,7,14,28), rolls=(7,14,28)) -> pd.DataFrame:
    out = data.copy()
    for L in lags:
        out[f"lag_{L}"] = out["y"].shift(L)
    for R in rolls:
        out[f"roll_mean_{R}"] = out["y"].shift(1).rolling(R).mean()
        out[f"roll_std_{R}"] = out["y"].shift(1).rolling(R).std()
    # calendar features (often helps GBDT)
    idx = out.index
    out["dow"] = idx.dayofweek
    out["month"] = idx.month
    return out

def pinball_loss(y_true, y_pred_q, q: float) -> float:
    # y_pred_q is quantile prediction at q
    e = y_true - y_pred_q
    return float(np.mean(np.maximum(q*e, (q-1)*e)))

In [None]:
import matplotlib.pyplot as plt

def plot_forecast_with_interval(
    idx, y_true, p50, p10=None, p90=None, title="Forecast with interval"
):
    plt.figure(figsize=(12, 4))
    plt.plot(idx, y_true, label="y_true")
    plt.plot(idx, p50, label="p50 / mean")
    if p10 is not None and p90 is not None:
        plt.fill_between(idx, p10, p90, alpha=0.2, label="P10–P90")
    plt.title(title)
    plt.legend()
    plt.tight_layout()
    plt.show()

def empirical_coverage(y_true, lo, hi) -> float:
    y_true = np.asarray(y_true)
    lo = np.asarray(lo)
    hi = np.asarray(hi)
    return float(np.mean((y_true >= lo) & (y_true <= hi)))

def mean_interval_width(lo, hi) -> float:
    return float(np.mean(np.asarray(hi) - np.asarray(lo)))

def wis_approx_10_50_90(y_true, p10, p50, p90) -> float:
    """
    Approximate Weighted Interval Score using only {0.1,0.5,0.9}.
    This is not the full multi-alpha WIS, but is a useful single-number proxy.

    For central 80% interval: alpha=0.2 (since 10-90)
    Interval score = (U-L) + (2/alpha)*(L-y)*1(y<L) + (2/alpha)*(y-U)*1(y>U)
    """
    y = np.asarray(y_true)
    L = np.asarray(p10)
    U = np.asarray(p90)
    alpha = 0.2
    width = U - L
    under = (L - y) * (y < L)
    over  = (y - U) * (y > U)
    score_interval = width + (2/alpha) * (under + over)

    # Add absolute error of median (often included in WIS variants)
    score_median = np.abs(y - np.asarray(p50))

    return float(np.mean(score_median + 0.5 * score_interval))

def evaluate_probabilistic_forecast(y_true, p10, p50, p90, label="model"):
    y_true = np.asarray(y_true)
    p10 = np.asarray(p10); p50 = np.asarray(p50); p90 = np.asarray(p90)

    metrics = {
        "model": label,
        "pinball_0.1": pinball_loss(y_true, p10, 0.1),
        "pinball_0.5": pinball_loss(y_true, p50, 0.5),
        "pinball_0.9": pinball_loss(y_true, p90, 0.9),
        "coverage_80(P10-P90)": empirical_coverage(y_true, p10, p90),
        "mean_width(P10-P90)": mean_interval_width(p10, p90),
        "WIS_proxy(10/50/90)": wis_approx_10_50_90(y_true, p10, p50, p90),
    }
    return metrics

def print_metrics(metrics: dict):
    # pretty print
    keys = [k for k in metrics.keys() if k != "model"]
    print(f"\n== {metrics['model']} ==")
    for k in keys:
        v = metrics[k]
        if "coverage" in k:
            print(f"{k:>22}: {v:.3f}")
        else:
            print(f"{k:>22}: {v:.4f}")

In [None]:
def rolling_backtest_indices(n_total, horizon, n_folds=5, min_train=300):
    """
    Returns folds of (train_end, test_start, test_end) indices for rolling-origin evaluation.
    """
    assert min_train + horizon < n_total
    last_start = n_total - horizon
    fold_starts = np.linspace(min_train, last_start, n_folds, dtype=int)
    folds = []
    for train_end in fold_starts:
        test_start = train_end
        test_end = train_end + horizon
        if test_end <= n_total:
            folds.append((train_end, test_start, test_end))
    return folds

def aggregate_backtest_metrics(metrics_list):
    dfm = pd.DataFrame(metrics_list).drop(columns=["model"], errors="ignore")
    return dfm.mean().to_dict()

## 1) Classical probabilistic: SARIMAX (ARIMAX) in statsmodels

This is a solid baseline when you want interpretability and a well-defined stochastic model. It supports exogenous covariates and yields prediction intervals via the state-space machinery.

In [None]:
# pip install statsmodels
import statsmodels.api as sm

# Exogenous regressors (must be aligned)
exog_cols = ["promo", "price"]

# A reasonable starting SARIMAX spec (you'd tune this)
model = sm.tsa.SARIMAX(
    train["y"],
    exog=train[exog_cols],
    order=(1,1,1),
    seasonal_order=(1,0,1,7),
    enforce_stationarity=False,
    enforce_invertibility=False,
)

res = model.fit(disp=False)

# Forecast with exog for future horizon
fc = res.get_forecast(steps=h, exog=test[exog_cols])
mean = fc.predicted_mean
ci = fc.conf_int(alpha=0.10)  # 90% interval

out = pd.DataFrame({
    "y_true": test["y"],
    "y_mean": mean,
    "y_lo_90": ci.iloc[:, 0],
    "y_hi_90": ci.iloc[:, 1],
}, index=test.index)

out.head()

### Notes you'd care about in production

- SARIMAX intervals can be miscalibrated if the noise model is wrong or residuals aren't close to assumptions; calibration checks are mandatory.
- Great when seasonality is stable and the business wants "a model you can explain".

In [None]:
# --- Convert SARIMAX CI to p10/p50/p90 (approx) ---
# NOTE: statsmodels gives symmetric interval based on Gaussian assumptions;
# conf_int(alpha=0.10) corresponds to central 90% (P05-P95). We asked alpha=0.10 earlier,
# so those bounds are 5% and 95% not 10/90. We'll recompute alpha=0.20 for 10/90.
ci_80 = res.get_forecast(steps=h, exog=test[exog_cols]).conf_int(alpha=0.20)

p50 = mean.values
p10 = ci_80.iloc[:, 0].values
p90 = ci_80.iloc[:, 1].values

# --- Plot ---
plot_forecast_with_interval(
    idx=test.index,
    y_true=test["y"].values,
    p50=p50, p10=p10, p90=p90,
    title="SARIMAX forecast (P10–P90 from state-space CI)"
)

# --- Evaluate on the holdout horizon ---
m_sarimax = evaluate_probabilistic_forecast(test["y"].values, p10, p50, p90, label="SARIMAX")
print_metrics(m_sarimax)

In [None]:
# --- Rolling-origin backtest (optional) ---
metrics_bt = []
folds = rolling_backtest_indices(n_total=len(df), horizon=h, n_folds=4, min_train=400)

for train_end, test_start, test_end in folds:
    tr = df.iloc[:train_end]
    te = df.iloc[test_start:test_end]

    mod = sm.tsa.SARIMAX(
        tr["y"], exog=tr[exog_cols],
        order=(1,1,1), seasonal_order=(1,0,1,7),
        enforce_stationarity=False, enforce_invertibility=False,
    )
    r = mod.fit(disp=False)
    fc = r.get_forecast(steps=h, exog=te[exog_cols])
    p50 = fc.predicted_mean.values
    ci_80 = fc.conf_int(alpha=0.20)
    p10 = ci_80.iloc[:, 0].values
    p90 = ci_80.iloc[:, 1].values

    metrics_bt.append(evaluate_probabilistic_forecast(te["y"].values, p10, p50, p90, label="SARIMAX"))

bt_avg = aggregate_backtest_metrics(metrics_bt)
print("\nSARIMAX backtest avg metrics:", bt_avg)

## 2) Industry workhorse: LightGBM quantile regression (multiple quantiles)

Train one model per quantile (simple, very common), using lag/rolling features + covariates.

In [None]:
# pip install lightgbm
import lightgbm as lgb

feat_df = make_lag_features(df)
feat_df = feat_df.dropna()

train_f = feat_df.iloc[:-h]
test_f  = feat_df.iloc[-h:]

features = [c for c in train_f.columns if c != "y"]

X_train, y_train = train_f[features], train_f["y"]
X_test, y_test   = test_f[features], test_f["y"]

def fit_lgb_quantile(q: float):
    params = dict(
        objective="quantile",
        alpha=q,
        learning_rate=0.05,
        num_leaves=63,
        min_data_in_leaf=50,
        feature_fraction=0.9,
        bagging_fraction=0.9,
        bagging_freq=1,
        verbose=-1,
    )
    dtrain = lgb.Dataset(X_train, label=y_train)
    booster = lgb.train(params, dtrain, num_boost_round=2000, valid_sets=[dtrain], callbacks=[lgb.early_stopping(50, verbose=False)])
    return booster

qs = [0.1, 0.5, 0.9]
models = {q: fit_lgb_quantile(q) for q in qs}

preds = {q: models[q].predict(X_test) for q in qs}

lgb_out = pd.DataFrame({
    "y_true": y_test.values,
    "p10": preds[0.1],
    "p50": preds[0.5],
    "p90": preds[0.9],
}, index=test_f.index)

# quick metrics
print("Pinball(0.1):", pinball_loss(y_test.values, lgb_out["p10"].values, 0.1))
print("Pinball(0.5):", pinball_loss(y_test.values, lgb_out["p50"].values, 0.5))
print("Pinball(0.9):", pinball_loss(y_test.values, lgb_out["p90"].values, 0.9))

lgb_out.head()

# ── Plot + Evaluate ──
p10 = lgb_out["p10"].values
p50 = lgb_out["p50"].values
p90 = lgb_out["p90"].values

plot_forecast_with_interval(
    idx=lgb_out.index,
    y_true=lgb_out["y_true"].values,
    p50=p50, p10=p10, p90=p90,
    title="LightGBM quantile forecast (P10-P90)"
)

m_lgbm = evaluate_probabilistic_forecast(lgb_out["y_true"].values, p10, p50, p90, label="LightGBM-Quantiles")
print_metrics(m_lgbm)

# ── Rolling-origin backtest ──
metrics_bt = []
folds = rolling_backtest_indices(n_total=len(feat_df), horizon=h, n_folds=5, min_train=400)

for train_end, test_start, test_end in folds:
    tr = feat_df.iloc[:train_end].dropna()
    te = feat_df.iloc[test_start:test_end].dropna()

    Xtr, ytr = tr[features], tr["y"]
    Xte, yte = te[features], te["y"]

    def fit_q(q):
        params = dict(objective="quantile", alpha=q, learning_rate=0.05, num_leaves=63,
                      min_data_in_leaf=50, feature_fraction=0.9, bagging_fraction=0.9,
                      bagging_freq=1, verbose=-1)
        dtr = lgb.Dataset(Xtr, label=ytr)
        return lgb.train(params, dtr, num_boost_round=1500, valid_sets=[dtr],
                         callbacks=[lgb.early_stopping(50, verbose=False)])

    m10, m50, m90 = fit_q(0.1), fit_q(0.5), fit_q(0.9)
    p10 = m10.predict(Xte); p50 = m50.predict(Xte); p90 = m90.predict(Xte)

    metrics_bt.append(evaluate_probabilistic_forecast(yte.values, p10, p50, p90, label="LGBM"))

bt_avg = aggregate_backtest_metrics(metrics_bt)
print("\nLightGBM backtest avg metrics:", bt_avg)

### Production notes

- This is usually paired with: strong backtesting, covariate availability guarantees, leakage checks, and monitoring for feature drift.
- Quantiles can cross (P90 < P50) unless you enforce monotonicity post-hoc (e.g., isotonic fix) or use joint quantile approaches.

## 3) "Distributional boosting": NGBoost (predict a full distribution)

Instead of quantiles, you fit parameters of a distribution (e.g., Normal) via natural-gradient boosting; very practical when you want a proper probabilistic model with likelihood-based training.

In [None]:
# pip install ngboost
from ngboost import NGBRegressor
from ngboost.distns import Normal
from sklearn.ensemble import HistGradientBoostingRegressor

# Use same feature matrix as LightGBM example
base = HistGradientBoostingRegressor(max_depth=6, learning_rate=0.05)
ngb = NGBRegressor(Dist=Normal, Base=base, n_estimators=800, verbose=False, random_state=7)

ngb.fit(X_train, y_train)

dist = ngb.pred_dist(X_test)  # distribution object
p10 = dist.ppf(0.1)
p50 = dist.ppf(0.5)
p90 = dist.ppf(0.9)

ngb_out = pd.DataFrame({"y_true": y_test, "p10": p10, "p50": p50, "p90": p90}, index=test_f.index)
ngb_out.head()

# ── Plot + Evaluate ──
p10 = ngb_out["p10"].values
p50 = ngb_out["p50"].values
p90 = ngb_out["p90"].values

plot_forecast_with_interval(
    idx=ngb_out.index,
    y_true=ngb_out["y_true"].values,
    p50=p50, p10=p10, p90=p90,
    title="NGBoost (Normal) forecast (P10-P90)"
)

m_ngboost = evaluate_probabilistic_forecast(ngb_out["y_true"].values, p10, p50, p90, label="NGBoost-Normal")
print_metrics(m_ngboost)

# ── Rolling-origin backtest ──
metrics_bt = []
folds = rolling_backtest_indices(n_total=len(feat_df), horizon=h, n_folds=5, min_train=400)

for train_end, test_start, test_end in folds:
    tr = feat_df.iloc[:train_end].dropna()
    te = feat_df.iloc[test_start:test_end].dropna()

    Xtr, ytr = tr[features], tr["y"]
    Xte, yte = te[features], te["y"]

    base = HistGradientBoostingRegressor(max_depth=6, learning_rate=0.05)
    ngb = NGBRegressor(Dist=Normal, Base=base, n_estimators=600, verbose=False, random_state=7)
    ngb.fit(Xtr, ytr)

    dist = ngb.pred_dist(Xte)
    p10 = dist.ppf(0.1); p50 = dist.ppf(0.5); p90 = dist.ppf(0.9)

    metrics_bt.append(evaluate_probabilistic_forecast(yte.values, p10, p50, p90, label="NGBoost"))

bt_avg = aggregate_backtest_metrics(metrics_bt)
print("\nNGBoost backtest avg metrics:", bt_avg)

### Production notes

- You get a coherent distribution (no quantile crossing).
- But your distributional assumption (Normal, Student-t, etc.) matters; mis-specification can hurt calibration.

## 4) Calibration layer: Conformal prediction intervals (MAPIE) on top of any model

Conformal is a huge deal operationally because it gives distribution-free coverage guarantees (under conditions) and works with any regressor. MAPIE includes time-series examples.

Here's a practical pattern: fit a point model, then conformalize to get calibrated intervals.

In [None]:
# pip install mapie scikit-learn
from sklearn.model_selection import TimeSeriesSplit
from sklearn.ensemble import HistGradientBoostingRegressor
from mapie.regression import TimeSeriesRegressor  # renamed from MapieTimeSeriesRegressor in MAPIE >=1.0

base_model = HistGradientBoostingRegressor(max_depth=6, learning_rate=0.05, random_state=7)

# MAPIE time-series conformal with rolling splits
cv = TimeSeriesSplit(n_splits=5)

mapie = TimeSeriesRegressor(
    estimator=base_model,
    cv=cv,
    method="enbpi",     # EnbPI-style for time series
    agg_function="mean" # aggregation of ensemble predictions
)

mapie.fit(X_train, y_train)

# NOTE: In MAPIE >=1.0, predict() uses `confidence_level=` instead of `alpha=`.
# confidence_level=0.90 means 90% interval (same as old alpha=0.10).
y_pred, y_pis = mapie.predict(X_test, confidence_level=0.90)  # 90% interval
lo_90 = y_pis[:, 0, 0]
hi_90 = y_pis[:, 1, 0]

conf_out = pd.DataFrame({"y_true": y_test, "y_pred": y_pred, "lo_90": lo_90, "hi_90": hi_90}, index=test_f.index)

# empirical coverage
coverage = np.mean((conf_out["y_true"] >= conf_out["lo_90"]) & (conf_out["y_true"] <= conf_out["hi_90"]))
print("Empirical 90% coverage:", coverage)

conf_out.head()

# ── Evaluate (central 80%: P10-P90) ──
y_pred, y_pis = mapie.predict(X_test, confidence_level=0.80)
p10 = y_pis[:, 0, 0]
p90 = y_pis[:, 1, 0]
p50 = y_pred

# Store P10/P90 in conf_out for the rolling coverage comparison
conf_out["p10"] = p10
conf_out["p90"] = p90

plot_forecast_with_interval(
    idx=test_f.index,
    y_true=y_test.values,
    p50=p50, p10=p10, p90=p90,
    title="Conformal intervals (EnbPI) on top of point model (P10-P90 approx)"
)

m_conformal = evaluate_probabilistic_forecast(y_test.values, p10, p50, p90, label="Conformal-EnbPI")
print_metrics(m_conformal)

### Production notes

- This is one of the simplest ways to stop shipping overconfident intervals.
- You still need to choose an appropriate conformal method for your temporal dependence regime.

## 5) Deep probabilistic global model: DeepAR in GluonTS

DeepAR is a canonical probabilistic deep model for forecasting and is designed to train across many series. GluonTS focuses on probabilistic forecasting.

Below is a minimal example for one series (the real payoff is multiple series). For notebook learning, it's still useful.

In [None]:
# pip install gluonts torch
import numpy as np
import pandas as pd
from gluonts.dataset.common import ListDataset
from gluonts.torch.model.deepar import DeepAREstimator
from gluonts.torch.distributions import StudentTOutput
# NOTE: gluonts.torch.trainer.Trainer was removed in GluonTS 0.16+.
# Training params now go into trainer_kwargs; batch_size and lr are top-level.

# GluonTS expects: start timestamp + target array
training_data = ListDataset(
    [{"start": train.index[0], "target": train["y"].values}],
    freq="D",
)

# DeepAR probabilistic model
estimator = DeepAREstimator(
    freq="D",
    prediction_length=h,
    context_length=60,
    distr_output=StudentTOutput(),  # heavier tails often help
    batch_size=32,
    lr=1e-3,
    trainer_kwargs=dict(max_epochs=10),
)

predictor = estimator.train(training_data)

# Make forecast
test_data = ListDataset(
    [{"start": train.index[0], "target": df["y"].values}],
    freq="D",
)

from gluonts.evaluation.backtest import make_evaluation_predictions
forecast_it, ts_it = make_evaluation_predictions(dataset=test_data, predictor=predictor, num_samples=200)

forecast = next(forecast_it)

# quantiles
p10 = forecast.quantile(0.1)
p50 = forecast.quantile(0.5)
p90 = forecast.quantile(0.9)

deepar_out = pd.DataFrame(
    {"p10": p10, "p50": p50, "p90": p90, "y_true": test["y"].values},
    index=test.index
)
deepar_out.head()

# ── Plot + Evaluate ──
p10 = deepar_out["p10"].values
p50 = deepar_out["p50"].values
p90 = deepar_out["p90"].values

plot_forecast_with_interval(
    idx=deepar_out.index,
    y_true=deepar_out["y_true"].values,
    p50=p50, p10=p10, p90=p90,
    title="DeepAR forecast (P10-P90 from samples)"
)

m_deepar = evaluate_probabilistic_forecast(deepar_out["y_true"].values, p10, p50, p90, label="DeepAR")
print_metrics(m_deepar)

**Note:** Adding covariates in GluonTS is very doable (dynamic real features / static categorical features), but the exact dataset schema depends on whether your covariates are known-future vs observed-only; GluonTS docs walk through those patterns.

## 6) Deep SOTA for covariates: TFT (PyTorch Forecasting)

If you want one deep model that is purpose-built for:
- multi-horizon outputs
- known/unknown covariates
- categorical embeddings
- quantile forecasts

...TFT is the standard "practical SOTA" choice, and PyTorch Forecasting is the most common implementation used by practitioners.

Here is a minimal runnable example on the same dataset:

In [None]:
# pip install pytorch-forecasting pytorch-lightning torch
import pytorch_lightning as pl
import torch

from pytorch_forecasting import TimeSeriesDataSet, TemporalFusionTransformer, QuantileLoss
from pytorch_forecasting.data import GroupNormalizer

# Build a single-series "group"
tft_df = df.reset_index().rename(columns={"ds":"time"})
tft_df["series"] = "S1"
tft_df["time_idx"] = np.arange(len(tft_df))

max_encoder_length = 60
max_prediction_length = h

training_cutoff = tft_df["time_idx"].max() - max_prediction_length

training = TimeSeriesDataSet(
    tft_df[tft_df.time_idx <= training_cutoff],
    time_idx="time_idx",
    target="y",
    group_ids=["series"],
    max_encoder_length=max_encoder_length,
    max_prediction_length=max_prediction_length,
    # covariates:
    time_varying_known_reals=["time_idx", "promo", "price"],
    time_varying_unknown_reals=["y"],
    target_normalizer=GroupNormalizer(groups=["series"]),
)

validation = TimeSeriesDataSet.from_dataset(training, tft_df, predict=True, stop_randomization=True)

train_loader = training.to_dataloader(train=True, batch_size=64, num_workers=0)
val_loader = validation.to_dataloader(train=False, batch_size=64, num_workers=0)

tft = TemporalFusionTransformer.from_dataset(
    training,
    learning_rate=3e-3,
    hidden_size=16,
    attention_head_size=4,
    dropout=0.1,
    loss=QuantileLoss(quantiles=[0.1, 0.5, 0.9]),
)

trainer = pl.Trainer(
    max_epochs=10,
    accelerator="auto",
    enable_checkpointing=False,
    logger=False,
)

trainer.fit(tft, train_loader, val_loader)

raw_predictions, x = tft.predict(val_loader, mode="raw", return_x=True)
# raw_predictions["prediction"] shape: [batch, horizon, n_quantiles]
pred = raw_predictions["prediction"][0].detach().cpu().numpy()  # first batch element

p10, p50, p90 = pred[:, 0], pred[:, 1], pred[:, 2]
tft_out = pd.DataFrame({"p10": p10, "p50": p50, "p90": p90, "y_true": test["y"].values}, index=test.index)
tft_out.head()

# ── Plot + Evaluate ──
p10 = tft_out["p10"].values
p50 = tft_out["p50"].values
p90 = tft_out["p90"].values

plot_forecast_with_interval(
    idx=tft_out.index,
    y_true=tft_out["y_true"].values,
    p50=p50, p10=p10, p90=p90,
    title="TFT quantile forecast (P10-P90)"
)

m_tft = evaluate_probabilistic_forecast(tft_out["y_true"].values, p10, p50, p90, label="TFT")
print_metrics(m_tft)

### Production notes

- TFT is heavier than GBDT. It shines when you have: many series, lots of covariates, nonlinear interactions, regime changes.
- You need disciplined backtesting and careful covariate governance (especially known-future features).

## 7) Amazon Chronos-2 (zero-shot) with covariates

Chronos-2 is a pretrained foundation model for time series forecasting that works zero-shot (no training needed). It can leverage covariates for improved forecasts.

In [None]:
# pip install chronos-forecasting
import pandas as pd
from chronos import Chronos2Pipeline

# --- Build Chronos input frames (single series) ---
# Chronos expects columns: id, timestamp, target (+ optional covariates)
context_df = (
    train.reset_index()
         .rename(columns={"ds": "timestamp", "y": "target"})
         .assign(id="S1")[["id", "timestamp", "target", "promo", "price"]]
)

# Future covariates (known-in-advance): include timestamps + covariate columns, no target
future_df = (
    test.reset_index()
        .rename(columns={"ds": "timestamp"})
        .assign(id="S1")[["id", "timestamp", "promo", "price"]]
)

# --- Load pretrained Chronos-2 ---
# device_map="cpu" works everywhere (GPU: "cuda")
pipeline = Chronos2Pipeline.from_pretrained("amazon/chronos-2", device_map="cpu")

# --- Predict quantiles ---
pred_df = pipeline.predict_df(
    df=context_df,                    # context data with target + covariates
    future_df=future_df,              # enables covariate-informed forecasting
    prediction_length=h,
    quantile_levels=[0.1, 0.5, 0.9],
    id_column="id",
    timestamp_column="timestamp",
    target="target",
)

# pred_df typically contains: timestamp, predictions (central), and quantile columns "0.1","0.5","0.9"
pred_df = pred_df.sort_values("timestamp").set_index("timestamp")

p10 = pred_df["0.1"].values
p50 = pred_df["0.5"].values if "0.5" in pred_df.columns else pred_df["predictions"].values
p90 = pred_df["0.9"].values
y_true = test["y"].values

chronos_out = pd.DataFrame({"p10": p10, "p50": p50, "p90": p90, "y_true": y_true}, index=test.index)
chronos_out.head()

In [None]:
# --- Plot ---
plot_forecast_with_interval(
    idx=test.index,
    y_true=y_true,
    p50=p50, p10=p10, p90=p90,
    title="Chronos-2 (zero-shot) forecast (P10–P90)"
)

# --- Evaluate ---
m_chronos = evaluate_probabilistic_forecast(y_true, p10, p50, p90, label="Chronos-2")
print_metrics(m_chronos)

### Production notes

- Chronos-2 is zero-shot (no training needed), making it extremely fast to deploy for new series.
- Works well out-of-the-box but may underperform task-specific fine-tuned models on domains with strong patterns.
- Ideal for cold-start scenarios, exploratory analysis, or as a strong baseline before investing in custom models.

## Calibration Section: Compare All Methods

Once you've run all models, this section compares them side-by-side and provides calibration diagnostics.

**Note:** Only SARIMAX, LightGBM, and NGBoost include rolling-origin backtests above (refitting deep models per fold is expensive in a notebook). The comparison table below reflects single-holdout performance for all 7 methods.

In [None]:
all_metrics = [m_sarimax, m_lgbm, m_ngboost, m_conformal, m_deepar, m_tft, m_chronos]
metrics_df = pd.DataFrame(all_metrics).set_index("model").sort_values("WIS_proxy(10/50/90)")
metrics_df

### Rolling Coverage Curve

A rolling coverage curve shows how coverage varies over time, which is more informative than a single coverage number.

In [None]:
def rolling_coverage_curve(y_true, lo, hi, window=14):
    y = np.asarray(y_true)
    inside = ((y >= np.asarray(lo)) & (y <= np.asarray(hi))).astype(float)
    return pd.Series(inside).rolling(window).mean().values

# Collect P10/P90 for all methods
ci_80_sarimax = res.get_forecast(steps=h, exog=test[exog_cols]).conf_int(alpha=0.20)

coverage_data = {
    "SARIMAX":    (ci_80_sarimax.iloc[:, 0].values, ci_80_sarimax.iloc[:, 1].values),
    "LightGBM":   (lgb_out["p10"].values, lgb_out["p90"].values),
    "NGBoost":    (ngb_out["p10"].values, ngb_out["p90"].values),
    "Conformal":  (conf_out["p10"].values, conf_out["p90"].values),
    "DeepAR":     (deepar_out["p10"].values, deepar_out["p90"].values),
    "TFT":        (tft_out["p10"].values, tft_out["p90"].values),
    "Chronos-2":  (chronos_out["p10"].values, chronos_out["p90"].values),
}

plt.figure(figsize=(14, 5))
for label, (lo, hi) in coverage_data.items():
    cov_curve = rolling_coverage_curve(test["y"].values, lo, hi, window=14)
    plt.plot(test.index, cov_curve, label=label)

plt.axhline(0.8, linestyle="--", color="black", alpha=0.5, label="nominal 0.8")
plt.ylim(0, 1.05)
plt.ylabel("Rolling 14-day coverage")
plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left")
plt.title("Rolling interval coverage (P10-P90) - all methods")
plt.tight_layout()
plt.show()