In [None]:
import numpy as np
import matplotlib.pyplot as plt
import scipy.integrate as integrate

from scipy.special import gamma
from tabulate import tabulate
from time import time

### Values for reference

In [None]:
european_call_ref = 0.056832
variance_swap_price_ref = 0.028295
variance_call_price_ref = 0.013517

### Kernel function

In [None]:
def K_t(t):
    if t == 0:
        return 0
    
    return t**(H-0.5)/gamma(H+0.5) #check

In [None]:
M = 10**3
n = 5

l = 0.3
nu = 0.3
theta = 0.02
rho = -0.7
H = 0.1

K = 1

V_0 = 0.02
S_0 = 1
Y_0 = np.log(S_0)

In [None]:
n_check_array = [4,10,20,40,80,160,320]

In [None]:
t_0 = 0
T = 1

In [None]:
def european_call_options(x):
    return np.maximum(x-K,0)

def asian_options(x):
    A_T = T*np.mean(x)
    
    return np.maximum(A_T-K,0)

def lookback_options(x):
    M_T = np.max(x)
    
    return np.maximum(M_T-K,0)

def variance_swap_price(x):
    return x

def variance_call_price(x):
    return np.maximum(x-V_0,0)

In [None]:
#t_delta = T/n
#t = [k*t_delta for k in range(n)]
#
#K_t_delta = np.zeros((n,n))
#for k in range(n):
#    for i in range(k):
#        K_t_delta[k,i] = K_t(t[k]-t[i])

# Risky asset price under the risk-neutral probability & Volatility
_Scheme (6)_

In [None]:
def S_V_process(n):

    t_delta = T/n
    t = [k*t_delta for k in range(n)]

    K_t_delta = np.zeros((n,n))
    for k in range(n):
        for i in range(k):
            K_t_delta[k,i] = K_t(t[k]-t[i])

    G1 = np.random.randn(n)
    G2 = np.random.randn(n)

    W = np.zeros(n)
    W_orth = np.zeros(n)

    for i in range(n-1):
        W[i+1] = W[i] + np.sqrt(t[i+1] - t[i])*G1[i]
        W_orth[i+1] = W[i] + np.sqrt(t[i+1] - t[i])*G2[i]
    
    Y_n = np.zeros(n)
    V_n = np.zeros(n)

    S_n = np.zeros(n)

    S_n[0] = np.exp(Y_0)
    
    Y_sum = 0
    V_sum = 0
    
    for k in range(1,n):
        Y_sum = Y_sum + (-0.5*V_n[k-1] + (t[k] - t[k-1]) + rho*np.sqrt(np.maximum(V_n[k-1],0))*(W[k] - W[k-1]) //
                      + np.sqrt(1 - rho**2)*np.sqrt(np.maximum(V_n[k-1],0))*(W_orth[k] - W_orth[k-1]))
        Y_n[k] = Y_0 + Y_sum

        for i in range(k):
            V_sum = V_sum + K_t(t[k]-t[i])*(theta - l*np.maximum(V_n[i],0))*(t[i+1] - t[i]) 
            + K_t(t[k]-t[i])*nu*np.sqrt(np.maximum(V_n[i],0))*(W[i+1] - W[i])
        
        #V_sum = sum(K_t_delta[k,i]*(theta - l*np.maximum(V_n[i],0))*(t[i+1] - t[i]) //
        #              + K_t_delta[k,i]*nu*np.sqrt(np.maximum(V_n[i],0))*(W[i+1] - W[i]) for i in range(k))
        
        V_n[k] = V_0 + V_sum

        S_n[k] = np.exp(Y_n[k])
   
    return (V_n,S_n)


### Test

In [None]:
function_8_scheme = european_call_options

In [None]:
S_n_M = np.zeros(M)

for m in range(M):
    S_n_m = S_V_process(n)[1]
    S_n_M[m] = S_n_m[-1]


U_M = np.mean(function_8_scheme(S_n_M))
var_M = np.var(function_8_scheme(S_n_M))

error = 2*var_M/np.sqrt(M)
confidence_interval = [U_M - 2*error, U_M + 2*error]

print("E =", U_M)
print("Var =", var_M)
print(confidence_interval)
#print(S_n_M)

In [None]:
U_M_n = np.zeros(n)

