# Deep Hedging Models Test

This notebook tests the four deep hedging models on synthetic data generated from parametric models.


In [1]:
import sys
import numpy as np
import pandas as pd
import torch
from pathlib import Path
import matplotlib.pyplot as plt

project_root = Path().resolve().parents[0]
sys.path.append(str(project_root))

from src.models.parametric.gbm import GeometricBrownianMotion
from src.models.parametric.ou_process import OUProcess
from src.models.parametric.merton_jump_diffusion import MertonJumpDiffusion
from src.models.parametric.garch11 import GARCH11
from src.models.parametric.de_jump_diffusion import DoubleExponentialJumpDiffusion
from src.models.non_parametric.block_bootstrap import BlockBootstrap

from src.deep_hedgers.feedforward_layers import FeedforwardDeepHedger
from src.deep_hedgers.feedforward_time import FeedforwardTimeDeepHedger
from src.deep_hedgers.rnn_hedger import RNNDeepHedger
from src.deep_hedgers.lstm_hedger import LSTMDeepHedger

from src.utils.preprocessing_utils import preprocess_data
from src.utils.configs_utils import get_dataset_cfgs


In [2]:
# Load configuration
non_parametric_dataset_cfgs, parametric_dataset_cfgs = get_dataset_cfgs()

# Preprocess real data for parametric models
train_data_para, valid_data_para, test_data_para = preprocess_data(parametric_dataset_cfgs)

# Preprocess real data for non-parametric models (block bootstrap)
train_data_non_para, valid_data_non_para, test_data_non_para = preprocess_data(non_parametric_dataset_cfgs)

print(f"Parametric train data shape: {train_data_para.shape}")
print(f"Parametric valid data shape: {valid_data_para.shape}")
print(f"Non-parametric train data shape: {train_data_non_para.shape}")
print(f"Non-parametric valid data shape: {valid_data_non_para.shape}")


Preprocessing data for AAPL
Preprocessing data for AAPL
Parametric train data shape: torch.Size([9044, 4])
Parametric valid data shape: torch.Size([1131, 4])
Non-parametric train data shape: (9035, 13, 4)
Non-parametric valid data shape: (1129, 13, 4)


In [3]:
import pandas as pd

def get_initial_prices_from_original_data(original_data_path, log_returns_windows, window_size, channel_idx=0):
    """
    Extract initial prices for each window from original price data.
    
    Args:
        original_data_path: Path to original CSV file with price data
        log_returns_windows: Log returns windows of shape (R, L, N)
        window_size: Size of each window (L)
        channel_idx: Index of channel to extract (0 for Open)
    
    Returns:
        Initial prices for each window, shape (R,)
    """
    # Read original price data
    df = pd.read_csv(original_data_path)
    REQUIRED_COLUMNS = ['Open', 'High', 'Low', 'Close']
    original_prices = df[REQUIRED_COLUMNS].values  # (T, N)
    
    # Extract the channel we need (Open channel)
    prices_channel = original_prices[:, channel_idx]  # (T,)
    
    # Windows are created from log returns with step=1
    # Window i: log_returns[i:i+L] corresponds to prices[i:i+L+1], initial = prices[i]
    R = log_returns_windows.shape[0]
    initial_prices = np.zeros(R)
    
    max_start_idx = len(prices_channel) - window_size - 1
    
    for i in range(R):
        if i <= max_start_idx:
            initial_prices[i] = prices_channel[i]
        else:
            initial_prices[i] = prices_channel[min(i, max_start_idx)] if max_start_idx >= 0 else prices_channel[0]
    
    return initial_prices

