In [1]:
from pylib.async_normals import ConcurrentNormGenerator
from pylib.immutable_base_model import ImmutableBaseModel
import numba.cuda
from pydantic import Field, confloat, field_validator, ConfigDict
from math import sqrt
import cupy as cp
from typing import Optional

In [2]:
class SimulationParams(ImmutableBaseModel):
    timesteps: int = Field(..., gt=0, description="Number of timesteps in the simulation")
    network_size: int = Field(..., gt=0, description="Size of the network")
    batches_per_mc_run: int = Field(..., gt=0, description="Number of batches per Monte Carlo run")
    threads_per_block: int = Field(..., gt=0, description="Number of threads per cuda block")
    mc_seed: int = Field(..., gt=0, description="Seed for Monte Carlo simulations")
    buffer_size: int = Field(..., gt=0, description="Size of the buffer used in simulations")

    def total_paths(self) -> int:
        return self.network_size * self.batches_per_mc_run

    def total_blocks(self) -> int:
        return (self.total_paths + self.threads_per_block - 1) // self.threads_per_block

In [3]:
@numba.cuda.jit
def SimulateBlackScholes(input_output, timesteps, sqrt_dt, X0, v):
    idx = numba.cuda.grid(1)
    if idx < input_output.shape[1]:
        X = X0
        for i in range(timesteps):
            dW = input_output[i,idx] * sqrt_dt
            X += v * X * dW
            X = abs(X)
            input_output[i,idx] = X

In [4]:
class BlackScholes:
    class Inputs(ImmutableBaseModel):
        X0: confloat(gt=0) = Field(..., description="Initial stock price, must be greater than 0")
        K: confloat(gt=0) = Field(..., description="Strike price, must be greater than 0")
        T: confloat(ge=0) = Field(..., description="Time to maturity in years, must be greater than 0")
        r: float = Field(..., description="Risk-free interest rate")
        d: float = Field(..., description="Dividend yield")
        v: confloat(gt=0) = Field(..., description="Volatility, must be greater than 0")
        
    class SimResults(ImmutableBaseModel):
        times: cp.ndarray = Field(..., description="vector of timesteps")
        sims: cp.ndarray = Field(..., description="array of simulated price paths")
        forwards: cp.ndarray = Field(..., description="vector of forwards")
        df: cp.ndarray = Field(..., description="vector of discount factors")

        @field_validator('times', 'sims', 'forwards', 'df', mode='before')
        def check_cupy_array(cls, v):
            if not isinstance(v, cp.ndarray):
                raise ValueError(f'{v} is not a CuPy array')
            return v

        model_config = ConfigDict(arbitrary_types_allowed=True)        

    class PricingResults(ImmutableBaseModel):
        call_price_intrinsic: cp.ndarray = Field(..., description="call zero vol price-- network learns additive adjustment")
        put_price_intrinsic: cp.ndarray = Field(..., description="put zero vol price-- network learns additive adjustment")
        underlying: cp.ndarray = Field(..., description="price of underlying")
        put_convexity: cp.ndarray = Field(..., description="put convexity adjustment on top of intrinsic")
        call_convexity: cp.ndarray = Field(..., description="call convexity adjustment on top of intrinsic")

        @field_validator('call_price_intrinsic', 'put_price_intrinsic', 'underlying', 'put_convexity', 'call_convexity', mode='before')
        def check_cupy_array(cls, v):
            if not isinstance(v, cp.ndarray):
                raise ValueError(f'{v} is not a CuPy array')
            return v

        model_config = ConfigDict(arbitrary_types_allowed=True)

    def __init__(self, sp: SimulationParams):
        self._sp=sp
        self._ng=ConcurrentNormGenerator(
            rows=self._sp.timesteps,
            cols=self._sp.total_paths,
            seed=self._sp.mc_seed,
            buffer_size=self._sp.buffer_size,
        )
        # create non-blocking streams for cupy and numba
        self._cp_stream=cp.cuda.Stream(non_blocking=True)
        self._numba_stream=numba.cuda.stream()
        
    def _simulate(self, inputs: Inputs)->SimResults:
        sims=self._ng.get_matrix()
        assert sims.shape == (self._sp.timesteps,self._sp.total_paths)
        dt=inputs.T / self._sp.timesteps
        sqrt_dt=sqrt(dt)
        
        # Convert sims to a Numba device array
        sims_numba = numba.cuda.to_device(sims, stream=self._numba_stream)
        
        # Launch the kernel in the stream
        SimulateBlackScholes[self._sp.total_blocks, self._sp.threads_per_block, self._numba_stream](
            sims_numba,
            self._sp.timesteps,
            sqrt_dt,
            inputs.X0,
            inputs.v,
        )
        
        # while that's happening, build forwards
        with self._cp_stream:
            times=cp.linspace(dt,inputs.T,num=self._sp.timesteps)
            forwards=inputs.X0 * cp.exp((inputs.r-inputs.d)*times)
            df=cp.exp(-inputs.r * times)
            
            # synchronize with kernel because next step requires sims
            self._numba_stream.synchronize()          
            
            # Compute the row-wise mean of the array
            row_means = cp.mean(sims, axis=1, keepdims=True)
            # Compute the division of factors needed
            factors = forwards[:, cp.newaxis] / row_means
            # Multiply each row of the array by the corresponding factor
            sims = sims * factors
            
        self._cp_stream.synchronize()
        return BlackScholes.SimResults(times=times,sims=sims,forwards=forwards,df=df)
    
    def price(self, inputs:Inputs, sr: Optional[SimResults]=None)->PricingResults:
        if sr is None:
            sr=self._simulate(inputs=inputs)
        
        with self._cp_stream:
            F=sr.forwards[-1]
            df=sr.df[-1]
            K=cp.array(inputs.K)
            put_intrinsic=df*cp.maximum(K-F,0)
            call_intrinsic=df*cp.maximum(F-K,0)
            underlying_terminal=sr.sims[-1,:].reshape([sp.network_size,sp.batches_per_mc_run])
            put_convexity=df*cp.maximum(K-underlying_terminal,0)-put_intrinsic
            call_convexity=df*cp.maximum(underlying_terminal-K,0)-call_intrinsic
            
        self._cp_stream.synchronize()            
        return BlackScholes.PricingResults(
            call_price_intrinsic=call_intrinsic,
            put_price_intrinsic=put_intrinsic, 
            underlying=underlying_terminal,
            put_convexity=put_convexity, 
            call_convexity=call_convexity,
        )

In [5]:
sp=SimulationParams(
    timesteps=1024,
    network_size=2048,
    batches_per_mc_run=32,
    threads_per_block=256,
    mc_seed=42,
    buffer_size=3,
)

In [6]:
inputs=BlackScholes.Inputs(
    X0=100,
    K=100,
    T=2,
    r=0.02,
    d=0.01,
    v=0.2,
)

In [7]:
bs = BlackScholes(sp=sp)
sr = bs._simulate(inputs)

In [8]:
pr=bs.price(inputs=inputs,sr=sr)