for i in range(n):
    S_n_M = np.zeros(M)

    for m in range(M):
        S_n_m = S_V_process(n)[1]
        S_n_M[m] = S_n_m[-1]
        
    U_M = np.mean(function_8_scheme(S_n_M))
    U_M_n[i] = U_M

integers1toN = np.arange(1,n+1)
cum_sum_n = np.cumsum(U_M_n)/integers1toN

plt.plot(integers1toN, cum_sum_n, color="b")

In [None]:
integers1toM = np.arange(1,M+1)
cum_sum = np.cumsum(function_8_scheme(S_n_M))/integers1toM

plt.plot(integers1toM, cum_sum, color="b")

# Integrated variance formulation (integrated-rough Heston model)
_Scheme (7)_

In [None]:
def S_X_process(n):
    
    t_delta = T/n
    t = [k*t_delta for k in range(n)]
    
    Y_n = np.zeros(n)
    X_n = np.zeros(n)
    X_n_max = np.zeros(n)
    
    S_n = np.zeros(n)

    M_n = np.zeros(n)
    M_n_orth = np.zeros(n)

    S_n[0] = np.exp(Y_0)
        
    Z = np.random.normal(0,1,n)
    Z_orth = np.random.normal(0,1,n)
    
    for k in range(n):
        #print(k)
        X_n_max[k] = np.max(X_n)

        for i in range(1,k):
            #print(i)
            M_n[k] = M_n[k] + np.sqrt(X_n_max[i] - X_n_max[i-1])*Z[i]
            M_n_orth[k] = M_n_orth[k] + np.sqrt(X_n_max[i] - X_n_max[i-1])*Z_orth[i]

        Y_n[k] = Y_0 - 0.5*X_n_max[k] + rho*M_n[k] + np.sqrt(1 - rho**2)*M_n_orth[k]

        X_sum = 0
        for i in range(k):
            #print(K_integral, K_t(t[k] - t[i])*theta*t[i]*(t[i+1]-t[i]))
            X_sum = X_sum + (-l*X_n_max[i] + nu*M_n[i])*(t[i+1]-t[i])
            #X_sum = X_sum + K_t(t[k] - t[i])*(theta*t[i] - l*X_n_max[i] + nu*M_n[i])*(t[i+1]-t[i])

        K_integral = integrate.quad(lambda s: theta*K_t(t[k]-s)*s,0,t[k])[0]
        X_sum = X_sum + K_integral
        
        X_n[k] = V_0*t[k] + X_sum
        
        S_n[k] = np.exp(Y_n[k])
    
    return (S_n, X_n) #S_n is not correct

### Test

In [None]:
function_9_scheme = variance_swap_price

In [None]:
X_n_M = np.zeros(M)

for m in range(M):
    X_n_m = S_X_process(n)[1]
    X_n_M[m] = X_n_m[-1]
    #print()

U_M = np.mean(function_9_scheme(X_n_M))
var_M = np.var(function_9_scheme(X_n_M))

error = 2*var_M/np.sqrt(M)
confidence_interval = [U_M - 2*error, U_M + 2*error]

print("E =", U_M)
print("Var =", var_M)
print(confidence_interval)

In [None]:
integers1toM = np.arange(1,M+1)
cum_sum = np.cumsum(function_9_scheme(X_n_M))/integers1toM

#plt.plot(integers1toM, cum_sum, color="b")

## Main algorithm

In [None]:
def main(process,f):
    U_M_n = np.zeros(n)
    errors = np.zeros(n)

    U_M_table = []
    errors_table = []
    time_table = []

    for i in range(n):
        t_0 = time()

        X_n_M = np.zeros(M)

        for m in range(M):
            X_n_m = process(i+1)[1]
            X_n_M[m] = X_n_m[-1]

        U_M = np.mean(f(X_n_M))
        U_M_n[i] = U_M

        var_M = np.var(f(X_n_M))

        error = 2*var_M/np.sqrt(M)
        errors[i] = error

        confidence_interval = [U_M - 2*error, U_M + 2*error]

        t_1 = time()

        if i in n_check_array:
            U_M_table.append(U_M)
            errors_table.append(error)
            time_table.append(t_1 - t_0)

    integers1toN = np.arange(1,n+1)
    cum_sum_n = np.cumsum(U_M_n)/integers1toN

    return {"U_M_table": U_M_table, 
            "errors_table": errors_table, 
            "time_table": time_table,
            "cum_sum_n": cum_sum_n,
            "errors": errors}

