In [35]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import grad
from torch.utils.data import DataLoader, TensorDataset
from torch.autograd import grad
import torch.optim as optim

import numpy as np
import torch
from scipy.stats import norm


In [36]:
from IPython.core.display import HTML
HTML("""
<style>
body { font-family: "Helvetica Neue", sans-serif; font-size: 15px; }
h1, h2, h3 { color: #34495e; }
p { line-height: 1.6; }
</style>
""")

In [37]:
class OPNN(nn.Module):

    # x = [S/K, T]
    def __init__(self, input_dim = 2, hidden_dim = 50, num_hidden_layers = 2):

        super(OPNN, self).__init__()

        # input layers
        layers = [
            
            nn.Linear(input_dim, hidden_dim),
            nn.Tanh()
            
        ]

        # hidden layers 
        for _ in range(num_hidden_layers - 1):

            layers.append(nn.Linear(hidden_dim, hidden_dim))
            layers.append(nn.Tanh())

        # output layer
        layers.append(nn.Linear(hidden_dim, 1))

        self.model = nn.Sequential(*layers) # unbounded/continuous

    def forward(self, x):

        # https://stats.stackexchange.com/questions/501437/why-are-the-softmax-softplus-and-softsign-functions-all-prefixed-with-the-word

        # adapted from relu but smoother and differentiable everywhere
        # ln(1 + e^x)

        # https://docs.pytorch.org/docs/stable/generated/torch.nn.Softplus.html
        return F.softplus(self.model(x))

note: tanh is smoother than ReLU and works better for PDE tasks (that's why no GELU)
final layer doesn't have activation, allows output range to be unconstrained
requires_grad for auto grad on the PDE residuals

S: Spot Price
K: Strike Price
T: Time to Maturity

Deeper Networks will overfit/struggle with PDE loss

In [38]:
# default 3, 3, 2

'''
model = OPNN(input_dim=2, hidden_dim=50, num_hidden_layers=2)

# Sample batch: S = 100, K = 100, T = 0.5
# Output.shape = [1, 1]
sample_S = torch.tensor([[100.0]], requires_grad=True)
sample_K = torch.tensor([[100.0]], requires_grad=True)
sample_T = torch.tensor([[0.5]], requires_grad=True)


nn_input = torch.cat([sample_S / sample_K, sample_T], dim=1)

output_normalized = model(nn_input)
output_C = output_normalized * sample_K

print("Model output (normalized C/K):", output_normalized)
print("Model output (un-normalized C):", output_C)
'''

'\nmodel = OPNN(input_dim=2, hidden_dim=50, num_hidden_layers=2)\n\n# Sample batch: S = 100, K = 100, T = 0.5\n# Output.shape = [1, 1]\nsample_S = torch.tensor([[100.0]], requires_grad=True)\nsample_K = torch.tensor([[100.0]], requires_grad=True)\nsample_T = torch.tensor([[0.5]], requires_grad=True)\n\n\nnn_input = torch.cat([sample_S / sample_K, sample_T], dim=1)\n\noutput_normalized = model(nn_input)\noutput_C = output_normalized * sample_K\n\nprint("Model output (normalized C/K):", output_normalized)\nprint("Model output (un-normalized C):", output_C)\n'

In [39]:
'''
output = model(normalized_input)

# https://docs.pytorch.org/docs/stable/generated/torch.ones_like.html

# calc first derivatives
grad_output = torch.ones_like(output)
dC_dinput = grad(output, sample_input, create_graph=True)[0]
print(dC_dinput)

# calc second derivatives
dC_dS = dC_dinput[:, 0]
d2C_dS2 = grad(dC_dS, sample_input, grad_outputs=torch.ones_like(dC_dS), create_graph=True)[0][:, 0]
print(d2C_dS2)
'''

'\noutput = model(normalized_input)\n\n# https://docs.pytorch.org/docs/stable/generated/torch.ones_like.html\n\n# calc first derivatives\ngrad_output = torch.ones_like(output)\ndC_dinput = grad(output, sample_input, create_graph=True)[0]\nprint(dC_dinput)\n\n# calc second derivatives\ndC_dS = dC_dinput[:, 0]\nd2C_dS2 = grad(dC_dS, sample_input, grad_outputs=torch.ones_like(dC_dS), create_graph=True)[0][:, 0]\nprint(d2C_dS2)\n'

In [40]:
# weight test
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total trainable parameters: {total_params}")

Total trainable parameters: 2751


In [41]:
# checking weights

for name, param in model.named_parameters():
    if param.requires_grad:
        print(f"{name}: mean={param.data.mean():.4f}, std={param.data.std():.4f}")

model.0.weight: mean=-0.0123, std=0.4252
model.0.bias: mean=-0.0293, std=0.4250
model.2.weight: mean=0.0020, std=0.0806
model.2.bias: mean=0.0235, std=0.0727
model.4.weight: mean=-0.0088, std=0.0753
model.4.bias: mean=0.1073, std=nan


  print(f"{name}: mean={param.data.mean():.4f}, std={param.data.std():.4f}")


In [42]:
# symmetry/invariance check

# call prices should INCREASE with S, and convex


'''
S_values = torch.tensor([[80.0, 100.0, 0.5],
                         [90.0, 100.0, 0.5],
                         [100.0, 100.0, 0.5],
                         [110.0, 100.0, 0.5]], requires_grad=True)

S_values_norm = S_values / torch.tensor([100.0, 100.0, 1.0])
prices = model(S_values_norm)
print("Monotonicity test:", prices.squeeze())

'''

'\nS_values = torch.tensor([[80.0, 100.0, 0.5],\n                         [90.0, 100.0, 0.5],\n                         [100.0, 100.0, 0.5],\n                         [110.0, 100.0, 0.5]], requires_grad=True)\n\nS_values_norm = S_values / torch.tensor([100.0, 100.0, 1.0])\nprices = model(S_values_norm)\nprint("Monotonicity test:", prices.squeeze())\n\n'

The output DECREASES as S increases which is wrong

In [43]:
'''
vectorized black scholes for call options

small epsilon to avoid distribution by zero or log of zero for T if T can be close to 0
for now, lets assume T > 0 is based on uniform(0.001, 1.0)
'''
def black_scholes_call_price(S, K, T, r, sigma):
    
    d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    
    d2 = d1 - sigma * np.sqrt(T)
    
    return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)

