## Transformer-Based Option Pricing Implementation

This notebook implements transformer architectures for option pricing based on the Informer model and comparative analysis from recent literature.

In [1]:
# Cell 1: Imports and Setup
import os
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import torch
import torch.distributed as dist
import torch.nn as nn
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, DistributedSampler
from tqdm.notebook import tqdm
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)

# Multi-GPU Training
def setup_distributed():
    if 'RANK' in os.environ and 'WORLD_SIZE' in os.environ:
        rank = int(os.environ['RANK'])
        world_size = int(os.environ['WORLD_SIZE'])
        local_rank = int(os.environ['LOCAL_RANK'])

        dist.init_process_group(backend='nccl')
        torch.cuda.set_device(local_rank)

        return rank, world_size, local_rank
    else:
        return 0, 1, 0

def cleanup_distributed():
    if dist.is_initialized():
        dist.destroy_process_group()

rank, world_size, local_rank = setup_distributed()
is_main_process = (rank == 0)

if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)
    num_gpus = torch.cuda.device_count()
    if is_main_process:
        print(f"Number of GPUs available: {num_gpus}")
        for i in range(num_gpus):
            print(f"  GPU {i}: {torch.cuda.get_device_name(i)}")
        print(f"World size: {world_size}")
        print(f"Local rank: {local_rank}")
else:
    if is_main_process:
        print("No GPU available, using CPU")

USE_MULTI_GPU = world_size > 1

# Device configuration
device = torch.device(f'cuda:{local_rank}' if torch.cuda.is_available() else 'cpu')
if is_main_process:
    print(f"Using device: {device}")
    print(f"Distributed training: {USE_MULTI_GPU}")

# Display settings
pd.set_option('display.max_columns', None)
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

Number of GPUs available: 8
  GPU 0: NVIDIA L4
  GPU 1: NVIDIA L4
  GPU 2: NVIDIA L4
  GPU 3: NVIDIA L4
  GPU 4: NVIDIA L4
  GPU 5: NVIDIA L4
  GPU 6: NVIDIA L4
  GPU 7: NVIDIA L4
World size: 1
Local rank: 0
Using device: cuda:0
Distributed training: False


In [None]:
# Cell 2: Configuration and Hyperparameters
# Reference: Bańka & Chudziak (2025), "Applying Informer for Option Pricing", Section 4.2, p. 1274
# Reference: Pimentel et al. (2025), "Option pricing with LSTM", Section 4.2.3, Table 2, p. [page with Table 2]

CONFIG = {
    # Data parameters - following preprocessing methodology from Preprocessing.html
    'train_months': 8,
    'val_months': 1,
    'test_month': 1,
    'test_year': 2023,

    # Sequence parameters
    'seq_len': 20,           # Input sequence length (past 20 time steps)
    'label_len': 10,         # Decoder start token length
    'pred_len': 1,           # Predict 1 step ahead
    
    # Model architecture - Informer configuration (Bańka & Chudziak, p. 1274)
    'd_model': 36,           # Embedding dimension
    'n_heads': 3,            # Number of attention heads
    'n_encoder_layers': 1,   # Number of encoder layers
    'n_decoder_layers': 2,   # Number of decoder layers
    'd_ff': 8,               # Feedforward dimension
    'dropout': 0.06,         # Dropout rate
    'label_len': 5,          # Label length for decoder
    'pred_len': 1,           # Prediction length (single day prediction)
    
    # Training parameters
    'batch_size': 64,        # As per Bańka & Chudziak, p. 1274
    'learning_rate': 0.0001, # Adam optimizer initial LR
    'weight_decay': 0.0001,  # L2 regularization
    'epochs': 100,           # Maximum epochs
    'early_stopping_patience': 20,  # Early stopping patience
    
    # Features - following both papers' feature selection
    # Bańka & Chudziak, Section 3.1, p. 1272: "strike price, implied volatility, time to maturity, underlying asset price"
    # Pimentel et al., Section 4.2.1: "strike price, underlying price, time to maturity, risk-free rate, volatility"
    'feature_columns': ['STRIKE', 'UNDERLYING_LAST', 'MTM', 'RFR', 'VOL_GG'],
    'target_column': 'C_MID',  # Call option price as target
    
    # Hyperparameter search ranges (for later optimization)
    'hp_search': {
        'd_model': [16, 32, 64],
        'n_heads': [2, 3, 4],
        'n_encoder_layers': [1, 2],
        'n_decoder_layers': [1, 2, 3],
        'd_ff': [8, 16, 32],
        'dropout': [0.05, 0.06, 0.1],
        'learning_rate': [0.0001, 0.0005, 0.001],
        'batch_size': [32, 64, 128]
    }
}

print("Configuration loaded:")
for key, value in CONFIG.items():
    if key != 'hp_search':
        print(f"  {key}: {value}")


In [None]:
# Cell 3: Data Loading and Preparation Functions
# Reference: Preprocessing.html, create_rolling_window_split_flexible function

def load_preprocessed_data(filepath='./data.csv'):
    """
    Load preprocessed options data.
    Reference: Preprocessing.html - data preprocessing steps
    """
    print(f"Loading data from {filepath}...")
    df = pd.read_csv(filepath, parse_dates=['QUOTE_DATE', 'EXPIRE_DATE'])
        
    print(f"Loaded {len(df):,} call option records")
    print(f"Date range: {df['QUOTE_DATE'].min()} to {df['QUOTE_DATE'].max()}")
    
    return df


def create_rolling_window_split(df, test_month, test_year=2023, 
                                        feature_columns=['STRIKE', 'UNDERLYING_LAST', 'MTM', 'RFR', 'VOLATILITY'], 
                                        target_column = 'C_MID', train_months=8, val_months=1):
    df[target_column] = df[['C_BID', 'C_ASK']].apply(pd.to_numeric, errors='coerce').mean(axis=1)

    
    test_start = pd.Timestamp(year=test_year, month=test_month, day=1)
    test_end = test_start + pd.offsets.MonthEnd(0)

    val_start = test_start - pd.offsets.MonthBegin(val_months)
    val_end = test_start - pd.Timedelta(days=1)

    train_start = val_start - pd.offsets.MonthBegin(train_months)  
    train_end = val_start - pd.Timedelta(days=1)

    train_df = df.loc[(df['QUOTE_DATE'] >= train_start) & (df['QUOTE_DATE'] <= train_end), ['QUOTE_DATE'] + feature_columns + [target_column]].set_index('QUOTE_DATE').copy()
    val_df = df.loc[(df['QUOTE_DATE'] >= val_start) & (df['QUOTE_DATE'] <= val_end), ['QUOTE_DATE'] + feature_columns + [target_column]].set_index('QUOTE_DATE').copy()
    test_df = df.loc[(df['QUOTE_DATE'] >= test_start) & (df['QUOTE_DATE'] <= test_end), ['QUOTE_DATE'] + feature_columns + [target_column]].set_index('QUOTE_DATE').copy()

    scaler = MinMaxScaler()
    scaler.fit(train_df)

    train_scaled = pd.DataFrame(scaler.transform(train_df), columns=train_df.columns, index=train_df.index)
    val_scaled = pd.DataFrame(scaler.transform(val_df), columns=val_df.columns, index=val_df.index)
    test_scaled = pd.DataFrame(scaler.transform(test_df), columns=test_df.columns, index=test_df.index)
    
    return train_df, val_df, test_df, scaler

# Load data
df = load_preprocessed_data()

In [None]:
# Cell 4: PyTorch Dataset Class

class OptionPricingDataset(Dataset):
    """
    PyTorch Dataset for option pricing.
    
    Reference: Bańka & Chudziak (2025), Section 3.2, p. 1272-1273
    "The input data is structured as a time-series sequence"
    """
    def __init__(self, data_df, target_col, seq_len, label_len, pred_len, feature_cols):
        """
        Args:
            features: numpy array of shape (n_samples, n_features)
            targets: numpy array of shape (n_samples, 1)
        """
        self.data = data_df[feature_cols].values.astype(np.float32)
        self.target = data_df[target_col].values.astype(np.float32)

        self.seq_len = seq_len
        self.label_len = label_len
        self.pred_len = pred_len
        self.feature_cols = feature_cols

    
    def __len__(self):
        # Number of valid sequences we can create
        return len(self.data) - self.seq_len - self.label_len - self.pred_len + 1
    
    def __getitem__(self, idx):
        """
        Returns:
            x_enc: encoder input (seq_len, n_features)
            x_dec: decoder input (label_len + pred_len, n_features)
            y: target values (pred_len,)
        """
        # Encoder input: historical sequence
        s_begin = idx
        s_end = s_begin + self.seq_len
        x_enc = self.data[s_begin:s_end]
        
        # Decoder input: label + prediction windows
        r_begin = s_end - self.label_len
        r_end = r_begin + self.label_len + self.pred_len
        x_dec = self.data[r_begin:r_end]
        
        # Target: future values to predict
        y = self.target[s_end:s_end + self.pred_len]
        
        return (
            torch.FloatTensor(x_enc),
            torch.FloatTensor(x_dec),
            torch.FloatTensor(y)
        )


