# V8 Rolling Walk-Forward Backtest for MSFT 1-Minute GRU ModelThis notebook implements a **strictly causal** rolling walk-forward backtest system that:1. Makes predictions at each minute using only past data (no lookahead bias)2. Generates 15-step ahead forecasts using autoregressive sampling3. Visualizes predictions as a "fan chart" / "prediction cone"4. Computes directional accuracy and path divergence metrics**Key Design Principles:**- **Strict Causality**: At time t, model only sees data from t-window to t-1- **Autoregressive Generation**: Uses `generate_realistic()` with fresh random noise each step- **Rolling Evaluation**: Simulates real-time deployment scenario

## Section 1: Imports and ConfigurationImport all required libraries and define configuration constants for the rolling backtest.

In [None]:
import importlib.utilimport subprocessimport sysrequired = {    'alpaca': 'alpaca-py',    'numpy': 'numpy',    'pandas': 'pandas',    'matplotlib': 'matplotlib',    'pandas_market_calendars': 'pandas-market-calendars',    'tqdm': 'tqdm',}missing = [pkg for mod, pkg in required.items() if importlib.util.find_spec(mod) is None]if missing:    print('Installing missing packages:', missing)    subprocess.check_call([sys.executable, '-m', 'pip', 'install', *missing])else:    print('All required third-party packages are already installed.')

In [None]:
from __future__ import annotationsimport copyimport osimport randomimport timefrom dataclasses import dataclass, fieldfrom datetime import datetime, timedelta, timezonefrom typing import List, Dict, Tuple, Optional, Callablefrom collections import dequeimport numpy as npimport pandas as pdimport pandas_market_calendars as mcalimport torchimport torch.nn as nnfrom alpaca.data.enums import DataFeedfrom alpaca.data.historical import StockHistoricalDataClientfrom alpaca.data.requests import StockBarsRequestfrom alpaca.data.timeframe import TimeFramefrom IPython.display import display, clear_outputfrom matplotlib import pyplot as pltfrom matplotlib.patches import Patch, Rectanglefrom matplotlib.lines import Line2Dfrom torch.utils.data import DataLoader, Datasetfrom tqdm import tqdm# Set matplotlib style for dark themeplt.style.use('dark_background')

In [None]:
# ==============================================================================# ROLLING BACKTEST CONFIGURATION# ==============================================================================# Model Configuration (MUST match v7 training configuration)SYMBOL = 'MSFT'HORIZON = 15  # Prediction horizon (15 minutes ahead)LOOKBACK = 160  # Model input window sizeHIDDEN_SIZE = 256NUM_LAYERS = 2DROPOUT = 0.20# OHLC columnsOHLC_COLS = ['Open', 'High', 'Low', 'Close']TARGET_COLS = ['rOpen', 'rHigh', 'rLow', 'rClose']BASE_FEATURE_COLS = [    'rOpen', 'rHigh', 'rLow', 'rClose',    'logVolChange', 'logTradeCountChange',    'vwapDelta', 'rangeFrac', 'orderFlowProxy', 'tickPressure',]# Rolling Backtest ConfigurationROLLING_START_TIME = "09:30"  # Market openROLLING_END_TIME = "16:00"    # Market closeMAX_PREDICTION_AGE = 50       # Show last 50 predictions on fan chartFAN_OPACITY_DECAY = 0.02      # Opacity reduction per step backTEMPERATURE_DEFAULT = 1.5     # Sampling temperature for generationMIN_PREDICTED_VOL = 0.0001    # Minimum volatility to prevent flat lines# Visualization ConfigurationFIGURE_SIZE = (20, 10)DPI = 100# Device ConfigurationDEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')print(f'Using device: {DEVICE}')if torch.cuda.is_available():    print(f'GPU: {torch.cuda.get_device_name(0)}')print("\nRolling Backtest Configuration:")print(f"  Symbol: {SYMBOL}")print(f"  Lookback window: {LOOKBACK} bars")print(f"  Prediction horizon: {HORIZON} bars")print(f"  Trading hours: {ROLLING_START_TIME} - {ROLLING_END_TIME}")print(f"  Max prediction age in fan: {MAX_PREDICTION_AGE}")

### V7 Model Architecture (Seq2SeqAttnGRU)Recreate the exact model architecture from v7 for loading trained weights.

In [None]:
class Seq2SeqAttnGRU(nn.Module):    """    Sequence-to-Sequence GRU with Attention for multi-step OHLC forecasting.    This is the EXACT v7 architecture - DO NOT MODIFY.    """        def __init__(self, input_size, output_size, hidden_size, num_layers, dropout, horizon):        super().__init__()        self.horizon = horizon        self.output_size = output_size        self.hidden_size = hidden_size                self.encoder = nn.GRU(            input_size=input_size, hidden_size=hidden_size,            num_layers=num_layers, batch_first=True,            dropout=dropout if num_layers > 1 else 0.0,        )        self.decoder_cell = nn.GRUCell(output_size + hidden_size, hidden_size)        self.attn_proj = nn.Linear(hidden_size, hidden_size, bias=False)                # Output mu and log_sigma for each OHLC        self.mu_head = nn.Sequential(            nn.Linear(hidden_size * 2, hidden_size),            nn.GELU(),            nn.Linear(hidden_size, output_size),        )        self.log_sigma_head = nn.Sequential(            nn.Linear(hidden_size * 2, hidden_size // 2),            nn.GELU(),            nn.Linear(hidden_size // 2, output_size),        )                # Initialize sigma head to predict moderate volatility initially        nn.init.xavier_uniform_(self.mu_head[-1].weight, gain=0.1)        nn.init.zeros_(self.mu_head[-1].bias)        nn.init.zeros_(self.log_sigma_head[-1].weight)        nn.init.zeros_(self.log_sigma_head[-1].bias)            def _attend(self, h_dec, enc_out):        """Compute attention weights and context vector."""        query = self.attn_proj(h_dec).unsqueeze(2)        scores = torch.bmm(enc_out, query).squeeze(2)        weights = torch.softmax(scores, dim=1)        context = torch.bmm(weights.unsqueeze(1), enc_out).squeeze(1)        return context        def forward(self, x, y_teacher=None, teacher_forcing_ratio=0.0, return_sigma=False):        """Forward pass for training."""        enc_out, h = self.encoder(x)        h_dec = h[-1]        dec_input = x[:, -1, :self.output_size]  # Last timestep OHLC returns                mu_seq, sigma_seq = [], []        for t in range(self.horizon):            context = self._attend(h_dec, enc_out)            cell_input = torch.cat([dec_input, context], dim=1)            h_dec = self.decoder_cell(cell_input, h_dec)            out_features = torch.cat([h_dec, context], dim=1)                        mu = self.mu_head(out_features)            log_sigma = self.log_sigma_head(out_features)                        mu_seq.append(mu.unsqueeze(1))            sigma_seq.append(log_sigma.unsqueeze(1))                        # Teacher forcing or autoregressive            if y_teacher is not None and teacher_forcing_ratio > 0.0:                if teacher_forcing_ratio >= 1.0 or torch.rand(1).item() < teacher_forcing_ratio:                    dec_input = y_teacher[:, t, :]                else:                    noise = torch.randn_like(mu) * torch.exp(log_sigma).detach()                    dec_input = mu + noise            else:                dec_input = mu                mu_out = torch.cat(mu_seq, dim=1)        sigma_out = torch.cat(sigma_seq, dim=1)                if return_sigma:            return mu_out, sigma_out        return mu_out        def generate_realistic(self, x, temperature=1.0, historical_vol=None):        """        Generate realistic price paths with controlled stochasticity.                Args:            x: Input tensor [batch, lookback, features]            temperature: Controls volatility (1.0 = learned vol, >1.0 = more wild)            historical_vol: Optional override for first 5 steps                    Returns:            Generated return sequences [batch, horizon, 4]        """        self.eval()        with torch.no_grad():            enc_out, h = self.encoder(x)            h_dec = h[-1]            dec_input = x[:, -1, :self.output_size]                        generated = []            for t in range(self.horizon):                context = self._attend(h_dec, enc_out)                cell_input = torch.cat([dec_input, context], dim=1)                h_dec = self.decoder_cell(cell_input, h_dec)                out_features = torch.cat([h_dec, context], dim=1)                                mu = self.mu_head(out_features)                log_sigma = self.log_sigma_head(out_features)                                # Scale sigma by temperature                sigma = torch.exp(log_sigma) * temperature                                # Optional: override with historical volatility for first few steps                if historical_vol is not None and t < 5:                    sigma = torch.ones_like(sigma) * historical_vol                                # Ensure minimum volatility to prevent flatness                sigma = torch.maximum(sigma, torch.tensor(MIN_PREDICTED_VOL, device=x.device))                                # Sample from distribution (FRESH noise each step - critical for diversity)                noise = torch.randn_like(mu) * sigma                sample = mu + noise                                generated.append(sample.unsqueeze(1))                dec_input = sample  # Feed back the sample (autoregressive)                        return torch.cat(generated, dim=1)

