In [9]:
import numpy as np
from math import *
import matplotlib.pyplot as plt
import scipy.stats as st
from scipy.stats import norm
import pandas as pd
from mpl_toolkits.mplot3d import Axes3D
from CFLib.euro_opt import FwEuroPut, FwEuroCall, impVolFromFwPut
from CFLib.ImplCall import impVolFromFwCall


def BS_trj(Obj, step, So, T, N, sigma):
    Obj = np.random.RandomState(1234)
    dt = T/step
    S  = So * np.ones((step+1, N))
    X  = Obj.normal(-0.5 * sigma * sigma * dt, sigma * sqrt(dt), (step,N))
    for n in range(0, step):
        S[n+1] = S[n] * np.exp(X[n])
    return S

def BS_trj_shifted_Y (step, So, T, N, sigma, shift):
    Obj = np.random.RandomState(1234)
    dt = T/step
    Y  = (So+shift) * np.ones((step+1, N))
    X  = Obj.normal(-0.5 * sigma * sigma * dt, sigma * sqrt(dt), (step,N))
    for n in range(0, step):
        Y[n+1] = Y[n] * np.exp(X[n])
    return Y

def BS_trj_shifted_S(step, So, T, N, sigma, shift):
    Y = BS_trj_shifted_Y (step, So, T, N, sigma, shift)
    S = Y - shift
    return S

def MartingaleProperty (shift, T, step, N, sigma, So):
    #print("%9s    %8s     %8s\n" %("months", "E[S(tn)]", "MC-err"))
    S = BS_trj_shifted_S (step, So, T, N, sigma, shift)
    m = (So) * np.ones(step)
    err = np.zeros(step)
    x = np.linspace(0,1,step)
    for i in range(1,step):
        m[i] = np.mean(S[i,:])
        std = np.std(S[i,:])
        err[i] = 1.6 * std / sqrt(N)
        #print ("%6d       %8.6f      %8.2e\n" %(i, m[i], err[i]))
    return m,err
    #plt.plot([0,T],[So + shift, So + shift], color='r')
    #plt.errorbar(x,m, yerr = err, marker="x", color="g")
    #plt.xlabel("Time")
    #plt.ylabel("E[S(t)]")
    #plt.show()

def MartingaleProperty_for_all_T(sigma, T, step, N, So, shift):
    for i in range (len(T)):
        x = np.linspace(0,T[i],int(T[i]*step))
        m, err = MartingaleProperty (shift, T[i], int(T[i]*step), N, sigma, So)
        plt.plot([0, T[i]], [So, So], color='navy')
        plt.errorbar(x,m, yerr = err, marker="x", color="teal")
        plt.xlabel("Time")
        plt.ylabel("E[S(t)]")
        plt.show()

def MC_price_displaced (T, step, N, So, K, type: str, sigma, shift): #monte carlo displaced diffusion model
    S  = BS_trj_shifted_S (step, So, T, N, sigma, shift)
    if type == "call":
        payoff = np.where(S[-1,:]>(K),S[-1,:]-(K),0) 
    elif type == "put":
        payoff = np.where(S[-1,:]<(K),(K)-S[-1,:],0)
    else:
        raise Exception("Type must be put or call")
    m = np.mean(payoff)
    std = np.std(payoff)
    err = 1.6 * std/sqrt(N)
    #print ("The MC price of the", type, "is:")
    #print("%9s    %8s \n" %(m, err))
    #print ("errore", m+err, m-err)
    return m