def create_dataloaders(data_splits, batch_size=64):
    """Create train, validation, and test dataloaders."""
    train_df = data_splits[0]
    val_df = data_splits[1]
    test_df = data_splits[2]
    print(train_df)
    
    train_dataset = OptionPricingDataset(
        train_df.shape, 
        np.random.randn(train_df.shape[0], 1) * 0.01
    )
    val_dataset = OptionPricingDataset(
        val_df.shape,
        np.random.randn(val_df.shape[0], 1) * 0.01
    )
    test_dataset = OptionPricingDataset(
        test_df.shape,
        np.random.randn(test_df.shape[0], 1) * 0.01
    )
    
    train_loader = DataLoader(
        train_dataset, 
        batch_size=batch_size, 
        shuffle=True,
        num_workers=0
    )
    val_loader = DataLoader(
        val_dataset, 
        batch_size=batch_size, 
        shuffle=False,
        num_workers=0
    )
    test_loader = DataLoader(
        test_dataset, 
        batch_size=batch_size, 
        shuffle=False,
        num_workers=0
    )
    
    return train_loader, val_loader, test_loader

In [None]:
# Cell 5: Positional Encoding
# Reference: Bańka & Chudziak (2025), Figure 2, p. 1272 (Informer architecture)
# Reference: Vaswani et al. (2017), "Attention is All You Need" - base transformer architecture

class PositionalEncoding(nn.Module):
    """
    Positional encoding for transformer models.
    
    Reference: Standard transformer architecture from Vaswani et al. (2017)
    Used in financial time-series as noted in Bańka & Chudziak (2025), Section 3.2
    """
    def __init__(self, d_model, max_len=5000, dropout=0.1):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)
        
        # Create positional encoding matrix
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * 
                            (-np.log(10000.0) / d_model))
        
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        """
        Args:
            x: Tensor of shape (batch_size, seq_len, d_model)
        """
        x = x + self.pe[:, :x.size(1), :]
        return self.dropout(x)

In [None]:
# Cell 6: Simplified Transformer Model (Baseline)
# Reference: Ruiru et al. (2024), "LSTM versus Transformers", Section 2.2, p. 544
# Reference: Bańka & Chudziak (2025), Section 3, p. 1271-1273

class SimpleTransformerModel(nn.Module):
    """
    Simplified Transformer model for option pricing.
    
    This serves as a baseline before implementing the full Informer architecture.
    
    Reference: Ruiru et al. (2024), Section 2.2, Equation 4, p. 544
    "Attention(Q,K,V) = softmax(QK^T/√dk)V"
    
    Reference: Bańka & Chudziak (2025), Figure 1, p. 1271
    Shows conceptual overview of encoder-decoder architecture
    """
    def __init__(self, n_features, d_model=32, n_heads=4, n_layers=2, 
                 d_ff=128, dropout=0.1):
        super(SimpleTransformerModel, self).__init__()
        
        self.n_features = n_features
        self.d_model = d_model
        
        # Input embedding
        self.input_projection = nn.Linear(n_features, d_model)
        
        # Positional encoding
        self.pos_encoder = PositionalEncoding(d_model, dropout=dropout)
        
        # Transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_ff,
            dropout=dropout,
            activation='relu',
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer, 
            num_layers=n_layers
        )
        
        # Output projection
        self.output_projection = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, 1)
        )
    
    def forward(self, x):
        """
        Args:
            x: Input tensor of shape (batch_size, n_features)
        Returns:
            predictions: Tensor of shape (batch_size, 1)
        """
        # Reshape to add sequence dimension
        x = x.unsqueeze(1)  # (batch_size, 1, n_features)
        
        # Project to d_model dimension
        x = self.input_projection(x)  # (batch_size, 1, d_model)
        
        # Add positional encoding
        x = self.pos_encoder(x)
        
        # Apply transformer encoder
        x = self.transformer_encoder(x)  # (batch_size, 1, d_model)
        
        # Take the output and project to prediction
        x = x.squeeze(1)  # (batch_size, d_model)
        predictions = self.output_projection(x)  # (batch_size, 1)
        
        return predictions



class SimpleTransformerModel(nn.Module):
    """
    Simplified Transformer model for option pricing with sequence input.
    """
    def __init__(self, n_features, d_model=32, n_heads=4, n_layers=2, 
                 d_ff=128, dropout=0.1):
        super(SimpleTransformerModel, self).__init__()
        
        self.n_features = n_features
        self.d_model = d_model
        
        # Input embedding
        self.input_projection = nn.Linear(n_features, d_model)
        
        # Positional encoding
        self.pos_encoder = PositionalEncoding(d_model, dropout=dropout)
        
        # Transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_ff,
            dropout=dropout,
            activation='relu',
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer, 
            num_layers=n_layers
        )
        
        # Output projection
        self.output_projection = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, 1)
        )
    
    def forward(self, x_enc):
        """
        Args:
            x_enc: Input tensor of shape (batch_size, seq_len, n_features)
        Returns:
            predictions: Tensor of shape (batch_size, 1)
        """
        # Project to d_model dimension
        x = self.input_projection(x_enc)  # (batch_size, seq_len, d_model)
        
        # Add positional encoding
        x = self.pos_encoder(x)
        
        # Apply transformer encoder
        x = self.transformer_encoder(x)  # (batch_size, seq_len, d_model)
        
        # Use the last timestep for prediction (or could use mean pooling)
        x = x[:, -1, :]  # (batch_size, d_model)
        
        # Project to prediction
        predictions = self.output_projection(x)  # (batch_size, 1)
        
        return predictions


    def train_epoch_simple(model, train_loader, criterion, optimizer, device):
        """Training epoch for Simple Transformer"""
        model.train()
        total_loss = 0
        
        for x_enc, x_dec, y in train_loader:
            x_enc = x_enc.to(device)
            y = y.to(device)
            
            optimizer.zero_grad()
            output = model(x_enc)
            
            output = output.squeeze()
            y = y.squeeze()
            
            loss = criterion(output, y)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            
            total_loss += loss.item()
        
        return total_loss / len(train_loader)
    
    
    def validate_epoch_simple(model, val_loader, criterion, device):
        """Validation epoch for Simple Transformer"""
        model.eval()
        total_loss = 0
        
        with torch.no_grad():
            for x_enc, x_dec, y in val_loader:
                x_enc = x_enc.to(device)
                y = y.to(device)
                
                output = model(x_enc)
                
                output = output.squeeze()
                y = y.squeeze()
                
                loss = criterion(output, y)
                total_loss += loss.item()
        
        return total_loss / len(val_loader)

In [None]:
# Cell 7: Informer-Inspired Model with ProbSparse Attention
# Reference: Bańka & Chudziak (2025), Section 3.2.1, p. 1272-1273
# Reference: Zhou et al. (2021), "Informer" original paper

class ProbSparseSelfAttention(nn.Module):
    """
    Simplified ProbSparse Self-Attention mechanism.
    
    Reference: Bańka & Chudziak (2025), Section 3.2.1, p. 1272
    "ProbSparse Self-Attention Mechanism... selects a subset of queries based on 
    the Kullback-Leibler divergence (KLD)"
    
    Reference: Figure 3, p. 1273 - Illustration of ProbSparse Attention mechanism
    
    Note: This is a simplified implementation focusing on the key concepts
    rather than full complexity of the original Informer.
    """
    def __init__(self, d_model, n_heads, dropout=0.1, factor=3):
        super(ProbSparseSelfAttention, self).__init__()
        # assert d_model % n_heads == 0
        
        self.d_model = d_model
        self.n_heads = n_heads
        self.d_k = d_model // n_heads
        self.factor = factor
        
        self.query_projection = nn.Linear(d_model, d_model)
        self.key_projection = nn.Linear(d_model, d_model)
        self.value_projection = nn.Linear(d_model, d_model)
        self.out_projection = nn.Linear(d_model, d_model)
        
        self.dropout = nn.Dropout(dropout)
        self.scale = np.sqrt(self.d_k)
    
    def forward(self, queries, keys, values, attn_mask=None):
        """
        Args:
            queries: (batch_size, seq_len, d_model)
            keys: (batch_size, seq_len, d_model)
            values: (batch_size, seq_len, d_model)
        """
        batch_size = queries.size(0)
        seq_len = queries.size(1)
        
        # Project and reshape for multi-head attention
        Q = self.query_projection(queries).view(batch_size, seq_len, 
                                                 self.n_heads, self.d_k).transpose(1, 2)
        K = self.key_projection(keys).view(batch_size, seq_len, 
                                           self.n_heads, self.d_k).transpose(1, 2)
        V = self.value_projection(values).view(batch_size, seq_len, 
                                               self.n_heads, self.d_k).transpose(1, 2)
        
        # Simplified: Use standard attention instead of full ProbSparse implementation
        # Full implementation would select top-U queries based on sparsity measure
        attention_scores = torch.matmul(Q, K.transpose(-2, -1)) / self.scale
        
        if attn_mask is not None:
            attention_scores = attention_scores.masked_fill(attn_mask == 0, -1e9)
        
        attention_weights = torch.softmax(attention_scores, dim=-1)
        attention_weights = self.dropout(attention_weights)
        
        context = torch.matmul(attention_weights, V)
        
        # Reshape and project output
        context = context.transpose(1, 2).contiguous().view(
            batch_size, seq_len, self.d_model
        )
        output = self.out_projection(context)
        
        return output, attention_weights