In [None]:
def scheme_8(f):
    return main(S_V_process, f)

In [None]:
def scheme_9(f):
    return main(S_X_process, f)

### Auxilary functions for displaying the results

In [None]:
def plot_estimations(scheme_8_data, scheme_9_data, title="E[f(S,X)]", ref_value=None):
    integers1toN = np.arange(1,n+1)
    
    fig, ax = plt.subplots()
    
    ax.plot(integers1toN, scheme_8_data['cum_sum_n'], label='Scheme (8)', color="blue")
    ax.fill_between(integers1toN, scheme_8_data['cum_sum_n'] - scheme_8_data['errors'], 
                    scheme_8_data['cum_sum_n'] + scheme_8_data['errors'], label='Confidence interval', 
                    color='blue', alpha=.1)
    
    ax.plot(integers1toN, scheme_9_data['cum_sum_n'], label='Scheme (9)', color="orange")
    ax.fill_between(integers1toN, scheme_9_data['cum_sum_n'] - scheme_9_data['errors'], 
                    scheme_9_data['cum_sum_n'] + scheme_9_data['errors'], label='Confidence interval', 
                    color='orange', alpha=.1)

    if ref_value is not None:
        ax.axhline(ref_value, label='reference value', color="green", linestyle='--')
    
    ax.legend(loc=1)
    
    plt.title(f"Estimation of {title}")
    plt.xlabel("Number of steps")
    plt.ylabel("Estimation")
    
    plt.show()

In [None]:
def show_numerical_results_table(scheme_8_results, scheme_9_results, ref_value):
    #TODO init constants
    table_size = np.size(scheme_8_results["U_M_table"])
    
    print("\tEstimation of E[X_T] with Scheme (8) (left) and Scheme (9) (right). The computation time is in seconds.")
    print()
    print("\tMean Value\t Stat.Error\t Comp.Time\t||\t Mean Value\t Stat.Error\t Comp.Time")
    print(f"Ref\t {ref_value}")
    for k in range(table_size):
        print(f"n={n_check_array[k]}\t",
        f"{scheme_8_results['U_M_table'][k]:f}\t",
        f"{scheme_8_results['errors_table'][k]:f}\t",
        f"{scheme_8_results['time_table'][k]:f}\t||\t",
        f"{scheme_9_results['U_M_table'][k]:f}\t",
        f"{scheme_9_results['errors_table'][k]:f}\t",
        f"{scheme_9_results['time_table'][k]:f}")


# Numerical results

## _European call_ maximum(S_T-K,0)

In [None]:
scheme_8_results = scheme_8(european_call_options)
scheme_9_results = scheme_9(european_call_options)

In [None]:
plot_estimations(scheme_8_results, scheme_9_results, "E[maximum(S_T-K,0)]", european_call_ref)
show_numerical_results_table(scheme_8_results, scheme_9_results, european_call_ref)

## _Asian option_ maximum(A_T-K,0)

In [None]:
scheme_8_results = scheme_8(asian_options)
scheme_9_results = scheme_9(asian_options)

In [None]:
plot_estimations(scheme_8_results, scheme_9_results, "E[maximum(A_T-K,0)]")
show_numerical_results_table(scheme_8_results, scheme_9_results, "-")

## _Lookback option_ maximum(M_T-K,0)

In [None]:
scheme_8_results = scheme_8(lookback_options)
scheme_9_results = scheme_9(lookback_options)

In [None]:
plot_estimations(scheme_8_results, scheme_9_results, "E[maximum(M_T-K,0)]")
show_numerical_results_table(scheme_8_results, scheme_9_results, "-")

## _Variance swap price_ E[X_t]

In [None]:
scheme_8_results = scheme_8(variance_swap_price)
scheme_9_results = scheme_9(variance_swap_price)

In [None]:
plot_estimations(scheme_8_results, scheme_9_results, "E[X_T]", variance_swap_price_ref)
show_numerical_results_table(scheme_8_results, scheme_9_results, variance_swap_price_ref)

## _Variance call price_ E[maximum(X_T-V_0,0)]

In [None]:
scheme_8_results = scheme_8(variance_call_price)
scheme_9_results = scheme_9(variance_call_price)

In [None]:
plot_estimations(scheme_8_results, scheme_9_results, "E[maximum(X_T-V_0,0)]", variance_swap_price_ref)
show_numerical_results_table(scheme_8_results, scheme_9_results, variance_call_price_ref)