In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torch.func import vmap, grad

import numpy as np
import scipy.stats as spstats
import matplotlib.pyplot as plt
import DGM 

# Deep Galerkin Method for the High-dimensional Black-Scholes Equation

An extension of **DGM: A deep learning algorithm for solving partial diﬀerential equations** 

<br>

Bilal Saleh Husain, 2024-25

<br>

### Equation, Simulation, & DGM Parameters

In [7]:
# Parameters for the problem

T = 1.0             # Maturity time
delta = 2 / 3       # Recovery rate
R = 0.02            # Risk-free interest rate
mu_bar = 0.02       # Drift coefficient
sigma_bar = 0.2     # Volatility coefficient
vh = 50             # Threshold for intensity function
vl = 70             # Threshold for intensity function
gamma_h = 0.2       # High intensity
gamma_l = 0.02      # Low intensity

S0 = 100
d=100               # Spatial dimension of problem 

In [8]:
# Neural network parameters

layer_width = 50         # Number of units per layer
n_layers = 3              # Number of LSTM layers
input_dim = 100           # Dimension of the underlying assets (x1, x2, ..., x100)
learning_rate = 0.001    # Learning rate for the optimizer


# Training parameters

sampling_stages = 15000     # Number of sampling stages
steps_per_sample = 1     # Number of optimization steps per sample
nSim_interior = 5000      # Number of interior samples
nSim_terminal = 5000      # Number of terminal samples


# Domain bounds

t_low = 0    # Lower bound for time
S_low = 50.0      # Lower bound for asset prices
S_high = 180.0   # Upper bound for asset prices 

### Terminal Condition

In [10]:
# Terminal condition g(x)
def terminal_condition(x):
    """
    Terminal condition g(x) = min{x1, x2, ..., x100}.
    """
    return torch.min(x, dim=1, keepdim=True).values

### Samplers

In [11]:
def sampler(nSim_interior, nSim_terminal, S0=100.0):
    """
    Generate training samples for the DGM network.
    Args:
        nSim_interior: Number of interior samples
        nSim_terminal: Number of terminal samples
        S0: Initial asset price (not used in uniform sampling)
    Returns:
        Tuple of torch tensors (t_interior, S_interior, t_terminal, S_terminal)
    """
    # Interior samples - time uniformly in [0,T], assets in [50,150] around thresholds
    t_interior = np.random.uniform(low=0, high=T, size=[nSim_interior, 1])
    S_interior = np.random.uniform(low=S_low, high=S_high, size=[nSim_interior, 100])
    
    # Terminal samples - fixed time T, assets in same range
    t_terminal = np.ones((nSim_terminal, 1)) * T
    S_terminal = np.random.uniform(low=S_low, high=S_high, size=[nSim_terminal, 100])
    
    # Convert to torch tensors
    t_interior = torch.tensor(t_interior, dtype=torch.float32, requires_grad=True)
    S_interior = torch.tensor(S_interior, dtype=torch.float32, requires_grad=True)
    t_terminal = torch.tensor(t_terminal, dtype=torch.float32)
    S_terminal = torch.tensor(S_terminal, dtype=torch.float32)
    
    return t_interior, S_interior, t_terminal, S_terminal

In [12]:
def sampler(nSim_interior, nSim_terminal, S0=100.0):
    # Interior samples - time uniformly in [0,T], assets lognormally distributed
    t_interior = np.random.uniform(low=0.0, high=T, size=[nSim_interior, 1])
    
    # Generate correlated Brownian motions (if needed)
    Z = np.random.normal(0, 1, size=[nSim_interior, 100])  # Standard Gaussian
    
    # Scale volatility by sqrt(t) for interior samples
    S_interior = S0 * np.exp(
        (mu_bar - 0.5 * sigma_bar**2) * t_interior + 
        sigma_bar * np.sqrt(t_interior) * Z
    )
    
    # Terminal samples - fixed time T
    t_terminal = np.ones((nSim_terminal, 1)) * T
    Z_terminal = np.random.normal(0, 1, size=[nSim_terminal, 100])
    S_terminal = S0 * np.exp(
        (mu_bar - 0.5 * sigma_bar**2) * T + 
        sigma_bar * np.sqrt(T) * Z_terminal
    )
    
    # Convert to torch tensors
    t_interior = torch.tensor(t_interior, dtype=torch.float32, requires_grad=True)
    S_interior = torch.tensor(S_interior, dtype=torch.float32, requires_grad=True)
    t_terminal = torch.tensor(t_terminal, dtype=torch.float32)
    S_terminal = torch.tensor(S_terminal, dtype=torch.float32)
    
    return t_interior, S_interior, t_terminal, S_terminal

### Loss Function

