In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
from glob import glob
from pathlib import Path

# Fix for potential path issues
import sys
import os
ROOT = os.path.dirname(os.path.abspath(''))
if ROOT not in sys.path:
    sys.path.insert(0, ROOT)

# Import our modular variance components
from dgps.static import simulate_variance_break
from estimators.forecasters import (
    forecast_variance_dist_arima_global,
    forecast_variance_dist_arima_rolling,
    forecast_garch_variance,
    variance_rmse_mae_bias,
    variance_interval_coverage,
    variance_log_score_normal,
)
from analyses.simulations import (
    mc_variance_breaks,
    mc_variance_breaks_grid
)

# Set plotting style
sns.set_theme(style="whitegrid")
plt.rcParams['figure.figsize'] = (12, 6)
plt.rcParams['font.size'] = 10


# Variance Break Analysis Workflow

This notebook provides a comprehensive workflow for analyzing variance breaks using the Pesaran (2013) framework.

## Contents:
1. **Quick Run** - Execute a small test variance break experiment
2. **Single Variance Break Analysis** - Full Monte Carlo simulation with multiple forecasting methods
3. **Grid Analysis** - Optimal window selection for different break magnitudes
4. **Loss Surface Visualization** - Heatmaps showing RMSE, coverage, and log-score
5. **Results Loading & Inspection** - Load and display saved results
6. **Plotting Utilities** - Generate diagnostic plots from results
7. **Tests** - Run the test suite

In [None]:
## Section 1: Quick Run (Development)

# Run a quick variance break experiment (small n_sim and T for fast iteration)
df_point_quick, df_unc_quick = mc_variance_breaks(
    n_sim=10,
    T=100,
    phi=0.6,
    window=20,
    horizon=5,
    scenarios=[
        {"name": "Single variance break (quick)", "variance_Tb": 50, "variance_sigma1": 1.0, "variance_sigma2": 2.0, "task": "variance"}
    ]
)

print("=== QUICK RUN: POINT METRICS ===")
print(df_point_quick.round(4).to_string(index=False))
print("\n=== QUICK RUN: UNCERTAINTY METRICS ===")
print(df_unc_quick.round(4).to_string(index=False))

## Section 2: Single Variance Break Simulation

Run a full Monte Carlo simulation for a single variance break scenario.

**Scenario Details:**
- Single break at midpoint: $T_b = T/2$
- Pre-break variance: $\sigma_1 = 1.0$
- Post-break variance: $\sigma_2 = 2.0$ (2x increase)
- AR(1) coefficient: $\phi = 0.6$
- Forecast horizon: 20 periods
- Rolling window size: 100

**Models Compared:**
1. **ARIMA Global** - Uses full sample to estimate parameters
2. **ARIMA Rolling** - Uses rolling window to adapt to break
3. **GARCH** - Estimates conditional heteroskedasticity

In [None]:
## Section 2a: Run Full Monte Carlo Simulation

# Run full variance break experiment (default parameters)
df_point, df_unc = mc_variance_breaks(
    n_sim=200,
    T=400,
    phi=0.6,
    window=100,
    horizon=20,
    scenarios=[
        {"name": "Single variance break", "variance_Tb": 200, "variance_sigma1": 1.0, "variance_sigma2": 2.0, "task": "variance"}
    ]
)

print("=== POINT METRICS: RMSE, MAE, BIAS ===")
print(df_point.round(4).to_string(index=False))

print("\n=== UNCERTAINTY METRICS: COVERAGE & LOG-SCORE ===")
print(df_unc.round(4).to_string(index=False))

# Summary statistics
print("\n=== SUMMARY ===")
print("Best model (lowest RMSE):")
rmse_row = df_point[df_point['Metric'] == 'RMSE'].iloc[0]
model_rmses = {col: rmse_row[col] for col in ['ARIMA Global', 'ARIMA Rolling', 'GARCH'] if col in rmse_row}
best_model = min(model_rmses, key=model_rmses.get)
print(f"  {best_model}: {model_rmses[best_model]:.4f}")

