In [1]:
import numpy as np
import math
import pandas as pd
from scipy import optimize
import matplotlib.pyplot as plt
from financepy.models.black import *
from financepy.utils.global_types import *
import Fourier as fourier

import warnings
warnings.filterwarnings('ignore')

from enum import Enum
class Calib_type(Enum):
    PRICE = 1
    IVOL = 2

%load_ext autoreload
%autoreload 2

####################################################################
#  FINANCEPY BETA Version 0.370 - This build: 28 Oct 2024 at 20:26 #
#     This software is distributed FREE AND WITHOUT ANY WARRANTY   #
#  Report bugs as issues at https://github.com/domokane/FinancePy  #
####################################################################



In [2]:
def blackImpliedVol(forward,   # Forward rate F
                    K,         # Strike Rate K
                    T,         # Time to Expiry (years)
                    DF,        # df RFR to expiry date
                    callOrPut, # FinOptionTypes.EUROPEAN_CALL or FinOptionTypes.EUROPEAN_PUT
                    price
                    ):    
    min_ivol, max_ivol = 0.001, 1.0
    f = lambda ivol: Black(ivol).value(forward, K, T, DF, callOrPut) - price    
    return optimize.bisect(f, min_ivol, max_ivol)    

def blackVega(forward, K, T, DF, ivol, callOrPut):
    return Black(ivol).vega(forward, K, T, DF, callOrPut)

### Dataset 1

In [3]:
S0 = 328.29
#Ts = [0.1753424, 0.4246575, 0.9232876]
#rs = [0.000553778, 0.000659467, 0.000850338]

Ts = [0.9232876]
rs = [0.000850338]
Ks = [275, 300, 325, 350, 375]
callOrPut = OptionTypes.EUROPEAN_CALL
call_option_prices = {'Strike': Ks,
                       # 'T = ' + str(Ts[0]): [56.9, 36.3, 19.6, 9.45, 4.3],
                       # 'T = ' + str(Ts[1]): [63.2, 44.9, 30.55, 20.05, 12.5],
                        'T = ' + str(Ts[0]): [77.55, 61.45, 48.9, 38.45, 29.5],
                       } 
df_call_option_prices = pd.DataFrame(call_option_prices)
df_call_option_prices = df_call_option_prices.set_index('Strike')

### Find the implied vol from call option prices, ease for checking calibration error
df_mkt_ivols = pd.DataFrame()
df_mkt_ivols['Strike'] = Ks

for r, T in zip(rs, Ts):    
    DF = np.exp(-r*T)
    forward = S0 * np.exp(r*T)
    ivols = [blackImpliedVol(forward, K, T, DF, callOrPut, df_call_option_prices['T = ' + str(T)].loc[K]) for K in Ks]     
    df_mkt_ivols['T = ' + str(T)] = np.array(ivols)    

df_mkt_ivols = df_mkt_ivols.set_index('Strike')

print(df_mkt_ivols)

        T = 0.9232876
Strike               
275          0.400809
300          0.382711
325          0.378430
350          0.374690
375          0.368094


### Heston calibration objective function

In [4]:
#input: 
#   var_0 = params[0]
#   mean_rev_speed = params[1]
#   mean_rev_level = params[2]
#   vol_of_var = params[3]
#   corr = params[4]    
#   calib_type
#output: list of residuals
def heston_obj_func(S0, Ks, Ts, rs, mkt_call_data, params, calib_type, vega_data):  
    vol_0 = np.sqrt(params[0])    
    residuals = []
    for r, T in zip(rs, Ts):    
        DF = np.exp(-r*T)
        forward = S0 * np.exp(r*T)
        mdl_prices = [DF * fourier.carr_madan_heston_call_option(forward, vol_0, params[1], params[2], params[3], params[4], T, K) for K in Ks]      
        mkt_prices = list(mkt_call_data['T = ' + str(T)])
        res = [(mdl/mkt-1.0)**2 for mdl, mkt, vega in zip(mdl_prices, mkt_prices, vegas)]                
        residuals.extend(res)     
        
    return residuals

def unpack_heston_calib_params(heston_calib_res):
    var_0 = heston_calib_res.x[0]    
    mean_rev_speed = heston_calib_res.x[1]
    mean_rev_level = heston_calib_res.x[2]
    vol_of_var = heston_calib_res.x[3]
    corr = heston_calib_res.x[4]   
    return var_0, mean_rev_speed, mean_rev_level, vol_of_var, corr

#initial values
var_0, mean_rev_speed, mean_rev_level, vol_of_var, corr = 0.1, 1, 0.5, 0.8, -0.5   
init_param_heston = np.array([var_0, mean_rev_speed, mean_rev_level, vol_of_var, corr])

### Perform Heston Calibration 

In [5]:
# for all maturities
obj_function = lambda p: heston_obj_func(S0, Ks, Ts, rs, df_call_option_prices, p, Calib_type.PRICE, df_vegas_w)
heston_calib_res_price = optimize.least_squares(obj_function, init_param_heston, method='lm', xtol=1E-8)
print(heston_calib_res_price.x)