### Helper Functions (from v7)

In [None]:
def enforce_candle_validity(ohlc: np.ndarray) -> np.ndarray:    """    Enforce OHLC validity: High >= max(Open, Close), Low <= min(Open, Close).        Args:        ohlc: Array of shape [N, 4] with columns [Open, High, Low, Close]            Returns:        Validated OHLC array    """    out = np.asarray(ohlc, dtype=np.float32)    o, h, l, c = out[:, 0], out[:, 1], out[:, 2], out[:, 3]    out[:, 1] = np.maximum.reduce([h, o, c])    out[:, 2] = np.minimum.reduce([l, o, c])    return outdef returns_to_prices_seq(return_ohlc: np.ndarray, last_close: float) -> np.ndarray:    """    Convert return OHLC sequences back to price sequences.        Args:        return_ohlc: Array of shape [horizon, 4] with log returns        last_close: Last known close price            Returns:        Price OHLC array of shape [horizon, 4]    """    seq = []    prev_close = float(last_close)    for rO, rH, rL, rC in np.asarray(return_ohlc, dtype=np.float32):        o = prev_close * np.exp(float(rO))        h = prev_close * np.exp(float(rH))        l = prev_close * np.exp(float(rL))        c = prev_close * np.exp(float(rC))        cand = enforce_candle_validity(np.array([[o, h, l, c]], dtype=np.float32))[0]        seq.append(cand)        prev_close = float(cand[3])    return np.asarray(seq, dtype=np.float32)def build_feature_frame(df: pd.DataFrame) -> pd.DataFrame:    """    Build feature frame from price data.    This function is adapted from v7 training pipeline.    """    eps = 1e-9    g = df.groupby('session_id', sort=False)    prev_close = g['Close'].shift(1)    prev_close = prev_close.fillna(df['Open'])    prev_vol = g['Volume'].shift(1).fillna(df['Volume'])    prev_tc = g['TradeCount'].shift(1).fillna(df['TradeCount'])    prev_imp = g['is_imputed'].shift(1).fillna(0).astype(bool)        row_imputed = (df['is_imputed'].astype(bool) | prev_imp)    row_open_skip = (df['bar_in_session'].astype(int) < 6)  # SKIP_OPEN_BARS_TARGET = 6        out = pd.DataFrame(index=df.index, dtype=np.float32)    out['rOpen'] = np.log(df['Open'] / (prev_close + eps))    out['rHigh'] = np.log(df['High'] / (prev_close + eps))    out['rLow'] = np.log(df['Low'] / (prev_close + eps))    out['rClose'] = np.log(df['Close'] / (prev_close + eps))    out['logVolChange'] = np.log((df['Volume'] + 1.0) / (prev_vol + 1.0))    out['logTradeCountChange'] = np.log((df['TradeCount'] + 1.0) / (prev_tc + 1.0))    out['vwapDelta'] = np.log((df['VWAP'] + eps) / (df['Close'] + eps))    out['rangeFrac'] = np.maximum(out['rHigh'] - out['rLow'], 0) / (np.abs(out['rClose']) + eps)        signed_body = (df['Close'] - df['Open']) / ((df['High'] - df['Low']) + eps)    out['orderFlowProxy'] = signed_body * np.log1p(df['Volume'])    out['tickPressure'] = np.sign(df['Close'] - df['Open']) * np.log1p(df['TradeCount'])        out['row_imputed'] = row_imputed.astype(np.int8).to_numpy()    out['row_open_skip'] = row_open_skip.astype(np.int8).to_numpy()    out['prev_close'] = prev_close.astype(np.float32).to_numpy()    return out.astype(np.float32)

## Section 2: Rolling Backtest EngineCore rolling backtest implementation with strict causality guarantees.

