In [1]:
from mpmath import mp, log, sqrt, matrix, mpc, mpf
import functools
import matplotlib.pyplot as plt
from mpmath import linspace, quadsubdiv

mp.dps = 30  # Set high precision

def compute_C_and_D_star_functions(**params):
    """
    Create functions for computing C*, C0*, and D* using mpmath for high-precision complex arithmetic
    """
    # Extract parameters
    kappa = mp.mpf(params['kappa'])
    alpha_1 = mp.mpf(params['alpha_1'])
    beta_1 = mp.mpf(params['beta_1'])
    gamma_1_star = mp.mpf(params['gamma_1_star'])
    omega_1 = mp.mpf(params['omega_1'])
    alpha_2 = mp.mpf(params['alpha_2'])
    beta_2 = mp.mpf(params['beta_2'])
    gamma_2_star = mp.mpf(params['gamma_2_star'])
    omega_2 = mp.mpf(params['omega_2'])
    sigma = mp.mpf(params['sigma'])
    rho = mp.mpf(params['rho'])
    r = mp.mpf(params['r'])

    
    def canonicalize_params(p):
        return tuple((k, str(v)) for k, v in sorted(p.items()))


    cache_key = hash(canonicalize_params(params))


    def compute_A_1(u, v):
        u = unwrap_mpc(u)
        w1 = v[0] * alpha_1
        w2 = v[1] * alpha_2
        denom = 1 - 2 * w2 * (1 - rho**2)
        a = w1 + (w2 * rho**2) / denom
        b = u - 2 * gamma_1_star * w1 - (2 * rho * gamma_2_star * w2) / denom
        c = gamma_1_star**2 * w1 + (gamma_2_star**2 * w2) / denom
        return kappa * (c + b**2 / (2 * (1 - 2 * a)) - 0.5 * u) + v[0] * beta_1

    def compute_A_2(u, v):
        u = unwrap_mpc(u)
        w1 = v[0] * alpha_1
        w2 = v[1] * alpha_2
        denom = 1 - 2 * w2 * (1 - rho**2)
        a = w1 + (w2 * rho**2) / denom
        b = u - 2 * gamma_1_star * w1 - (2 * rho * gamma_2_star * w2) / denom
        c = gamma_1_star**2 * w1 + (gamma_2_star**2 * w2) / denom
        return (1 - kappa) * (c + b**2 / (2 * (1 - 2 * a)) - 0.5 * u) + v[1] * beta_2

    def compute_B(u, v):
        u = unwrap_mpc(u)
        w1 = v[0] * alpha_1
        w2 = v[1] * alpha_2
        denom = 1 - 2 * w2 * (1 - rho**2)
        a = w1 + (w2 * rho**2) / denom
        term1 = -0.5 * log(1 - 2 * w2 * (1 - rho**2))
        term2 = -0.5 * log(1 - 2 * a)
        return term1 + term2 + u * r + v[0] * omega_1 + v[1] * omega_2

    @functools.lru_cache(None)
    def compute_C_star(u, v, t, T, key):
        return (compute_C1(u, v, t, T, key), compute_C2(u, v, t, T, key))

    @functools.lru_cache(None)
    def compute_C1(u, v, t, T, key):
        if T - t == 1:
            return compute_A_1(u, (mp.mpf(0), mp.mpf(0)))
        next_v = compute_C_star(u, v, t + 1, T, key)
        return compute_A_1(u, next_v)

    @functools.lru_cache(None)
    def compute_C2(u, v, t, T, key):
        if T - t == 1:
            return compute_A_2(u, (mp.mpf(0), mp.mpf(0)))
        next_v = compute_C_star(u, v, t + 1, T, key)
        return compute_A_2(u, next_v)

    @functools.lru_cache(None)
    def compute_D_star(u, v, t, T, key):
        if T - t == 1:
            return compute_B(u, (mp.mpf(0), mp.mpf(0)))
        next_v = compute_C_star(u, v, t + 1, T, key)
        next_D = compute_D_star(u, v, t + 1, T, key)
        return compute_B(u, next_v) + next_D

    def compute_C0_star(u, v, t, T, key):
        return mp.mpf(0)

    def wrap_mpc(u):
        if isinstance(u, mpc):
            return (u.real, u.imag)
        elif isinstance(u, tuple) and len(u) == 2:
            return u
        else:
            print(u)
            print(type(u))
            raise ValueError("Expected a complex number or a tuple of two numbers")
        
    def unwrap_mpc(u):
        if isinstance(u, tuple) and len(u) == 2:
            return mpc(u[0], u[1])
        else:
            raise ValueError("Expected a tuple of two numbers")

    def wrap_vec(v):
        if isinstance(v, matrix):
            # Flatten a column matrix (2x1) into a tuple
            if v.cols == 1:
                return tuple(mp.mpc(v[i, 0]) for i in range(v.rows))
            else:
                raise ValueError("Expected a column matrix of shape (2, 1)")
        else:
            raise ValueError("Expected a column matrix of shape (2, 1)")

    def wrapped_compute_C_star(u, v, t, T):
        return matrix(compute_C_star(wrap_mpc(u), wrap_vec(v), t, T, cache_key))

    def wrapped_compute_C0_star(u, v, t, T):
        return compute_C0_star(wrap_mpc(u), wrap_vec(v), t, T, cache_key)

    def wrapped_compute_D_star(u, v, t, T):
        return compute_D_star(wrap_mpc(u), wrap_vec(v), t, T, cache_key)

    return wrapped_compute_D_star, wrapped_compute_C_star, wrapped_compute_C0_star

