In [241]:
import tensorflow as tf
import numpy as np
import pandas as pd

from scipy.optimize import least_squares
from scipy.optimize import Bounds

np.set_printoptions(formatter={'float': '{: 0.3f}'.format})
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)

In [272]:
class BinomialPricing():

    def __init__(self, params = None):
        # Flags
        self.PUT = object()
        self.CALL = object()
        self.STOCK = object()
        self.FUTURE = object()
        self.OPTION = object()
        self.CHOOSER = object() # Chooser underlying
        self.EXERCISE = object()
        self.HAZARD = object()
        self.DEBUG = object()

        # Rates
        self.SHORT_RATE = object()
        self.ZCB = object()
        self.SWAP = object()

        # Initialisation
        self.lattices = {}
        
    def set_params(self, params = None):
        # Lattice Paramters (Input)
        self.S0 = params['S0'] if params != None else 100
        self.R0 = params['R0'] if params != None else  6 / 100
        self.T = params['T'] if params != None else  0.25
        self.sigma = params['sigma'] if params != None else 30 / 100
        self.n = params['n'] if params != None else 3 # Maturity of option
        self.n_future = params['n_future'] if params != None else 5 # Maturity of underlying
        self.r = params['r'] if params != None else 2 / 100
        self.c = params['c'] if params != None else 1 / 100
        self.use_BDT = params['use_BDT'] if params != None else False

        # Rates Parameters
        self.N = params['N'] if params != None else 1 # Notional / Real Principle
        self.rK = params['rK'] if params != None else 5 / 100 # Fixed rat

        # Option Parameters (Input)
        self.K = params['K'] if params != None else 0
        self.option_type = params['option_type'] if params != None else self.CALL
        self.compute_on = params['compute_on'] if params != None else self.SWAP
        self.is_american = params['is_american'] if params != None else False
        
        #Hazard
        self.use_hazard = params['use_hazard'] if params != None else False
        
        if use_hazard:
            self.hazard_a = params['hazard_a'] if params != None else 0
            self.hazard_b = params['hazard_b'] if params != None else 0
            self.recovery = params['recovery'] if params != None else 1
        
        # Initialisation Computation
        if self.use_BDT:
            self.fixed_rates = np.full((self.n_future + 1, 1), self.R0)
    
    def get_BDT_a_values(self):
        return self.fixed_rates
    
    def set_BDT_a_values(self, vals):
        self.fixed_rates = vals
    
    def override_computed_parameters(self):
        self.u = 1.1
        self.d = 0.9
        self.q = 0.5
    
    # Must be called if any numerical paramters are changed in the model
    def compute_option_parameters(self, calc_option=False):
        # Computed Values from BSM
        if calc_option:
            calibrated_T = self.T * (self.n/self.n_future)
            self.dt = calibrated_T / self.n
        else:
            self.dt = self.T / self.n_future
        self.u = np.exp(self.sigma * np.sqrt(self.dt))
        self.d = 1 / self.u
        self.q = (np.exp((self.r - self.c)*self.dt) - self.d) / (self.u - self.d)
        self.R = np.exp(self.r*self.dt) # Risk free compounded periodically
    
    def init_stock_lattice(self):
        self.compute_option_parameters()
        
        size = self.n_future + 1
        # U
        u_values = np.full((size,size),self.u)
        arr = [i for i in reversed(range(size))]
        lower_r =  np.tril(np.tile(arr, size).reshape(size, size).T)
        u_powers = np.flip(lower_r, axis=1)

        # D
        d_values = np.full((size,size),self.d)
        row, col = np.indices((size,size))
        unmasked = col - row
        mask = unmasked < 0
        unmasked[mask] = 0
        d_powers =  np.flip(unmasked.T, axis=1)

        lattice = np.power(u_values, u_powers) * np.power(d_values,d_powers) * self.S0
        lattice[(d_powers + u_powers) == 0] = 0
        lattice[self.n_future,0] = self.S0

        self.lattices[self.STOCK] = lattice

    def BDT(self, j):
        a = self.fixed_rates
        b = self.sigma
        
        return np.exp(j * b) * a
        
    def init_short_rate_lattice(self, recompute_params=True):
        if recompute_params:
            self.compute_option_parameters()
            self.override_computed_parameters()
        
        lattice = None
        
        if self.use_BDT:
            size = self.n_future + 1
            # State
            arr = [i for i in reversed(range(size))]
            lower_r =  np.tril(np.tile(arr, size).reshape(size, size).T)
            state_index = np.flip(lower_r, axis=1)
            
            # Mask
            row, col = np.indices((size,size))
            unmasked = col - row
            unmasked = np.flip(unmasked.T, axis=1)
            mask = unmasked < 0
            
            lattice = self.BDT(state_index)
            lattice[mask] = 0

            self.lattices[self.SHORT_RATE] = lattice
        else:
            size = self.n_future + 1
            # U
            u_values = np.full((size,size),self.u)
            arr = [i for i in reversed(range(size))]
            lower_r =  np.tril(np.tile(arr, size).reshape(size, size).T)
            u_powers = np.flip(lower_r, axis=1)

            # D
            d_values = np.full((size,size),self.d)
            row, col = np.indices((size,size))
            unmasked = col - row
            mask = unmasked < 0
            unmasked[mask] = 0
            d_powers =  np.flip(unmasked.T, axis=1)

            lattice = np.power(u_values, u_powers) * np.power(d_values,d_powers) * self.R0
            lattice[(d_powers + u_powers) == 0] = 0
            lattice[self.n_future,0] = self.R0

            self.lattices[self.SHORT_RATE] = lattice
        
        return lattice
    
    def hazard(self, t, s):
        b_s = np.full(t.shape, self.hazard_b)
        return np.power(b_s, s - (t/2)) * self.hazard_a
    
    def init_hazard_rate_lattice(self, recompute_params=True):
        if recompute_params:
            self.compute_option_parameters()
            self.override_computed_parameters()
        
        lattice = None

        size = self.n_future + 1
        
        if self.use_hazard:
            # State
            arr = [i for i in reversed(range(size))]
            lower_r =  np.tril(np.tile(arr, size).reshape(size, size).T)
            state_index = np.flip(lower_r, axis=1)

            # Time
            time = np.repeat([np.arange(0,size)], size, axis=0)

            # Mask
            row, col = np.indices((size,size))
            unmasked = col - row
            unmasked = np.flip(unmasked.T, axis=1)
            mask = unmasked < 0

            lattice = self.hazard(state_index)
            lattice[mask] = 0
        else:
            lattice = np.zeros((size, size))

        self.lattices[self.HAZARD] = lattice

        return lattice

    def init_futures_lattice(self):
        self.compute_option_parameters()
        
        size = self.n_future + 1
        lattice = np.zeros((size,size))
        stock_lattice = self.lattices[self.STOCK]
        lattice[:,-1] = stock_lattice[:,size-1]  # F_T = S_T

        last_row_idx = lattice.shape[0] - 1
        
        for i in reversed(range(1, self.n_future+1)):
            col = lattice[:,i]

            for j in range(i):
                down_s = col[last_row_idx-j]
                up_s = col[last_row_idx-(j+1)]
                lattice[last_row_idx-j,i-1] = self.q * up_s + (1-self.q) * down_s

        self.lattices[self.FUTURE] = lattice

    def init_options_lattice(self, use_short_rate=False, recompute_params=True):
        if recompute_params:
            self.compute_option_parameters(calc_option=True)
            self.override_computed_parameters()
        
        size = self.n + 1
        lattice = np.zeros((size,size))
        ex_early_lattice = np.zeros((size,size))
        debug = np.zeros((size,size))
        
        print(self.lattices.keys())
        
        stock_lattice = self.lattices[self.compute_on]
        stock_last_row_idx = stock_lattice.shape[0] - 1
        short_rates = self.lattices[self.SHORT_RATE]
        
        # Gets the payouts at expiration period n (+ 1 for zero index)
        if self.option_type is self.CALL:
            lattice[:,-1] = np.maximum(stock_lattice[stock_last_row_idx-size+1:,size-1] - self.K, 0)
        else:
            lattice[:,-1] = np.maximum(self.K - stock_lattice[stock_last_row_idx-size+1:,size-1], 0)  

        last_row_idx = lattice.shape[0] - 1
        short_rates_last_row_idx = short_rates.shape[0] - 1 
        
        for i in reversed(range(0, self.n+1)): 
            col = lattice[:,i]

            for j in range(i):
                down_s = col[last_row_idx-j] 
                up_s = col[last_row_idx-(j+1)] 

                rate = self.R
                
                if use_short_rate:
                    rate = 1+short_rates[short_rates_last_row_idx-j,i-1]
                
                if self.is_american:
                    if self.option_type is self.CALL:
                        ex = stock_lattice[stock_last_row_idx-j,i-1] - self.K 
                    else:
                        ex = self.K - stock_lattice[stock_last_row_idx-j,i-1]

                    early_exercise = np.maximum(ex, (1/rate) * (self.q * up_s + (1-self.q) * down_s))

                    if ex == early_exercise:
                        ex_early_lattice[last_row_idx-j,i-1] = 1
                    
                    lattice[last_row_idx-j,i-1] = early_exercise 
                else:
                    lattice[last_row_idx-j,i-1] = (1/rate) * (self.q * up_s + (1-self.q) * down_s)

        self.lattices[self.OPTION] = lattice
        self.lattices[self.EXERCISE] = ex_early_lattice

        return lattice

    def init_chooser_options_lattice(self):
        # Custom implementation
        size = self.n + 1
        lattice = np.zeros((size,size))
  
        stock_lattice = self.lattices[self.compute_on]
        stock_last_row_idx = stock_lattice.shape[0]

        lattice[:,-1] = stock_lattice[stock_last_row_idx-size:,size-1]

        last_row_idx = lattice.shape[0] - 1
        
        for i in reversed(range(0, self.n+1)):
            col = lattice[:,i]

            for j in range(i):
                down_s = col[last_row_idx-j]
                up_s = col[last_row_idx-(j+1)]

                lattice[last_row_idx-j,i-1] = (1/self.R) * (self.q * up_s + (1-self.q) * down_s)

        self.lattices[self.OPTION] = lattice

        return lattice

    @staticmethod
    def print_np_array(arr):
        print(pd.DataFrame(arr))
    
    def print_lattice(self,lat):
        self.print_np_array(self.lattices[lat])
        
    def get_lattice_df(self,lat):
        return pd.DataFrame(self.lattices[lat])

    @staticmethod
    def resize_lattice(lattice, final_period):
        return lattice[lattice.shape[0]-final_period-1:,:final_period+1]

    def compute_forward_ZCB(self, forward_maturity):
        zcb = self.init_ZCB_lattice() 
        unit_zcb = self.init_ZCB_lattice(override_N = 1, override_maturity=forward_maturity)

        # Value of ZCB recieved at forward_maturity or at maturity of the bond is the same
        # as no coupons are paid at any point
        G0 = zcb[zcb.shape[0]-1, 0] / unit_zcb[unit_zcb.shape[0]-1,0]

        return (G0, zcb, unit_zcb)

    def compute_future_ZCB(self, future_maturity):
        zcb = self.init_ZCB_lattice()
        clipped_zcb = self.resize_lattice(zcb,future_maturity)
        no_discount_zcb = self.init_ZCB_lattice(override_N = clipped_zcb[:, future_maturity],
                                                override_maturity = future_maturity,
                                                use_discounting=False)

        # Value of ZCB recieved at forward_maturity or at maturity of the bond is the same
        # as no coupons are paid at any point
        G0 = no_discount_zcb[no_discount_zcb.shape[0]-1, 0] 

        return (G0, no_discount_zcb)
    
    def compute_every_zcb(self, return_spot_rates = False):
        unit_zcb_prices = []
        unit_zcb_rates = []
        
        for i in range(1,bp.n_future+1):
            unit_zcb = bp.init_ZCB_lattice(override_N = 1, override_maturity=i)
            price = unit_zcb[unit_zcb.shape[0]-1,0]
            unit_zcb_prices.append(price)
            unit_zcb_rates.append(np.power((1 / price), 1/i) - 1)
            
        if return_spot_rates:
            return np.array(unit_zcb_rates)
        else:
            return unit_zcb_prices
    
    def init_ZCB_lattice(self, override_maturity=0, override_N=0, use_discounting=True, recompute_params=True):
        if recompute_params:
            self.compute_option_parameters()
            self.override_computed_parameters()
        
        if override_maturity != 0:
            size = override_maturity + 1
        else:
            size = self.n_future + 1
            
        lattice = np.zeros((size,size))

        if type(override_N) == np.ndarray:
            lattice[:,-1] = override_N
        elif override_N != 0:
            lattice[:,-1] = np.full((lattice.shape[0],), override_N)
        else:
            lattice[:,-1] = np.full((lattice.shape[0],), self.N)  # Principle Returned

        short_rates = self.lattices[self.SHORT_RATE]

        last_row_idx = lattice.shape[0] - 1
        short_rates_last_row_idx = short_rates.shape[0] - 1 
        
        for i in reversed(range(1, size)):
            col = lattice[:,i]

            for j in range(i):
                down_s = col[last_row_idx-j]
                up_s = col[last_row_idx-(j+1)]

                if use_discounting:
                    lattice[last_row_idx-j,i-1] = 1/(1+short_rates[short_rates_last_row_idx-j,i-1])*(self.q * up_s + (1-self.q) * down_s)
                else:
                    lattice[last_row_idx-j,i-1] = (self.q * up_s + (1-self.q) * down_s)
                
        self.lattices[self.ZCB] = lattice

        return lattice
    
    def calibrate_model(self, spot_rates):
        self.init_short_rate_lattice()
        
        def residuals(theta):
            self.set_BDT_a_values(theta)
            self.init_short_rate_lattice(recompute_params=False)
            y_hat = self.compute_every_zcb(return_spot_rates=True)
            y = spot_rates

            return y - y_hat
        
        res = least_squares(residuals, self.get_BDT_a_values().reshape((-1,))) # Scipy 
        self.set_BDT_a_values(res['x'])
        
        bp.init_short_rate_lattice(recompute_params=False)
        return bp.compute_every_zcb(return_spot_rates=True)

    def init_swap_lattice(self, forward_starting=0, recompute_params=True):
        if recompute_params:
            self.compute_option_parameters()
            self.override_computed_parameters()
        
        size = self.n_future + 1
            
        lattice = np.zeros((size,size))
        short_rates = self.lattices[self.SHORT_RATE]
        
        lattice[:,-1] = (short_rates[:, lattice.shape[1]-1] - self.rK) / (1 + short_rates[:, lattice.shape[1]-1])

        last_row_idx = lattice.shape[0] - 1
        short_rates_last_row_idx = short_rates.shape[0] - 1 
        
        for i in reversed(range(1, size)):
            col = lattice[:,i] 

            for j in range(i):
                down_s = col[last_row_idx-j]
                up_s = col[last_row_idx-(j+1)]
                floating = short_rates[short_rates_last_row_idx-j,i-1]
                
                if i-1 < forward_starting:
                    lattice[last_row_idx-j,i-1] = 1/(1+floating)*(self.q * up_s + (1-self.q) * down_s)
                else:
                    lattice[last_row_idx-j,i-1] = 1/(1+floating)*((floating - self.rK) + self.q * up_s + (1-self.q) * down_s)

        lattice *= self.N 
        self.lattices[self.SWAP] = lattice

        return lattice