In [None]:
@dataclassclass RollingPredictionLog:    """    Log entry for a single rolling prediction.        This dataclass stores all information needed to evaluate a prediction    made at a specific anchor time, including the context, prediction,    and actual future for comparison.    """    anchor_time: pd.Timestamp          # Timestamp when prediction was made (time t)    context_end_price: float           # Close price at t-1 (last known price)    predicted_path: pd.DataFrame       # DataFrame with 15 rows of predicted OHLC    actual_path: pd.DataFrame          # DataFrame with 15 rows of actual future OHLC    prediction_horizon: int = 15       # Number of steps predicted    temperature: float = 1.5           # Temperature used for generation        # Computed metrics (populated after creation)    step_mae: Optional[np.ndarray] = None  # MAE at each step [horizon]    directional_hit: Optional[bool] = None  # Did t+1 direction match?        def __post_init__(self):        """Validate the prediction log on creation."""        assert len(self.predicted_path) == self.prediction_horizon, \            f"Predicted path must have {self.prediction_horizon} rows, got {len(self.predicted_path)}"        assert len(self.actual_path) == self.prediction_horizon, \            f"Actual path must have {self.prediction_horizon} rows, got {len(self.actual_path)}"        assert list(self.predicted_path.columns) == OHLC_COLS, \            f"Predicted path columns must be {OHLC_COLS}"        assert list(self.actual_path.columns) == OHLC_COLS, \            f"Actual path columns must be {OHLC_COLS}"                # Critical causality check: first prediction timestamp must equal anchor time        assert self.predicted_path.index[0] == self.anchor_time, \            f"First prediction timestamp {self.predicted_path.index[0]} must equal anchor time {self.anchor_time}"                # Critical causality check: actual path starts at anchor time        assert self.actual_path.index[0] == self.anchor_time, \            f"First actual timestamp {self.actual_path.index[0]} must equal anchor time {self.anchor_time}"        def compute_metrics(self):        """Compute step-by-step MAE and directional accuracy."""        # Step-by-step MAE for Close prices        pred_close = self.predicted_path['Close'].values        actual_close = self.actual_path['Close'].values        self.step_mae = np.abs(pred_close - actual_close)                # Directional accuracy for t+1        pred_t1_return = pred_close[0] - self.context_end_price        actual_t1_return = actual_close[0] - self.context_end_price        self.directional_hit = np.sign(pred_t1_return) == np.sign(actual_t1_return)                return self        def get_step_mae(self, step: int) -> float:        """Get MAE at specific step (0-indexed)."""        if self.step_mae is None:            self.compute_metrics()        return float(self.step_mae[step])        def get_path_slope(self) -> float:        """Get slope of predicted close price path (for trend correlation)."""        closes = self.predicted_path['Close'].values        x = np.arange(len(closes))        return np.polyfit(x, closes, 1)[0]  # Return slope coefficient        def get_actual_slope(self) -> float:        """Get slope of actual close price path."""        closes = self.actual_path['Close'].values        x = np.arange(len(closes))        return np.polyfit(x, closes, 1)[0]

In [None]:
class RollingBacktester:    """    Rolling walk-forward backtest engine with strict causality.        This class manages the rolling prediction loop:    - At each minute t, uses data from [t-LOOKBACK, t-1] as context    - Generates HORIZON-step ahead predictions for [t, t+HORIZON-1]    - Stores predictions in RollingPredictionLog objects    - Validates strict causality (no future data leakage)    """        def __init__(        self,        model: nn.Module,        price_df: pd.DataFrame,        feature_df: pd.DataFrame,        lookback: int = LOOKBACK,        horizon: int = HORIZON,        device: torch.device = DEVICE,        input_mean: Optional[np.ndarray] = None,        input_std: Optional[np.ndarray] = None,    ):        """        Initialize the rolling backtester.                Args:            model: Trained Seq2SeqAttnGRU model            price_df: DataFrame with OHLCV data and session info            feature_df: DataFrame with engineered features            lookback: Number of historical bars to use as context            horizon: Number of steps to predict ahead            device: Torch device for inference            input_mean: Mean for input normalization (from training)            input_std: Std for input normalization (from training)        """        self.model = model.to(device)        self.model.eval()  # Always eval mode for inference        self.price_df = price_df.copy()        self.feature_df = feature_df.copy()        self.lookback = lookback        self.horizon = horizon        self.device = device                # Input normalization (fit on training data, apply to all)        if input_mean is None:            self.input_mean = feature_df[BASE_FEATURE_COLS].mean().values            self.input_std = feature_df[BASE_FEATURE_COLS].std().values        else:            self.input_mean = input_mean            self.input_std = input_std        self.input_std = np.where(self.input_std < 1e-8, 1.0, self.input_std)                # Storage for prediction logs        self.prediction_logs: List[RollingPredictionLog] = []                # Pre-compute scaled features for efficiency        self._precompute_scaled_features()        def _precompute_scaled_features(self):        """Pre-compute scaled features for all timepoints."""        features = self.feature_df[BASE_FEATURE_COLS].values        self.scaled_features = (features - self.input_mean) / self.input_std                # Add imputed fraction column        self.row_imputed = self.feature_df['row_imputed'].values.astype(bool)        def _compute_historical_vol(self, end_idx: int) -> float:        """        Compute realized volatility from recent close prices.                Args:            end_idx: Index up to which to compute (exclusive)                    Returns:            Historical volatility (std of log returns)        """        start_idx = max(0, end_idx - self.lookback)        closes = self.price_df['Close'].iloc[start_idx:end_idx].values        if len(closes) < 2:            return 0.001        log_returns = np.log(closes[1:] / closes[:-1])        return float(np.std(log_returns)) if len(log_returns) > 1 else 0.001        def _make_prediction(        self,         context_start: int,         context_end: int,        temperature: float = TEMPERATURE_DEFAULT    ) -> Tuple[np.ndarray, float]:        """        Make a single prediction using autoregressive generation.                Args:            context_start: Start index of context window (inclusive)            context_end: End index of context window (exclusive) - this is time t            temperature: Sampling temperature                    Returns:            Tuple of (predicted_returns [horizon, 4], historical_vol)        """        # CRITICAL CAUSALITY: context_end is exclusive, so we use [context_start, context_end-1]        # This means at time t, we only see data up to and including t-1        assert context_end - context_start == self.lookback, \            f"Context window size must be {self.lookback}, got {context_end - context_start}"                # Extract and prepare input features        x_raw = self.scaled_features[context_start:context_end]        imp_frac = float(self.row_imputed[context_start:context_end].mean())        imp_col = np.full((self.lookback, 1), imp_frac, dtype=np.float32)        x_aug = np.concatenate([x_raw, imp_col], axis=1)                # Compute historical volatility for scaling        hist_vol = self._compute_historical_vol(context_end)                # Convert to tensor and move to device        x_tensor = torch.from_numpy(x_aug).unsqueeze(0).float().to(self.device)  # [1, lookback, features]                # Generate prediction with FRESH random noise        with torch.no_grad():  # CRITICAL: No gradient computation for inference            pred_returns = self.model.generate_realistic(                x_tensor,                 temperature=temperature,                historical_vol=hist_vol            )                return pred_returns[0].cpu().numpy(), hist_vol  # [horizon, 4]        def run_rolling_backtest(        self,        start_time: str = ROLLING_START_TIME,        end_time: str = ROLLING_END_TIME,        date: Optional[str] = None,        temperature: float = TEMPERATURE_DEFAULT,        verbose: bool = True,    ) -> List[RollingPredictionLog]:        """        Run the complete rolling walk-forward backtest.                Args:            start_time: Start time for backtest (HH:MM format)            end_time: End time for backtest (HH:MM format)            date: Specific date to run (YYYY-MM-DD), or None for first available            temperature: Sampling temperature for generation            verbose: Whether to show progress bar                    Returns:            List of RollingPredictionLog objects        """        # Filter to specific date if requested        if date:            mask = self.price_df.index.strftime('%Y-%m-%d') == date            price_slice = self.price_df[mask]        else:            # Use first available date            first_date = self.price_df.index[0].strftime('%Y-%m-%d')            mask = self.price_df.index.strftime('%Y-%m-%d') == first_date            price_slice = self.price_df[mask]                if len(price_slice) == 0:            raise ValueError(f"No data available for date {date}")                # Filter to trading hours        start_dt = pd.Timestamp(f"{price_slice.index[0].date()} {start_time}")        end_dt = pd.Timestamp(f"{price_slice.index[0].date()} {end_time}")                # Get indices for rolling loop        all_indices = self.price_df.index        rolling_indices = all_indices.get_indexer(price_slice.index)                # Find start and end indices within the filtered range        valid_start = self.lookback  # Need at least lookback bars before first prediction        valid_end = len(self.price_df) - self.horizon  # Need horizon bars after last prediction                # Filter to valid range        rolling_indices = rolling_indices[(rolling_indices >= valid_start) & (rolling_indices < valid_end)]                if len(rolling_indices) == 0:            raise ValueError("No valid rolling indices found. Check data range and lookback/horizon.")                self.prediction_logs = []                # Progress bar        iterator = tqdm(rolling_indices, desc="Rolling Backtest") if verbose else rolling_indices                for idx in iterator:            # CRITICAL CAUSALITY: At time t (idx), we only see [t-lookback, t-1]            context_start = idx - self.lookback            context_end = idx  # Exclusive - this is time t                        # Get anchor time and context end price            anchor_time = all_indices[idx]            context_end_price = float(self.price_df['Close'].iloc[context_end - 1])                        # Make prediction            pred_returns, hist_vol = self._make_prediction(                context_start, context_end, temperature            )                        # Convert returns to prices            pred_prices = returns_to_prices_seq(pred_returns, context_end_price)                        # Get actual future prices            actual_future = self.price_df[OHLC_COLS].iloc[idx:idx + self.horizon]                        # Create prediction log            pred_df = pd.DataFrame(                pred_prices,                index=actual_future.index,                columns=OHLC_COLS            )                        log = RollingPredictionLog(                anchor_time=anchor_time,                context_end_price=context_end_price,                predicted_path=pred_df,                actual_path=actual_future.copy(),                prediction_horizon=self.horizon,                temperature=temperature,            )            log.compute_metrics()            self.prediction_logs.append(log)                        # Update progress bar            if verbose and hasattr(iterator, 'set_postfix'):                iterator.set_postfix({                    'anchor': anchor_time.strftime('%H:%M'),                    'vol': f"{hist_vol:.4f}"                })                return self.prediction_logs