NameError: name 'df_vegas_w' is not defined

In [None]:
[print(round(i, 4)) for i in heston_calib_res_price.x]

### Check the calibration result

In [None]:
var_0, mean_rev_speed, mean_rev_level, vol_of_var, corr = unpack_heston_calib_params(heston_calib_res_price)
vol_0 = np.sqrt(var_0)
K1s = np.linspace(250, 400, 21)
df_heston_calibrated = pd.DataFrame()
df_heston_calibrated['Strike'] = K1s

for r, T in zip(rs, Ts):    
    DF = np.exp(-r*T)
    forward = S0 * np.exp(r*T)
    call_values = [DF * fourier.carr_madan_heston_call_option(forward, vol_0, mean_rev_speed, mean_rev_level, vol_of_var, corr, T, K) for K in K1s]      
    ivols = [blackImpliedVol(forward, K, T, DF, callOrPut, call_value) for call_value, K in zip(call_values, K1s)]       
    df_heston_calibrated['T = ' + str(T)] = np.array(ivols)

df_heston_calibrated = df_heston_calibrated.set_index('Strike')

#---------------------------------------------------------------------
ax = df_heston_calibrated.plot(grid=True, figsize=(7, 5), color = ['blue','orange','green'])
df_mkt_ivols.plot(ax=ax, grid=True, figsize=(7, 5), style='o', color = ['blue','orange','green'])

plt.title("Heston Calibration Results")
plt.ylim(0.31, 0.42)
plt.xlabel("Strike")
plt.ylabel("Lognormal Implied Volatility")
#plt.savefig("L4_heston_calib.png")

plt.show()

### MJD calibration objective function

In [30]:
#input: 
#   vol = params[0]
#   a = params[1]
#   b = params[2]
#   lam = params[3]  
#output: list of residuals
def MJD_obj_func(S0, Ks, Ts, rs, mkt_call_data, params, calib_type, vega_data):     
    residuals = []
    for r, T in zip(rs, Ts):    
        DF = np.exp(-r*T)
        forward = S0 * np.exp(r*T)
        mdl_prices = [DF * fourier.carr_madan_MJD_call_option(forward, params[0], params[1], params[2], params[3], T, K) for K in Ks]      
        mkt_prices = list(mkt_call_data['T = ' + str(T)])
        vegas = list(vega_data['T = ' + str(T)])
        res = [vega*(mdl/mkt-1.0)**2 for mdl, mkt, vega in zip(mdl_prices, mkt_prices, vegas)]  
        residuals.extend(res)     
        
    return residuals

def unpack_MJD_calib_params(MJD_calib_res):
    vol = MJD_calib_res.x[0]    
    a = MJD_calib_res.x[1]
    b = MJD_calib_res.x[2]
    lam = MJD_calib_res.x[3]
    return vol, a, b, lam

#initial values
vol = 0.2; a = -0.5; b = 1.44; lam = 0.12
init_param_MJD = np.array([vol, a, b, lam])

#[ 0.26830666 -1.344381    1.4427396   0.11208649]

### Perform MJD calibration

In [None]:
#initial values 
obj_function = lambda p: MJD_obj_func(S0, Ks, Ts, rs, df_call_option_prices, p, Calib_type.PRICE, df_vegas_w)
MJD_calib_res_price = optimize.least_squares(obj_function, init_param_MJD, method='lm', xtol=1E-8 )
print(MJD_calib_res_price.x)

In [None]:
vol, a, b, lam = unpack_MJD_calib_params(MJD_calib_res_price)

K1s = np.linspace(250, 400, 21)
df_MJD_calibrated = pd.DataFrame()
df_MJD_calibrated['Strike'] = K1s

for r, T in zip(rs, Ts):    
    DF = np.exp(-r*T)
    forward = S0 * np.exp(r*T)
    call_values = [DF * fourier.carr_madan_MJD_call_option(forward, vol, a, b, lam, T, K) for K in K1s]      
    ivols = [blackImpliedVol(forward, K, T, DF, callOrPut, call_value) for call_value, K in zip(call_values, K1s)]       
    df_MJD_calibrated['T = ' + str(T)] = np.array(ivols)

df_MJD_calibrated = df_MJD_calibrated.set_index('Strike')

#---------------------------------------------------------------------
ax = df_MJD_calibrated.plot(grid=True, figsize=(7, 5), color = ['blue','orange','green'])
df_mkt_ivols.plot(ax=ax, grid=True, figsize=(7, 5), style='o', color = ['blue','orange','green'])

plt.ylim(0.31, 0.42)
plt.title("MJD Calibration Results")
plt.xlabel("Strike")
plt.ylabel("Lognormal Implied Volatility")
plt.savefig("L4_MJD_calib_vega_weighted.png")