print("\nBest model (highest log-score):")
ls_row = df_unc[df_unc['Metric'] == 'LogScore'].iloc[0]
model_ls = {col: ls_row[col] for col in ['ARIMA Global', 'ARIMA Rolling', 'GARCH'] if col in ls_row}
best_model_ls = max(model_ls, key=model_ls.get)
print(f"  {best_model_ls}: {model_ls[best_model_ls]:.4f}")

## Section 3: Grid Analysis - Optimal Window Selection

Analyze how forecast performance varies with rolling window size and break magnitude.

**Key insight from Pesaran (2013):**
- Larger breaks require **smaller windows** to quickly adapt
- Smaller breaks can use **larger windows** for stability
- This creates a trade-off in window selection

In [None]:
## Section 3a: Run Grid Analysis

# Grid analysis: vary window size and break magnitude
df_grid = mc_variance_breaks_grid(
    n_sim=50,
    T=200,
    phi=0.6,
    horizon=20,
    window_sizes=[20, 50, 100, 200],
    break_magnitudes=[1.5, 2.0, 3.0, 5.0],
    seed=42
)

print("=== LOSS SURFACE: RMSE (lower is better) ===")
rmse_pivot = df_grid.pivot(index='Window', columns='BreakMagnitude', values='RMSE')
print(rmse_pivot.round(4).to_string())

print("\n=== LOSS SURFACE: Coverage95 (closer to 0.95 is better) ===")
cov_pivot = df_grid.pivot(index='Window', columns='BreakMagnitude', values='Coverage95')
print(cov_pivot.round(4).to_string())

print("\n=== LOSS SURFACE: LogScore (higher is better) ===")
ls_pivot = df_grid.pivot(index='Window', columns='BreakMagnitude', values='LogScore')
print(ls_pivot.round(4).to_string())

# Insights
print("\n=== INSIGHTS ===")
print("Window selection trade-off:")
for break_mag in [1.5, 2.0, 3.0, 5.0]:
    subset = df_grid[df_grid['BreakMagnitude'] == break_mag]
    best_idx = subset['RMSE'].idxmin()
    best_window = subset.loc[best_idx, 'Window']
    print(f"  Break magnitude {break_mag}x: optimal window ≈ {best_window}")

## Section 4: Loss Surface Visualization

Create heatmaps showing the RMSE, coverage, and log-score loss surfaces.

In [None]:
## Section 4a: Plot Loss Surfaces

# Create heatmap visualizations from grid results
fig, axes = plt.subplots(1, 3, figsize=(16, 4))

# Extract data for heatmaps
windows = sorted(df_grid['Window'].unique())
break_mags = sorted(df_grid['BreakMagnitude'].unique())

rmse_data = df_grid.pivot(index='Window', columns='BreakMagnitude', values='RMSE')
cov_data = df_grid.pivot(index='Window', columns='BreakMagnitude', values='Coverage95')
ls_data = df_grid.pivot(index='Window', columns='BreakMagnitude', values='LogScore')

# RMSE heatmap
sns.heatmap(rmse_data, annot=True, fmt='.4f', cmap='RdYlGn_r', ax=axes[0], cbar_kws={'label': 'RMSE'})
axes[0].set_title('RMSE (lower is better)', fontsize=12, fontweight='bold')
axes[0].set_xlabel('Break Magnitude')
axes[0].set_ylabel('Window Size')

# Coverage heatmap
sns.heatmap(cov_data, annot=True, fmt='.3f', cmap='RdYlGn', ax=axes[1], vmin=0.85, vmax=0.95, 
            cbar_kws={'label': 'Coverage'})
axes[1].set_title('Coverage 95% (target = 0.95)', fontsize=12, fontweight='bold')
axes[1].set_xlabel('Break Magnitude')
axes[1].set_ylabel('Window Size')

# Log-score heatmap
sns.heatmap(ls_data, annot=True, fmt='.4f', cmap='RdYlGn', ax=axes[2], cbar_kws={'label': 'Log-Score'})
axes[2].set_title('Log-Score (higher is better)', fontsize=12, fontweight='bold')
axes[2].set_xlabel('Break Magnitude')
axes[2].set_ylabel('Window Size')