In [None]:
# ==============================================================================# CAUSALITY VALIDATION FUNCTIONS# ==============================================================================def validate_no_lookahead_bias(    backtester: RollingBacktester,    sample_size: int = 10) -> None:    """    Validate that no prediction uses future information.        This function performs several critical checks:    1. Verify prediction logs match expected count    2. Check that each prediction starts at its anchor time    3. Verify context windows don't include future data    4. Sample check: prediction doesn't know actual future        Args:        backtester: The RollingBacktester instance        sample_size: Number of random samples to check    """    logs = backtester.prediction_logs    price_df = backtester.price_df        print("\n=== CAUSALITY VALIDATION ===")        # Check 1: Verify prediction count    expected_minutes = len(logs)    assert len(logs) == expected_minutes, \        f"Expected {expected_minutes} predictions, got {len(logs)}"    print(f"✓ Prediction count validated: {len(logs)} logs")        # Check 2: Each prediction starts at anchor time    for i, log in enumerate(logs):        assert log.predicted_path.index[0] == log.anchor_time, \            f"Log {i}: First prediction timestamp {log.predicted_path.index[0]} must equal anchor time {log.anchor_time}"    print(f"✓ All {len(logs)} predictions start at correct anchor time")        # Check 3: Sample predictions don't match actual future perfectly    sample_indices = np.random.choice(len(logs), min(sample_size, len(logs)), replace=False)    for idx in sample_indices:        log = logs[idx]        pred = log.predicted_path['Close'].values        actual = log.actual_path['Close'].values                mae = np.mean(np.abs(pred - actual))        rel_error = mae / (np.mean(actual) + 1e-8)                assert rel_error > 1e-6, \            f"Log {idx}: Prediction suspiciously close to actual (rel_error={rel_error:.8f}). Possible data leakage!"    print(f"✓ Sampled {len(sample_indices)} predictions - no evidence of data leakage")        # Check 4: Verify timestamps are sequential    for i in range(1, len(logs)):        prev_anchor = logs[i-1].anchor_time        curr_anchor = logs[i].anchor_time        time_diff = (curr_anchor - prev_anchor).total_seconds()        assert time_diff == 60, \            f"Log {i}: Non-sequential timestamps. {prev_anchor} -> {curr_anchor} = {time_diff}s"    print(f"✓ All timestamps are sequential (60s intervals)")        print("\n✅ ALL CAUSALITY CHECKS PASSED - No lookahead bias detected!\n")def validate_off_by_one(backtester: RollingBacktester) -> None:    """    Validate no off-by-one errors in context/prediction alignment.    """    logs = backtester.prediction_logs    price_df = backtester.price_df        print("\n=== OFF-BY-ONE VALIDATION ===")        for i, log in enumerate(logs[:5]):        anchor = log.anchor_time        anchor_idx = price_df.index.get_loc(anchor)        prev_close_actual = price_df['Close'].iloc[anchor_idx - 1]                assert abs(log.context_end_price - prev_close_actual) < 1e-6, \            f"Log {i}: context_end_price {log.context_end_price} != actual prev close {prev_close_actual}"                expected_first_open = log.context_end_price        actual_first_open = log.predicted_path['Open'].iloc[0]        open_diff_pct = abs(actual_first_open - expected_first_open) / expected_first_open        assert open_diff_pct < 0.01, \            f"Log {i}: First Open {actual_first_open} too far from context price {expected_first_open}"        print(f"✓ First 5 predictions: context/prediction alignment correct")    print("\n✅ OFF-BY-ONE CHECKS PASSED\n")

## Section 3: Visualization FunctionsFan chart and cone visualization for rolling predictions.

