In [1]:
# Mount Google Drive - applicable, if working on Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Set Working Directory - if working on Google Drive
%cd /content/drive/MyDrive/Colab Notebooks

Mounted at /content/drive
/content/drive/MyDrive/Colab Notebooks


In [2]:
import numpy as np
from scipy.integrate import quad
from scipy.special import iv  # Modified Bessel function of the first kind
import warnings
warnings.filterwarnings("error", category=RuntimeWarning)

# Define C_j(φ, T - t)
def C_j(phi, T_t, b_j, d_j, q_j, kappa, theta, sigma, rho, r):
    I = 1j  # sqrt(-1)
    term1 = r * I * phi * T_t
    term2 = (kappa * theta / sigma**2) * ((b_j - rho * sigma * I * phi + d_j) * T_t)
    try:
        term3 = -2 * (kappa * theta / sigma**2) * (np.log(1 - q_j) + (d_j * T_t) - np.log(1 - q_j))
    except RuntimeWarning as e:
        print(f"Caught a RuntimeWarning: {e}")
    return term1 + term2 + term3

# Define D_j(φ, T - t)
def D_j(phi, T_t, b_j, d_j, q_j, sigma, rho):
    I = 1j  # sqrt(-1)
    numerator = b_j - rho * sigma * I * phi + d_j
    denominator = sigma**2
    term1 = numerator / denominator
    try:
        temp = np.exp(d_j * T_t)
        term2 = np.log(1 - temp) - np.log(1 - q_j * temp)
    except:
        return term1 / q_j
    return term1 * np.exp(term2)


# Define the characteristic function f_j
def f_j(j, S_t, v_t, T_t, phi, kappa, theta, sigma, rho, r):
    I = 1j  # Define sqrt(-1) as the imaginary unit
    ln_S_t = np.log(S_t)

    if j == 1:
        b_j = kappa - sigma * rho
        u_j = 1/2
    elif j == 2:
        b_j = kappa + sigma * rho
        u_j = -1/2
    d_j = np.sqrt((rho * sigma * I * phi - b_j)**2 - sigma**2 * (2 * u_j * I * phi - phi**2))
    q_j = (b_j - rho * sigma * I * phi + d_j) / (b_j - rho * sigma * I * phi - d_j)
    result = np.exp(I * phi * ln_S_t + C_j(phi, T_t, b_j, d_j, q_j, kappa, theta, sigma, rho, r) + D_j(phi, T_t, b_j, d_j, q_j, sigma, rho) * v_t)
    return result

# Define the main function P_j
def P_j(t, S_t, v_t, j, T, kappa, theta, sigma, rho, r, K=1):
    # Define the integrand
    def integrand(phi):
        I = 1j  # sqrt(-1)
        f_j_value = f_j(j, S_t, v_t, T - t, phi, kappa, theta, sigma, rho, r)
        return max(np.real(np.exp(-I * phi * np.log(K)) * f_j_value / (I * phi)),1e6)

    integral_value, _ = quad(integrand, 0, np.inf,limit=100000)
    # Combine the components of P_j
    P_value = 0.5 + (1 / np.pi) * integral_value
    return max(min(P_value,1e4),-1e4)

# Define the probability density function f(v|ν(t))
def f_v_given_vega(t, t_star, vega, kappa, theta, sigma, rho):
    B = (4 * kappa * (1 - np.exp(-(kappa - rho * sigma) * (t_star - t)))) / sigma**2
    R = 4 * kappa * theta / sigma**2
    Lambda = B * np.exp(-(kappa - rho * sigma) * (t_star - t)) * vega
    term1 = B / 2
    term2 = np.exp(min(700,-(B * vega + Lambda)) / 2)
    term3 = (B * vega / Lambda) ** ((R / 2 - 1) / 2)
    term4 = iv(R / 2 - 1, np.sqrt(Lambda * B * vega))  # Modified Bessel function
    try:
        result = term1 * term2 * term3 * term4
    except:
        return 0
    return max(min(result,1e6),-1e6)


# Define the integral of P̂_j
def P_hat_j(S_t, j, params):
    vega_t, kappa, theta, sigma, rho, t, t_star, T, r = params
    def integrand(v):
        inte_p = P_j(t_star, S_t, v, j, T, kappa, theta, sigma, rho, r)
        inte_f = f_v_given_vega(t, t_star, v, kappa, theta, sigma, rho)
        return inte_p * inte_f
    integral_value, _ = quad(integrand, 0, np.inf)
    return max(min(integral_value,10),-5)

