
# Final Experiments Notebook — Greeks Estimation via PINNs

This notebook is the single source of truth for the final report figures, tables, and metrics. It covers:
- Core experiment: seeded PINN training + in-distribution test metrics.
- OOD volatility evaluation.
- Baselines: finite differences and Monte Carlo.
- Ablations: supervised-only price regressor, adaptive sampling toggle, Sobolev λ sweep.
- Figures: training curves, price/Greek surfaces, PDE residual heatmap, Gamma smoothness.
- Reproducibility: seeds, configs, saved outputs.


## 0. Setup
- Uses local `src/` package; run from repo root.
- Toggle the run flags to avoid long jobs unless you want to re-train.


In [None]:

import os
import json
import time
from pathlib import Path
import numpy as np
import pandas as pd
import torch
import sys

# Plotting
import plotly.express as px
import plotly.graph_objects as go

# Add repo root to path so `import src` works when running from notebooks/
REPO_ROOT = Path.cwd().resolve()
if (REPO_ROOT / 'src').exists():
    import sys
    sys.path.append(str(REPO_ROOT))
else:
    # If running from notebooks/, jump one level up
    REPO_ROOT = REPO_ROOT.parent
    sys.path.append(str(REPO_ROOT))

from src import DATA_DIR, RESULTS_DIR, FIGURES_DIR
from src.preprocessing import load_normalization_config, normalize_inputs
from src.models import PINNModel, load_pinn_checkpoint
from src.train import train
from src.test import evaluate_oos
from src.losses import compute_pde_residual
from src.baselines import finite_diff_greeks, mc_pathwise_greeks
from src.utils import bs_price

# Device and seeds
SEED = 7
torch.manual_seed(SEED)
np.random.seed(SEED)
device = torch.device('cuda' if torch.cuda.is_available() else ('mps' if torch.backends.mps.is_available() else 'cpu'))
device


## 1. Data and normalization
Inspect the synthetic data bounds used for all experiments.


In [None]:

meta = load_normalization_config(DATA_DIR)
meta


In [None]:

# Quick peek at the test split statistics
import numpy as np
import pandas as pd

test_path = DATA_DIR / 'synthetic_test.npy'
assert test_path.exists(), "Missing synthetic_test.npy"
S, t, sigma, price = np.load(test_path).T
summary = pd.DataFrame({
    'S': S,
    't': t,
    'sigma': sigma,
    'price': price,
}).describe(percentiles=[0.05, 0.5, 0.95])
summary


## 2. Training configuration
The default block trains the PINN with warmup, Sobolev regularization, and adaptive sampling. Set `RUN_TRAIN=True` to retrain; otherwise we load the existing checkpoint in `results/pinn_checkpoint.pt`.


In [None]:

RUN_TRAIN = False  # flip to True to retrain
TRAIN_CONFIG = dict(
    epochs=50,
    lr=5e-4,
    batch_size=4096,
    data_path=DATA_DIR / 'synthetic_train.npy',
    val_path=DATA_DIR / 'synthetic_val.npy',
    checkpoint_path=RESULTS_DIR / 'pinn_checkpoint.pt',
    device=device,
    adaptive_sampling=True,
    adaptive_every=5,
    adaptive_points=10_000,
    adaptive_radius=0.1,
    adaptive_eval_samples=50_000,
    use_warmup=True,
    warmup_steps=500,
    warmup_base_lr=1e-5,
    grad_clip=1.0,
    lambda_reg=0.01,
    boundary_weight=1.0,
    boundary_warmup=10,
    plot_losses=True,
    plot_path=FIGURES_DIR / 'training_curves' / 'loss_curves.html',
    log_path=RESULTS_DIR / 'training_history.json',
)

if RUN_TRAIN:
    model, history = train(**TRAIN_CONFIG)
    print('Training complete; last epoch:', history[-1])


## 3. Load trained model
Uses the main checkpoint by default; adjust `MODEL_PATH` if you run new experiments.


In [None]:

MODEL_PATH = RESULTS_DIR / 'pinn_checkpoint.pt'
assert MODEL_PATH.exists(), f"Missing checkpoint at {MODEL_PATH}"
model = PINNModel().to(device)
load_pinn_checkpoint(model, torch.load(MODEL_PATH, map_location=device), strict=False)
model.eval()
print('Loaded', MODEL_PATH)


## 4. In-distribution evaluation
Evaluate on the held-out test set with analytic targets, finite differences, and Monte Carlo baselines (sampled for speed). Adjust `sample_size` or `mc_paths` as needed.


In [None]:

metrics_id = evaluate_oos(
    data_path=DATA_DIR / 'synthetic_test.npy',
    model_path=MODEL_PATH,
    device=device,
    sample_size=5000,   # subset for speed; set None for full test set
    mc_paths=5000,      # reduce for quick runs; increase for lower variance
    seed=SEED,
    fig_dir=None,
)

pd.DataFrame(metrics_id, index=[0]).T.rename(columns={0: 'value'})


## 5. Out-of-distribution volatility sweep
Generate σ ∈ [0.60, 0.65] samples and reuse the same evaluation harness. The synthetic OOD set is saved for reproducibility.


In [None]:

OOD_PATH = DATA_DIR / 'synthetic_ood_vol.npy'
if not OOD_PATH.exists():
    rng = np.random.default_rng(SEED)
    n_ood = 20000
    S_ood = rng.uniform(meta.S_min, meta.S_max, n_ood)
    t_ood = rng.uniform(meta.t_min, meta.t_max, n_ood)
    sigma_ood = rng.uniform(0.60, 0.65, n_ood)
    V_ood = bs_price(S_ood, meta.K, T=meta.T, t=t_ood, sigma=sigma_ood, r=meta.r)
    np.save(OOD_PATH, np.stack([S_ood, t_ood, sigma_ood, V_ood], axis=1))
    print('Saved OOD set to', OOD_PATH)

metrics_ood = evaluate_oos(
    data_path=OOD_PATH,
    model_path=MODEL_PATH,
    device=device,
    sample_size=5000,
    mc_paths=3000,
    seed=SEED,
    fig_dir=None,
)

pd.DataFrame(metrics_ood, index=[0]).T.rename(columns={0: 'value'})


## 6. Surfaces and PDE residuals
Produce price/Greek surfaces on a grid and a PDE residual heatmap for the report. Figures are saved to `figures/final/`.


In [None]:

FIG_OUT = FIGURES_DIR / 'final'
FIG_OUT.mkdir(parents=True, exist_ok=True)

s_lin = torch.linspace(meta.S_min, meta.S_max, 40, device=device)
sigma_lin = torch.linspace(meta.sigma_min, meta.sigma_max, 40, device=device)
Sg, Sigmag = torch.meshgrid(s_lin, sigma_lin, indexing='ij')
t_val = torch.full_like(Sg, 1.0)
features = normalize_inputs(Sg.flatten(), t_val.flatten(), Sigmag.flatten(), config=meta)
features = features.detach().clone().requires_grad_(True)

price_grid = model(features).reshape(Sg.shape)

# Greeks via autograd
ones = torch.ones_like(price_grid.flatten())
grad_feats = torch.autograd.grad(price_grid.flatten(), features, grad_outputs=ones, create_graph=True)[0]

x_range = max(meta.x_max - meta.x_min, 1e-6)
dx_norm_dx = 2.0 / x_range

S_flat = Sg.flatten()
dV_dx = grad_feats[:, 0] * dx_norm_dx
Delta = (dV_dx / S_flat).reshape(Sg.shape)

# Second derivative
second = torch.autograd.grad(grad_feats[:, 0], features, grad_outputs=torch.ones_like(grad_feats[:, 0]), create_graph=True)[0][:,0]
Gamma = (second * (dx_norm_dx**2) * (1.0 / (S_flat**2)) + dV_dx * (-1.0 / (S_flat**2))).reshape(Sg.shape)

# PDE residual
pde = compute_pde_residual(
    model,
    Sg.flatten(),
    t_val.flatten(),
    Sigmag.flatten(),
    r=meta.r,
    config=meta,
).reshape(Sg.shape).detach().cpu().numpy()

# Convert to CPU for plotting
plots = {
    'price': price_grid.detach().cpu().numpy(),
    'delta': Delta.detach().cpu().numpy(),
    'gamma': Gamma.detach().cpu().numpy(),
    'pde_residual': pde,
}

for name, arr in plots.items():
    fig = px.imshow(
        arr.T,
        x=s_lin.cpu().numpy(),
        y=sigma_lin.cpu().numpy(),
        origin='lower',
        aspect='auto',
        color_continuous_scale='Viridis',
        labels={'x': 'S', 'y': 'sigma', 'color': name},
        title=f'{name} @ t=1.0',
    )
    fig.write_html(FIG_OUT / f'{name}_heatmap.html')
    fig.show()


## 7. Gamma smoothness (total variation)
Compute the total variation of Gamma over the grid as the smoothness metric.


In [None]:

import numpy as np

def total_variation(arr: np.ndarray) -> float:
    return float(np.abs(np.diff(arr, axis=0)).sum() + np.abs(np.diff(arr, axis=1)).sum())