In [None]:
def draw_candlestick(    ax,    ohlc_df: pd.DataFrame,    start_x: float = 0,    width: float = 0.6,    alpha: float = 1.0,    up_color: str = '#00FF00',    down_color: str = '#FF0000',    wick_color: str = '#FFFFFF',    use_line: bool = False,    line_color: str = '#00FFFF',    line_alpha: float = 0.7,    linewidth: float = 1.0,) -> None:    """Draw candlesticks or line on matplotlib axis."""    if use_line:        x_vals = np.arange(len(ohlc_df)) + start_x        closes = ohlc_df['Close'].values        trend_up = closes[-1] >= closes[0]        color = line_color if trend_up else '#FF6600'                ax.plot(            x_vals, closes,            color=color,            alpha=line_alpha,            linewidth=linewidth,            zorder=5        )    else:        vals = ohlc_df[OHLC_COLS].values        for i, (o, h, l, c) in enumerate(vals):            x = start_x + i            bull = c >= o                        ax.vlines(                x, l, h,                color=wick_color,                linewidth=0.8,                alpha=alpha,                zorder=2            )                        lower = min(o, c)            height = max(abs(c - o), 1e-6)            rect = Rectangle(                (x - width/2, lower),                width,                height,                facecolor=up_color if bull else down_color,                edgecolor=up_color if bull else down_color,                linewidth=0.5,                alpha=alpha,                zorder=3            )            ax.add_patch(rect)def plot_prediction_fan(    prediction_logs: List[RollingPredictionLog],    price_df: pd.DataFrame,    max_prediction_age: int = MAX_PREDICTION_AGE,    fan_opacity_decay: float = FAN_OPACITY_DECAY,    figsize: Tuple[int, int] = FIGURE_SIZE,    title: Optional[str] = None,    metrics: Optional[Dict] = None,) -> plt.Figure:    """    Create a fan chart showing overlapping prediction paths.        This visualization shows:    - Actual historical candles (faint green/red)    - Last N prediction paths with opacity decay    - Vertical line indicating current time    - Color coding: up = cyan/white, down = orange/red    """    fig, ax = plt.subplots(figsize=figsize, facecolor='black')    ax.set_facecolor('black')        first_anchor = prediction_logs[0].anchor_time    last_anchor = prediction_logs[-1].anchor_time        first_idx = price_df.index.get_loc(first_anchor)    context_start_idx = max(0, first_idx - LOOKBACK * 2)    context_prices = price_df[OHLC_COLS].iloc[context_start_idx:first_idx + HORIZON]        draw_candlestick(        ax, context_prices, start_x=0, width=0.6, alpha=0.2,        up_color='#00AA00', down_color='#AA0000', wick_color='#666666'    )        recent_logs = prediction_logs[-max_prediction_age:]        for i, log in enumerate(recent_logs):        age = len(recent_logs) - 1 - i        opacity = max(0.1, 0.9 - age * fan_opacity_decay)                anchor_idx = price_df.index.get_loc(log.anchor_time)        start_x = anchor_idx - context_start_idx                closes = log.predicted_path['Close'].values        trend_up = closes[-1] >= closes[0]                if trend_up:            line_color = '#00FFFF' if opacity > 0.5 else '#0088AA'        else:            line_color = '#FF6600' if opacity > 0.5 else '#AA4400'                x_vals = np.arange(len(log.predicted_path)) + start_x        ax.plot(            x_vals, closes,            color=line_color,            alpha=opacity,            linewidth=1.2 if opacity > 0.5 else 0.8,            zorder=10 + i        )        last_anchor_idx = price_df.index.get_loc(last_anchor)    now_x = last_anchor_idx - context_start_idx    ax.axvline(        now_x, color='white', linestyle='--', linewidth=1.5, alpha=0.8,        zorder=100, label='Now'    )        future_start_idx = last_anchor_idx    future_end_idx = min(len(price_df), future_start_idx + HORIZON)    if future_end_idx > future_start_idx:        future_prices = price_df[OHLC_COLS].iloc[future_start_idx:future_end_idx]        draw_candlestick(            ax, future_prices, start_x=now_x, width=0.6, alpha=0.5,            up_color='#00FF00', down_color='#FF0000', wick_color='#FFFFFF'        )        ax.set_xlim(-5, now_x + HORIZON + 5)        all_times = price_df.index[context_start_idx:future_end_idx]    n_ticks = 12    tick_indices = np.linspace(0, len(all_times) - 1, n_ticks, dtype=int)    ax.set_xticks(tick_indices)    ax.set_xticklabels(        [all_times[i].strftime('%H:%M') for i in tick_indices],        rotation=45, ha='right', color='white', fontsize=9    )        ax.tick_params(axis='y', colors='white')    ax.yaxis.set_major_formatter(plt.FuncFormatter(lambda x, p: f'${x:.2f}'))        for spine in ax.spines.values():        spine.set_color('#444444')        ax.grid(color='#333333', linewidth=0.5, alpha=0.3)        if title:        ax.set_title(title, color='white', fontsize=14, pad=15)    else:        ax.set_title(            f'{SYMBOL} Rolling Backtest - Prediction Fan Chart',            color='white', fontsize=14, pad=15        )        if metrics:        caption = (            f"Rolling Backtest: {metrics.get('num_predictions', len(prediction_logs))} predictions | "            f"Directional Accuracy: {metrics.get('directional_accuracy', 0):.1%} | "            f"Mean Path MAE: ${metrics.get('mean_path_mae', 0):.2f}"        )        ax.text(0.5, -0.15, caption, transform=ax.transAxes, ha='center', color='#AAAAAA', fontsize=10)        legend_elements = [        Line2D([0], [0], color='#00FFFF', lw=2, label='Predicted Up'),        Line2D([0], [0], color='#FF6600', lw=2, label='Predicted Down'),        Line2D([0], [0], color='white', lw=1.5, linestyle='--', label='Now'),        Patch(facecolor='#00AA00', edgecolor='none', alpha=0.5, label='Actual (hist)'),    ]    ax.legend(        handles=legend_elements, facecolor='black', edgecolor='#444444',        labelcolor='white', loc='upper left', fontsize=9    )        plt.tight_layout()    return fig

In [None]:
def plot_prediction_cone(    prediction_logs: List[RollingPredictionLog],    price_df: pd.DataFrame,    figsize: Tuple[int, int] = (16, 10),    confidence_levels: List[float] = [0.5, 0.8, 0.95],) -> plt.Figure:    """Plot prediction cone showing confidence intervals over time."""    fig, axes = plt.subplots(2, 1, figsize=figsize, facecolor='black')        all_errors = []    for log in prediction_logs:        pred = log.predicted_path['Close'].values        actual = log.actual_path['Close'].values        errors = pred - actual        all_errors.append(errors)        errors_array = np.array(all_errors)        # Plot 1: Error distribution by step    ax1 = axes[0]    ax1.set_facecolor('black')        steps = np.arange(1, HORIZON + 1)    means = np.mean(errors_array, axis=0)    stds = np.std(errors_array, axis=0)        colors = ['#00FFFF', '#0088FF', '#0044AA']    alphas = [0.3, 0.2, 0.1]        for i, (conf, color, alpha) in enumerate(zip(confidence_levels, colors, alphas)):        z_score = {0.5: 0.674, 0.8: 1.282, 0.95: 1.96}.get(conf, 1.0)        upper = means + z_score * stds        lower = means - z_score * stds                ax1.fill_between(steps, lower, upper, color=color, alpha=alpha, label=f'{conf:.0%} confidence')        ax1.axhline(0, color='white', linestyle='-', linewidth=0.8, alpha=0.5)    ax1.plot(steps, means, color='#FF6600', linewidth=2, label='Mean error')        ax1.set_xlabel('Prediction Step', color='white')    ax1.set_ylabel('Error ($)', color='white')    ax1.set_title('Prediction Error Cone by Horizon', color='white', fontsize=12)    ax1.tick_params(colors='white')    ax1.legend(facecolor='black', labelcolor='white')    ax1.grid(color='#333333', alpha=0.3)    for spine in ax1.spines.values():        spine.set_color('#444444')        # Plot 2: MAE by step    ax2 = axes[1]    ax2.set_facecolor('black')        maes = np.mean(np.abs(errors_array), axis=0)    ax2.bar(steps, maes, color='#00AAFF', alpha=0.7, edgecolor='white', linewidth=0.5)    ax2.plot(steps, maes, color='#FFFFFF', linewidth=2, marker='o', markersize=4)        ax2.set_xlabel('Prediction Step', color='white')    ax2.set_ylabel('Mean Absolute Error ($)', color='white')    ax2.set_title('MAE by Prediction Step', color='white', fontsize=12)    ax2.tick_params(colors='white')    ax2.grid(color='#333333', alpha=0.3, axis='y')    for spine in ax2.spines.values():        spine.set_color('#444444')        plt.tight_layout()    return fig