def log_returns_to_prices(log_returns, original_data_path=None, initial_prices=None, channel_idx=0):
    """
    Convert log returns to prices, using initial prices from original data.
    
    Args:
        log_returns: Array of shape (R, L, N) or (L, N) of log returns
        original_data_path: Path to original CSV file (optional, to extract initial prices)
        initial_prices: Pre-computed initial prices, shape (R,) for 3D or scalar for 2D (optional)
        channel_idx: Channel index to use (0 for Open)
    
    Returns:
        prices: Array of same shape as log_returns but with prices
    """
    if isinstance(log_returns, torch.Tensor):
        log_returns_np = log_returns.cpu().numpy()
        return_tensor = True
    else:
        log_returns_np = np.asarray(log_returns)
        return_tensor = False
    
    if log_returns_np.ndim == 2:
        # Single time series (L, N)
        L, N = log_returns_np.shape
        prices = np.zeros((L + 1, N))
        
        if initial_prices is not None:
            initial_prices = np.asarray(initial_prices)
            if initial_prices.ndim == 0 or initial_prices.shape == ():
                prices[0] = initial_prices
            elif initial_prices.shape == (N,):
                prices[0] = initial_prices
            else:
                prices[0] = initial_prices[channel_idx] if channel_idx < len(initial_prices) else 1.0
        elif original_data_path is not None:
            # Get initial price from original data
            df = pd.read_csv(original_data_path)
            REQUIRED_COLUMNS = ['Open', 'High', 'Low', 'Close']
            original_prices = df[REQUIRED_COLUMNS].values
            prices[0] = original_prices[0, channel_idx] if len(original_prices) > 0 else 1.0
        else:
            prices[0] = 1.0
        
        for t in range(L):
            prices[t + 1] = prices[t] * np.exp(log_returns_np[t])
        prices = prices[1:]  # Remove initial price
        
    elif log_returns_np.ndim == 3:
        # Multiple time series (R, L, N)
        R, L, N = log_returns_np.shape
        prices = np.zeros((R, L + 1, N))
        
        if initial_prices is not None:
            initial_prices = np.asarray(initial_prices)
            if initial_prices.shape == (R,):
                # One initial price per sample
                prices[:, 0, channel_idx] = initial_prices
                # Use same initial price for all channels
                for ch in range(N):
                    if ch != channel_idx:
                        prices[:, 0, ch] = initial_prices
            elif initial_prices.shape == (R, N):
                prices[:, 0] = initial_prices
            else:
                # Scalar initial price
                prices[:, 0] = initial_prices
        elif original_data_path is not None:
            # Get initial prices from original data
            initial_prices_array = get_initial_prices_from_original_data(
                original_data_path, log_returns_np, L, channel_idx
            )
            prices[:, 0, channel_idx] = initial_prices_array
            # Use same initial price for all channels
            for ch in range(N):
                if ch != channel_idx:
                    prices[:, 0, ch] = initial_prices_array
        else:
            # Use default initial price of 1.0
            prices[:, 0] = 1.0
        
        for t in range(L):
            prices[:, t + 1] = prices[:, t] * np.exp(log_returns_np[:, t])
        prices = prices[:, 1:]  # Remove initial price
    else:
        raise ValueError(f"Expected 2D or 3D array, got {log_returns_np.ndim}D")
    
    if return_tensor:
        return torch.from_numpy(prices).float()
    return prices


In [4]:
# Generate synthetic data from parametric models
length_para, num_channels = train_data_para.shape
generation_length = train_data_non_para.shape[1]  # L from non-parametric data
num_samples = 500
seed = 42

print(f"Generating {num_samples} samples of length {generation_length}...")

# Initialize parametric models
parametric_models = {}
parametric_models["GBM"] = GeometricBrownianMotion(length=length_para, num_channels=num_channels)
parametric_models["OU Process"] = OUProcess(length=length_para, num_channels=num_channels)
parametric_models["MJD"] = MertonJumpDiffusion(length=length_para, num_channels=num_channels)
parametric_models["GARCH11"] = GARCH11(length=length_para, num_channels=num_channels)
parametric_models["DEJD"] = DoubleExponentialJumpDiffusion(length=length_para, num_channels=num_channels)

# Initialize block bootstrap
block_bootstrap = BlockBootstrap(block_size=generation_length)

# Fit models
print("\nFitting parametric models...")
for name, model in parametric_models.items():
    print(f"Fitting {name}...")
    model.fit(train_data_para)

# Fit block bootstrap
print("\nFitting Block Bootstrap...")
# Convert train_data_para to format expected by block bootstrap
if isinstance(train_data_para, torch.Tensor):
    block_bootstrap.fit(train_data_para)
else:
    block_bootstrap.fit(torch.from_numpy(train_data_para).float())