class InformerEncoderLayer(nn.Module):
    """
    Informer Encoder Layer with ProbSparse attention and distilling.
    
    Reference: Bańka & Chudziak (2025), Section 3.2.1, p. 1272-1273
    """
    def __init__(self, d_model, n_heads, d_ff, dropout=0.1, factor=3):
        super(InformerEncoderLayer, self).__init__()
        
        self.attention = ProbSparseSelfAttention(d_model, n_heads, dropout, factor)
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(d_ff, d_model)
        )
        
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x, attn_mask=None):
        # Self-attention with residual connection
        attn_output, _ = self.attention(x, x, x, attn_mask)
        x = self.norm1(x + self.dropout(attn_output))
        
        # Feed-forward with residual connection
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        
        return x


class InformerModel(nn.Module):
    """
    Informer-inspired model for option pricing.
    
    Reference: Bańka & Chudziak (2025), Section 3, p. 1270-1274
    Full architecture description including encoder-decoder structure.
    
    Reference: Figure 2, p. 1272 - Complete Informer model overview
    """
    def __init__(self, n_features, d_model=32, n_heads=3, 
                 n_encoder_layers=1, n_decoder_layers=2,
                 d_ff=8, dropout=0.06, factor=3):
        super(InformerModel, self).__init__()
        
        self.d_model = d_model
        
        # Input embedding - Reference: Section 3.2.1, "Embedding Layer", p. 1272
        self.input_embedding = nn.Linear(n_features, d_model)
        
        # Positional encoding
        self.pos_encoder = PositionalEncoding(d_model, dropout=dropout)
        
        # Encoder layers
        self.encoder_layers = nn.ModuleList([
            InformerEncoderLayer(d_model, n_heads, d_ff, dropout, factor)
            for _ in range(n_encoder_layers)
        ])
        
        # Decoder setup (simplified for single-step prediction)
        self.decoder_input = nn.Parameter(torch.randn(1, 1, d_model))
        
        decoder_layer = nn.TransformerDecoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_ff,
            dropout=dropout,
            batch_first=True
        )
        self.decoder = nn.TransformerDecoder(decoder_layer, n_decoder_layers)
        
        # Output projection - Reference: Section 3.2.2, "Decoder Output", p. 1273
        self.output_projection = nn.Linear(d_model, 1)
    
    def forward(self, x_enc, x_dec):
        """
        Forward pass with encoder and decoder inputs.
        
        Args:
            x_enc: Encoder input (batch_size, seq_len, n_features)
            x_dec: Decoder input (batch_size, label_len + pred_len, n_features)
            
        Returns:
            predictions: (batch_size, 1) - predicted option price
        """
        # Embed encoder input
        enc_out = self.input_embedding(x_enc)  # (batch_size, seq_len, d_model)
        enc_out = self.pos_encoder(enc_out)
        
        # Pass through encoder layers (ProbSparse attention)
        for encoder_layer in self.encoder_layers:
            enc_out = encoder_layer(enc_out)
        
        # Embed decoder input
        # dec_out = self.output_projection(x_dec)  # (batch_size, label_len+pred_len, d_model)
        dec_out = self.input_embedding(x_dec)
        dec_out = self.pos_encoder(dec_out)
        
        # Pass through decoder (uses encoder output as memory)
        dec_out = self.decoder(dec_out, enc_out)
        
        # Output projection - take the last timestep for prediction
        predictions = self.output_projection(dec_out[:, -1, :])  # (batch_size, 1)
        
        return predictions


In [None]:
# Cell 8: Training Functions
# Reference: Pimentel et al. (2025), Section 4.2.1, p. [training section]
# Reference: Bańka & Chudziak (2025), Section 4.2, p. 1274

def train_epoch_informer(model, train_loader, criterion, optimizer, device):
    """Training epoch for Informer model"""
    model.train()
    total_loss = 0

    num_batches = 0
    update_freq = 1000
    pbar = tqdm(train_loader, desc='Training', mininterval=10)
        
    # for x_enc, x_dec, y in pbar:
    for batch_idx, (x_enc, x_dec, y) in enumerate(pbar):
        x_enc = x_enc.to(device)
        x_dec = x_dec.to(device)
        y = y.to(device)

        # Check for NaN in inputs
        if torch.isnan(x_enc).any() or torch.isnan(x_dec).any() or torch.isnan(y).any():
            print(f"Warning: NaN detected in input at batch {batch_idx}")
            continue
        
        # Check for Inf in inputs
        if torch.isinf(x_enc).any() or torch.isinf(x_dec).any() or torch.isinf(y).any():
            print(f"Warning: Inf detected in input at batch {batch_idx}")
            continue
        
        optimizer.zero_grad()

        try:
            output = model(x_enc, x_dec)

            # Check for NaN in output
            if torch.isnan(output).any():
                print(f"Warning: NaN in model output at batch {batch_idx}")
                # Skip this batch
                continue
        
            # Squeeze dimensions to match
            output = output.squeeze()
            y = y.squeeze()
            
            loss = criterion(output, y)

            # Check if loss is NaN or Inf
            if torch.isnan(loss) or torch.isinf(loss):
                print(f"\nWarning: Invalid loss at batch {batch_idx}: {loss.item()}")
                print(f"  Output range: [{output.min().item():.6f}, {output.max().item():.6f}]")
                print(f"  Target range: [{y.min().item():.6f}, {y.max().item():.6f}]")
                continue
            
            loss.backward()
        
            # Check for NaN in gradients before clipping
            has_nan_grad = False
            for name, param in model.named_parameters():
                if param.grad is not None:
                    if torch.isnan(param.grad).any() or torch.isinf(param.grad).any():
                        print(f"\nWarning: NaN/Inf gradient in {name} at batch {batch_idx}")
                        has_nan_grad = True
                        break
            
            if has_nan_grad:
                optimizer.zero_grad()
                continue
            
            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
            optimizer.step()

            batch_loss = loss.item()
            total_loss += batch_loss
            num_batches += 1
            
        except RuntimeError as e:
            print(f"\nRuntime error at batch {batch_idx}: {str(e)}")
            optimizer.zero_grad()
            continue

        # Update every update_freq batches or on last batch
        if (batch_idx + 1) % update_freq == 0 or (batch_idx + 1) == len(train_loader):
            if num_batches > 0:
                # avg_running = running_loss / min(update_freq, num_batches % update_freq or update_freq)
                avg_total = total_loss / num_batches
                
                pbar.set_postfix({
                    'batch': f'{batch_idx + 1}/{len(train_loader)}',
                    'loss': f'{batch_loss:.6f}',
                    # f'avg_{update_freq}': f'{avg_running:.6f}',
                    'total_avg': f'{avg_total:.6f}'
                })
            # running_loss = 0
    
    if num_batches == 0:
        print("\nWarning: No valid batches processed!")
        return float('nan')

    return total_loss / len(train_loader)


def validate_epoch_informer(model, val_loader, criterion, device):
    """Validation epoch for Informer model"""
    model.eval()
    total_loss = 0

    pbar = tqdm(val_loader, desc='Validation', mininterval=10)
    
    with torch.no_grad():
        for x_enc, x_dec, y in pbar:
            x_enc = x_enc.to(device)
            x_dec = x_dec.to(device)
            y = y.to(device)
            
            output = model(x_enc, x_dec)
            output = output.squeeze()
            y = y.squeeze()
            
            loss = criterion(output, y)
            total_loss += loss.item()
                
            pbar.set_postfix({'loss': f'{loss.item():.6f}'})
            
    return total_loss / len(val_loader)