# Define the Heston option price C_FWS
def C_FWS(S_t, params):
    vega_t, kappa, theta, sigma, rho, t, t_star, T, r = params
    P_hat_1 = P_hat_j(S_t, 1, params)
    P_hat_2 = P_hat_j(S_t, 2, params)
    term1 = S_t * P_hat_1
    term2 = np.exp(-r * (t_star - t)) * P_hat_2
    return term1 - term2

In [4]:
pip install QuantLib

Collecting QuantLib
  Downloading QuantLib-1.36-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.1 kB)
Downloading QuantLib-1.36-cp38-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (19.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m19.6/19.6 MB[0m [31m73.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: QuantLib
Successfully installed QuantLib-1.36


In [5]:
import os
import math
import numpy as np
import pandas as pd
import QuantLib as ql
from scipy.optimize import minimize
import warnings
warnings.filterwarnings('ignore')
import multiprocessing
import logging
logging.getLogger().setLevel(logging.DEBUG)
np.random.seed(42)
nasdaq_holidays = [
    '2018-01-01', '2018-01-15', '2018-02-19', '2018-03-30', '2018-05-28',
    '2018-07-04', '2018-09-03', '2018-11-22', '2018-12-25',
    '2019-01-01', '2019-01-21', '2019-02-18', '2019-04-19', '2019-05-27',
    '2019-07-04', '2019-09-02', '2019-11-28', '2019-12-25',
    '2020-01-01', '2020-01-20', '2020-02-17', '2020-04-10', '2020-05-25',
    '2020-07-03', '2020-09-07', '2020-11-26', '2020-12-25',
    '2021-01-01', '2021-01-18', '2021-02-15', '2021-04-02', '2021-05-31',
    '2021-07-05', '2021-09-06', '2021-11-25', '2021-12-24',
    '2022-01-01', '2022-01-17', '2022-02-21', '2022-04-15', '2022-05-30',
    '2022-07-04', '2022-09-05', '2022-11-24', '2022-12-26',
    '2023-01-01', '2023-01-16', '2023-02-20', '2023-04-07', '2023-05-29',
    '2023-07-04', '2023-09-04', '2023-11-23', '2023-12-25'
]
nasdaq_holidays = np.array(nasdaq_holidays, dtype='datetime64[D]')


def IR_effect(init_date, start_date, end_date):
    if start_date == end_date:
        return 0.0
    else:
        tau = np.busday_count(start_date, end_date, holidays=nasdaq_holidays) / 252
        if tau == 0.0:
            return 0.0
        else:
            beta0 = IRParams.loc[IRParams.Date == init_date, 'BETA0'].item()
            beta1 = IRParams.loc[IRParams.Date == init_date, 'BETA1'].item()
            beta2 = IRParams.loc[IRParams.Date == init_date, 'BETA2'].item()
            tau1 = IRParams.loc[IRParams.Date == init_date, 'TAU1'].item()

            r = beta0 + beta1*(1-math.exp(-tau/tau1))/(tau/tau1) + beta2*((1-math.exp(-tau/tau1))/(tau/tau1)-math.exp(-tau/tau1))
            return r/100


def date_asQuantLib(t):
    result = pd.to_datetime(t)
    return ql.Date(result.day, result.month, result.year)


def read_all_paths(directory):
    all_paths = []
    for root, dirs, files in os.walk(directory):
        for name in files:
            file_path = os.path.join(root, name)
            all_paths.append(file_path)
        for name in dirs:
            dir_path = os.path.join(root, name)
            all_paths.append(dir_path)
    return all_paths


# Define a helper to price options using the Heston model
def model_price(strike, expiry, model):
    payoff = ql.PlainVanillaPayoff(ql.Option.Call, strike)
    exercise = ql.EuropeanExercise(expiry)
    option = ql.VanillaOption(payoff, exercise)
    engine = ql.AnalyticHestonEngine(model)
    if engine is None:
        print("Pricing engine is not initialized properly.")
    option.setPricingEngine(engine)
    return option.NPV()


# Define the objective function (least squares + Tikhonov regularization)
def objective_function(params, market_data, alpha, prior_params):
    v0, kappa, theta, sigma, rho = params

    strikes = market_data['strike_prices']
    market_prices = market_data['market_prices']
    expiries = market_data['expiry']
    rf = market_data['risk_free_curve']

    heston_process = ql.HestonProcess(ql.YieldTermStructureHandle(rf),
                                      market_data['dividend_yield'], market_data['spot_price'],
                                      v0, kappa, theta, sigma, rho)
    heston_model = ql.HestonModel(heston_process)

    # Calculate the sum of squared errors between model and market prices
    errors = np.sum([(model_price(strike, expiry, heston_model) - market_price) ** 2
                     for strike, market_price, expiry in zip(strikes, market_prices, expiries)])

    # Tikhonov regularization (penalty for deviating from prior guess)
    regularization = alpha * np.sum((params - prior_params) ** 2)

    return errors + regularization


# Calibrate the parameters on each t0
def calibrateHestonT0(df, t0, S0, r0, init_params):
    valuation_date = t0
    ql.Settings.instance().evaluationDate = valuation_date
    spot_handle = ql.QuoteHandle(ql.SimpleQuote(S0))
    dividend_yield = ql.YieldTermStructureHandle(ql.FlatForward(valuation_date, 0.0, day_count))
    risk_free_curve = ql.FlatForward(valuation_date, ql.QuoteHandle(ql.SimpleQuote(r0)), day_count)

    market_data = {
        'strike_prices': df['K'],
        'market_prices': (df['ask'] + df['bid']) / 2,  # Observed market option prices
        'expiry': df['t'],
        'spot_price': spot_handle,
        'dividend_yield': dividend_yield,
        'risk_free_curve': risk_free_curve
    }

    initial_guess = init_params
    prior_params = init_params  # Prior guess for Tikhonov
    alpha = 0.01  # Regularization parameter

    result = minimize(
        objective_function,
        x0=initial_guess,
        args=(market_data, alpha, prior_params),
        method='L-BFGS-B',  # Optimization method, can use other methods
        bounds=[(0.01, 0.2), (0.1, 10), (0.001, 0.2), (0.001, 10.0), (-0.9, 0.9)]  # Bounds on parameters
    )

    return result.x


def calc_Payoff(S1, S2):
    return np.maximum(S2 - S1, 0)


def calc_GeekingHedge(S0, params, alpha):
    delta = C_FWS(S0*(1+alpha), params) - C_FWS(S0, params)
    return delta / (alpha * S0)

def Heston_main(item):
    pair, data = item
    k = 1
    print(pair)
    ticker = pair.split('_')[0].lower()
    stock_df = pd.read_csv('data/adjusted_Stock_Daily/{}_stock_daily_adjusted.csv'.format(ticker), index_col=0)
    stock_df = stock_df.set_index('date')

    t0List = data['t0'].unique()
    t0List.sort()
    params_dict = {}
    gap_list = []
    for i in range(len(t0List)-1,-1,-1):
        t0 = t0List[i]
        df = data[data['t0'] == t0]

        df1 = df[['t0', 'T1', 'K1', 'C1_ask', 'C1_bid']]
        df2 = df[['t0', 'T2', 'K2', 'C2_ask', 'C2_bid']]
        df1.columns = ['t0', 't', 'K', 'ask', 'bid']
        t1 = df1['t'].unique()[0]
        df1['t'] = date_asQuantLib(t1)
        df2.columns = ['t0', 't', 'K', 'ask', 'bid']
        t2 = df2['t'].unique()[0]
        df2['t'] = date_asQuantLib(t2)
        df = pd.concat([df1, df2]).dropna()

        S0 = stock_df.loc[t0]['adjusted_price']
        r = IR_effect(t0, t0, t2)
        params = calibrateHestonT0(df, date_asQuantLib(t0), S0, r, init_params)
        params_dict[t0] = params.copy()
        payoff_Actual = calc_Payoff(stock_df.loc[t1, 'adjusted_price'], stock_df.loc[t2, 'adjusted_price'])

        T1 = day_count.yearFraction(date_asQuantLib(t0), date_asQuantLib(t1))
        T2 = day_count.yearFraction(date_asQuantLib(t0), date_asQuantLib(t2))
        params = np.concatenate((params, np.array([r,0,T1,T2])))
        price = C_FWS(S0, params)
        t_i = pd.to_datetime(t0)
        while t_i < pd.to_datetime(t2):
            ti_str = t_i.strftime('%Y-%m-%d')
            S_i = stock_df.loc[ti_str:].iloc[0, 0]
            r = IR_effect(ti_str, ti_str, t2)
            t = max([date for date in t0List if date <= ti_str])
            params_hedge = params_dict[t]
            if t == t_i:
                params_hedge = np.concatenate((params_hedge, np.array([r,0,T1,T2])))
            else:
                params_hedge = np.concatenate((params_hedge, np.array([r, day_count.yearFraction(date_asQuantLib(t_i), date_asQuantLib(t)),T1,T2])))
            delta_i = calc_GeekingHedge(S_i, params_hedge, alpha=0.001)
            t_i += pd.Timedelta(days=1)
            T1 = day_count.yearFraction(date_asQuantLib(t_i), date_asQuantLib(t1))
            T2 = day_count.yearFraction(date_asQuantLib(t_i), date_asQuantLib(t2))
            t_i = max(t_i, pd.to_datetime(t2))
            price += delta_i * (stock_df.loc[ti_str:].iloc[0, 0] - S_i)
        gap_list.append((price - payoff_Actual) / S0)
    timeToMaturity = np.busday_count(pd.to_datetime(t0List).values.astype('datetime64[D]'),
                                     pd.to_datetime([t1]).values.astype('datetime64[D]'),
                                     holidays=nasdaq_holidays)
    gap_list = pd.DataFrame(gap_list, index=timeToMaturity, columns=[pair]).T
    gap_list.to_csv('result_Heston/{}.csv'.format(pair))
    return gap_list

def collect_results(result):
    if result is not None and not result.empty:
        results.append(result)
    else:
        logging.warning("Received an empty or None result, skipping.")

In [None]:
if __name__ == "__main__":
    v0 = 0.04  # Initial variance
    kappa = 0.8  # Rate of mean reversion
    theta = 0.04  # Long-term variance
    sigma = 0.1  # Volatility of volatility
    rho = -0.2  # Correlation (can be negative)
    init_params = np.array([v0, kappa, theta, sigma, rho])

    IRParams = pd.read_csv('interest_rates_parameters.csv', parse_dates=['Date'], dayfirst=True)
    IRParams = IRParams.fillna(method='ffill')
    IRParams.head()

    calendar = ql.UnitedStates(ql.UnitedStates.NYSE)
    day_count = ql.Actual365Fixed()
    N = 1000  # Number of time steps
    M = 100000

    data_path = f'data/options_call_askbid/'
    tickerList = ['TSLA']
    paths = os.listdir(data_path)
    paths = [file for file in paths if file[-4:] == '.csv' and file.split('_')[0] in tickerList]

    # gap_df = pd.read_csv('result_hestonModel.csv',index_col=0)
    # gap_df.columns = np.array(pd.to_numeric(gap_df.columns).astype('int'))
    gap_df = pd.DataFrame()
    data_dict = {}
    for path in paths:
        if 'checkpoint' in path:
            continue
        if '.DS_Store' in path:
            continue
        data = pd.read_csv(data_path+path, index_col=0)
        pair = path.split('.')[0].split('/')[-1]
        if pair not in gap_df.index:
            data_dict[pair] = data

    manager = multiprocessing.Manager()
    results = manager.list()

    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        for item in data_dict.items():
            pool.apply_async(Heston_main, args=(item,), callback=collect_results)

        pool.close()
        pool.join()

    combined_result = pd.concat(results)

    combined_result.to_csv('result_Heston.csv')
    logging.info("All tasks completed and results saved to result.csv.")

In [None]:
for item in data_dict.items():
    Heston_main(item)

In [None]:
params = np.array([ 0.10288822,  0.79690756,  0.0293103 ,  1.15725764, -0.05981958])
S0 = 316.57999992370605
T1 = day_count.yearFraction(date_asQuantLib('2022-12-12'), date_asQuantLib('2023-05-19'))
T2 = day_count.yearFraction(date_asQuantLib('2022-12-12'), date_asQuantLib('2023-06-16'))
r = IR_effect('2022-12-12', '2022-12-12', '2023-06-16')
params = np.concatenate((params, np.array([0,T1,T2,r])))
C_FWS(S0, params)

0.0

In [None]:
data

Unnamed: 0,t0,T1,K1,C1_ask,C1_bid,T2,K2,C2_ask,C2_bid,Adj_S0
0,2022-12-12,2023-05-19,165.0,31.500000,30.300000,2023-06-16,66.67,104.859808,104.059808,167.820007
1,2022-12-12,2023-05-19,170.0,29.150000,27.900000,2023-06-16,165.00,32.495029,31.995029,167.820007
2,2022-12-12,2023-05-19,175.0,26.650000,25.550000,2023-06-16,166.67,32.200000,31.800000,167.820007
3,2022-12-12,2023-05-19,180.0,24.750000,23.550000,2023-06-16,170.00,31.027243,30.677243,167.820007
4,2022-12-12,2023-05-19,200.0,17.550000,16.200000,2023-06-16,180.00,25.876051,25.676051,167.820007
...,...,...,...,...,...,...,...,...,...,...
211,2022-12-30,2023-05-19,180.0,6.650000,6.300000,2023-06-16,450.00,0.350000,0.270000,123.180000
212,2022-12-30,2023-05-19,185.0,5.950000,5.650000,2023-06-16,466.67,0.300000,0.240000,123.180000
213,2022-12-30,2023-05-19,190.0,5.300000,5.050000,2023-06-16,500.00,0.240000,0.210000,123.180000
214,2022-12-30,2023-05-19,195.0,4.746843,4.446843,2023-06-16,600.00,0.130000,0.090000,123.180000