# Generate synthetic data
synthetic_data = {}
generation_kwargs = {'num_samples': num_samples, 'seq_length': generation_length, 'seed': seed}

print("\nGenerating synthetic data...")
for name, model in parametric_models.items():
    print(f"Generating {name}...")
    syn_data = model.generate(**generation_kwargs)
    if isinstance(syn_data, torch.Tensor):
        synthetic_data[name] = syn_data
    else:
        synthetic_data[name] = torch.from_numpy(syn_data).float()

# Generate block bootstrap data
print("Generating Block Bootstrap...")
syn_data_bb = block_bootstrap.generate(**generation_kwargs)
synthetic_data["BlockBootstrap"] = syn_data_bb if isinstance(syn_data_bb, torch.Tensor) else torch.from_numpy(syn_data_bb).float()

print("\nSynthetic data generation complete!")
for name, data in synthetic_data.items():
    print(f"{name}: {data.shape}")


Generating 500 samples of length 13...

Fitting parametric models...
Fitting GBM...
Fitting OU Process...
Fitting MJD...
Fitting GARCH11...
Fitting DEJD...

Fitting Block Bootstrap...

Generating synthetic data...
Generating GBM...
Generating OU Process...
Generating MJD...
Generating GARCH11...
Generating DEJD...
Generating Block Bootstrap...


  log_returns = torch.tensor(log_returns, device=self.device)



Synthetic data generation complete!
GBM: torch.Size([500, 13, 4])
OU Process: torch.Size([500, 13, 4])
MJD: torch.Size([500, 13, 4])
GARCH11: torch.Size([500, 13, 4])
DEJD: torch.Size([500, 13, 4])
BlockBootstrap: torch.Size([500, 13, 4])


In [5]:
# Convert log returns to prices for hedging
# Extract open channel and convert to prices using initial prices from real data
print("Converting log returns to prices...")

# Get original data path
original_data_path = non_parametric_dataset_cfgs.get('original_data_path')

# Real data prices (using validation set)
real_log_returns = valid_data_non_para  # (R, L, N)
if isinstance(real_log_returns, torch.Tensor):
    real_log_returns = real_log_returns.cpu().numpy()
real_prices = log_returns_to_prices(real_log_returns, original_data_path=original_data_path, channel_idx=0)

# Get mean initial price from real data for synthetic data
real_train_log_returns = train_data_non_para
if isinstance(real_train_log_returns, torch.Tensor):
    real_train_log_returns = real_train_log_returns.cpu().numpy()
real_initial_prices = get_initial_prices_from_original_data(
    original_data_path, real_train_log_returns, real_train_log_returns.shape[1], channel_idx=0
)
mean_initial_price = np.mean(real_initial_prices)
print(f"Mean initial price from real data: {mean_initial_price:.4f}")

# Synthetic data prices - use mean initial price from real data
synthetic_prices = {}
for name, syn_log_returns in synthetic_data.items():
    # Convert to numpy if tensor
    if isinstance(syn_log_returns, torch.Tensor):
        syn_log_returns = syn_log_returns.cpu().numpy()
    
    # Use mean initial price for all synthetic samples
    syn_prices = log_returns_to_prices(
        syn_log_returns, 
        initial_prices=np.ones(syn_log_returns.shape[0]) * mean_initial_price,
        channel_idx=0
    )
    synthetic_prices[name] = syn_prices
    print(f"{name} prices shape: {syn_prices.shape}")

print(f"Real prices shape: {real_prices.shape}")


Converting log returns to prices...
Mean initial price from real data: 4.4411
GBM prices shape: (500, 13, 4)
OU Process prices shape: (500, 13, 4)
MJD prices shape: (500, 13, 4)
GARCH11 prices shape: (500, 13, 4)
DEJD prices shape: (500, 13, 4)
BlockBootstrap prices shape: (500, 13, 4)
Real prices shape: (1129, 13, 4)


In [6]:
# Initialize deep hedging models
seq_length = generation_length
hidden_size = 64
strike = mean_initial_price  # At-the-money call option (use mean initial price)

print(f"Using strike price: {strike:.4f} (mean initial price)")