plt.tight_layout()
plt.savefig('figures/variance_loss_surfaces.png', dpi=150, bbox_inches='tight')
plt.show()

print("Saved figures/variance_loss_surfaces.png")

## Section 5: Load and Inspect Saved Results

Load previously saved results from the `results/` directory.

In [None]:
## Section 5a: Load Latest Results

# Load the most recent variance experiment results
res_files = sorted(glob('results/variance_*_point.csv'))
if not res_files:
    print('No result files found in results/. Run mc_variance_breaks() to produce results.')
else:
    latest_point = res_files[-1]
    latest_unc = latest_point.replace('_point.csv', '_unc.csv')
    
    print(f'Loading latest results:\n  {latest_point}\n  {latest_unc}\n')
    
    df_results_point = pd.read_csv(latest_point)
    print("=== POINT METRICS ===")
    print(df_results_point.round(4).to_string(index=False))
    
    if Path(latest_unc).exists():
        df_results_unc = pd.read_csv(latest_unc)
        print("\n=== UNCERTAINTY METRICS ===")
        print(df_results_unc.round(4).to_string(index=False))
    else:
        print(f'\nWarning: Uncertainty file not found: {latest_unc}')

## Section 6: Plotting Utilities

Generate diagnostic plots from results (RMSE comparison, coverage analysis, etc.)

In [None]:
## Section 6a: RMSE and Coverage Comparison Plots

# Create comparison plots for point and uncertainty metrics
fig, axes = plt.subplots(2, 2, figsize=(14, 10))

# Plot 1: RMSE Comparison
rmse_data = df_results_point[df_results_point['Metric'] == 'RMSE']
scenarios = rmse_data['Scenario'].unique()
models = [col for col in df_results_point.columns if col not in ['Scenario', 'Metric']]

x = np.arange(len(scenarios))
width = 0.25

for i, model in enumerate(models):
    values = [rmse_data[rmse_data['Scenario'] == s][model].values[0] if model in rmse_data.columns else np.nan 
              for s in scenarios]
    axes[0, 0].bar(x + i*width, values, width, label=model)

axes[0, 0].set_ylabel('RMSE', fontweight='bold')
axes[0, 0].set_title('Root Mean Squared Error Comparison', fontweight='bold')
axes[0, 0].set_xticks(x + width)
axes[0, 0].set_xticklabels(scenarios, rotation=20, ha='right')
axes[0, 0].legend()
axes[0, 0].grid(axis='y', alpha=0.3)

# Plot 2: MAE Comparison
mae_data = df_results_point[df_results_point['Metric'] == 'MAE']
for i, model in enumerate(models):
    values = [mae_data[mae_data['Scenario'] == s][model].values[0] if model in mae_data.columns else np.nan 
              for s in scenarios]
    axes[0, 1].bar(x + i*width, values, width, label=model)

axes[0, 1].set_ylabel('MAE', fontweight='bold')
axes[0, 1].set_title('Mean Absolute Error Comparison', fontweight='bold')
axes[0, 1].set_xticks(x + width)
axes[0, 1].set_xticklabels(scenarios, rotation=20, ha='right')
axes[0, 1].legend()
axes[0, 1].grid(axis='y', alpha=0.3)

# Plot 3: Coverage 80%
if 'Coverage80' in df_results_unc['Metric'].values:
    cov80_data = df_results_unc[df_results_unc['Metric'] == 'Coverage80']
    for i, model in enumerate(models):
        values = [cov80_data[cov80_data['Scenario'] == s][model].values[0] if model in cov80_data.columns else np.nan 
                  for s in scenarios]
        axes[1, 0].bar(x + i*width, values, width, label=model)
    axes[1, 0].axhline(y=0.80, color='red', linestyle='--', linewidth=2, label='Target (0.80)')
    axes[1, 0].set_ylabel('Coverage', fontweight='bold')
    axes[1, 0].set_title('80% Interval Coverage', fontweight='bold')
    axes[1, 0].set_xticks(x + width)
    axes[1, 0].set_xticklabels(scenarios, rotation=20, ha='right')
    axes[1, 0].set_ylim([0, 1.1])
    axes[1, 0].legend()
    axes[1, 0].grid(axis='y', alpha=0.3)

