In [497]:
import numba as nb

@nb.njit
def b_numba(k, a):
    """Numba version of b(k, a)"""
    return ((k**(a+1) - (k-1)**(a+1)) / (a+1)) ** (1/a)

@nb.njit
def g_numba(x, a):
    """Numba version of g(x, a)"""
    return x**a

In [499]:
@nb.njit(fastmath=True, parallel=True)
def cumsum_numba(arr):
    """Numba-compatible cumulative sum along axis 1."""
    res = np.empty_like(arr)
    for i in nb.prange(arr.shape[0]):  # Loop over paths
        res[i, 0] = arr[i, 0]
        for j in range(1, arr.shape[1]):
            res[i, j] = res[i, j - 1] + arr[i, j]
    return res

In [None]:
   def Y(self, dW):
        """
        Constructs Volterra process from appropriately
        correlated 2d Brownian increments.
        """
        Y1 = np.zeros((self.N, 1 + self.s)) # Exact integrals
        Y2 = np.zeros((self.N, 1 + self.s)) # Riemann sums

        # Construct Y1 through exact integral
        for i in np.arange(1, 1 + self.s, 1):
            Y1[:,i] = dW[:,i-1,1] # Assumes kappa = 1

        # Construct arrays for convolution
        G = np.zeros(1 + self.s) # Gamma
        for k in np.arange(2, 1 + self.s, 1):
            G[k] = g(b(k, self.a)/self.n, self.a)

        X = dW[:,:,0] # Xi

        # Initialise convolution result, GX
        GX = np.zeros((self.N, len(X[0,:]) + len(G) - 1))

        # Compute convolution, FFT not used for small n
        # Possible to compute for all paths in C-layer?
        for i in range(self.N):
            GX[i,:] = np.convolve(G, X[i,:])

        # Extract appropriate part of convolution
        Y2 = GX[:,:1 + self.s]

        # Finally contruct and return full process
        Y = np.sqrt(2 * self.a + 1) * (Y1 + Y2)
        return Y

In [450]:
@nb.njit(parallel=True)
def Y_numba(dW, N, s, a, n):
    """Numba version of Y(dW)"""
    Y1 = np.zeros((N, 1 + s))  # Exact integrals
    Y2 = np.zeros((N, 1 + s))  # Riemann sums

    # Construct Y1 through exact integral
    for i in range(1, 1 + s,1):
        Y1[:, i] = dW[:, i-1, 1]  # Assumes kappa = 1

    # Construct arrays for convolution
    G = np.zeros(1 + s)  # Gamma
    for k in range(2, 1 + s,1):
        G[k] = g_numba(b_numba(k, a) / n, a)  # Uses previously tested functions

    X = dW[:, :, 0]  # Xi

    # Initialise convolution result
    GX = np.zeros((N, len(X[0, :]) + len(G) - 1))

    # Compute convolution
    for i in nb.prange(N):
        GX[i, :] = np.convolve(G, X[i, :])

    # Extract appropriate part of convolution
    Y2 = GX[:, :1 + s]

    # Finally construct and return full process
    Y = np.sqrt(2 * a + 1) * (Y1 + Y2)
    return Y

In [452]:
@nb.njit(fastmath=True, parallel=True)
def dB_numba(dW1, dW2, rho):
    """Numba-optimized correlated Brownian motion process."""
    return rho * dW1[:, :, 0] + np.sqrt(1 - rho**2) * dW2

In [454]:
@nb.njit(fastmath=True, parallel=True)
def V_numba(Y, v0, eta, H, tau, s):
    t_grid = np.linspace(0, tau, s+1).reshape(1, -1)
    exponent = eta * Y - 0.5 * eta**2 * t_grid**(2 * H)
    V = v0 * np.exp(exponent)  # ✅ Correct, using `np.exp`
    return V