def train_epoch_simple(model, train_loader, criterion, optimizer, device):
    """Training epoch for Simple Transformer"""
    model.train()
    total_loss = 0

    num_batches = 0
    update_freq = 1000
    pbar = tqdm(train_loader, desc='Training', mininterval=10)
    
    # for x, y in pbar:
    for batch_idx, (x, y) in enumerate(pbar):
        x = x.to(device)
        y = y.to(device)

        # Check for NaN in inputs
        if torch.isnan(x_enc).any() or torch.isnan(x_dec).any() or torch.isnan(y).any():
            print(f"Warning: NaN detected in input at batch {batch_idx}")
            continue
        
        # Check for Inf in inputs
        if torch.isinf(x_enc).any() or torch.isinf(x_dec).any() or torch.isinf(y).any():
            print(f"Warning: Inf detected in input at batch {batch_idx}")
            continue
        
        optimizer.zero_grad()

        try:
            output = model(x)
        
            # Check for NaN in output
            if torch.isnan(output).any():
                print(f"Warning: NaN in model output at batch {batch_idx}")
                # Skip this batch
                continue
        
            # Squeeze dimensions to match
            output = output.squeeze()
            y = y.squeeze()
            
            loss = criterion(output, y)
            
            loss.backward()

            # Check for NaN in gradients before clipping
            has_nan_grad = False
            for name, param in model.named_parameters():
                if param.grad is not None:
                    if torch.isnan(param.grad).any() or torch.isinf(param.grad).any():
                        print(f"\nWarning: NaN/Inf gradient in {name} at batch {batch_idx}")
                        has_nan_grad = True
                        break
            
            if has_nan_grad:
                optimizer.zero_grad()
                continue
                
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            
            optimizer.step()
        
            batch_loss = loss.item()
            total_loss += batch_loss
            num_batches += 1

        except RuntimeError as e:
            print(f"\nRuntime error at batch {batch_idx}: {str(e)}")
            optimizer.zero_grad()
            continue

        # Update every update_freq batches or on last batch
        if (batch_idx + 1) % update_freq == 0 or (batch_idx + 1) == len(train_loader):
            if num_batches > 0:
                # avg_running = running_loss / min(update_freq, num_batches % update_freq or update_freq)
                avg_total = total_loss / num_batches
                
                pbar.set_postfix({
                    'batch': f'{batch_idx + 1}/{len(train_loader)}',
                    'loss': f'{batch_loss:.6f}',
                    # f'avg_{update_freq}': f'{avg_running:.6f}',
                    'total_avg': f'{avg_total:.6f}'
                })
            # running_loss = 0
    
    if num_batches == 0:
        print("\nWarning: No valid batches processed!")
        return float('nan')
    
    return total_loss / len(train_loader)


def validate_epoch_simple(model, val_loader, criterion, device):
    """Validation epoch for Simple Transformer"""
    model.eval()
    total_loss = 0
    
    num_batches = 0
    update_freq = 1000
    pbar = tqdm(val_loader, desc='Validation', mininterval=10)

    with torch.no_grad():
        for x, y in pbar:
            x = x.to(device)
            y = y.to(device)
            
            output = model(x)
            
            output = output.squeeze()
            y = y.squeeze()
            
            loss = criterion(output, y)
            total_loss += loss.item()
            pbar.set_postfix({'loss': f'{loss.item():.6f}'})
    
    return total_loss / len(val_loader)


def train_model(model, train_loader, val_loader, config, device, model_type='informer'):
    """
    Full training loop with early stopping.
    
    Reference: Bańka & Chudziak (2025), Section 4.2, p. 1274
    "Training proceeds over 300 epochs, with early stopping applied based on 
    validation loss, using a patience of 30 epochs"
    
    Reference: Pimentel et al. (2025), Section 4.2.1, p. [training details]
    "We utilize early stopping once validation loss has not improved for 20 
    consecutive epochs"
    """
    criterion = nn.MSELoss()
    optimizer = optim.Adam(
        model.parameters(), 
        lr=config['learning_rate'],
        weight_decay=config['weight_decay']
    )

    scheduler = optim.lr_scheduler.ReduceLROnPlateau(
        optimizer, 
        mode='min', 
        factor=0.5,      # Reduce LR by half
        patience=10,      # After 10 epochs without improvement
        verbose=True,
        min_lr=1e-6
    )

    # Select appropriate training functions
    if model_type == 'informer':
        train_fn = train_epoch_informer
        val_fn = validate_epoch_informer
    else:
        train_fn = train_epoch_simple
        val_fn = validate_epoch_simple
    
    best_val_loss = float('inf')
    patience_counter = 0
    train_losses = []
    val_losses = []
    
    print(f"\nTraining on {device}...")
    print(f"Model parameters: {sum(p.numel() for p in model.parameters()):,}")
    print(f"Training batches per epoch: {len(train_loader)}")
    print(f"Validation batches per epoch: {len(val_loader)}")
    print("-" * 70)

    epoch_pbar = tqdm(range(config['epochs']), desc='Overall Progress')
    
    # for epoch in range(config['epochs']):
    for epoch in epoch_pbar:
        train_loss = train_fn(model, train_loader, criterion, optimizer, device)
        val_loss = val_fn(model, val_loader, criterion, device)
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        scheduler.step(val_loss)
        
        # Early stopping check
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            # Save best model
            best_model_state = model.state_dict().copy()
            improvement = "Y"
        else:
            patience_counter += 1
            improvement = ""

        epoch_pbar.set_postfix({
            'train_loss': f'{train_loss:.6f}',
            'val_loss': f'{val_loss:.6f}',
            'best': f'{best_val_loss:.6f}',
            'patience': f'{patience_counter}/{config["early_stopping_patience"]}',
            'lr': f'{optimizer.param_groups[0]['lr']:.2e}',
            'improved': improvement
        })
        
        if (epoch + 1) % 10 == 0:
            tqdm.write(f"\n{'='*70}")
            tqdm.write(f"Epoch {epoch+1}/{config['epochs']} Summary:")
            tqdm.write(f"  Train Loss: {train_loss:.6f}")
            tqdm.write(f"  Val Loss:   {val_loss:.6f}")
            tqdm.write(f"  Best Val:   {best_val_loss:.6f}")
            tqdm.write(f"  Patience:   {patience_counter}/{config['early_stopping_patience']}")
            if improvement:
                tqdm.write(f"  Status:     Y Improvement!")
            tqdm.write(f"{'='*70}\n")

        if patience_counter >= config['early_stopping_patience']:
            tqdm.write(f"\n{'='*70}")
            tqdm.write(f"Early stopping triggered at epoch {epoch+1}")
            tqdm.write(f"Best validation loss: {best_val_loss:.6f}")
            tqdm.write(f"{'='*70}\n")
            break
    
    # Load best model
    model.load_state_dict(best_model_state)
    
    return {
        'train_losses': train_losses,
        'val_losses': val_losses,
        'best_val_loss': best_val_loss,
        'epochs_trained': epoch + 1
    }

In [None]:
# Cell 9: Evaluation Functions
# Reference: Pimentel et al. (2025), Table 4, p. [results tables]
# Reference: Bańka & Chudziak (2025), Section 4.3, p. 1274

def evaluate_model_informer(model, test_loader, scaler, device, feature_cols, target_col):
    """
    Evaluate Informer model on test set.
    """
    model.eval()
    all_predictions = []
    all_targets = []
    
    with torch.no_grad():
        for x_enc, x_dec, y in test_loader:
            x_enc = x_enc.to(device)
            x_dec = x_dec.to(device)
            predictions = model(x_enc, x_dec)
            
            all_predictions.append(predictions.cpu().numpy())
            all_targets.append(y.numpy())
    
    # Concatenate all batches
    predictions_scaled = np.concatenate(all_predictions, axis=0).reshape(-1, 1)
    targets_scaled = np.concatenate(all_targets, axis=0).reshape(-1, 1)
    
    # Create dummy data for inverse transform (scaler expects all features)
    n_features = len(feature_cols)
    dummy_features = np.zeros((len(predictions_scaled), n_features))
    target_idx = feature_cols.index(target_col) if target_col in feature_cols else -1
    
    # If target is in features, use proper inverse transform
    if target_idx >= 0:
        dummy_features[:, target_idx] = predictions_scaled.flatten()
        predictions = scaler.inverse_transform(dummy_features)[:, target_idx].reshape(-1, 1)
        
        dummy_features[:, target_idx] = targets_scaled.flatten()
        targets = scaler.inverse_transform(dummy_features)[:, target_idx].reshape(-1, 1)
    else:
        # Target not in feature scaling, just use scaled values
        predictions = predictions_scaled
        targets = targets_scaled
    
    # Calculate metrics
    mse = mean_squared_error(targets, predictions)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(targets, predictions)
    
    theil_u1 = np.sqrt(mse) / np.sqrt(np.mean(targets**2))
    
    results = {
        'predictions': predictions,
        'targets': targets,
        'rmse': rmse,
        'mae': mae,
        'mse': mse,
        'theil_u1': theil_u1
    }
    
    return results


