# Parallel Calibration Benchmark (rBergomi / Rough Heston)

This notebook benchmarks different parallel backends, worker counts, and batch sizes for the local machine,
to recommend mc['batch_size'] and n_workers for calibration.

- Backends: threads vs processes
- Workers: a few sensible counts around CPU cores
- Batch sizes: tuned per model (rBergomi vs Rough Heston)

Uses the optimized terminal-only simulators to mimic calibration workload.

In [1]:
import os, math, time, statistics as stats
import numpy as np
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import os, sys, math, json, time, hashlib
import numpy as np
from matplotlib import cm, colors, rcParams
rcParams['font.family'] = 'Times New Roman'
import matplotlib.pyplot as plt
repo_root = os.path.abspath(os.path.join(os.getcwd(), ".."))
if repo_root not in sys.path:
    sys.path.append(repo_root)

from src.rough import (
    rbergomi_terminal_parallel_pool,
    rough_heston_terminal_parallel_pool,
)

# keep BLAS threads to 1 to avoid oversubscription
for var in ("OMP_NUM_THREADS", "MKL_NUM_THREADS", "OPENBLAS_NUM_THREADS", "NUMEXPR_NUM_THREADS"):
    os.environ.setdefault(var, "1")

cores = os.cpu_count() or 4
cores


12

In [2]:
def bench_rbergomi(n_paths=20000, N=128, reps=1):
    S0, r, q = 100.0, 0.01, 0.0
    T, H, eta, rho, xi0 = 0.5, 0.12, 1.5, -0.6, 0.04
    backends = ['thread', 'process']
    worker_cands = sorted(set([min(4, cores), min(8, cores), max(1, cores//2), cores]))
    bs_cands = [2048, 4096, 8192] + ([16384] if n_paths >= 16384 else [])
    results = []
    for backend in backends:
        Exec = ThreadPoolExecutor if backend=='thread' else ProcessPoolExecutor
        for nw in worker_cands:
            for bs in bs_cands:
                t0 = time.perf_counter()
                for _ in range(reps):
                    with Exec(max_workers=int(nw)) as ex:
                        ST = rbergomi_terminal_parallel_pool(
                            ex, S0=S0, T=T, N=N, n_paths=n_paths, H=H, eta=eta, rho=rho, xi0=xi0,
                            r=r, q=q, base_seed=12345, fgn_method='davies-harte', batch_size=bs
                        )
                dt = time.perf_counter()-t0
                results.append((dt/reps, backend, int(nw), int(bs)))
    results.sort()
    return results

def bench_rough(n_paths=10000, N=96, reps=1):
    S0, r, q = 100.0, 0.01, 0.0
    T, v0, kappa, theta, eta, rho, H = 0.6, 0.04, 1.6, 0.04, 1.8, -0.7, 0.10
    backends = ['thread', 'process']
    worker_cands = sorted(set([min(4, cores), min(8, cores), max(1, cores//2), cores]))
    bs_cands = [512, 1024, 2048, 4096]
    results = []
    for backend in backends:
        Exec = ThreadPoolExecutor if backend=='thread' else ProcessPoolExecutor
        for nw in worker_cands:
            for bs in bs_cands:
                t0 = time.perf_counter()
                for _ in range(reps):
                    with Exec(max_workers=int(nw)) as ex:
                        ST = rough_heston_terminal_parallel_pool(
                            ex, S0=S0, v0=v0, T=T, N=N, n_paths=n_paths, H=H, kappa=kappa, theta=theta, eta=eta, rho=rho,
                            r=r, q=q, base_seed=23456, batch_size=bs
                        )
                dt = time.perf_counter()-t0
                results.append((dt/reps, backend, int(nw), int(bs)))
    results.sort()
    return results


In [3]:
# Run short sweeps (adjust n_paths/N up for more stable results)
rbergomi_results = bench_rbergomi(n_paths=20000, N=128, reps=1)
rough_results    = bench_rough(n_paths=10000,  N=96,  reps=1)
rbergomi_results[:5], rough_results[:5]


([(0.1881846000032965, 'thread', 6, 4096),
  (0.1971789999952307, 'thread', 12, 4096),
  (0.21670309999899473, 'thread', 8, 4096),
  (0.25177979999716626, 'thread', 6, 8192),
  (0.2544190999979037, 'thread', 4, 4096)],
 [(0.03448140000546118, 'thread', 8, 2048),
  (0.034725000004982576, 'thread', 4, 4096),
  (0.03523489999497542, 'thread', 4, 2048),
  (0.0353051999991294, 'thread', 8, 512),
  (0.03655150000122376, 'thread', 4, 1024)])

In [4]:
def summarize(name, results):
    best = results[0]
    print(f'{name} best: time={best[0]:.3f}s, backend={best[1]}, n_workers={best[2]}, batch_size={best[3]}')
    print('Top 5:')
    for row in results[:5]:
        print(f'  {row[0]:.3f}s  backend={row[1]:6s}  n_workers={row[2]:2d}  batch={row[3]:5d}')
    return dict(backend=best[1], n_workers=int(best[2]), batch_size=int(best[3]))

cfg_rbergomi = summarize('rBergomi', rbergomi_results)
cfg_rough    = summarize('RoughHeston', rough_results)
cfg_rough["use_numba"] = True
cfg_rbergomi, cfg_rough


rBergomi best: time=0.188s, backend=thread, n_workers=6, batch_size=4096
Top 5:
  0.188s  backend=thread  n_workers= 6  batch= 4096
  0.197s  backend=thread  n_workers=12  batch= 4096
  0.217s  backend=thread  n_workers= 8  batch= 4096
  0.252s  backend=thread  n_workers= 6  batch= 8192
  0.254s  backend=thread  n_workers= 4  batch= 4096
RoughHeston best: time=0.034s, backend=thread, n_workers=8, batch_size=2048
Top 5:
  0.034s  backend=thread  n_workers= 8  batch= 2048
  0.035s  backend=thread  n_workers= 4  batch= 4096
  0.035s  backend=thread  n_workers= 4  batch= 2048
  0.035s  backend=thread  n_workers= 8  batch=  512
  0.037s  backend=thread  n_workers= 4  batch= 1024


({'backend': 'thread', 'n_workers': 6, 'batch_size': 4096},
 {'backend': 'thread', 'n_workers': 8, 'batch_size': 2048, 'use_numba': True})

## Suggested usage
Use the best settings in your mc dict and pass the backend to calibration:

```python
best_rb = cfg_rbergomi
out_rb, _ = calibrate_rbergomi(
    smiles,
    mc=dict(N=192, paths=12000, fgn_method='davies-harte',
            batch_size=best_rb['batch_size'], n_workers=best_rb['n_workers']),
    parallel_backend=best_rb['backend'],
    terminal_only=True,
)

best_rh = cfg_rough
out_rh, _ = calibrate_rough_heston(
    smiles,
    mc=dict(N=192, paths=12000,
            batch_size=best_rh['batch_size'], n_workers=best_rh['n_workers']),
    parallel_backend=best_rh['backend'],
    terminal_only=True,
)
```