hedgers = {
    'Feedforward_L-1': FeedforwardDeepHedger(seq_length=seq_length, hidden_size=hidden_size, strike=strike),
    'Feedforward_Time': FeedforwardTimeDeepHedger(seq_length=seq_length, hidden_size=hidden_size, strike=strike),
    'RNN': RNNDeepHedger(seq_length=seq_length, hidden_size=hidden_size, strike=strike),
    'LSTM': LSTMDeepHedger(seq_length=seq_length, hidden_size=hidden_size, strike=strike)
}

print("Deep hedging models initialized:")
for name in hedgers.keys():
    print(f"  - {name}")


Using strike price: 4.4411 (mean initial price)
Deep hedging models initialized:
  - Feedforward_L-1
  - Feedforward_Time
  - RNN
  - LSTM


In [7]:
# Train deep hedgers on synthetic data
num_epochs = 50
batch_size = 32
learning_rate = 0.001

results = {}

# Test on GBM synthetic data
test_model_name = 'GBM'
syn_prices_test = synthetic_prices[test_model_name]

print(f"\nTraining deep hedgers on {test_model_name} synthetic data...")
print(f"Data shape: {syn_prices_test.shape}")
print(f"Using open channel only (extracting first channel)")

# Extract open channel (index 0) and convert to torch tensors
syn_prices_open = syn_prices_test[:, :, 0]  # (R, L)
if isinstance(syn_prices_open, np.ndarray):
    syn_prices_open = torch.from_numpy(syn_prices_open).float()
elif isinstance(syn_prices_open, torch.Tensor):
    syn_prices_open = syn_prices_open.float()

real_prices_open = real_prices[:, :, 0]  # (R, L)
if isinstance(real_prices_open, np.ndarray):
    real_prices_open = torch.from_numpy(real_prices_open).float()
elif isinstance(real_prices_open, torch.Tensor):
    real_prices_open = real_prices_open.float()

for hedger_name, hedger in hedgers.items():
    print(f"\n{'='*60}")
    print(f"Training {hedger_name}...")
    print(f"{'='*60}")
    
    hedger.fit(
        syn_prices_open,
        num_epochs=num_epochs,
        batch_size=batch_size,
        learning_rate=learning_rate,
        verbose=True
    )
    
    # Evaluate on synthetic data
    print(f"\nEvaluating {hedger_name} on synthetic data...")
    eval_results_syn = hedger.evaluate(syn_prices_open)
    
    # Evaluate on real data
    print(f"Evaluating {hedger_name} on real data...")
    eval_results_real = hedger.evaluate(real_prices_open)
    
    results[hedger_name] = {
        'synthetic': eval_results_syn,
        'real': eval_results_real
    }
    
    print(f"\n{hedger_name} Results:")
    print(f"  Synthetic - MSE: {eval_results_syn['mse_X']:.6f}, Mean X: {eval_results_syn['mean_X']:.6f}")
    print(f"  Real - MSE: {eval_results_real['mse_X']:.6f}, Mean X: {eval_results_real['mean_X']:.6f}")



Training deep hedgers on GBM synthetic data...
Data shape: (500, 13, 4)
Using open channel only (extracting first channel)

Training Feedforward_L-1...


RuntimeError: view size is not compatible with input tensor's size and stride (at least one dimension spans across two contiguous subspaces). Use .reshape(...) instead.

In [None]:
# Display results summary
print("\n" + "="*80)
print("DEEP HEDGING RESULTS SUMMARY")
print("="*80)

for hedger_name, result in results.items():
    print(f"\n{hedger_name}:")
    print(f"  Premium: {result['synthetic']['premium']:.6f}")
    print(f"  Synthetic Data:")
    print(f"    MSE(X): {result['synthetic']['mse_X']:.6f}")
    print(f"    Mean(X): {result['synthetic']['mean_X']:.6f}")
    print(f"    Std(X): {result['synthetic']['std_X']:.6f}")
    print(f"  Real Data:")
    print(f"    MSE(X): {result['real']['mse_X']:.6f}")
    print(f"    Mean(X): {result['real']['mean_X']:.6f}")
    print(f"    Std(X): {result['real']['std_X']:.6f}")