In [456]:
@nb.njit(fastmath=True, parallel=True)
def S_numba(V, dB, S0, dt):
    """Numba-optimized price process S."""
    increments = np.sqrt(V[:, :-1]) * dB - 0.5 * V[:, :-1] * dt
    integral = cumsum_numba(increments)  # Use custom cumsum function
    S = np.ones_like(V) * S0  # Ensure initial S0 is set properly
    S[:, 1:] = S0 * np.exp(integral)
    return S

In [458]:
import numpy as np 
# Define parameters
N, s, a, n = 1000, 100, -0.4, 100
tau = 1.0
H = a+.5
eta = 1.
v0 = 1.
S0 = 1.0
dt = 1.0 / n

# Generate random dW samples
np.random.seed(42)
dW_samples = np.random.randn(N, s, 2)

# Compute Y, V, and S using Numba
Y_numba_result = Y_numba(dW_samples, N, s, a, n)
V_numba_result = V_numba(Y_numba_result, v0, eta, H, tau, s)
dB_samples = np.random.randn(N, s)  # Generate correlated Brownian motion
S_numba_result = S_numba(V_numba_result, dB_samples, S0, dt)

# Print first 5 values of S
print("First 5 simulated S values (Numba):", S_numba_result[0, :5])

First 5 simulated S values (Numba): [1.00000000e+00 4.74381049e+00 4.35047103e+00 2.01758918e-01
 3.01714442e-03]


In [460]:
dB_python_result = rB.dB(dW_samples, dW2_samples, rho)

In [462]:
dB_numba_result = dB_numba(dW_samples, dW2_samples, rho)

In [464]:
V_numba_result = V_numba(Y_numba_result, v0, eta, H, tau, s)


In [466]:
print("\n🔍 V_numba / V_python first 10 values:", (V_numba_result / V_python)[0, :10])


🔍 V_numba / V_python first 10 values: [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]


In [414]:
@nb.njit(fastmath=True, parallel=True)
def rBergomi_pricer_numba(H, eta, rho, v0, tau, K, S0, MC_samples=40000, n=365):
    """Numba-optimized European Call price under rBergomi dynamics."""

    # Derived parameters
    a = H - 0.5
    dt = tau / n
    s = n  # Number of time steps

    # Generate Brownian motions
    dW1 = np.random.randn(MC_samples, s)
    dW2 = np.random.randn(MC_samples, s)

    # Construct correlated Brownian motion dB
    dB = rho * dW1 + np.sqrt(1 - rho**2) * dW2

    # Ensure correct reshaping
    dW1_reshaped = dW1.reshape(MC_samples, s, 1)  
    dB_reshaped = dB.reshape(MC_samples, s, 1)  

    # Compute fractional volatility process Y
    Y = Y_numba(dW1_reshaped, MC_samples, s, a, n)

    # Compute variance process V
    V = V_numba(Y, v0, eta, H, tau, s)  # ✅ This now returns an array

    # ✅ Ensure `V` has the correct shape before passing to `S_numba`
    V_trimmed = V[:, :s]  # Now this slicing works

    # Compute stock price process S
    S = S_numba(V_trimmed, dB_reshaped, S0, dt)

    # Extract terminal stock prices
    ST = S[:, -1]

    # Compute European Call price
    payoffs = np.maximum(ST - K, 0)
    price = np.mean(payoffs)

    return price