gamma_tv = total_variation(plots['gamma'])
print('Gamma total variation:', gamma_tv)


## 8. Runtime profiling
Measure inference latency (per-sample and batch) on the current device.


In [None]:

model.eval()
with torch.no_grad():
    sample_feats = normalize_inputs(
        torch.tensor([100.0], device=device),
        torch.tensor([1.0], device=device),
        torch.tensor([0.2], device=device),
        config=meta,
    )
    batch_feats = normalize_inputs(
        torch.tensor(np.linspace(meta.S_min, meta.S_max, 1024, dtype=np.float32), device=device),
        torch.tensor(np.linspace(meta.t_min, meta.t_max, 1024, dtype=np.float32), device=device),
        torch.tensor(np.linspace(meta.sigma_min, meta.sigma_max, 1024, dtype=np.float32), device=device),
        config=meta,
    )

    # Warmup
    _ = model(sample_feats)

    def time_fn(fn, iters=50):
        torch.cuda.synchronize() if device.type == 'cuda' else None
        start = time.time()
        for _ in range(iters):
            fn()
        if device.type == 'cuda':
            torch.cuda.synchronize()
        return (time.time() - start) / iters

    per_sample_ms = time_fn(lambda: model(sample_feats)) * 1000
    batch_ms = time_fn(lambda: model(batch_feats)) * 1000

print(f'Latency: {per_sample_ms:.4f} ms/sample, {batch_ms:.4f} ms for batch=1024')


## 9. Ablations (ready-to-run toggles)
- **Supervised-only** (no PDE/BC): trains a plain regressor on price labels.
- **No adaptive sampling**: reuse the main PINN config with `adaptive_sampling=False`.
- **Sobolev λ sweep**: loop over {0.001, 0.01, 0.1}.

These are disabled by default to keep the notebook fast; flip the flags to generate additional curves/metrics for the paper.


In [None]:

RUN_SUPERVISED_ONLY = True
RUN_NO_ADAPTIVE = True
RUN_LAMBDA_SWEEP = True

if RUN_SUPERVISED_ONLY:
    from torch.utils.data import DataLoader, TensorDataset
    data = np.load(DATA_DIR / 'synthetic_train.npy')
    tensors = [torch.tensor(data[:, i], dtype=torch.float32) for i in range(4)]
    loader = DataLoader(TensorDataset(*tensors), batch_size=4096, shuffle=True)
    sup_model = PINNModel().to(device)
    opt = torch.optim.Adam(sup_model.parameters(), lr=1e-3)
    sup_history = []
    for epoch in range(10):
        sup_model.train()
        losses = []
        for S, t, sigma, V in loader:
            S = S.to(device); t = t.to(device); sigma = sigma.to(device); V = V.to(device)
            feats = normalize_inputs(S, t, sigma, config=meta)
            pred = sup_model(feats).squeeze(-1)
            loss = torch.mean((pred - V) ** 2)
            opt.zero_grad(); loss.backward(); opt.step()
            losses.append(loss.item())
        sup_history.append(float(np.mean(losses)))
        print(f'[Supervised] epoch {epoch+1}: loss={sup_history[-1]:.4f}')

if RUN_NO_ADAPTIVE:
    cfg = TRAIN_CONFIG.copy()
    cfg.update(dict(adaptive_sampling=False, checkpoint_path=RESULTS_DIR / 'pinn_no_adaptive.pt', log_path=RESULTS_DIR / 'history_no_adaptive.json'))
    _, _ = train(**cfg)

if RUN_LAMBDA_SWEEP:
    sweep_results = []
    for lam in [0.001, 0.01, 0.1]:
        cfg = TRAIN_CONFIG.copy()
        cfg.update(dict(lambda_reg=lam, checkpoint_path=RESULTS_DIR / f'pinn_lambda_{lam}.pt', log_path=RESULTS_DIR / f'history_lambda_{lam}.json'))
        _, hist = train(**cfg)
        sweep_results.append({'lambda': lam, 'final_loss': hist[-1]['loss']})
    display(pd.DataFrame(sweep_results))


## 10. Export summary table for the report
Collect the key metrics (ID vs OOD) and write them to `results/final_summary.csv` for easy inclusion in the paper.


In [None]:

summary_df = pd.DataFrame([
    {'split': 'in_distribution', **metrics_id},
    {'split': 'ood_vol', **metrics_ood},
])
summary_path = RESULTS_DIR / 'final_summary.csv'
summary_df.to_csv(summary_path, index=False)
summary_df