def generate_black_scholes_dataset(n_samples=10000, r=0.05, sigma=0.2, seed=42):
    np.random.seed(seed)

    # Sample inputs: S, K, T
    S = np.random.uniform(5, 200, size=n_samples) # Example range, adjust as needed
    K = np.random.uniform(5, 200, size=n_samples) # Example range, adjust as needed
    T = np.random.uniform(0.01, 1.0, size=n_samples) # Time to Maturity

    # Compute the true Black-Scholes call option price
    C = black_scholes_call_price(S, K, T, r, sigma)

    # Combine S, K, T into a single input tensor (raw_inputs)
    # This `raw_inputs` tensor will be passed to `total_loss`, `supervised_loss`, and `bs_pde_loss`
    raw_inputs = np.stack([S, K, T], axis=1) # Shape [n_samples, 3]

    # Convert to torch tensors
    y_tensor = torch.tensor(C.reshape(-1, 1), dtype=torch.float32) # Target prices (C)
    X_tensor = torch.tensor(raw_inputs, dtype=torch.float32) # Raw inputs [S, K, T]

    return X_tensor, y_tensor # X_tensor now contains [S, K, T]

In [44]:
X_train, y_train = generate_black_scholes_dataset(n_samples=10000)

print("Sample input (S, K, T):", X_train[:3])
print("Sample target prices (C):", y_train[:3])

Sample input (S, K, T): tensor([[ 78.0353,  77.8600,   0.7327],
        [190.3893,  69.9179,   0.1927],
        [147.7388,  39.3500,   0.3532]])
Sample target prices (C): tensor([[  6.8434],
        [121.1417],
        [109.0776]])


Looks like the simulator did well, within market range \

Sample input (S, K, T): \
tensor([[ 87.4540,  87.3641,  0.7327],  # near-the-money, moderate T \
        [145.0714,  83.2912,  0.1927],  # deep in-the-money \
        [123.1994,  67.6154,  0.3532]]) # deep in-the-money, mid-maturity \

Sample target prices (C): \
tensor([[ 7.6124],   # makes sense for near-the-money \
        [62.5787],  # very high because S >> K \ 
        [56.7675]]) # similar case: S >> K \

## implementing loss

In [45]:
# squared pde residual (physics informed)
'''
Computes the Black-Scholes PDE residual loss.
The PDE is: dC/dT + 0.5 * sigma^2 * S^2 * d2C/dS2 + r * S * dC/dS - r * C = 0 
inputs: tensor of shape [B, 3] = [S, K, T] (raw S, K, T values for PDE calculations)
'''
def bs_pde_loss(model, inputs, r=0.05, sigma=0.2):
    """
    Computes the Black-Scholes PDE residual loss.

    inputs: tensor of shape [B, 3] = [S, K, T]
    """
    # Ensure inputs require gradients for derivative computations
    # IMPORTANT: All subsequent slices for S, K, T must come from this tensor
    inputs_for_grad = inputs.clone().detach().requires_grad_(True)

    S = inputs_for_grad[:, 0:1] # Slice from the tensor that requires grad
    K = inputs_for_grad[:, 1:2] # Slice from the tensor that requires grad
    T = inputs_for_grad[:, 2:3] # Slice from the tensor that requires grad

    # ... rest of your code is fine from here onwards ...

    price_ratio = S / K
    nn_inputs = torch.cat([price_ratio, T], dim=1)

    C_normalized = model(nn_inputs)

    C = C_normalized * K

    # calc the dervis for pde
    dC_dT = grad(C, T, grad_outputs=torch.ones_like(C), create_graph=True)[0]
    dC_dS = grad(C, S, grad_outputs=torch.ones_like(C), create_graph=True)[0]
    d2C_dS2 = grad(dC_dS, S, grad_outputs=torch.ones_like(dC_dS), create_graph=True)[0]

    # total pde is: dC/dT + 0.5 * sigma^2 * S^2 * d2C/dS2 + r * S * dC/dS - r * C = 0
    pde_residual = dC_dT + 0.5 * sigma**2 * S**2 * d2C_dS2 + r * S * dC_dS - r * C

    loss = torch.mean(pde_residual**2)
    return loss