def Black_price(So, K, T, r, sigma, type): #classic black and scholes
    d1 = (log(So / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * sqrt(T))
    d2 = d1 - sigma * sqrt(T)
    if type == 'call':
        return So * norm.cdf(d1) - K * exp(-r * T) * norm.cdf(d2)
    elif type == 'put':
        return K * exp(-r * T) * norm.cdf(-d2) - So * norm.cdf(-d1)
    else:
        raise Exception("Type must be put or call")

def price_matrix_Black(So, K, T, r, sigma, type): #classic black and scholes
    result = np.ndarray((len(T), len(K)))
    for i in range(len(T)):
        for j in range(len(K)):
            result[i][j] = BS_price(So, K[j], T[i], r, sigma, type)
    return result 

def analytic_price_shifted (T, So, K, type: str, sigma, shift, r): #same as the black ans scholes classic but with a shift
    K_shifted = K + shift
    So_shifted = So + shift
    return Black_price(So_shifted, K_shifted, T, r, sigma, type)

def price_matrix_analytic(T, So, K, type: str, sigma, shift, r):
    result = np.ndarray((len(T), len(K)))
    for i in range(len(T)):
        for j in range(len(K)):
            result[i][j] = analytic_price_shifted (T[i], So, K[j], type, sigma, shift, r)
    return result


def price_matrix_displaced(sigma, shift, T, step, N, So, K, type): #this function has different purposed based on the return value
    TK_pairs = [(t, k) for t in T for k in K]
    results = np.array([MC_price_displaced (t, int(t*step), N, So, k, type, sigma, shift) for t, k in TK_pairs])
    
    result = np.array(results).reshape((len(T), len(K)))
    #result_matrix = pd.DataFrame(result, index=T, columns=K)
    #m_values = results[:, 0]  # Estrae i valori 'm' dalla matrice di risultati
    #err_values = results[:, 1]  # Estrae i valori 'err' dalla matrice di risultati
    
    #m_matrix = m_values.reshape((len(T), len(K)))  # Crea la matrice per i valori 'm'
    #err_matrix = err_values.reshape((len(T), len(K)))  # Crea la matrice per i valori 'err'
    #prices = pd.DataFrame(m_matrix, index = T, columns = K)
    #errors = pd.DataFrame(err_matrix, index = T, columns = K)
    
    return result

def difference_MC_BS (T, step, N, So, K, type: str, sigma, shift, r): # shift = 0, this function measure if the analytic price B&S classic falls inside the MC displaced range
    analytic_BS = price_matrix_Black(So, K, T, r, sigma, type)
    results_m, results_err = price_matrix_displaced(T, step, N, So, K, type, sigma, shift)
    final_matrix = np.ndarray((len(T), len(K)), dtype = bool)

    for i in range(len(T)):
        for j in range(len(K)):
            lower_bound = results_m[i, j] - results_err[i, j]
            upper_bound = results_m[i, j] + results_err[i, j]
            if lower_bound < analytic_BS[i, j] < upper_bound:
                final_matrix[i, j] = True
            else:
                final_matrix[i, j] = False
    final_matrix = pd.DataFrame(final_matrix, index=T, columns=K)
    return final_matrix

def difference_analytical_MC (T, step, N, So, K, type: str, sigma, shift, r): #this function measures if the analytic price of the diffusion falls inside the MC dispaced range
    analytic = price_matrix_analytic(T, So, K, type, sigma, shift, r)
    results_m, results_err = price_matrix_displaced(T, step, N, So, K, type, sigma, shift)
    final_matrix = np.ndarray((len(T), len(K)), dtype = bool)

    for i in range(len(T)):
        for j in range(len(K)):
            lower_bound = results_m[i, j] - results_err[i, j]
            upper_bound = results_m[i, j] + results_err[i, j]
            if lower_bound < analytic[i, j] < upper_bound:
                final_matrix[i, j] = True
            else:
                final_matrix[i, j] = False
    final_matrix = pd.DataFrame(final_matrix, index=T, columns=K)
    return final_matrix

def implied_volatility(market_price, T, K,type):
    if type == "call":
        implied_vol = impVolFromFwCall(market_price, T, K) #new function created inside CFLib
    if type == "put":
        implied_vol = impVolFromFwPut(market_price, T, K)
    return implied_vol

def implied_volatility_matrix(prices,T,K,type_option):
    T_array = np.array(T)
    K_array = np.array(K)
    impl_vol_matrix = np.empty_like(prices)
    for i, prices_row in enumerate(prices):
        Ts = T_array[i]
        if type_option == "put":
            impl_vol_matrix[i] = impVolFromFwPut(prices_row, Ts, K_array)
        if type_option == "call":
            impl_vol_matrix[i] = impVolFromFwCall(prices_row, Ts, K_array)
    #result_matrix = pd.DataFrame(impl_vol_matrix, index=T, columns=K)
    return impl_vol_matrix

def surface_volatility(market_price,T,K,type_option):
    y = np.array(T)
    x, y = np.meshgrid(K, y)
    if type_option == "put":
        z = implied_volatility_matrix(market_price,T,K,"put")
    elif type_option == "call":
        z = implied_volatility_matrix(market_price,T,K,"call")
    else:
        raise Exception("Type must be put or call")
    volatility = np.array(z)
    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')
    surf = ax.plot_surface(x, y, volatility, cmap='viridis')
    cbar = fig.colorbar(surf, shrink=0.5, aspect=5, pad=0.1)
    plt.title(f"Surface Volatility - {type_option}")
    ax.set_xlabel('Moneyness')
    ax.set_ylabel('Time (years)')
    ax.set_zlabel('Implied Volatility')
    plt.show()

In [10]:
def effect_of_variables(T, step, N, So, type, sigma, shift):

    # Effect of sigma
    plt.figure(1)
    plt.grid()
    plt.xlabel('strike, K')
    plt.ylabel('implied volatility')
    sigmaV = [0.2, 0.3, 0.4, 0.5]
    legend = []
    K = np.linspace(0.8, 1.2, 22)
    K = np.array(K).reshape([len(K),1])
    optPrice = np.zeros_like(K)  # Creare un array di zeri con le stesse dimensioni di K

    for sigmaTemp in sigmaV:    
        for i in range(len(K)):
            optPrice[i] = MC_price_shifted (T, step, N, So, K[i], type, sigmaTemp, shift)

        IV = np.zeros_like(K)
        for idx in range(len(K)):
            IV[idx] = implied_volatility(optPrice[idx], T, K[idx],type)
        plt.plot(K, IV * 100.0)
        legend.append('sigma={0}'.format(sigmaTemp))

    plt.ylim([0.0,100])
    plt.legend(legend)
    plt.show()

    #Effect of shift

    plt.figure(2)
    plt.grid()
    plt.xlabel('strike, K')
    plt.ylabel('implied volatility')
    shiftV = [0.55, 0.6, 0.7, 0.8]
    legend = []
    optPrice = np.zeros_like(K) 
    for shiftTemp in shiftV:    
        for i in range(len(K)):
            optPrice[i] = MC_price_shifted (T, step, N, So, K[i], type, sigma, shiftTemp)

       # Implied volatilities
        IV =np.zeros([len(K),1])
        for idx in range(0,len(K)):
            IV[idx] = implied_volatility(optPrice[idx], T, K[idx],type)
        plt.plot(K,IV*100.0)
        legend.append('shift={0}'.format(shiftTemp))
    plt.legend(legend)
    plt.show()