## Section 4: Execution BlockLoad model and data, run backtest, and display visualizations.

In [None]:
# ==============================================================================# DATA LOADING FUNCTIONS# ==============================================================================class RequestPacer:    """Rate limiter for API calls."""    def __init__(self, max_calls_per_minute: int):        if max_calls_per_minute <= 0:            raise ValueError('max_calls_per_minute must be >0')        self.min_interval = 60.0 / float(max_calls_per_minute)        self.last_call_ts = 0.0            def wait(self) -> None:        now = time.monotonic()        elapsed = now - self.last_call_ts        if elapsed < self.min_interval:            time.sleep(self.min_interval - elapsed)        self.last_call_ts = time.monotonic()def _require_alpaca_credentials() -> tuple[str, str]:    api_key = os.getenv('ALPACA_API_KEY')    secret_key = os.getenv('ALPACA_SECRET_KEY')    if not api_key or not secret_key:        raise RuntimeError('Missing ALPACA_API_KEY / ALPACA_SECRET_KEY.')    return api_key, secret_keydef fetch_bars_alpaca(symbol: str, start_date: str, end_date: str) -> pd.DataFrame:    """Fetch 1-minute bars from Alpaca for specified date range."""    api_key, secret_key = _require_alpaca_credentials()        from alpaca.data.enums import DataFeed    from alpaca.data.historical import StockHistoricalDataClient    from alpaca.data.requests import StockBarsRequest    from alpaca.data.timeframe import TimeFrame        client = StockHistoricalDataClient(api_key=api_key, secret_key=secret_key)    feed = DataFeed.IEX    pacer = RequestPacer(120)        start_ts = pd.Timestamp(start_date, tz='America/New_York')    end_ts = pd.Timestamp(end_date, tz='America/New_York') + pd.Timedelta(days=1)        pacer.wait()    req = StockBarsRequest(        symbol_or_symbols=[symbol],        timeframe=TimeFrame.Minute,        start=start_ts, end=end_ts, feed=feed    )        bars = client.get_stock_bars(req).df        df = bars.reset_index().rename(columns={        'timestamp': 'Datetime', 'open': 'Open', 'high': 'High',        'low': 'Low', 'close': 'Close', 'volume': 'Volume',        'trade_count': 'TradeCount', 'vwap': 'VWAP',    })        for col in ['Volume', 'TradeCount', 'VWAP']:        if col not in df.columns:            df[col] = 0.0 if col != 'VWAP' else df['Close']        df['Datetime'] = pd.to_datetime(df['Datetime'], utc=True)    df = df.set_index('Datetime').sort_index()    df = df.tz_convert('America/New_York').tz_localize(None)        return df[OHLC_COLS + ['Volume', 'TradeCount', 'VWAP']].astype(np.float32)def create_mock_data(symbol: str, date: str, n_bars: int = 400) -> pd.DataFrame:    """Create mock 1-minute price data for testing without API."""    np.random.seed(42)        start_ts = pd.Timestamp(f'{date} 09:30:00')    time_index = pd.date_range(start=start_ts, periods=n_bars, freq='1min')        initial_price = 400.0    volatility = 0.001        returns = np.random.normal(0, volatility, n_bars)    closes = initial_price * np.exp(np.cumsum(returns))        opens = np.roll(closes, 1)    opens[0] = initial_price        ranges = np.abs(np.random.normal(0.5, 0.3, n_bars))    highs = np.maximum(opens, closes) + ranges    lows = np.minimum(opens, closes) - ranges        df = pd.DataFrame({        'Open': opens, 'High': highs, 'Low': lows, 'Close': closes,        'Volume': np.random.randint(1000, 10000, n_bars),        'TradeCount': np.random.randint(100, 1000, n_bars),        'VWAP': closes * (1 + np.random.normal(0, 0.0001, n_bars)),    }, index=time_index)        df['is_imputed'] = 0    df['session_id'] = 0    df['bar_in_session'] = np.arange(n_bars)    df['session_len'] = n_bars        return df.astype(np.float32)# Load or create datatry:    backtest_date = (datetime.now() - timedelta(days=5)).strftime('%Y-%m-%d')    print(f"Attempting to fetch {SYMBOL} data for {backtest_date}...")    price_df = fetch_bars_alpaca(SYMBOL, backtest_date, backtest_date)    print(f"Fetched {len(price_df)} bars from Alpaca")except Exception as e:    print(f"Could not fetch real data: {e}")    print("Using mock data for demonstration...")    backtest_date = '2024-01-15'    price_df = create_mock_data(SYMBOL, backtest_date, n_bars=400)    print(f"Created {len(price_df)} mock bars")print(f"\nData range: {price_df.index[0]} to {price_df.index[-1]}")print(f"Total bars: {len(price_df)}")

In [None]:
# ==============================================================================# MODEL INITIALIZATION# ==============================================================================def initialize_model_for_inference(    input_size: int = len(BASE_FEATURE_COLS) + 1,    output_size: int = len(TARGET_COLS),    checkpoint_path: Optional[str] = None,) -> nn.Module:    """Initialize model for inference."""    model = Seq2SeqAttnGRU(        input_size=input_size, output_size=output_size,        hidden_size=HIDDEN_SIZE, num_layers=NUM_LAYERS,        dropout=DROPOUT, horizon=HORIZON,    ).to(DEVICE)        if checkpoint_path and os.path.exists(checkpoint_path):        print(f"Loading checkpoint from {checkpoint_path}")        state_dict = torch.load(checkpoint_path, map_location=DEVICE)        model.load_state_dict(state_dict)        print("Checkpoint loaded successfully")    else:        if checkpoint_path:            print(f"Warning: Checkpoint not found at {checkpoint_path}")        print("Using randomly initialized model (for testing only)")        model.eval()    return model# Initialize modelmodel = initialize_model_for_inference(checkpoint_path=None)# Count parameterstotal_params = sum(p.numel() for p in model.parameters())print(f"\nModel initialized")print(f"Total parameters: {total_params:,}")print(f"Device: {DEVICE}")