In [416]:
@nb.njit(fastmath=True, parallel=True)
def rBergomi_pricer_numba(H, eta, rho, v0, tau, K, S0, MC_samples=40000, n=365):
    """Numba-optimized European Call price under rBergomi dynamics."""

    # Derived parameters
    a = H - 0.5
    dt = tau / n
    s = n  # Number of time steps

    # Generate Brownian motions
    dW1 = np.random.randn(MC_samples, s)
    dW2 = np.random.randn(MC_samples, s)

    # Construct correlated Brownian motion dB
    dB = rho * dW1 + np.sqrt(1 - rho**2) * dW2

    # Ensure correct reshaping
    dW1_reshaped = dW1.reshape(MC_samples, s, 1)  
    dB_reshaped = np.ascontiguousarray(dB[:, :s]).copy()  # ✅ Now fully contiguous

    # Compute fractional volatility process Y
    Y = Y_numba(dW1_reshaped, MC_samples, s, a, n)

    # Compute variance process V
    V = V_numba(Y, v0, eta, H, tau, s)

    # ✅ Ensure `V` has correct shape before passing to `S_numba`
    V_trimmed = V[:, :s]  # Now this slicing works

    # ✅ Debugging prints to verify shape match
    print("✅ V_trimmed shape:", V_trimmed.shape)
    print("✅ dB_reshaped shape:", dB_reshaped.shape)

    # Compute stock price process S
    S = S_numba(V_trimmed, dB_reshaped, S0, dt)

    # Extract terminal stock prices
    ST = S[:, -1]

    # Compute European Call price
    payoffs = np.maximum(ST - K, 0)
    price = np.mean(payoffs)

    return price

In [419]:
import data as da


In [421]:
@nb.njit(fastmath=True, parallel=True)
def rBergomi_pricer_numba(H, eta, rho, v0, tau, K, S0, MC_samples):
    """Numba-optimized rBergomi European call option pricer."""
    
    # ✅ Initialize model
    N, s = MC_samples, int(365 * tau)  # Fix s = n * tau
    a = H - 0.5
    dt = tau / s
    
    # ✅ Generate Brownian motions
    np.random.seed(42)  # Ensures same randomness every time
    dW1 = np.random.randn(N, s)
    dW2 = np.random.randn(N, s)
    
    # ✅ Compute correlated Brownian motion
    dB = rho * dW1 + np.sqrt(1 - rho**2) * dW2
    
    # ✅ Debugging prints before computing Y
    print("\n🔍 First 10 values of dW1 (Numba):", dW1[0, :10])
    print("🔍 Min/Max/Mean dW1 (Numba):", np.min(dW1), np.max(dW1), np.mean(dW1))
    
    print("\n🔍 First 10 values of dW2 (Numba):", dW2[0, :10])
    print("🔍 Min/Max/Mean dW2 (Numba):", np.min(dW2), np.max(dW2), np.mean(dW2))
    
    print("\n🔍 First 10 values of dB (Numba):", dB[0, :10])
    print("🔍 Min/Max/Mean dB (Numba):", np.min(dB), np.max(dB), np.mean(dB))

    # ✅ Compute Y (fractional volatility process)
    Y = Y_numba(dW1, N, s, a, n)
    
    # ✅ Compute variance process V
    V = V_numba(Y, v0, eta, H, tau, s)
    
    # ✅ Compute stock prices S
    S = S_numba(V, dB, S0, dt)
    ST = S[:, -1]  # Extract terminal prices

    # ✅ Debugging prints for ST
    print("\n🔍 First 10 values of ST (Numba):", ST[:10])
    print("🔍 Min/Max/Mean ST (Numba):", np.min(ST), np.max(ST), np.mean(ST))

    # ✅ Compute option price
    price = np.mean(np.maximum(ST - K, 0))

    # ✅ Debugging price before checks
    print("\n🔍 Price before numerical stability checks (Numba):", price)

    # ✅ Check for numerical stability
    if price <= 0 or price + K < S0:
        print("🚨 NumStabProblem: Price {} < intrinsic.".format(price))
        return np.nan  # Returning NaN for debugging
    
    return price