def evaluate_model_simple(model, test_loader, scaler, device, feature_cols, target_col):
    """
    Evaluate Simple Transformer model on test set.
    """
    model.eval()
    all_predictions = []
    all_targets = []
    
    with torch.no_grad():
        for x_enc, x_dec, y in test_loader:
            x_enc = x_enc.to(device)
            predictions = model(x_enc)
            
            all_predictions.append(predictions.cpu().numpy())
            all_targets.append(y.numpy())
    
    # Concatenate all batches
    predictions_scaled = np.concatenate(all_predictions, axis=0).reshape(-1, 1)
    targets_scaled = np.concatenate(all_targets, axis=0).reshape(-1, 1)
    
    # Same inverse transform logic as above
    n_features = len(feature_cols)
    dummy_features = np.zeros((len(predictions_scaled), n_features))
    target_idx = feature_cols.index(target_col) if target_col in feature_cols else -1
    
    if target_idx >= 0:
        dummy_features[:, target_idx] = predictions_scaled.flatten()
        predictions = scaler.inverse_transform(dummy_features)[:, target_idx].reshape(-1, 1)
        
        dummy_features[:, target_idx] = targets_scaled.flatten()
        targets = scaler.inverse_transform(dummy_features)[:, target_idx].reshape(-1, 1)
    else:
        predictions = predictions_scaled
        targets = targets_scaled
    
    # Calculate metrics
    mse = mean_squared_error(targets, predictions)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(targets, predictions)
    theil_u1 = np.sqrt(mse) / np.sqrt(np.mean(targets**2))
    
    results = {
        'predictions': predictions,
        'targets': targets,
        'rmse': rmse,
        'mae': mae,
        'mse': mse,
        'theil_u1': theil_u1
    }
    
    return results


def calculate_detailed_metrics(predictions, targets):
    """
    Calculate additional performance metrics.
    
    Reference: Pimentel et al. (2025), Table 5, p. [bias/variance table]
    "bias, variance and covariance proportion"
    """
    # Bias proportion
    mean_error = np.mean(predictions - targets)
    mse = np.mean((predictions - targets)**2)
    bias_prop = (mean_error**2) / mse if mse > 0 else 0
    
    # Variance proportion
    std_pred = np.std(predictions)
    std_target = np.std(targets)
    var_prop = ((std_pred - std_target)**2) / mse if mse > 0 else 0
    
    # Covariance proportion
    corr = np.corrcoef(predictions.flatten(), targets.flatten())[0, 1]
    cov_prop = 2 * (1 - corr) * std_pred * std_target / mse if mse > 0 else 0
    
    return {
        'bias_proportion': bias_prop,
        'variance_proportion': var_prop,
        'covariance_proportion': cov_prop
    }

In [None]:
# Cell 10: Visualization Functions

def plot_training_history(history, title='Training History'):
    """
    Plot training and validation loss over epochs.
    
    Reference: Similar to Figure 5 in Pimentel et al. (2025), p. [hyperparameter search figure]
    """
    fig, ax = plt.subplots(figsize=(10, 6))
    
    epochs = range(1, len(history['train_losses']) + 1)
    ax.plot(epochs, history['train_losses'], label='Training Loss', linewidth=2)
    ax.plot(epochs, history['val_losses'], label='Validation Loss', linewidth=2)
    
    ax.set_xlabel('Epoch', fontsize=12)
    ax.set_ylabel('Loss (MSE)', fontsize=12)
    ax.set_title(title, fontsize=14, fontweight='bold')
    ax.legend(fontsize=11)
    ax.grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()


def plot_predictions_vs_actual(results, sample_size=1000, title='Predictions vs Actual'):
    """
    Plot predicted vs actual option prices.
    
    Reference: Similar to Figures 5-7 in Bańka & Chudziak (2025), p. 1276
    """
    predictions = results['predictions'].flatten()
    targets = results['targets'].flatten()
    
    # Sample for visualization if dataset is large
    if len(predictions) > sample_size:
        indices = np.random.choice(len(predictions), sample_size, replace=False)
        predictions_sample = predictions[indices]
        targets_sample = targets[indices]
    else:
        predictions_sample = predictions
        targets_sample = targets
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    # Scatter plot
    ax1.scatter(targets_sample, predictions_sample, alpha=0.5, s=20)
    
    # Perfect prediction line
    min_val = min(targets_sample.min(), predictions_sample.min())
    max_val = max(targets_sample.max(), predictions_sample.max())
    ax1.plot([min_val, max_val], [min_val, max_val], 'r--', linewidth=2, label='Perfect Prediction')
    
    ax1.set_xlabel('Actual Option Price ($)', fontsize=12)
    ax1.set_ylabel('Predicted Option Price ($)', fontsize=12)
    ax1.set_title('Predicted vs Actual Prices', fontsize=14, fontweight='bold')
    ax1.legend(fontsize=11)
    ax1.grid(True, alpha=0.3)
    
    # Residuals histogram
    residuals = predictions - targets
    ax2.hist(residuals, bins=50, edgecolor='black', alpha=0.7)
    ax2.axvline(x=0, color='r', linestyle='--', linewidth=2)
    ax2.set_xlabel('Prediction Error ($)', fontsize=12)
    ax2.set_ylabel('Frequency', fontsize=12)
    ax2.set_title('Distribution of Prediction Errors', fontsize=14, fontweight='bold')
    ax2.grid(True, alpha=0.3, axis='y')
    
    plt.tight_layout()
    plt.show()


def plot_error_by_moneyness(results, test_df, title='Error Analysis by Moneyness'):
    """
    Analyze prediction errors across different moneyness levels.
    
    Reference: Pimentel et al. (2025), Figure 10, p. [moneyness analysis figure]
    "RMSE of model predictions sorted by moneyness"
    """
    predictions = results['predictions'].flatten()
    targets = results['targets'].flatten()
    errors = np.abs(predictions - targets)
    
    # Calculate moneyness
    moneyness = test_df['UNDERLYING_LAST'].values / test_df['STRIKE'].values
    
    # Create moneyness bins
    # Reference: Pimentel et al. (2025), Table 1, p. [data description]
    # Moneyness categories: <0.97 (OTM), 0.97-1.03 (ATM), >1.03 (ITM)
    bins = [0, 0.97, 1.03, np.inf]
    labels = ['OTM (<0.97)', 'ATM (0.97-1.03)', 'ITM (>1.03)']
    moneyness_categories = pd.cut(moneyness, bins=bins, labels=labels)
    
    # Create DataFrame for analysis
    error_df = pd.DataFrame({
        'moneyness': moneyness,
        'category': moneyness_categories,
        'error': errors,
        'target': targets
    })
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    # Box plot by category
    error_df.boxplot(column='error', by='category', ax=ax1)
    ax1.set_xlabel('Moneyness Category', fontsize=12)
    ax1.set_ylabel('Absolute Error ($)', fontsize=12)
    ax1.set_title('Prediction Errors by Moneyness', fontsize=14, fontweight='bold')
    plt.sca(ax1)
    plt.xticks(rotation=0)
    
    # Average metrics by category
    metrics_by_category = error_df.groupby('category').agg({
        'error': ['mean', 'std', 'count']
    }).round(2)
    
    categories = metrics_by_category.index
    means = metrics_by_category['error']['mean']
    stds = metrics_by_category['error']['std']
    
    ax2.bar(range(len(categories)), means, yerr=stds, capsize=5, alpha=0.7)
    ax2.set_xticks(range(len(categories)))
    ax2.set_xticklabels(categories, rotation=0)
    ax2.set_xlabel('Moneyness Category', fontsize=12)
    ax2.set_ylabel('Mean Absolute Error ($)', fontsize=12)
    ax2.set_title('Average Error by Moneyness', fontsize=14, fontweight='bold')
    ax2.grid(True, alpha=0.3, axis='y')
    
    plt.tight_layout()
    plt.show()
    
    print("\nError Statistics by Moneyness:")
    print(metrics_by_category)


