In [71]:
import numpy as np
from math import *
import pandas as pd

# Plotting Graphs
import matplotlib.pyplot as plt

#Time Management
from time import strftime, time

# Random Variable Simulation
from scipy import stats

# Remove Warnings
import warnings
warnings.filterwarnings('ignore')

# Pretty Table
from prettytable import PrettyTable

# Import system
import sys

In [41]:
'''TEST 1: ABSORPTION
======================'''
def fNAbsorption(forwardCurve, sigma1, sigma2, eta, delta, timeSteps, expiry, numPaths):
    dt = expiry/timeSteps
    mu = [(sigma1**2 + sigma2**2) * np.sum(1/(1+forwardCurve[:i])) for i in range(len(forwardCurve))]
    paths =  np.zeros((timeSteps+1, numPaths), np.float64)
    paths[0] = F0
    for t in range(1, timeSteps+1):
        rand1 = np.random.standard_normal(numPaths)
        rand2 = np.random.standard_normal(numPaths)
        paths[t] = np.maximum((paths[t-1] 
                            + sigma1*np.power(paths[t-1] + delta, eta)*sqrt(dt)*rand1 
                            + sigma2*np.power(paths[t-1]+delta, eta)*sqrt(dt)*rand2),0)
    return(paths)

In [42]:
'''TEST 2: REFLECTION
======================'''
def fNReflection(F0, sigma1, sigma2, eta, delta, timeSteps, expiry, numPaths):
    dt = expiry/timeSteps
    paths =  np.zeros((timeSteps+1, numPaths), np.float64)
    paths[0] = F0 
    for t in range(1, timeSteps+1):
        rand1 = np.random.standard_normal(numPaths)
        rand2 = np.random.standard_normal(numPaths)
        paths[t] = np.absolute(paths[t-1] 
                            + sigma1*np.power(paths[t-1]+delta, eta)*sqrt(dt)*rand1 
                            + sigma2*np.power(paths[t-1]+delta, eta)*sqrt(dt)*rand2)
    return(paths)

In [43]:
'''TEST 3: HIGHAM AND MAO
=========================='''
def fNHighamMao(F0, sigma1, sigma2, eta, delta, timeSteps, expiry, numPaths):
    dt = expiry/timeSteps
    paths =  np.zeros((timeSteps+1, numPaths), np.float64)
    paths[0] = F0 
    for t in range(1, timeSteps+1):
        rand1 = np.random.standard_normal(numPaths)
        rand2 = np.random.standard_normal(numPaths)
        paths[t] = (paths[t-1] 
                            + sigma1*np.power(np.absolute(paths[t-1])+delta, eta)*sqrt(dt)*rand1 
                            + sigma2*np.power(np.absolute(paths[t-1])+delta, eta)*sqrt(dt)*rand2)
    return(paths)

In [44]:
'''TEST 4: PARTIAL TRUNCATION
=============================='''
def fNPartialTrunc(F0, sigma1, sigma2, eta, delta, timeSteps, expiry, numPaths):
    dt = expiry/timeSteps
    paths =  np.zeros((timeSteps+1, numPaths), np.float64)
    paths[0] = F0 
    for t in range(1, timeSteps+1):
        rand1 = np.random.standard_normal(numPaths)
        rand2 = np.random.standard_normal(numPaths)
        paths[t] = (paths[t-1] 
                            + sigma1*np.power(np.maximum(paths[t-1],0)+delta, eta)*sqrt(dt)*rand1 
                            + sigma2*np.power(np.maximum(paths[t-1],0)+delta, eta)*sqrt(dt)*rand2)
    return(paths)

In [45]:
'''TEST 5: LOG EULER
====================='''
def fNLogEuler(F0, sigma1, sigma2, eta, delta, timeSteps, expiry, numPaths):
    dt = expiry/timeSteps
    paths =  np.zeros((timeSteps+1, numPaths), np.float64)
    paths[0] = F0 
    for t in range(1, timeSteps+1):
        rand1 = np.random.standard_normal(numPaths)
        rand2 = np.random.standard_normal(numPaths)
        paths[t] = paths[t-1]*np.exp((np.power(paths[t-1]+delta, eta)/paths[t-1])*
                   (((-0.5 *(np.power(paths[t-1]+delta, eta)/paths[t-1])*pow(sigma1,2)*dt)+ 
                     sigma1*rand1*sqrt(dt))+
                    ((-0.5 *(np.power(paths[t-1]+delta, eta)/paths[t-1])*pow(sigma2,2)*dt)+ 
                     sigma2*rand2*sqrt(dt))))
        
    return(paths)