In [13]:
def loss(model, t_interior, S_interior, t_terminal, S_terminal):

    # PDE Loss Calculation
    t_interior.requires_grad_(True)
    S_interior.requires_grad_(True)
    
    V = model(t_interior, S_interior)
    
    # Compute first derivatives
    V_t = torch.autograd.grad(V.sum(), t_interior, create_graph=True)[0]
    V_x = torch.autograd.grad(V.sum(), S_interior, create_graph=True)[0]

    
    # ---- torch.func Hessian diagonal ----
    def get_second_derivative(S_batch):
        """Helper function for vmap"""
        S_batch = S_batch.unsqueeze(0)  # Add batch dim
        t_batch = t_interior[0:1]  # Reuse time from first sample
        
        def first_deriv(S):
            V = model(t_batch, S)
            return torch.autograd.grad(V.sum(), S, create_graph=True)[0]
        
        # Compute diagonal Hessian elements
        hessian_diag = grad(lambda S: first_deriv(S).sum())(S_batch)[0]
        return hessian_diag.squeeze(0)
    
    # Vectorize over batch dimension
    V_xx = vmap(get_second_derivative)(S_interior)
    # ---- end torch.func ----

    # Vectorized Q(V) calculation
    Q_V = torch.where(V < vh, gamma_h,
                     torch.where(V >= vl, gamma_l,
                               ((gamma_h - gamma_l)/(vh - vl)) * (V - vh) + gamma_h))
    
    # PDE residual
    pde_residual = (
        V_t
        + mu_bar * torch.sum(S_interior * V_x, dim=1, keepdim=True)
        + 0.5 * sigma_bar**2 * torch.sum(S_interior**2 * V_xx, dim=1, keepdim=True)
        - (1 - delta) * Q_V * V
        - R * V
    )

    L_pde = torch.mean(pde_residual**2)
    
    # Terminal Condition Loss
    target_payoff = torch.min(S_terminal, dim=1, keepdim=True)[0]  # min payoff
    fitted_payoff = model(t_terminal, S_terminal)
    L_terminal = torch.mean((fitted_payoff - target_payoff)**2)
    
    return L_pde, L_terminal

### Model Training

In [None]:
model = DGM.DGMNet(layer_width, n_layers, input_dim=input_dim)

optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = StepLR(optimizer, step_size=5000, gamma=0.5) 
iteration_counter = 0

t_0 = torch.tensor([[0.0]], dtype=torch.float32)         # t = 0 (shape: [1, 1])
x_100 = torch.ones((1, d), dtype=torch.float32) * 100    # x = (100, ..., 100) (shape: [1, d])

for i in range(sampling_stages):
    # Sample time-space pairs
    t_interior, S_interior, t_terminal, S_terminal = sampler(nSim_interior, nSim_terminal)

    iteration_counter += 1
    
    # Perform the training steps
    for _ in range(steps_per_sample):
        optimizer.zero_grad()
        
        # Compute the loss
        L1, L3 = loss(model, t_interior, S_interior, t_terminal, S_terminal)
        total_loss = L1 + L3
        
        # Backpropagate
        total_loss.backward()
        optimizer.step()
        
    scheduler.step()
    if iteration_counter % 1000 == 0:
        current_lr = optimizer.param_groups[0]['lr']
        print(f"↳ LR reduced to {current_lr:.1e} at iteration {iteration_counter}", '\n')
        
    
    print(f"Iteration {i}, Loss: {total_loss.item()}, L1: {L1.item()}, L3: {L3.item()}")
    print("Current prediction: ", model(t_0, x_100).item(), '\n')

Iteration 0, Loss: 3595.395263671875, L1: 15.67109489440918, L3: 3579.72412109375
Current prediction:  8.875784873962402 

Iteration 1, Loss: 3012.50634765625, L1: 15.328348159790039, L3: 2997.177978515625
Current prediction:  12.202801704406738 

Iteration 2, Loss: 2551.241455078125, L1: 18.737104415893555, L3: 2532.50439453125
Current prediction:  17.002155303955078 

Iteration 3, Loss: 2179.649169921875, L1: 19.707265853881836, L3: 2159.94189453125
Current prediction:  19.35318946838379 

Iteration 4, Loss: 1878.2037353515625, L1: 19.64272117614746, L3: 1858.56103515625
Current prediction:  22.084819793701172 

Iteration 5, Loss: 1650.39599609375, L1: 18.334640502929688, L3: 1632.0614013671875
Current prediction:  26.287307739257812 

Iteration 6, Loss: 1472.5941162109375, L1: 26.435155868530273, L3: 1446.158935546875
Current prediction:  28.83115577697754 

Iteration 7, Loss: 1316.9522705078125, L1: 19.610191345214844, L3: 1297.342041015625
Current prediction:  29.485971450805664 


### Results

In [11]:
t_0 = torch.tensor([[0.0]], dtype=torch.float32)         # t = 0 (shape: [1, 1])
x_100 = torch.ones((1, d), dtype=torch.float32) * 100    # x = (100, ..., 100) (shape: [1, d])

model_value = model(t_0, x_100)
print(f"Model value at t=0, x=(100, ..., 100): {model_value.item()}")

Model value at t=0, x=(100, ..., 100): 66.86580657958984