def plot_error_by_maturity(results, test_df, title='Error Analysis by Time to Maturity'):
    """
    Analyze prediction errors across different maturity periods.
    
    Reference: Pimentel et al. (2025), Figure 9, p. [maturity analysis figure]
    "RMSE of model predictions sorted by time to maturity"
    """
    predictions = results['predictions'].flatten()
    targets = results['targets'].flatten()
    errors = np.abs(predictions - targets)
    
    # Get maturity in days
    mtm_days = test_df['MTM'].values * 30.4375  # Convert months to days
    
    # Create maturity bins (in days)
    # Reference: Pimentel et al. (2025), Tables 8-9, maturity categories
    bins = [0, 90, 180, 270, 450, 630, 1085]
    labels = ['1-90d', '91-180d', '181-270d', '271-450d', '451-630d', '631-1085d']
    maturity_categories = pd.cut(mtm_days, bins=bins, labels=labels)
    
    error_df = pd.DataFrame({
        'maturity_days': mtm_days,
        'category': maturity_categories,
        'error': errors,
        'target': targets
    })
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    # Box plot by category
    error_df.boxplot(column='error', by='category', ax=ax1)
    ax1.set_xlabel('Time to Maturity', fontsize=12)
    ax1.set_ylabel('Absolute Error ($)', fontsize=12)
    ax1.set_title('Prediction Errors by Maturity', fontsize=14, fontweight='bold')
    plt.sca(ax1)
    plt.xticks(rotation=45)
    
    # Average metrics by category
    metrics_by_category = error_df.groupby('category').agg({
        'error': ['mean', 'std', 'count']
    }).round(2)
    
    categories = metrics_by_category.index
    means = metrics_by_category['error']['mean']
    stds = metrics_by_category['error']['std']
    
    x_pos = range(len(categories))
    ax2.bar(x_pos, means, yerr=stds, capsize=5, alpha=0.7)
    ax2.set_xticks(x_pos)
    ax2.set_xticklabels(categories, rotation=45, ha='right')
    ax2.set_xlabel('Time to Maturity', fontsize=12)
    ax2.set_ylabel('Mean Absolute Error ($)', fontsize=12)
    ax2.set_title('Average Error by Maturity', fontsize=14, fontweight='bold')
    ax2.grid(True, alpha=0.3, axis='y')
    
    plt.tight_layout()
    plt.show()
    
    print("\nError Statistics by Maturity:")
    print(metrics_by_category)

In [None]:
# Cell 11: Hyperparameter Search Scaffolding
# Reference: Pimentel et al. (2025), Section 4.2.3, p. [hyperparameter search]
# Reference: Bańka & Chudziak (2025), discussion of hyperparameter tuning


def random_search_hyperparameters(train_df, val_df, test_df, config, n_trials=10, model_type='informer'):
    """
    Perform random search for hyperparameter optimization.
    """
    print(f"\n{'='*60}")
    print(f"HYPERPARAMETER SEARCH: {n_trials} trials")
    print(f"Model type: {model_type}")
    print(f"{'='*60}\n")
    
    hp_ranges = config['hp_search']
    results = []
    best_val_loss = float('inf')
    best_config = None
    
    for trial in range(n_trials):
        # Randomly sample hyperparameters
        trial_config = config.copy()
        trial_config['d_model'] = np.random.choice(hp_ranges['d_model'])
        trial_config['n_heads'] = np.random.choice(hp_ranges['n_heads'])
        trial_config['n_encoder_layers'] = np.random.choice(hp_ranges['n_encoder_layers'])
        trial_config['n_decoder_layers'] = np.random.choice(hp_ranges['n_decoder_layers'])
        trial_config['d_ff'] = np.random.choice(hp_ranges['d_ff'])
        trial_config['dropout'] = np.random.choice(hp_ranges['dropout'])
        trial_config['learning_rate'] = np.random.choice(hp_ranges['learning_rate'])
        trial_config['batch_size'] = np.random.choice(hp_ranges['batch_size'])
        
        print(f"\nTrial {trial + 1}/{n_trials}")
        print(f"  d_model={trial_config['d_model']}, n_heads={trial_config['n_heads']}, "
              f"n_enc={trial_config['n_encoder_layers']}, n_dec={trial_config['n_decoder_layers']}")
        
        # Create datasets for this trial
        trial_train_dataset = OptionPricingDataset(
            data_df=train_df,
            target_col=trial_config['target_column'],
            seq_len=trial_config['seq_len'],
            label_len=trial_config['label_len'],
            pred_len=trial_config['pred_len'],
            feature_cols=trial_config['feature_columns']
        )
        
        trial_val_dataset = OptionPricingDataset(
            data_df=val_df,
            target_col=trial_config['target_column'],
            seq_len=trial_config['seq_len'],
            label_len=trial_config['label_len'],
            pred_len=trial_config['pred_len'],
            feature_cols=trial_config['feature_columns']
        )
        
        trial_train_loader = DataLoader(
            trial_train_dataset, 
            batch_size=trial_config['batch_size'], 
            shuffle=True
        )
        trial_val_loader = DataLoader(
            trial_val_dataset, 
            batch_size=trial_config['batch_size'], 
            shuffle=False
        )
        
        # Initialize model
        n_features = len(trial_config['feature_columns'])
        
        if model_type == 'simple':
            model = SimpleTransformerModel(
                n_features=n_features,
                d_model=trial_config['d_model'],
                n_heads=trial_config['n_heads'],
                n_layers=trial_config['n_encoder_layers'],
                d_ff=trial_config['d_ff'],
                dropout=trial_config['dropout']
            ).to(device)
        else:  # informer
            model = InformerModel(
                n_features=n_features,
                d_model=trial_config['d_model'],
                n_heads=trial_config['n_heads'],
                n_encoder_layers=trial_config['n_encoder_layers'],
                n_decoder_layers=trial_config['n_decoder_layers'],
                d_ff=trial_config['d_ff'],
                dropout=trial_config['dropout']
            ).to(device)
        
        # Train with reduced epochs for search
        search_config = trial_config.copy()
        search_config['epochs'] = 30
        search_config['early_stopping_patience'] = 5
        
        try:
            history = train_model(model, trial_train_loader, trial_val_loader, 
                                search_config, device, model_type=model_type)
            
            trial_result = {
                'trial': trial + 1,
                'config': trial_config,
                'val_loss': history['best_val_loss'],
                'epochs_trained': history['epochs_trained']
            }
            
            results.append(trial_result)
            
            print(f"  Result: Val Loss = {history['best_val_loss']:.6f}")
            
            if history['best_val_loss'] < best_val_loss:
                best_val_loss = history['best_val_loss']
                best_config = trial_config.copy()
                print(f"  *** New best configuration! ***")
                
        except Exception as e:
            print(f"  Trial failed: {str(e)}")
            continue
    
    print(f"\n{'='*60}")
    print(f"SEARCH COMPLETE")
    print(f"Best validation loss: {best_val_loss:.6f}")
    print(f"{'='*60}\n")
    
    return best_config, results