In [2]:
kappa = 3.78e-2
lam = 0
alpha_1 = 1.71e-8
beta_1 = 9.83e-1
gamma_1_star = 9.91e2
omega_1 = 5.95e-14
alpha_2 = 1.59e-6
beta_2 = 4.08e-6
gamma_2_star = 7.85e2
omega_2 = 3.47e-12
sigma = 1.04e-5
rho = 1.00

r = 0.001

In [3]:
params = {
        "kappa" : kappa,
        "alpha_1" : alpha_1,
        "beta_1" : beta_1,
        "gamma_1_star" : gamma_1_star,
        "lam" : lam,
        "omega_1" : omega_1,
        "alpha_2" : alpha_2,
        "beta_2" : beta_2, 
        "gamma_2_star" : gamma_2_star,
        "omega_2" : omega_2,
        "sigma" : sigma,
        "rho" : rho,
        "r": r
    }

# Create D*, C0*, and C* functions
D_star, C_star, C0_star= compute_C_and_D_star_functions(**params)

In [4]:
# Define option parameters
option_params = {
    'K': 100  # Strike price
}

# Current state
t = 0
T = 20
y_t =  log(100)# Log of price at time t
asset_unconditional_variance = 1e-4
delta = 1
h_R = mpf(asset_unconditional_variance)
h_RV = mpf(asset_unconditional_variance)
# Initialize state vector with unconditional values
l_star_t = [h_R, h_RV]
print(l_star_t)

R = 10  # Real part of the contour for European call option (R > 1)


[mpf('0.00010000000000000000479217360238593'), mpf('0.00010000000000000000479217360238593')]


In [5]:
from mp_HedgeAndPrice import *

In [None]:
def integrand(y):
        z = mpc(R, y)
        log_psi1 = log_psi_1(t, z, y_t, l_star_t, A_star, B0_star, B_star, T)
        log_psi2 = log_psi_2(t, z, y_t, l_star_t, r, A_star, B0_star, B_star, T)
        psi1 = exp(log_psi1)
        psi2 = exp(log_psi2)
        psi_diff = psi2 - psi1
        f_val = f_check(z, **option_params)
        res = exp((z - 1) * y_t) * psi_diff * f_val * 1j
        return res * exp(-(y/1000)**2)  # damping factor