In [None]:
# ==============================================================================# RUN ROLLING BACKTEST# ==============================================================================# Build features from price dataif 'session_id' not in price_df.columns:    price_df['session_id'] = 0    price_df['bar_in_session'] = np.arange(len(price_df))    price_df['session_len'] = len(price_df)    price_df['is_imputed'] = 0# Build feature framefeature_df = build_feature_frame(price_df)print(f"Feature frame shape: {feature_df.shape}")# Initialize backtesterbacktester = RollingBacktester(    model=model, price_df=price_df, feature_df=feature_df,    lookback=LOOKBACK, horizon=HORIZON, device=DEVICE)# Run rolling backtestprint("\nStarting rolling backtest...")prediction_logs = backtester.run_rolling_backtest(    start_time=ROLLING_START_TIME, end_time=ROLLING_END_TIME,    temperature=TEMPERATURE_DEFAULT, verbose=True)print(f"\nBacktest complete! Generated {len(prediction_logs)} predictions.")

In [None]:
# ==============================================================================# VALIDATION ASSERTS# ==============================================================================print("\n" + "="*60)print("RUNNING VALIDATION CHECKS")print("="*60)# Run causality validationvalidate_no_lookahead_bias(backtester, sample_size=10)# Run off-by-one validationvalidate_off_by_one(backtester)# Additional assertionsexpected_predictions = len(prediction_logs)assert len(prediction_logs) == expected_predictions, \    f"Expected {expected_predictions} predictions, got {len(prediction_logs)}"# Verify all logs have correct structurefor i, log in enumerate(prediction_logs):    assert log.prediction_horizon == HORIZON, f"Log {i}: horizon mismatch"    assert len(log.predicted_path) == HORIZON, f"Log {i}: predicted_path length mismatch"    assert len(log.actual_path) == HORIZON, f"Log {i}: actual_path length mismatch"print(f"\n✅ ALL VALIDATION ASSERTS PASSED")print(f"   Total predictions: {len(prediction_logs)}")print(f"   Prediction horizon: {HORIZON} steps")print(f"   Lookback window: {LOOKBACK} steps")print("="*60)

In [None]:
# ==============================================================================# VISUALIZATION# ==============================================================================# Create fan chartprint("\nGenerating fan chart visualization...")# Calculate metrics for captiondirectional_hits = [log.directional_hit for log in prediction_logs if log.directional_hit is not None]directional_accuracy = np.mean(directional_hits) if directional_hits else 0step_1_maes = [log.get_step_mae(0) for log in prediction_logs]mean_path_mae = np.mean(step_1_maes) if step_1_maes else 0metrics = {    'num_predictions': len(prediction_logs),    'directional_accuracy': directional_accuracy,    'mean_path_mae': mean_path_mae,}# Plot fan chartfig_fan = plot_prediction_fan(    prediction_logs=prediction_logs, price_df=price_df,    max_prediction_age=MAX_PREDICTION_AGE, fan_opacity_decay=FAN_OPACITY_DECAY,    metrics=metrics)plt.show()print("\nFan chart displayed!")

In [None]:
# Plot prediction coneprint("\nGenerating prediction cone visualization...")fig_cone = plot_prediction_cone(prediction_logs=prediction_logs, price_df=price_df)plt.show()print("\nPrediction cone displayed!")

## Section 5: Metrics AnalysisCalculate comprehensive metrics and perform error analysis.

In [None]:
# ==============================================================================# METRICS CALCULATION# ==============================================================================def calculate_rolling_metrics(prediction_logs: List[RollingPredictionLog]) -> Dict:    """Calculate comprehensive metrics from rolling backtest results."""    metrics = {}    n = len(prediction_logs)        if n == 0:        return metrics        # 1. Directional Accuracy (Hit Rate for t+1)    directional_hits = [log.directional_hit for log in prediction_logs if log.directional_hit is not None]    metrics['directional_accuracy'] = np.mean(directional_hits) if directional_hits else 0    metrics['directional_hits'] = sum(directional_hits) if directional_hits else 0    metrics['directional_total'] = len(directional_hits) if directional_hits else 0        # 2. Path Divergence (MAE at specific steps)    step_maes = {1: [], 5: [], 10: [], 15: []}    for log in prediction_logs:        for step in step_maes.keys():            if step <= HORIZON:                step_maes[step].append(log.get_step_mae(step - 1))        metrics['step_mae'] = {step: np.mean(maes) if maes else 0 for step, maes in step_maes.items()}        # 3. Overall MAE    all_maes = []    for log in prediction_logs:        all_maes.extend(log.step_mae if log.step_mae is not None else [])    metrics['overall_mae'] = np.mean(all_maes) if all_maes else 0    metrics['overall_rmse'] = np.sqrt(np.mean(np.square(all_maes))) if all_maes else 0        # 4. Trend Correlation    pred_slopes = [log.get_path_slope() for log in prediction_logs]    actual_slopes = [log.get_actual_slope() for log in prediction_logs]    if len(pred_slopes) > 1:        metrics['trend_correlation'] = np.corrcoef(pred_slopes, actual_slopes)[0, 1]    else:        metrics['trend_correlation'] = 0        # 5. Volatility Match    pred_vols, actual_vols = [], []    for log in prediction_logs:        pred_close = log.predicted_path['Close'].values        actual_close = log.actual_path['Close'].values        pred_vol = np.std(np.log(pred_close[1:] / pred_close[:-1])) if len(pred_close) > 1 else 0        actual_vol = np.std(np.log(actual_close[1:] / actual_close[:-1])) if len(actual_close) > 1 else 0        pred_vols.append(pred_vol)        actual_vols.append(actual_vol)        avg_pred_vol = np.mean(pred_vols) if pred_vols else 0    avg_actual_vol = np.mean(actual_vols) if actual_vols else 0    metrics['avg_predicted_volatility'] = avg_pred_vol    metrics['avg_actual_volatility'] = avg_actual_vol    metrics['volatility_ratio'] = avg_pred_vol / (avg_actual_vol + 1e-8)        # 6. Range statistics    pred_ranges, actual_ranges = [], []    for log in prediction_logs:        pred_range = log.predicted_path['High'].max() - log.predicted_path['Low'].min()        actual_range = log.actual_path['High'].max() - log.actual_path['Low'].min()        pred_ranges.append(pred_range)        actual_ranges.append(actual_range)        metrics['avg_predicted_range'] = np.mean(pred_ranges) if pred_ranges else 0    metrics['avg_actual_range'] = np.mean(actual_ranges) if actual_ranges else 0        return metrics# Calculate metricsprint("\n" + "="*60)print("CALCULATING ROLLING BACKTEST METRICS")print("="*60)metrics = calculate_rolling_metrics(prediction_logs)print(f"\n1. DIRECTIONAL ACCURACY (t+1 hit rate)")print(f"   Accuracy: {metrics['directional_accuracy']:.2%}")print(f"   Hits: {metrics['directional_hits']}/{metrics['directional_total']}")print(f"\n2. PATH DIVERGENCE (MAE by step)")for step, mae in metrics['step_mae'].items():    print(f"   Step {step}: ${mae:.4f}")print(f"\n3. OVERALL ERROR METRICS")print(f"   MAE: ${metrics['overall_mae']:.4f}")print(f"   RMSE: ${metrics['overall_rmse']:.4f}")print(f"\n4. TREND CORRELATION")print(f"   Correlation: {metrics['trend_correlation']:.4f}")print(f"\n5. VOLATILITY ANALYSIS")print(f"   Predicted vol: {metrics['avg_predicted_volatility']:.6f}")print(f"   Actual vol: {metrics['avg_actual_volatility']:.6f}")print(f"   Vol ratio: {metrics['volatility_ratio']:.2f}")print(f"\n6. PRICE RANGE ANALYSIS")print(f"   Predicted range: ${metrics['avg_predicted_range']:.4f}")print(f"   Actual range: ${metrics['avg_actual_range']:.4f}")print("\n" + "="*60)