In [46]:
# sample input batch, untrained models should give non-zero pde residuals

inputs, _ = generate_black_scholes_dataset(n_samples=32)
pde_loss = bs_pde_loss(model, inputs)
print("pde loss:", pde_loss.item())


pde loss: 14.142163276672363


In [47]:
# supervised model loss, standard

# MSE between predicted and BS price

def supervised_loss(model, inputs, targets):

    S = inputs[:, 0:1]
    K = inputs[:, 1:2]
    T = inputs[:, 2:3]

    price_ratio = S / K
    nn_input_features = torch.cat([price_ratio, T], dim=1)

    predicted_C_normalized = model(nn_input_features)

    predicted_C = predicted_C_normalized * K

    return F.mse_loss(predicted_C, targets)




In [48]:
'''

combine the supervised loss and pde loss

next steps: dynamically weigh the lambdas to find optimal model

'''
def total_loss(model, inputs, targets, lambda_sup=1.0, lambda_pde=1.0, r=0.05, sigma=0.2):
    
    sup_loss = supervised_loss(model, inputs, targets)
    
    pde_loss = bs_pde_loss(model, inputs, r=r, sigma=sigma)
    
    return lambda_sup * sup_loss + lambda_pde * pde_loss

# Testing

In [49]:
# 1. Generate Training Data
# X_train will contain [S, K, T] and y_train will contain [C]
X_train, y_train = generate_black_scholes_dataset(n_samples=10000, r=0.05, sigma=0.2, seed=42)

# Instantiate the Model
model = OPNN(input_dim=2, hidden_dim=50, num_hidden_layers=2)

# Choose an Optimizer
learning_rate = 0.001
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Prepare Data Loaders for Batching
batch_size = 64 # testing with various batch sizes
train_dataset = TensorDataset(X_train, y_train)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Define Training Parameters (Moved to before the loop)
num_epochs = 250 # this is what's defined in the paper but lets see 
r = 0.05 # risk free from data gen
sigma = 0.2 # same thing with the vol

# The paper said that there are weights but didn't specify the exact values.
# We are assuming equal weighting for now.
lambda_supervised = 1.0
lambda_pde = 1.0

print(f"Starting training for {num_epochs} epochs...")

for epoch in range(num_epochs):
    epoch_loss = 0.0

    for batch_idx, (inputs_batch, targets_batch) in enumerate(train_loader):
        optimizer.zero_grad()

        loss = total_loss(
            model,
            inputs_batch,
            targets_batch,
            lambda_sup=lambda_supervised,
            lambda_pde=lambda_pde,
            r=r,
            sigma=sigma
        )

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    avg_epoch_loss = epoch_loss / len(train_loader)
    if (epoch + 1) % 10 == 0 or epoch == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {avg_epoch_loss:.6f}")

print("Training finished.")

Starting training for 250 epochs...
Epoch [1/250], Loss: 1663.587832
Epoch [10/250], Loss: 45.044433
Epoch [20/250], Loss: 16.830009
Epoch [30/250], Loss: 11.038599
Epoch [40/250], Loss: 9.417383
Epoch [50/250], Loss: 8.448577
Epoch [60/250], Loss: 8.152265
Epoch [70/250], Loss: 7.695060
Epoch [80/250], Loss: 7.383376
Epoch [90/250], Loss: 7.402732
Epoch [100/250], Loss: 7.516372
Epoch [110/250], Loss: 7.286956
Epoch [120/250], Loss: 7.288943
Epoch [130/250], Loss: 7.500748
Epoch [140/250], Loss: 7.165808
Epoch [150/250], Loss: 7.106995
Epoch [160/250], Loss: 7.133197
Epoch [170/250], Loss: 6.967560
Epoch [180/250], Loss: 7.145759
Epoch [190/250], Loss: 7.257288
Epoch [200/250], Loss: 7.216045
Epoch [210/250], Loss: 7.106313
Epoch [220/250], Loss: 7.347358
Epoch [230/250], Loss: 6.968484
Epoch [240/250], Loss: 7.035211
Epoch [250/250], Loss: 7.010589
Training finished.