In [None]:
def plot_func(t, T, y_t, l_star_t, r, A_star, B0_star, B_star, f_check, option_params, range, R = 50):
    # y values
    y_vals = linspace(range[0], range[1], 1000)

    # Storage
    psi1_real, psi1_imag = [], []
    psi2_real, psi2_imag = [], []
    exp_real, exp_imag = [], []
    diff_real, diff_imag = [], []
    fval_real, fval_imag = [], []

    int_real, int_imag = [], []

    res_real, res_imag = [], []

    for y in y_vals:
        z = mpc(R, y)
        
        # Your functions
        log_psi_1_val = log_psi_1(t, z, y_t, l_star_t, A_star, B0_star, B_star, T)
        log_psi_2_val = log_psi_2(t, z, y_t, l_star_t, r, A_star, B0_star, B_star, T)
        
        psi_1 = exp(log_psi_1_val)
        psi_2 = exp(log_psi_2_val)
        exp_term = exp((z - 1) * y_t)
        psi_diff = psi_2 - psi_1
        f_val = f_check(z, **option_params)

        integrand = exp_term * psi_diff * f_val* 1j

        res = exp(z*y_t) * psi_1 * f_val
        res_real.append(res.real)
        res_imag.append(res.imag)

        
        # Store parts
        psi1_real.append(psi_1.real)
        psi1_imag.append(psi_1.imag)
        psi2_real.append(psi_2.real)
        psi2_imag.append(psi_2.imag)
        exp_real.append(exp_term.real)
        exp_imag.append(exp_term.imag)
        diff_real.append(psi_diff.real)
        diff_imag.append(psi_diff.imag)
        fval_real.append(f_val.real)
        fval_imag.append(f_val.imag)
        int_real.append(integrand.real)
        int_imag.append(integrand.imag)

    # Plot function
    def plot_complex_component(y_vals, real_vals, imag_vals, title):
        plt.figure(figsize=(10, 4))
        plt.plot(y_vals, real_vals, label='Real part')
        plt.plot(y_vals, imag_vals, label='Imag part', linestyle='--')
        plt.title(title)
        plt.xlabel('y')
        plt.ylabel('Value')
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        plt.show()

    # Generate plots
    plot_complex_component(y_vals, psi1_real, psi1_imag, 'ψ₁(z)')
    plot_complex_component(y_vals, psi2_real, psi2_imag, 'ψ₂(z)')
    plot_complex_component(y_vals, exp_real, exp_imag, 'exp((z-1)·yₜ)')
    plot_complex_component(y_vals, diff_real, diff_imag, 'ψ₂(z) - ψ₁(z)')
    plot_complex_component(y_vals, fval_real, fval_imag, 'f̂(z)')
    plot_complex_component(y_vals, int_real, int_imag, 'Integrand')
    plot_complex_component(y_vals, res_real, res_imag, 'price integrand')


In [None]:
plot_func(t, 50, y_t, l_star_t, 0.001, D_star, C0_star, C_star, f_check_call, {"K":100}, (-100, 100), R =500)

In [6]:
def risk_minimizing_hedge1(t, T, y_t, l_star_t, r, A_star, B0_star, B_star, f_check, option_params, R):
    def integrand(y):
        z = mpc(R, y)
        log_psi1 = log_psi_1(t, z, y_t, l_star_t, A_star, B0_star, B_star, T)
        log_psi2 = log_psi_2(t, z, y_t, l_star_t, r, A_star, B0_star, B_star, T)
        psi1 = exp(log_psi1)
        psi2 = exp(log_psi2)
        psi_diff = psi2 - psi1
        f_val = f_check(z, **option_params)
        res = exp((z - 1) * y_t) * psi_diff * f_val * 1j
        return res * exp(-(y/600)**2)  # damping factor
    
        
    res_tuple = 2 * quad(integrand, linspace(0, 1000, 100), maxdegree=10, method="tanh-sinh", error = True)
    integral_result = res_tuple[0]
    err = res_tuple[1]
    print(f"Integral result: {integral_result}, Error estimate: {err}")
    log_psi0 = log_psi_0(t, y_t, l_star_t, r, A_star, B0_star, B_star)
    psi0 = exp(log_psi0)-1
    result = (exp(-r * (T - t)) * integral_result / (2 * pi * psi0 * 1j))
    return result.real

def option_price1(t, T, y_t, l_star_t, r, A_star, B0_star, B_star, f_check, option_params, R):
    def integrand(y):
        z = mpc(R, y)
        res = (exp(z * y_t)
                * exp(log_psi_1(t, z, y_t, l_star_t, A_star, B0_star, B_star, T))
                * f_check(z, **option_params)) * 1j
        return res * exp(-(y/600)**2)  # damping factor

    res_tuple = 2 * quad(integrand, linspace(0, 1000, 100), maxdegree=10, method="tanh-sinh", error = True)
    integral_result = res_tuple[0]
    err = res_tuple[1]
    print(f"Integral result: {integral_result}, Error estimate: {err}")
    result = exp(-r * (T - t)) * integral_result / (2 * pi * 1j)
    return result.real

In [None]:
from mp_HedgeAndPrice import *

# Current state
t = 0
T = 20

current_price = 100
y_t =  log(current_price) # Log of price at time t