# Plot 4: Coverage 95%
if 'Coverage95' in df_results_unc['Metric'].values:
    cov95_data = df_results_unc[df_results_unc['Metric'] == 'Coverage95']
    for i, model in enumerate(models):
        values = [cov95_data[cov95_data['Scenario'] == s][model].values[0] if model in cov95_data.columns else np.nan 
                  for s in scenarios]
        axes[1, 1].bar(x + i*width, values, width, label=model)
    axes[1, 1].axhline(y=0.95, color='red', linestyle='--', linewidth=2, label='Target (0.95)')
    axes[1, 1].set_ylabel('Coverage', fontweight='bold')
    axes[1, 1].set_title('95% Interval Coverage', fontweight='bold')
    axes[1, 1].set_xticks(x + width)
    axes[1, 1].set_xticklabels(scenarios, rotation=20, ha='right')
    axes[1, 1].set_ylim([0, 1.1])
    axes[1, 1].legend()
    axes[1, 1].grid(axis='y', alpha=0.3)

plt.tight_layout()
plt.savefig('figures/variance_metrics_comparison.png', dpi=150, bbox_inches='tight')
plt.show()

print("Saved figures/variance_metrics_comparison.png")

## Section 7: Diagnostics - Single Simulation Walkthrough

Examine a single simulated path to understand the data-generating process.

In [None]:
## Section 7a: Visualize Single Path with Variance Break

# Generate a single variance break path with seed for reproducibility
T = 400
variance_Tb = 200
y_single = simulate_variance_break(T=T, variance_Tb=variance_Tb, phi=0.6, variance_sigma1=1.0, variance_sigma2=2.0, seed=42)

# Split into train/test
horizon = 20
y_train = y_single[:-horizon]
y_test = y_single[-horizon:]

# Generate forecasts using all three methods
m_global, v_global = forecast_variance_dist_arima_global(y_train, horizon=horizon)
m_rolling, v_rolling = forecast_variance_dist_arima_rolling(y_train, window=100, horizon=horizon)
try:
    m_garch, v_garch = forecast_garch_variance(y_train, horizon=horizon)
    has_garch = True
except:
    has_garch = False
    m_garch = np.full(horizon, np.nan)
    v_garch = np.full(horizon, np.nan)

# Plot the path
fig, axes = plt.subplots(2, 1, figsize=(14, 8))

# Panel 1: Full series with variance break
time = np.arange(T)
axes[0].plot(time[:variance_Tb], y_single[:variance_Tb], 'o-', label='Pre-break (σ=1.0)', linewidth=1, markersize=3, alpha=0.7)
axes[0].plot(time[variance_Tb:], y_single[variance_Tb:], 'o-', label='Post-break (σ=2.0)', linewidth=1, markersize=3, alpha=0.7, color='orange')
axes[0].axvline(x=variance_Tb, color='red', linestyle='--', linewidth=2, label='Break point')
axes[0].fill_between(time[variance_Tb:], -2*np.std(y_single), 2*np.std(y_single), alpha=0.1, color='red')
axes[0].set_ylabel('$y_t$', fontweight='bold')
axes[0].set_title('Single Variance Break Path (T=400, Tb=200, σ₁=1.0, σ₂=2.0)', fontweight='bold')
axes[0].legend(loc='upper left')
axes[0].grid(alpha=0.3)

# Panel 2: Forecast comparison
test_time = np.arange(T-horizon, T)
axes[1].plot(test_time, y_test, 'ko-', label='Actual', linewidth=2, markersize=6)
axes[1].plot(test_time, m_global, 's--', label='ARIMA Global', linewidth=1.5, markersize=5)
axes[1].plot(test_time, m_rolling, '^--', label='ARIMA Rolling', linewidth=1.5, markersize=5)
if has_garch:
    axes[1].plot(test_time, m_garch, 'D--', label='GARCH', linewidth=1.5, markersize=5)

# Add 95% intervals for rolling window
axes[1].fill_between(test_time, m_rolling - 1.96*np.sqrt(v_rolling), m_rolling + 1.96*np.sqrt(v_rolling), 
                     alpha=0.2, label='95% PI (Rolling)')

