[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/QuantLet/EMQA/blob/main/EMQA_rolling_recalibration/EMQA_rolling_recalibration.ipynb)

# EMQA_rolling_recalibration

Static vs monthly retrained (rolling recalibration) model comparison with bootstrap confidence intervals from Random Forest trees.

**Output:** `ml_rolling_recalibration.pdf`

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

plt.rcParams.update({
    'figure.facecolor': 'none',
    'axes.facecolor': 'none',
    'savefig.facecolor': 'none',
    'savefig.transparent': True,
    'axes.grid': False,
    'axes.spines.top': False,
    'axes.spines.right': False,
    'font.size': 11,
    'figure.figsize': (12, 6),
})

COLORS = {
    'blue': '#1A3A6E', 'red': '#CD0000', 'green': '#2E7D32',
    'orange': '#E67E22', 'purple': '#8E44AD', 'gray': '#808080',
    'cyan': '#00BCD4', 'amber': '#B5853F'
}

def save_fig(fig, name):
    fig.savefig(name, bbox_inches='tight', transparent=True, dpi=300)
    print(f"Saved: {name}")


In [None]:
url = 'https://raw.githubusercontent.com/QuantLet/EMQA/main/EMQA_rolling_recalibration/ro_de_prices_full.csv'
ro = pd.read_csv(url, parse_dates=['date'], index_col='date')
print(f'Loaded {len(ro)} observations')


In [None]:
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.metrics import mean_absolute_error, r2_score

data = ro[['ro_price', 'de_price']].dropna().copy()
data['target'] = data['ro_price']

# Lagged features
for lag in [1, 2, 7, 14, 30]:
    data[f'ro_lag_{lag}'] = data['ro_price'].shift(lag)
for lag in [1, 7]:
    data[f'de_lag_{lag}'] = data['de_price'].shift(lag)

# Rolling stats
for w in [7, 14, 30]:
    data[f'ro_ma_{w}'] = data['ro_price'].shift(1).rolling(w).mean()
    data[f'ro_std_{w}'] = data['ro_price'].shift(1).rolling(w).std()

# Temporal
data['dow'] = data.index.dayofweek
data['month'] = data.index.month
data['weekend'] = (data.index.dayofweek >= 5).astype(int)

data = data.dropna()
feature_cols = [c for c in data.columns if c not in ['target', 'ro_price', 'de_price']]

print(f"Dataset: {len(data)} rows, {len(feature_cols)} features")
print(f"Features: {feature_cols}")

In [None]:
# --- Static model: trained once on first 70% ---
split = int(len(data) * 0.7)
test_start = max(split, len(data) - 120)  # last 120 days for test

X_all = data[feature_cols]
y_all = data['target']

X_train_static = X_all.iloc[:split]
y_train_static = y_all.iloc[:split]

X_test_period = X_all.iloc[test_start:]
y_test_period = y_all.iloc[test_start:]

# Static RF
rf_static = RandomForestRegressor(n_estimators=150, max_depth=10, random_state=42, n_jobs=-1)
rf_static.fit(X_train_static, y_train_static)
static_pred = pd.Series(rf_static.predict(X_test_period), index=y_test_period.index)

# Bootstrap CI from static RF trees
static_tree_preds = np.array([tree.predict(X_test_period.values)
                               for tree in rf_static.estimators_])
static_ci_lower = pd.Series(np.percentile(static_tree_preds, 2.5, axis=0), index=y_test_period.index)
static_ci_upper = pd.Series(np.percentile(static_tree_preds, 97.5, axis=0), index=y_test_period.index)

# --- Rolling model: retrained every 30 days with expanding window ---
rolling_pred = pd.Series(dtype=float, index=y_test_period.index)
rolling_ci_lower = pd.Series(dtype=float, index=y_test_period.index)
rolling_ci_upper = pd.Series(dtype=float, index=y_test_period.index)
recalib_dates = []

test_indices = list(range(test_start, len(data)))
chunk_size = 30

for chunk_start_idx in range(0, len(test_indices), chunk_size):
    chunk_end_idx = min(chunk_start_idx + chunk_size, len(test_indices))
    idx_slice = test_indices[chunk_start_idx:chunk_end_idx]

    # Expanding training window up to current chunk start
    train_end = test_indices[chunk_start_idx]
    X_tr = X_all.iloc[:train_end]
    y_tr = y_all.iloc[:train_end]

    rf_roll = RandomForestRegressor(n_estimators=150, max_depth=10, random_state=42, n_jobs=-1)
    rf_roll.fit(X_tr, y_tr)

    X_chunk = X_all.iloc[idx_slice]
    preds = rf_roll.predict(X_chunk)
    rolling_pred.iloc[chunk_start_idx:chunk_end_idx] = preds

    # Bootstrap CI from individual tree predictions
    tree_preds = np.array([tree.predict(X_chunk.values) for tree in rf_roll.estimators_])
    rolling_ci_lower.iloc[chunk_start_idx:chunk_end_idx] = np.percentile(tree_preds, 2.5, axis=0)
    rolling_ci_upper.iloc[chunk_start_idx:chunk_end_idx] = np.percentile(tree_preds, 97.5, axis=0)

    recalib_dates.append(data.index[test_indices[chunk_start_idx]])