for K in [50, 75, 100, 125, 150]:
    option_params = {
        'K': K  # Strike price
    }

    print("K", K)
    print("TTM", T)
    print("Current Underlying Asset Price", current_price)
    moneyness = abs(log(K / current_price))
    print("Moneyness", moneyness)
    
    # Compute the risk-minimizing hedging position
    xi_t_plus_1 = risk_minimizing_hedge1(
        t, T, y_t, l_star_t, r,
        D_star, C0_star, C_star,
        f_check_call, option_params, 420
    )

    print(f"Risk-minimizing hedging position: {xi_t_plus_1}")

    # Compute the option price
    option_price_value = option_price1(
        t, T, y_t, l_star_t, r,
        D_star, C0_star, C_star,
        f_check_call, option_params, 20
    )

    print(f"Option price: {option_price_value}")


    bs_price = black_scholes_price(current_price, K, T/251, r, (asset_unconditional_variance * 251)**0.5, 'call')
    print("Black-Scholes Price:", bs_price)

K 50
TTM 20
Current Underlying Asset Price 100
Moneyness 0.693147180559945309417232121458


In [None]:
exp(log_psi_1(t, mpc(1, 0), y_t, l_star_t, D_star, C0_star, C_star, 20))

In [None]:
exp(0.001 * (20))

In [None]:
# Compute the risk-minimizing hedging position

xi_t_plus_1 = risk_minimizing_hedge1(
    t, 20, y_t, l_star_t, 0.001,
    D_star, C0_star, C_star,
    f_check_call, {"K":100}, 420
)
print(f"Risk-minimizing hedging position: {xi_t_plus_1}")

In [7]:

# Compute the option price
option_price_value = option_price1(
    t, 20, y_t, l_star_t, 0.001,
    D_star, C0_star, C_star,
    f_check_call, {"K":100}, R
)

print(f"Option price: {option_price_value}")

Integral result: (12.0224493514260160652659040684 + 9.58180210868474235733702747013j), Error estimate: 5.00000010000001001014820534e-32
Option price: 1.49479432097739497498092443155


In [8]:
import numpy as np
import pandas as pd

params = {
        "kappa" : kappa,
        "alpha_1" : alpha_1,
        "beta_1" : beta_1,
        "gamma_1_star" : gamma_1_star,
        "lam" : lam,
        "omega_1" : omega_1,
        "alpha_2" : alpha_2,
        "beta_2" : beta_2, 
        "gamma_2_star" : gamma_2_star,
        "omega_2" : omega_2,
        "sigma" : sigma,
        "rho" : rho,
        "r": r
    }

l_star_t = [1e-4, 1e-4]
# Create D*, C0*, and C* functions
D_star, C_star, C0_star= compute_C_and_D_star_functions(**params)

option_params = {"strike": 100}

def gen_data(D_star, C0_star, C_star, option_params, r):
    params["r"] = r
    D_star, C_star, C0_star= compute_C_and_D_star_functions(**params)
    

    # Create a grid
    moneyness_grid = linspace(0.8, 1.2, 20)  # 80% to 120% of strike
    TTM_grid = linspace(20, 200, 36)           # 5 days to 60 days

    MONEYNESS, TTM = np.meshgrid(moneyness_grid, TTM_grid)

    option_prices = np.zeros_like(MONEYNESS)
    hedge_ratios = np.zeros_like(MONEYNESS)

    # Generate the data
    for i in range(MONEYNESS.shape[0]):
        for j in range(MONEYNESS.shape[1]):
            spot_price = MONEYNESS[i, j] * option_params["strike"]
            y_t = np.log(spot_price)
            T = int(TTM[i, j])  # maturity in days
            t = 0
            
            try:
                price = option_price1(
                    t, T, y_t, l_star_t,
                    r, D_star, C0_star, C_star,
                    f_check_call, option_params, 20
                )
                hedge = risk_minimizing_hedge1(
                    t, T, y_t, l_star_t,
                    r, D_star, C0_star, C_star,
                    f_check_call, option_params, 420
                )
            except Exception as e:
                print(f"Error at i={i}, j={j}: {e}")
                price, hedge = np.nan, np.nan
            
            option_prices[i, j] = price
            hedge_ratios[i, j] = hedge

    # Save the data (optional)
    df = pd.DataFrame({
        "Moneyness": MONEYNESS.flatten(),
        "TTM": TTM.flatten(),
        "OptionPrice": option_prices.flatten(),
        "HedgeRatio": hedge_ratios.flatten()
    })
    df.to_csv(f"option_hedge_data_rrf={r}.csv", index=False)


In [9]:
for rate in [0.001, 0.01, 0.05]:
    gen_data(D_star, C0_star, C_star, option_params, rate)
    print(f"Data generated for r = {rate}")

TypeError: loop of ufunc does not support argument 0 of type mpf which has no callable log method