In [431]:
def rBergomi_pricer(H, eta, rho, v0, tau, K, S0, MC_samples=40000):
    """Python implementation of rBergomi Monte Carlo pricer."""
    try:
        rB = rBergomi(n=365, N=MC_samples, T=tau, a=H - 0.5)
        dW1, dW2 = rB.dW1(), rB.dW2()
        Y = rB.Y(dW1)
        dB = rB.dB(dW1, dW2, rho)
        xi = v0
        V = rB.V(Y, xi, eta)
        S = rB.S(V, dB)
        ST = S[:, -1]
        price = np.mean(np.maximum(ST - K, 0))
        
        # Debugging prints for first 10 values
        print("\n🔴 Python Debug:")
        print("🔍 First 10 values of V (Python):", V[0, :10])
        print("🔍 First 10 values of S (Python):", S[0, :10])
        print("🔍 First 10 terminal prices ST (Python):", ST[:10])
        print("🔍 Price before checking:", price)
        
    except Exception as e:
        print("Error in Python pricer:", e)
        return np.nan, np.nan  # Handle error case

    return price, np.nan  # Ignore implied volatility for now


In [495]:
dB=dB_numba(dW1,dW2,rho)

TypingError: Failed in nopython mode pipeline (step: nopython frontend)
[1m[1m[1mnon-precise type pyobject[0m
[0m[1mDuring: typing of argument at /var/folders/17/z52ptwgn5qx8rp5fr81jwqs00000gn/T/ipykernel_70825/1703965354.py (1)[0m
[1m
File "../../../../var/folders/17/z52ptwgn5qx8rp5fr81jwqs00000gn/T/ipykernel_70825/1703965354.py", line 1:[0m
[1m<source missing, REPL/exec in use?>[0m

[0m[1mDuring: Pass nopython_type_inference[0m 

This error may have been caused by the following argument(s):
- argument 0: [1mCannot determine Numba type of <class 'method'>[0m
- argument 1: [1mCannot determine Numba type of <class 'method'>[0m 

This error may have been caused by the following argument(s):
- argument 0: [1mCannot determine Numba type of <class 'method'>[0m
- argument 1: [1mCannot determine Numba type of <class 'method'>[0m 

This error may have been caused by the following argument(s):
- argument 0: [1mCannot determine Numba type of <class 'method'>[0m
- argument 1: [1mCannot determine Numba type of <class 'method'>[0m 

This error may have been caused by the following argument(s):
- argument 0: [1mCannot determine Numba type of <class 'method'>[0m
- argument 1: [1mCannot determine Numba type of <class 'method'>[0m


In [493]:
rho=-.7
dW1=rB.dW1
dW2=rB.dW2
dB=dB_numba(dW1,dW2,rho)
print("dW1 shape:", dB.shape)  # Should be (MC_samples, s)
Y = Y_numba(dB, MC_samples, 100, H - 0.5, 100)

TypingError: Failed in nopython mode pipeline (step: nopython frontend)
[1m[1m[1mnon-precise type pyobject[0m
[0m[1mDuring: typing of argument at /var/folders/17/z52ptwgn5qx8rp5fr81jwqs00000gn/T/ipykernel_70825/1703965354.py (1)[0m
[1m
File "../../../../var/folders/17/z52ptwgn5qx8rp5fr81jwqs00000gn/T/ipykernel_70825/1703965354.py", line 1:[0m
[1m<source missing, REPL/exec in use?>[0m

[0m[1mDuring: Pass nopython_type_inference[0m 

This error may have been caused by the following argument(s):
- argument 0: [1mCannot determine Numba type of <class 'method'>[0m
- argument 1: [1mCannot determine Numba type of <class 'method'>[0m 

This error may have been caused by the following argument(s):
- argument 0: [1mCannot determine Numba type of <class 'method'>[0m
- argument 1: [1mCannot determine Numba type of <class 'method'>[0m 

This error may have been caused by the following argument(s):
- argument 0: [1mCannot determine Numba type of <class 'method'>[0m
- argument 1: [1mCannot determine Numba type of <class 'method'>[0m


In [427]:
def rBergomi_pricer_numba(H, eta, rho, v0, tau, K, S0, MC_samples=40000):
    """Computes European Call price under rBergomi dynamics using Monte Carlo."""
    
    # Initialize parameters
    s = int(365 * tau)  # Ensure s matches the discretization
    dW1 = np.random.randn(MC_samples, s)
    dW2 = np.random.randn(MC_samples, s)
    
    # Compute paths using Numba functions
    Y = Y_numba(dW1, MC_samples, s, H - 0.5, s)  # Compute volatility process
    V = V_numba(Y, v0, eta, H, tau, s)  # Compute variance process
    dB = dB_numba(dW1, dW2, rho)  # Compute correlated Brownian motion
    S = S_numba(V[:, :-1], dB, S0, tau / s)  # Compute price paths
    
    # Extract final prices and compute payoffs
    ST = S[:, -1]
    payoffs = np.maximum(ST - K, 0)
    price = np.mean(payoffs)
    
    return price

In [429]:
# Compute prices
price_python, _ = da.rBergomi_pricer(H, eta, rho, v0, tau, K, S0, MC_samples)  # ✅ Fix tuple issue
price_numba = rBergomi_pricer_numba(H, eta, rho, v0, tau, K, S0, MC_samples)

# Debugging extreme values
print("🔍 First 10 values of V (Numba):", V_trimmed[0, :10])
print("🔍 First 10 values of S (Numba):", S[0, :10])

# Print results
print("\n🔴 Python rBergomi Pricer:", price_python)
print("🟢 Numba rBergomi Pricer:", price_numba)

# Compare differences
abs_diff = np.abs(price_python - price_numba)
rel_diff = abs_diff / (price_python + 1e-12)  # Avoid division by zero

print("\n📉 Absolute Difference:", abs_diff)
print("📊 Relative Difference:", rel_diff)

TypingError: Failed in nopython mode pipeline (step: nopython frontend)
[1m[1m[1mNo implementation of function Function(<built-in function getitem>) found for signature:
 
 >>> getitem(array(float64, 2d, C), Tuple(slice<a:b>, int64, Literal[int](1)))
 
There are 22 candidate implementations:
[1m      - Of which 20 did not match due to:
      Overload of function 'getitem': File: <numerous>: Line N/A.
        With argument(s): '(array(float64, 2d, C), Tuple(slice<a:b>, int64, int64))':[0m
[1m       No match.[0m
[1m      - Of which 1 did not match due to:
      Overload in function 'GetItemBuffer.generic': File: numba/core/typing/arraydecl.py: Line 211.
        With argument(s): '(array(float64, 2d, C), Tuple(slice<a:b>, int64, int64))':[0m
[1m       Rejected as the implementation raised a specific error:
         NumbaTypeError: [1mcannot index array(float64, 2d, C) with 3 indices: Tuple(slice<a:b>, int64, int64)[0m[0m
  raised from /opt/anaconda3/lib/python3.12/site-packages/numba/core/typing/arraydecl.py:133
[1m      - Of which 1 did not match due to:
      Overload in function 'GetItemBuffer.generic': File: numba/core/typing/arraydecl.py: Line 211.
        With argument(s): '(array(float64, 2d, C), Tuple(slice<a:b>, int64, Literal[int](1)))':[0m
[1m       Rejected as the implementation raised a specific error:
         NumbaTypeError: [1mcannot index array(float64, 2d, C) with 3 indices: Tuple(slice<a:b>, int64, Literal[int](1))[0m[0m
  raised from /opt/anaconda3/lib/python3.12/site-packages/numba/core/typing/arraydecl.py:133
[0m
[0m[1mDuring: typing of intrinsic-call at /var/folders/17/z52ptwgn5qx8rp5fr81jwqs00000gn/T/ipykernel_70825/820969963.py (9)[0m
[1m
File "../../../../var/folders/17/z52ptwgn5qx8rp5fr81jwqs00000gn/T/ipykernel_70825/820969963.py", line 9:[0m
[1m<source missing, REPL/exec in use?>[0m

[0m[1mDuring: Pass nopython_type_inference[0m