In [None]:
# ==============================================================================# ERROR ANALYSIS# ==============================================================================def analyze_errors_by_time_of_day(prediction_logs: List[RollingPredictionLog]) -> pd.DataFrame:    """Analyze how errors vary by time of day."""    data = []    for log in prediction_logs:        hour = log.anchor_time.hour        mae = log.get_step_mae(0)        data.append({'hour': hour, 'mae': mae, 'hit': log.directional_hit})        df = pd.DataFrame(data)    summary = df.groupby('hour').agg({        'mae': ['mean', 'std', 'count'],        'hit': 'mean'    }).round(4)        return summarydef plot_error_distribution(prediction_logs: List[RollingPredictionLog]) -> plt.Figure:    """Plot distribution of prediction errors."""    errors = []    for log in prediction_logs:        pred = log.predicted_path['Close'].iloc[0]        actual = log.actual_path['Close'].iloc[0]        errors.append(pred - actual)        fig, axes = plt.subplots(1, 2, figsize=(14, 5), facecolor='black')        # Histogram    ax1 = axes[0]    ax1.set_facecolor('black')    ax1.hist(errors, bins=30, color='#00AAFF', alpha=0.7, edgecolor='white')    ax1.axvline(0, color='red', linestyle='--', linewidth=2, label='Zero error')    ax1.set_xlabel('Prediction Error ($)', color='white')    ax1.set_ylabel('Frequency', color='white')    ax1.set_title('Distribution of t+1 Prediction Errors', color='white')    ax1.tick_params(colors='white')    ax1.legend(facecolor='black', labelcolor='white')    for spine in ax1.spines.values():        spine.set_color('#444444')        # Time series    ax2 = axes[1]    ax2.set_facecolor('black')    times = [log.anchor_time for log in prediction_logs]    ax2.plot(times, errors, color='#00FFFF', alpha=0.7, linewidth=0.8)    ax2.axhline(0, color='red', linestyle='--', linewidth=1)    ax2.fill_between(times, 0, errors, where=[e > 0 for e in errors], color='green', alpha=0.3)    ax2.fill_between(times, 0, errors, where=[e < 0 for e in errors], color='red', alpha=0.3)    ax2.set_xlabel('Time', color='white')    ax2.set_ylabel('Prediction Error ($)', color='white')    ax2.set_title('Prediction Errors Over Time', color='white')    ax2.tick_params(colors='white')    ax2.tick_params(axis='x', rotation=45)    for spine in ax2.spines.values():        spine.set_color('#444444')        plt.tight_layout()    return fig# Time of day analysisprint("\nError Analysis by Time of Day:")print("-"*50)time_analysis = analyze_errors_by_time_of_day(prediction_logs)print(time_analysis)# Plot error distributionprint("\nGenerating error distribution plots...")fig_errors = plot_error_distribution(prediction_logs)plt.show()

In [None]:
# ==============================================================================# SUMMARY TABLE# ==============================================================================def create_summary_table(metrics: Dict) -> pd.DataFrame:    """Create a formatted summary table of all metrics."""    rows = []        rows.append({        'Metric': 'Directional Accuracy (t+1)',        'Value': f"{metrics['directional_accuracy']:.2%}",        'Description': 'Percentage of correct up/down predictions'    })        for step, mae in metrics['step_mae'].items():        rows.append({            'Metric': f'MAE at Step {step}',            'Value': f"${mae:.4f}",            'Description': f'Mean absolute error at prediction step {step}'        })        rows.append({        'Metric': 'Overall MAE',        'Value': f"${metrics['overall_mae']:.4f}",        'Description': 'Mean absolute error across all steps'    })    rows.append({        'Metric': 'Overall RMSE',        'Value': f"${metrics['overall_rmse']:.4f}",        'Description': 'Root mean squared error across all steps'    })    rows.append({        'Metric': 'Trend Correlation',        'Value': f"{metrics['trend_correlation']:.4f}",        'Description': 'Correlation between predicted and actual price slopes'    })    rows.append({        'Metric': 'Volatility Ratio',        'Value': f"{metrics['volatility_ratio']:.2f}",        'Description': 'Ratio of predicted to actual volatility (>1 = overestimate)'    })        return pd.DataFrame(rows)# Create and display summaryprint("\n" + "="*80)print("ROLLING BACKTEST SUMMARY")print("="*80)summary_df = create_summary_table(metrics)print("\n")display(summary_df)print("\n" + "="*80)print(f"Total Predictions: {len(prediction_logs)}")print(f"Backtest Period: {prediction_logs[0].anchor_time} to {prediction_logs[-1].anchor_time}")print(f"Model: Seq2SeqAttnGRU (v7 architecture)")print(f"Horizon: {HORIZON} steps | Lookback: {LOOKBACK} steps")print("="*80)

---## Appendix: Rolling Backtest Architecture### Causality GuaranteeThe rolling backtest ensures strict causality through these mechanisms:1. **Context Window**: At time `t`, the model receives data from `[t-LOOKBACK, t-1]`   - `context_start = t - LOOKBACK`   - `context_end = t` (exclusive)   - Therefore, last available price is at index `t-1`2. **Prediction Window**: The model predicts for `[t, t+HORIZON-1]`   - First prediction timestamp equals anchor time `t`   - Last prediction is at `t+HORIZON-1`3. **Validation**: Each `RollingPredictionLog` validates:   - `predicted_path.index[0] == anchor_time`   - `actual_path.index[0] == anchor_time`### Autoregressive GenerationThe `generate_realistic()` method produces realistic price paths:1. Uses learned distribution parameters (mu, log_sigma)2. Samples with **fresh random noise** at each step3. Feeds predictions back as next input (autoregressive)4. Temperature controls volatility scaling5. Historical volatility can override for initial steps### Performance Optimization- `torch.no_grad()` for all inference- Pre-computed scaled features- Pre-allocated numpy arrays- Batched operations where possible- `tqdm` progress bar for monitoring