axes[1].set_ylabel('Forecast', fontweight='bold')
axes[1].set_xlabel('Time', fontweight='bold')
axes[1].set_title('Forecast Comparison on Test Set (h=20)', fontweight='bold')
axes[1].legend(loc='best')
axes[1].grid(alpha=0.3)

plt.tight_layout()
plt.savefig('figures/variance_single_path.png', dpi=150, bbox_inches='tight')
plt.show()

print("Saved figures/variance_single_path.png")

# Print forecast metrics for this single path
print("\n=== SINGLE PATH FORECAST METRICS ===")
rmse_g, mae_g, bias_g = variance_rmse_mae_bias(y_test, m_global)
rmse_r, mae_r, bias_r = variance_rmse_mae_bias(y_test, m_rolling)
rmse_garch, mae_garch, bias_garch = variance_rmse_mae_bias(y_test, m_garch)

print(f"ARIMA Global:  RMSE={rmse_g:.4f}, MAE={mae_g:.4f}, Bias={bias_g:.4f}")
print(f"ARIMA Rolling: RMSE={rmse_r:.4f}, MAE={mae_r:.4f}, Bias={bias_r:.4f}")
if has_garch:
    print(f"GARCH:         RMSE={rmse_garch:.4f}, MAE={mae_garch:.4f}, Bias={bias_garch:.4f}")

# Coverage analysis
cov80_r = variance_interval_coverage(y_test, m_rolling, v_rolling, level=0.80)
cov95_r = variance_interval_coverage(y_test, m_rolling, v_rolling, level=0.95)
ls_r = variance_log_score_normal(y_test, m_rolling, v_rolling)

print(f"\n=== ROLLING WINDOW UNCERTAINTY ===")
print(f"80% Coverage: {cov80_r:.4f} (target: 0.80)")
print(f"95% Coverage: {cov95_r:.4f} (target: 0.95)")
print(f"Log-Score:    {ls_r:.4f}")


## Section 8: Tests

Run the test suite to validate the variance analysis modules.

In [None]:
## Section 8a: Run Test Suite

# Run pytest on variance-related tests
import subprocess

print("Running pytest on variance tests...\n")
result = subprocess.run(
    ['pytest', 'tests/test_variance_garch.py', 'tests/test_scenarios.py', '-v'],
    capture_output=False,
    check=False
)
print(f"\nTest result: {'PASSED' if result.returncode == 0 else 'FAILED'}")

## Section 9: References and Notes

**Key Papers:**
- Pesaran, M. H. (2013). "Forecasting Economic Time Series." MIT Press.
- Diebold, F. X., & Mariano, R. S. (1995). "Comparing Predictive Accuracy." Journal of Business & Economic Statistics.

**Methodology:**
- **Data-Generating Process**: AR(1) with variance break at $T_b$
- **Forecasting Methods**:
  - ARIMA Global: Estimates parameters on full sample (ignores break)
  - ARIMA Rolling: Uses rolling window to adapt to break (robust but noisier)
  - GARCH: Models conditional heteroskedasticity explicitly

**Key Insights:**
1. Rolling windows are essential for robust forecasting after breaks
2. Window size must be adapted to break magnitude (window-break trade-off)
3. Larger breaks require smaller windows for quick adaptation
4. Smaller breaks allow larger windows for better stability

**Next Steps:**
- Compare with real financial data (S&P 500, forex, etc.)
- Extend to multiple structural breaks
- Adaptive window selection algorithms
- Integration with portfolio optimization

## Notes and next steps

- Consider adding a cell to save the raw per-simulation outputs (`--save-raw`) so analysts can compute additional diagnostics and plots.
- If you want multi-core speedups, add an `--n-jobs` flag to `pixi.py` and pass it into `mc_variance_breaks`.
- For publication-quality plots, create a dedicated notebook that reads `results/` and produces figures with a shared matplotlib style or seaborn.

If you'd like, I can add a `--save-raw` option to `pixi.py` and a dedicated analysis notebook that demonstrates common plots (RMSE comparison, coverage over time, QQ plots of residuals).