In [None]:
# '''TEST 7: MOMENT MATCHING
# ==========================='''
# def fNMotionMatching(F0, sigma1, sigma2, eta, delta, timeSteps, expiry, numPaths):
#     dt = expiry/timeSteps
#     paths =  np.zeros((timeSteps+1, numPaths), np.float64)
    
#     paths[0] = F0 
#     for t in range(1, timeSteps+1):
#         rand1 = np.random.standard_normal(numPaths)
#         rand1 = (rand1 - rand1.mean())/rand1.std()
#         rand2 = np.random.standard_normal(numPaths)
#         rand2 = (rand2 - rand2.mean())/rand2.std()  

In [49]:
loc = "Data\\01 - Calibration Data.xlsx"

# Obtain Swap Curve Data
swapData = pd.read_excel(loc,
                        sheet_name = "Swap Rates Data",
                        skiprows= [0, 1,3],
                       index_col = "Dates")

# Decimal Notation
swapData = pd.DataFrame.dropna(swapData, axis = 'rows')/100

In [51]:
'''SPOT CURVE CONSTRUCTION
==========================='''
# Curve on evaluation date: 17/02/2020
swapCurve = swapData.loc['2020-02-14']

swapMaturities = np.array(([i for i in range(1,31)] + # Years
                          [i*5 for i in range(7, 13)])) # 50 years

# Linear Interpolate swap curve
swapCurveInterp = np.interp(range(1, 61), swapMaturities, swapCurve)

# Bootstrap to obtain Zero Coupon Curve
zeroCoupon = [1]
for i in range(0, 60):
    zeroCoupon.append((1 - swapCurveInterp[i]*np.sum(zeroCoupon[:i]))/(1 + swapCurveInterp[i]))

forwardCurve = [zeroCoupon[i-1]/zeroCoupon[i]-1 for i in range(1, 61)]

In [52]:
'''CLOSED FORM CAP PRICER
=========================='''
def capletChiSquared(N, F0, K, expiry, sigma1, sigma2, eta, delta):
    # Define parameters
    v = expiry*(pow(sigma1, 2)+pow(sigma2, 2))
    a = pow(K+delta, 2*(1 - eta))/(pow(1 - eta, 2)* v)
    b =1/(1 - eta)
    c = pow(F0+delta, 2*(1 - eta))/(pow(1 - eta, 2)* v)
    
    price = N*zeroCoupon[expiry]*((F0+delta)*(1- stats.ncx2.cdf(a, b+2, c)) - (K+delta)*stats.ncx2.cdf(c, b, a))
    return(price)

'''MONTE CARLO PRICERS
======================'''
# Absorption
def monteCarloAbsorptionCap(N, F0, strike, expiry, sigma1, sigma2, eta, delta, timeSteps, numPaths):
    steps = expiry * timeSteps
    fT = fNAbsorption(F0,sigma1, sigma2, eta, delta, steps, expiry, numPaths)[steps]
    price = N * zeroCoupon[expiry] * np.nanmean(np.maximum(fT - strike, 0))
    return(price)

# Reflection
def monteCarloReflectionCap(N, F0, strike, expiry, sigma1, sigma2, eta, delta, timeSteps, numPaths):
    steps = expiry * timeSteps
    fT = fNReflection(F0,sigma1, sigma2, eta, delta, steps, expiry, numPaths)[steps]
    price = N * zeroCoupon[expiry] * np.nanmean(np.maximum(fT - strike, 0))
    return(price)

# Higham Mao
def monteCarloHighamMaoCap(N, F0, strike, expiry, sigma1, sigma2, eta, delta, timeSteps, numPaths):
    steps = expiry * timeSteps
    fT = fNHighamMao(F0,sigma1, sigma2, eta, delta, steps, expiry, numPaths)[steps]
    price = N * zeroCoupon[expiry] * np.nanmean(np.maximum(fT - strike, 0))
    return(price)