### Define Ground Truth

In [265]:
SPOT_RATES = np.array([3.0, 3.1, 3.2, 3.3, 3.4, 3.5, 3.55, 3.6, 3.65, 3.7])/100

bp = BinomialPricing()

PARAMS =  {
    'S0': 100,
    'R0': SPOT_RATES[0],
    'T': 0.25,
    'sigma': 0.1,
    'n': 3,
    'n_future': 10,
    'r': 2 / 100,
    'c': 1 / 100,
    'use_BDT': True,
    'N': 1000000,
    'rK': 3.9 / 100,
    'K': 0,
    'option_type': bp.CALL,
    'compute_on': bp.SWAP,
    'is_american': False,
    'use_hazard': False,
}

bp.set_params(PARAMS)
bp.calibrate_model(SPOT_RATES)
bp.init_swap_lattice(recompute_params=False)
bp.init_options_lattice(recompute_params=False)
bp.get_lattice_df(bp.OPTION)

dict_keys([<object object at 0xb2e174ca0>, <object object at 0xb2e174e20>, <object object at 0xb2e174c10>])


Unnamed: 0,0,1,2,3
0,0.0,0.0,0.0,49720.604707
1,0.0,0.0,35396.845601,21108.492192
2,0.0,22961.424364,10548.970292,0.0
3,14109.579909,5271.848563,0.0,0.0