def plot_hyperparameter_search_results(results):
    """
    Visualize hyperparameter search results.
    
    Reference: Similar to Pimentel et al. (2025), Figures 5-6, p. [HP search figures]
    """
    if not results:
        print("No results to plot")
        return
    
    # Extract data
    val_losses = [r['val_loss'] for r in results]
    d_models = [r['config']['d_model'] for r in results]
    n_heads = [r['config']['n_heads'] for r in results]
    learning_rates = [r['config']['learning_rate'] for r in results]
    
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    
    # Validation losses over trials
    axes[0, 0].plot(range(1, len(val_losses) + 1), val_losses, marker='o')
    axes[0, 0].axhline(y=min(val_losses), color='r', linestyle='--', 
                       label=f'Best: {min(val_losses):.6f}')
    axes[0, 0].set_xlabel('Trial Number', fontsize=12)
    axes[0, 0].set_ylabel('Validation Loss', fontsize=12)
    axes[0, 0].set_title('Validation Loss Across Trials', fontsize=14, fontweight='bold')
    axes[0, 0].legend()
    axes[0, 0].grid(True, alpha=0.3)
    
    # d_model vs validation loss
    axes[0, 1].scatter(d_models, val_losses, alpha=0.6, s=100)
    axes[0, 1].set_xlabel('d_model', fontsize=12)
    axes[0, 1].set_ylabel('Validation Loss', fontsize=12)
    axes[0, 1].set_title('Model Dimension vs Performance', fontsize=14, fontweight='bold')
    axes[0, 1].grid(True, alpha=0.3)
    
    # n_heads vs validation loss
    axes[1, 0].scatter(n_heads, val_losses, alpha=0.6, s=100)
    axes[1, 0].set_xlabel('Number of Attention Heads', fontsize=12)
    axes[1, 0].set_ylabel('Validation Loss', fontsize=12)
    axes[1, 0].set_title('Attention Heads vs Performance', fontsize=14, fontweight='bold')
    axes[1, 0].grid(True, alpha=0.3)
    
    # Learning rate vs validation loss
    axes[1, 1].scatter(learning_rates, val_losses, alpha=0.6, s=100)
    axes[1, 1].set_xlabel('Learning Rate', fontsize=12)
    axes[1, 1].set_ylabel('Validation Loss', fontsize=12)
    axes[1, 1].set_title('Learning Rate vs Performance', fontsize=14, fontweight='bold')
    axes[1, 1].set_xscale('log')
    axes[1, 1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()

def evaluate_model(MODEL_TYPE, model, test_loader, feature_scaler, device, CONFIG):
    print(f"\n{'='*70}")
    print(f"EVALUATING {MODEL_TYPE.upper()} MODEL ON TEST SET")
    print(f"{'='*70}\n")
    
    # Evaluate on test set
    if MODEL_TYPE == 'informer':
        results = evaluate_model_informer(
            model, 
            test_loader, 
            feature_scaler,
            device,
            CONFIG['feature_columns'],
            CONFIG['target_column']
        )
    else:
        results = evaluate_model_simple(
            model, 
            test_loader, 
            feature_scaler,
            device,
            CONFIG['feature_columns'],
            CONFIG['target_column']
        )
    
    # # Evaluate on test set
    # results = evaluate_model(
    #     model, 
    #     test_loader, 
    #     data_splits['scaler_target'], 
    #     device
    # )
    
    # Calculate additional metrics
    detailed_metrics = calculate_detailed_metrics(
        results['predictions'], 
        results['targets']
    )
    
    # Print results
    print("Performance Metrics:")
    print(f"  RMSE: ${results['rmse']:.2f}")
    print(f"  MAE: ${results['mae']:.2f}")
    print(f"  MSE: {results['mse']:.2f}")
    print(f"  Theil U1: {results['theil_u1']:.6f}")
    
    print("\nDetailed Metrics:")
    print(f"  Bias Proportion: {detailed_metrics['bias_proportion']:.4f}")
    print(f"  Variance Proportion: {detailed_metrics['variance_proportion']:.4f}")
    print(f"  Covariance Proportion: {detailed_metrics['covariance_proportion']:.4f}")
    print(f"  Sum (should be ≈1.0): {sum(detailed_metrics.values()):.4f}")
    
    # Reference: Pimentel et al. (2025), Table 4, p. [results table]
    # Compare with benchmarks mentioned in paper
    print("\n" + "="*70)
    print("Reference Benchmarks from Literature:")
    print("  Black-Scholes RMSE: ~$37.82 (Pimentel et al., 2025, Table 4)")
    print("  Heston RMSE: ~$35.90 (Pimentel et al., 2025, Table 4)")
    print("  LSTM RMSE: ~$27.94 (Pimentel et al., 2025, Table 4)")
    print("  Informer RMSE: ~$2.71 (Bańka & Chudziak, 2025, Table 1)")
    print("="*70)

In [None]:
# Cell 12: Main Execution - Data Preparation
# This cell prepares data and can be run independently

print("="*70)
print("TRANSFORMER-BASED OPTION PRICING")
print("="*70)

# Create data splits
train_df, val_df, test_df, feature_scaler = create_rolling_window_split(
    df,
    test_month=CONFIG['test_month'],
    test_year=CONFIG['test_year'],
    feature_columns=CONFIG['feature_columns'],
    target_column=CONFIG['target_column'],
    train_months=CONFIG['train_months'],
    val_months=CONFIG['val_months']
)

# Create datasets with actual data
train_dataset = OptionPricingDataset(
    data_df=train_df,
    target_col=CONFIG['target_column'],
    seq_len=CONFIG['seq_len'],
    label_len=CONFIG['label_len'],
    pred_len=CONFIG['pred_len'],
    feature_cols=CONFIG['feature_columns']
)

val_dataset = OptionPricingDataset(
    data_df=val_df,
    target_col=CONFIG['target_column'],
    seq_len=CONFIG['seq_len'],
    label_len=CONFIG['label_len'],
    pred_len=CONFIG['pred_len'],
    feature_cols=CONFIG['feature_columns']
)

test_dataset = OptionPricingDataset(
    data_df=test_df,
    target_col=CONFIG['target_column'],
    seq_len=CONFIG['seq_len'],
    label_len=CONFIG['label_len'],
    pred_len=CONFIG['pred_len'],
    feature_cols=CONFIG['feature_columns']
)

# Create dataloaders
train_loader = DataLoader(train_dataset, batch_size=CONFIG['batch_size'], shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=CONFIG['batch_size'], shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=CONFIG['batch_size'], shuffle=False)

# Verify shapes
print(f"Train dataset size: {len(train_dataset)}")
print(f"Val dataset size: {len(val_dataset)}")
print(f"Test dataset size: {len(test_dataset)}")

# Check one batch
x_enc, x_dec, y = next(iter(train_loader))
print(f"\nBatch shapes:")
print(f"  x_enc (encoder input): {x_enc.shape}")  # (batch_size, seq_len, 5)
print(f"  x_dec (decoder input): {x_dec.shape}")  # (batch_size, label_len+pred_len, 5)
print(f"  y (target): {y.shape}")                 # (batch_size, pred_len)

In [None]:
# Cell 13a: Informer Model Selection and Training
# Choose which model to train: 'simple' or 'informer'
MODEL_TYPE = 'informer'
print(f"\n{'='*70}")
print(f"TRAINING {MODEL_TYPE.upper()} MODEL")
print(f"{'='*70}\n")

# Initialize model
n_features = len(CONFIG['feature_columns'])

if MODEL_TYPE == 'simple':
    # Reference: Ruiru et al. (2024), Section 3.2, p. 545
    model = SimpleTransformerModel(
        n_features=n_features,
        d_model=CONFIG['d_model'],
        n_heads=CONFIG['n_heads'],
        n_layers=CONFIG['n_encoder_layers'],
        d_ff=CONFIG['d_ff'],
        dropout=CONFIG['dropout']
    ).to(device)
    print("Initialized Simple Transformer (Baseline)")
    
else:  # informer
    # Reference: Bańka & Chudziak (2025), Section 3, p. 1270-1274
    model = InformerModel(
        n_features=n_features,
        d_model=CONFIG['d_model'],
        n_heads=CONFIG['n_heads'],
        n_encoder_layers=CONFIG['n_encoder_layers'],
        n_decoder_layers=CONFIG['n_decoder_layers'],
        d_ff=CONFIG['d_ff'],
        dropout=CONFIG['dropout']
    ).to(device)
    print("Initialized Informer Model")

print(f"\nModel Architecture:")
print(f"  Features: {n_features}")
print(f"  d_model: {CONFIG['d_model']}")
print(f"  Attention heads: {CONFIG['n_heads']}")
print(f"  Encoder layers: {CONFIG['n_encoder_layers']}")
if MODEL_TYPE == 'informer':
    print(f"  Decoder layers: {CONFIG['n_decoder_layers']}")
print(f"  Feed-forward dim: {CONFIG['d_ff']}")
print(f"  Dropout: {CONFIG['dropout']}")
print(f"  Total parameters: {sum(p.numel() for p in model.parameters()):,}")

# Train model
history = train_model(model, train_loader, val_loader, CONFIG, device, model_type=MODEL_TYPE)

# Plot training history
plot_training_history(history, title=f'{MODEL_TYPE.upper()} Training History')

In [None]:
# # Cell 13b: Simple Model Selection and Training
# # Choose which model to train: 'simple' or 'informer'

In [None]:
# Cell 14: Model Evaluation
# Reference: Bańka & Chudziak (2025), Section 4.4, p. 1275-1277
# Reference: Pimentel et al. (2025), Section 5, Results section

print(f"\n{'='*70}")
print(f"EVALUATING {MODEL_TYPE.upper()} MODEL ON TEST SET")
print(f"{'='*70}\n")

# Evaluate on test set
if MODEL_TYPE == 'informer':
    results = evaluate_model_informer(
        model, 
        test_loader, 
        feature_scaler,
        device,
        CONFIG['feature_columns'],
        CONFIG['target_column']
    )
else:
    results = evaluate_model_simple(
        model, 
        test_loader, 
        feature_scaler,
        device,
        CONFIG['feature_columns'],
        CONFIG['target_column']
    )

# # Evaluate on test set
# results = evaluate_model(
#     model, 
#     test_loader, 
#     data_splits['scaler_target'], 
#     device
# )

# Calculate additional metrics
detailed_metrics = calculate_detailed_metrics(
    results['predictions'], 
    results['targets']
)

# Print results
print("Performance Metrics:")
print(f"  RMSE: ${results['rmse']:.2f}")
print(f"  MAE: ${results['mae']:.2f}")
print(f"  MSE: {results['mse']:.2f}")
print(f"  Theil U1: {results['theil_u1']:.6f}")

print("\nDetailed Metrics:")
print(f"  Bias Proportion: {detailed_metrics['bias_proportion']:.4f}")
print(f"  Variance Proportion: {detailed_metrics['variance_proportion']:.4f}")
print(f"  Covariance Proportion: {detailed_metrics['covariance_proportion']:.4f}")
print(f"  Sum (should be ≈1.0): {sum(detailed_metrics.values()):.4f}")

# Reference: Pimentel et al. (2025), Table 4, p. [results table]
# Compare with benchmarks mentioned in paper
print("\n" + "="*70)
print("Reference Benchmarks from Literature:")
print("  Black-Scholes RMSE: ~$37.82 (Pimentel et al., 2025, Table 4)")
print("  Heston RMSE: ~$35.90 (Pimentel et al., 2025, Table 4)")
print("  LSTM RMSE: ~$27.94 (Pimentel et al., 2025, Table 4)")
print("  Informer RMSE: ~$2.71 (Bańka & Chudziak, 2025, Table 1)")
print("="*70)

In [None]:
# # Cell 15: Visualize Results

# print("\n" + "="*70)
# print("GENERATING VISUALIZATIONS")
# print("="*70 + "\n")

# # Plot predictions vs actual
# plot_predictions_vs_actual(
#     results, 
#     sample_size=2000,
#     title=f'{MODEL_TYPE.upper()} Model: Predictions vs Actual'
# )

# # Analyze by moneyness
# # Reference: Pimentel et al. (2025), Figure 10, p. [moneyness figure]
# plot_error_by_moneyness(
#     results, 
#     data_splits['test'][2],
#     title=f'{MODEL_TYPE.upper()} Error Analysis by Moneyness'
# )

# # Analyze by maturity
# # Reference: Pimentel et al. (2025), Figure 9, p. [maturity figure]
# plot_error_by_maturity(
#     results, 
#     data_splits['test'][2],
#     title=f'{MODEL_TYPE.upper()} Error Analysis by Time to Maturity'
# )


# Cell 15: Visualize Results
print("\n" + "="*70)
print("GENERATING VISUALIZATIONS")
print("="*70 + "\n")

# Plot predictions vs actual
plot_predictions_vs_actual(
    results, 
    sample_size=2000,
    title=f'{MODEL_TYPE.upper()} Model: Predictions vs Actual'
)

# Analyze by moneyness
plot_error_by_moneyness(
    results, 
    test_df,  # ✅ Use test_df from Cell 12
    title=f'{MODEL_TYPE.upper()} Error Analysis by Moneyness'
)

# Analyze by maturity
plot_error_by_maturity(
    results, 
    test_df,  # ✅ Use test_df from Cell 12
    title=f'{MODEL_TYPE.upper()} Error Analysis by Time to Maturity'
)

In [None]:
# Cell 16: Optional - Hyperparameter Search
# Set RUN_HP_SEARCH to True to perform hyperparameter optimization
# Warning: This can take significant time depending on n_trials

RUN_HP_SEARCH = False  # Set to True to run hyperparameter search
N_TRIALS = 10  # Number of random configurations to try

if RUN_HP_SEARCH:
    print("\n" + "="*70)
    print("STARTING HYPERPARAMETER SEARCH")
    print("="*70)
    
    best_config, search_results = random_search_hyperparameters(
        data_splits,
        CONFIG,
        n_trials=N_TRIALS,
        model_type=MODEL_TYPE
    )
    
    print("\nBest Configuration Found:")
    for key, value in best_config.items():
        if key not in ['hp_search', 'feature_columns']:
            print(f"  {key}: {value}")
    
    # Visualize search results
    plot_hyperparameter_search_results(search_results)
    
    # Save best config
    print("\nTo use best configuration, update CONFIG with these values")
    
else:
    print("\nHyperparameter search skipped (RUN_HP_SEARCH = False)")
    print("To run hyperparameter search, set RUN_HP_SEARCH = True")

In [None]:
# Cell 17: Model Comparison Table
# Reference: Pimentel et al. (2025), Tables 4-5, p. [results tables]
# Reference: Bańka & Chudziak (2025), Tables 1-3, p. 1275

def create_comparison_table(model_results_dict):
    """
    Create comparison table for multiple models.
    
    Args:
        model_results_dict: Dictionary with model names as keys and results as values
    """
    comparison_data = []
    
    for model_name, results in model_results_dict.items():
        detailed = calculate_detailed_metrics(
            results['predictions'],
            results['targets']
        )
        
        comparison_data.append({
            'Model': model_name,
            'RMSE': results['rmse'],
            'MAE': results['mae'],
            'Theil U1': results['theil_u1'],
            'Bias %': detailed['bias_proportion'] * 100,
            'Variance %': detailed['variance_proportion'] * 100,
            'Covariance %': detailed['covariance_proportion'] * 100
        })
    
    comparison_df = pd.DataFrame(comparison_data)
    comparison_df = comparison_df.round(4)
    
    return comparison_df


# Store current model results
model_results = {
    MODEL_TYPE.upper(): results
}

# Create and display comparison table
comparison_table = create_comparison_table(model_results)
print("\n" + "="*70)
print("MODEL COMPARISON TABLE")
print("="*70)
print(comparison_table.to_string(index=False))
print("\n" + "="*70)

# Note: To compare multiple models, train each model and add results to model_results dictionary
print("\nNote: Train multiple models and add to model_results dictionary for full comparison")

In [None]:
# Cell 18: Save Model and Results

import pickle
from datetime import datetime

SAVE_RESULTS = True  # Set to False to skip saving

if SAVE_RESULTS:
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    
    # Create results directory
    os.makedirs('results', exist_ok=True)
    
    # Save model
    model_path = f'./results/{MODEL_TYPE}_model_{timestamp}.pt'
    torch.save(model.state_dict(), model_path)
    print(f"Model saved to: {model_path}")
    
    # Save configuration
    config_path = f'./results/{MODEL_TYPE}_config_{timestamp}.pkl'
    with open(config_path, 'wb') as f:
        pickle.dump(CONFIG, f)
    print(f"Configuration saved to: {config_path}")
    
    # Save results
    results_path = f'./results/{MODEL_TYPE}_results_{timestamp}.pkl'
    with open(results_path, 'wb') as f:
        pickle.dump({
            'results': results,
            'history': history,
            'comparison_table': comparison_table
        }, f)
    print(f"Results saved to: {results_path}")
    
    print(f"\nAll outputs saved with timestamp: {timestamp}")
else:
    print("Saving skipped (SAVE_RESULTS = False)")

In [None]:
# Cell 19: Summary and Key Findings

print("\n" + "="*70)
print("EXPERIMENT SUMMARY")
print("="*70)

print(f"\nModel: {MODEL_TYPE.upper()}")
print(f"Test Period: {CONFIG['test_year']}-{CONFIG['test_month']:02d}")
print(f"Training Samples: {len(train_dataset):,}")
print(f"Test Samples: {len(test_dataset):,}")

print("\n" + "-"*70)
print("KEY FINDINGS:")
print("-"*70)

# Reference: Bańka & Chudziak (2025), Section 5, p. 1276
print(f"\n1. Prediction Accuracy:")
print(f"   - RMSE: \${results['rmse']:.2f}")
print(f"   - MAE: \${results['mae']:.2f}")
print(f"   - Performance relative to Black-Scholes baseline")

# Reference: Pimentel et al. (2025), Section 5.2, p. [moneyness analysis]
print(f"\n2. Performance by Moneyness:")
print(f"   - Model shows varying accuracy across OTM, ATM, and ITM options")
print(f"   - See moneyness analysis plots above")

# Reference: Pimentel et al. (2025), Section 5.2, p. [maturity analysis]
print(f"\n3. Performance by Time to Maturity:")
print(f"   - Model performance changes with option maturity")
print(f"   - See maturity analysis plots above")

# Reference: Bańka & Chudziak (2025), Section 5, p. 1276
print(f"\n4. Model Architecture:")
print(f"   - Total parameters: {sum(p.numel() for p in model.parameters()):,}")
print(f"   - Epochs trained: {history['epochs_trained']}")
print(f"   - Best validation loss: {history['best_val_loss']:.6f}")

print("\n" + "="*70)
print("REFERENCES")
print("="*70)
print("""
1. Bańka, F., & Chudziak, J. A. (2025). Applying Informer for Option Pricing: 
   A Transformer-Based Approach. ICAART 2025, pp. 1270-1277.

2. Pimentel, R., et al. (2025). Option pricing with deep learning: 
   a long short-term memory approach. Decisions in Economics and Finance.

3. Ruiru, D. K., Jouandeau, N., & Odhiambo, D. (2024). LSTM versus Transformers: 
   A Practical Comparison of Deep Learning Models for Trading Financial Instruments.
   IJCCI 2024, pp. 543-549.

4. Zhou, H., et al. (2021). Informer: Beyond Efficient Transformer for Long 
   Sequence Time-Series Forecasting. AAAI 2021.
""")

print("="*70)
print("EXPERIMENT COMPLETE")
print("="*70)