# Partial Truncation
def monteCarloPartialTruncCap(N, F0, strike, expiry, sigma1, sigma2, eta, delta, timeSteps, numPaths):
    steps = expiry * timeSteps
    fT = fNPartialTrunc(F0,sigma1, sigma2, eta, delta, steps, expiry, numPaths)[steps]
    price = N * zeroCoupon[expiry] * np.nanmean(np.maximum(fT - strike, 0))
    return(price)

# Log Euler
def monteCarlologEulerCap(N, F0, strike, expiry, sigma1, sigma2, eta, delta, timeSteps, numPaths):
    steps = expiry * timeSteps
    fT = fNLogEuler(F0,sigma1, sigma2, eta, delta, steps, expiry, numPaths)[steps]
    price = N * zeroCoupon[expiry] * np.nanmean(np.maximum(fT - strike, 0))
    return(price)

In [86]:
'Define the parameters'
notional = 1e+5
shift = 0.01
vol1 = 0.01
vol2 = 0.01
power = 0.75
strike = 0.001

# Simulation Parameters
expiry = 50
timeSteps = 252
numPaths = 100000

In [None]:
''' TESTING THE MARTINGALE PROPERTY
===================================='''
startTime =  time()
curve = forwardCurve[1:50]
tenors = list(range(1,50))
steps = expiry * timeSteps

# absorption = [np.nanmean(fNAbsorption(rate,vol1, vol2, power, shift, 
#                                       252*exp, exp, numPaths)[252*exp])for rate, exp in zip(curve, tenors)]
# reflection = [np.nanmean(fNReflection(rate,vol1, vol2, power, shift, 252*exp, 
#                         exp, numPaths)[252*exp])for rate,exp in zip(curve, tenors)]
highamMao = [fNHighamMao(rate,vol1, vol2, power, shift, 252*exp, 
                        exp, numPaths)[252*exp]for rate ,exp in zip(curve, tenors)]
# partialTrunc = [np.nanmean(fNPartialTrunc(rate,vol1, vol2, power, shift, 252*exp, 
#                         exp, numPaths)[252*exp])for rate ,exp in zip(curve, tenors)]
# logEuler = [np.nanmean(fNLogEuler(rate,vol1, vol2, power, shift, 252*exp, 
#                         exp, numPaths)[252*exp])for rate ,exp in zip(curve, tenors)]
endTime =  time()

print('Runs Completed. Time elapsed: '+ str(round((endTime - startTime)/60)) + 'minutes')

In [85]:
print('Memory used: ' + str(sys.getsizeof(highamMao)))

Memory used: 528


In [58]:
'''RESULTS
==========='''
header = ['Forward Expiry','Forward Curve', 'Absorption', 'Reflection', 'Higham Mao', 
          'Partial Truncation', 'Log Euler']

resultsTable = PrettyTable()
resultsTable.add_column(header[0], [str(i)+'Y' for i in tenors])
resultsTable.add_column(header[1], np.round(curve, 5))
resultsTable.add_column(header[2], np.round(absorption, 5))
resultsTable.add_column(header[3], np.round(reflection, 5))
resultsTable.add_column(header[4], np.round(highamMao, 5))
resultsTable.add_column(header[5], np.round(partialTrunc, 5))
resultsTable.add_column(header[6], np.round(logEuler, 5))
print(resultsTable)

+----------------+---------------+------------+------------+------------+--------------------+-----------+
| Forward Expiry | Forward Curve | Absorption | Reflection | Higham Mao | Partial Truncation | Log Euler |
+----------------+---------------+------------+------------+------------+--------------------+-----------+
|       1Y       |    -0.00378   |  0.00034   |  0.00378   |  -0.00378  |      -0.00378      |  -0.00378 |
|       2Y       |    -0.00323   |  0.00049   |  0.00323   |  -0.00324  |      -0.00323      |  -0.00323 |
|       3Y       |    -0.00245   |   0.0006   |  0.00245   |  -0.00245  |      -0.00245      |  -0.00245 |
|       4Y       |    -0.0016    |  0.00071   |  0.00164   |  -0.0016   |      -0.0016       |  -0.00161 |
|       5Y       |    -0.00048   |   0.0008   |   0.0009   |  -0.00049  |      -0.00048      |  -0.00046 |
|       6Y       |    0.00063    |  0.00101   |  0.00104   |  0.00063   |      0.00063       |  0.00066  |
|       7Y       |    0.00201    |  0