print(f"Recalibration dates: {len(recalib_dates)}")
for d in recalib_dates:
    print(f"  {d.strftime('%Y-%m-%d')}")

In [None]:
# --- Metrics ---
mask = rolling_pred.notna()

mae_static = mean_absolute_error(y_test_period[mask], static_pred[mask])
mae_rolling = mean_absolute_error(y_test_period[mask], rolling_pred[mask])
r2_static = r2_score(y_test_period[mask], static_pred[mask])
r2_rolling = r2_score(y_test_period[mask], rolling_pred[mask])

# Direction accuracy
def direction_accuracy(actual, predicted):
    actual_dir = actual.diff().dropna() > 0
    pred_dir = predicted.diff().dropna() > 0
    common = actual_dir.index.intersection(pred_dir.index)
    return (actual_dir[common] == pred_dir[common]).mean()

da_static = direction_accuracy(y_test_period[mask], static_pred[mask])
da_rolling = direction_accuracy(y_test_period[mask], rolling_pred[mask])

# CI coverage
cov_static = np.mean((y_test_period[mask].values >= static_ci_lower[mask].values) &
                      (y_test_period[mask].values <= static_ci_upper[mask].values)) * 100
cov_rolling = np.mean((y_test_period[mask].values >= rolling_ci_lower[mask].values) &
                       (y_test_period[mask].values <= rolling_ci_upper[mask].values)) * 100

print(f"Static  - MAE: {mae_static:.2f}, R2: {r2_static:.4f}, Dir Acc: {da_static:.3f}, 95% CI Coverage: {cov_static:.1f}%")
print(f"Rolling - MAE: {mae_rolling:.2f}, R2: {r2_rolling:.4f}, Dir Acc: {da_rolling:.3f}, 95% CI Coverage: {cov_rolling:.1f}%")

In [None]:
# --- 2-panel chart ---
fig, axes = plt.subplots(1, 2, figsize=(16, 6), gridspec_kw={'width_ratios': [2, 1]})

# (A) Time series with CIs
ax = axes[0]
ax.plot(y_test_period.index, y_test_period.values, color=COLORS['blue'], lw=1.5, label='Actual')
ax.plot(static_pred.index, static_pred.values, color=COLORS['orange'], lw=1.2, ls='--',
        alpha=0.8, label='Static Model')
ax.fill_between(static_pred.index, static_ci_lower.values, static_ci_upper.values,
                color=COLORS['orange'], alpha=0.10, label='Static 95% CI')
ax.plot(rolling_pred.index, rolling_pred.values, color=COLORS['green'], lw=1.5, ls='-',
        alpha=0.9, label='Rolling Model')
ax.fill_between(rolling_pred.index, rolling_ci_lower.values, rolling_ci_upper.values,
                color=COLORS['green'], alpha=0.12, label='Rolling 95% CI')

for rd in recalib_dates:
    ax.axvline(rd, color=COLORS['red'], ls=':', lw=0.8, alpha=0.6)
# One label for recalibration lines
ax.axvline(recalib_dates[0], color=COLORS['red'], ls=':', lw=0.8, alpha=0.6, label='Recalibration')

ax.set_title('(A) Actual vs Static vs Rolling Predictions', fontsize=13, fontweight='bold')
ax.set_xlabel('Date')
ax.set_ylabel('Price (EUR/MWh)')
ax.tick_params(axis='x', rotation=30)
ax.legend(loc='upper center', bbox_to_anchor=(0.5, -0.15), frameon=False, ncol=3)

# (B) Bar chart: MAE, R2, Direction Accuracy
ax2 = axes[1]
metrics = ['MAE', 'R$^2$', 'Dir. Acc.']
static_vals = [mae_static, r2_static, da_static]
rolling_vals = [mae_rolling, r2_rolling, da_rolling]

x = np.arange(len(metrics))
width = 0.32

bars1 = ax2.bar(x - width/2, static_vals, width, color=COLORS['orange'], alpha=0.8, label='Static')
bars2 = ax2.bar(x + width/2, rolling_vals, width, color=COLORS['green'], alpha=0.8, label='Rolling')

for bar, val in zip(bars1, static_vals):
    ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
             f'{val:.3f}', ha='center', va='bottom', fontsize=9, fontweight='bold')
for bar, val in zip(bars2, rolling_vals):
    ax2.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.01,
             f'{val:.3f}', ha='center', va='bottom', fontsize=9, fontweight='bold')

ax2.set_xticks(x)
ax2.set_xticklabels(metrics, fontsize=11)
ax2.set_title('(B) Metric Comparison', fontsize=13, fontweight='bold')
ax2.legend(loc='upper center', bbox_to_anchor=(0.5, -0.10), frameon=False, ncol=2)

fig.suptitle('Static vs Rolling Recalibration', fontsize=15, fontweight='bold', y=1.02)
fig.tight_layout()
save_fig(fig, 'ml_rolling_recalibration.pdf')
plt.show()