In [266]:
bp = BinomialPricing()

PARAMS =  {
    'S0': 100,
    'R0': 5 / 100,
    'T': 0.25,
    'sigma': 0.1,
    'n': 3,
    'n_future': 10,
    'r': 2 / 100,
    'c': 1 / 100,
    'use_BDT': True,
    'N': 100,
    'rK': 3.9 / 100,
    'K': 0,
    'option_type': bp.CALL,
    'compute_on': bp.SWAP,
    'is_american': False,
    'recovery': 20 / 100,
    'use_hazard': True,
    'hazard_a': 0.01,
    'hazard_b': 1.01
}

bp.set_params(PARAMS)
bp.calibrate_model(SPOT_RATES)
bp.init_swap_lattice(recompute_params=False)
bp.init_options_lattice(recompute_params=False)
bp.get_lattice_df(bp.OPTION)

dict_keys([<object object at 0xb2e174e50>, <object object at 0xb2e1747c0>, <object object at 0xb2e174f90>])


Unnamed: 0,0,1,2,3
0,0.0,0.0,0.0,49720.604707
1,0.0,0.0,35396.845601,21108.492192
2,0.0,22961.424364,10548.970292,0.0
3,14109.579909,5271.848563,0.0,0.0


In [267]:
np.repeat([np.arange(0,10)], 10, axis=0)

array([[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
       [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
       [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
       [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
       [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
       [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
       [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
       [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
       [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
       [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]])

In [269]:
arr = [i for i in reversed(range(size))]
lower_r =  np.tril(np.tile(arr, size).reshape(size, size).T)
state_index = np.flip(lower_r, axis=1)

In [270]:
state_index

array([[0, 0, 0, 0, 0, 5],
       [0, 0, 0, 0, 4, 4],
       [0, 0, 0, 3, 3, 3],
       [0, 0, 2, 2, 2, 2],
       [0, 1, 1, 1, 1, 1],
       [0, 0, 0, 0, 0, 0]])