In [None]:
'''TESTING ON CAP VALUES
========================'''
chiSquare = [capletChiSquared(notional, rate, strike, exp, vol1, vol2, power, shift) for rate,
             exp in zip(curve, tenors)]

absorptionPrice = [monteCarloAbsorptionCap(notional,rate, 
                            strike, exp, vol1, vol2, power, shift, timeSteps, numPaths )
                 for rate ,exp in zip(curve, tenors)]
reflectionPrice = [monteCarloReflectionCap(notional,rate, 
                            strike, exp, vol1, vol2, power, shift, timeSteps, numPaths )
                 for rate ,exp in zip(curve, tenors)]
highamMaoPrice = [monteCarloHighamMaoCap(notional,rate, 
                            strike, exp, vol1, vol2, power, shift, timeSteps, numPaths )
                 for rate ,exp in zip(curve, tenors)]
partialTruncPrice = [monteCarloPartialTruncCap(notional,rate, 
                            strike, exp, vol1, vol2, power, shift, timeSteps, numPaths )
                 for rate ,exp in zip(curve, tenors)]
logEulerPrice = [monteCarlologEulerCap(notional,rate, 
                            strike, exp, vol1, vol2, power, shift, timeSteps, numPaths )
                 for rate ,exp in zip(curve, tenors)]

In [15]:
'''RESULTS
==========='''
header2 = ['Cap Expiry','Chi Square', 'Absorption', 'Reflection', 'Higham Mao', 
          'Partial Truncation', 'Log Euler']

resultsTable2 = PrettyTable()
resultsTable2.add_column(header2[0], [str(i)+'Y' for i in tenors])
resultsTable2.add_column(header2[1], np.round(chiSquare, 2))
resultsTable2.add_column(header2[2], np.round(absorptionPrice, 2))
resultsTable2.add_column(header2[3], np.round(reflectionPrice, 2))
resultsTable2.add_column(header2[4], np.round(highamMaoPrice, 2))
resultsTable2.add_column(header2[5], np.round(partialTruncPrice, 2))
resultsTable2.add_column(header2[6], np.round(logEulerPrice, 2))
print(resultsTable2)

+------------+------------+------------+------------+------------+--------------------+-----------+
| Cap Expiry | Chi Square | Absorption | Reflection | Higham Mao | Partial Truncation | Log Euler |
+------------+------------+------------+------------+------------+--------------------+-----------+
|     1Y     |    0.0     |    0.41    |   279.08   |    0.0     |        0.0         |    0.0    |
|     2Y     |    -0.0    |    3.68    |   225.03   |    0.0     |        0.0         |    0.0    |
|     3Y     |    0.0     |    9.53    |   148.46   |    0.0     |        0.0         |    0.0    |
|     4Y     |    0.03    |   13.67    |   77.15    |    0.0     |        0.04        |    0.0    |
|     5Y     |    3.32    |   18.29    |   24.14    |    4.22    |        3.92        |    0.0    |
|     6Y     |   30.82    |   34.42    |   36.35    |   31.06    |       30.84        |   29.61   |
|     7Y     |   119.55   |   121.73   |   121.8    |   120.62   |       119.96       |   116.86  |


In [36]:
from IPython.core.display import HTML

s  = '<script type="text/Javascript">'
s += 'var win = window.open("", "Title", "toolbar=no, location=no, directories=no, status=no, menubar=no, scrollbars=yes, resizable=yes, width=780, height=200, top="+(screen.height-400)+", left="+(screen.width-840));'
s += 'win.document.body.innerHTML = \'' + resultsTable.get_html_string(title = 'Martingale Test').replace("\n",'\\') + '\';'
s += '</script>'

# Show in new Window
HTML(s)