In [60]:
import sys
sys.path.append('../')

import numpy as np
import pandas as pd
from datetime import date
from dateutil.relativedelta import relativedelta
from enum import IntEnum
from scipy.interpolate import interp1d
from scipy.optimize import minimize

In [69]:
def generate_dates(start_date, maturity_months, tenor_months=12):
    """
    generate_swap_dates: computes a set of dates given starting date and length in months.
                         The tenor is by construction 12 months.

    Params:
    -------
    start_date: datetime.date
        The start date of the set of dates.
    n_months: int
        Number of months that define the length of the list of dates.
    tenor_months: int
        Set the tenor of the list of dates, by default it is 12 months.
    """
    dates = []
    for d in range(0, maturity_months, tenor_months):
        dates.append(start_date + relativedelta(months=d))
    dates.append(start_date + relativedelta(months=maturity_months))
    return dates

SwapType = IntEnum("SwapType", {"Receiver":-1, "Payer":1})

class OvernightIndexSwap:
    """
    A class to represent O/N swaps

    Attributes:
    -----------
    notional: float
        notional of the swap
    start_date: datetime.date
         start date of the contract
    maturity: str
        maturity of the swap.
    fixed_rate: float
        rate of the fixed leg of the swap
    type: SwapType
        type of the swap. default is SwapType.Payer
    """
    def __init__(self, nominal, start_date, maturity, fixed_rate, type=SwapType.Payer):
        self.nominal = nominal
        self.fixed_rate = fixed_rate
        self.payment_dates = generate_dates(start_date, maturity)
        self.type = type
      
    def npv_floating(self, dc):
        """
        Compute the floating leg NPV
    
        Params:
        -------
        dc: DiscountCurve
            discount curve to be used in the calculation
        """
        return self.nominal * (dc.df(self.payment_dates[0]) - dc.df(self.payment_dates[-1]))
  
    def npv_fixed(self, dc):
        """
        Computes the fixed leg NPV
    
        Params:
        -------
        dc: DiscountCurve
            discount curve to be used in the calculation
        """
        val = 0
        for i in range(1, len(self.payment_dates)):
            val += dc.df(self.payment_dates[i]) * \
                    (self.payment_dates[i] - self.payment_dates[i-1]).days/360 
        return self.nominal*self.fixed_rate*val
  
    def npv(self, dc):
        """
        Computes the contract NPV seen from the point of view of the 
        receiver of the floating leg.
    
        Params:
        -------
        dc: DiscountCurve
            discount curve to be used in the calculation
        """
        return self.npv_floating(dc) - self.npv_fixed(dc)*self.type

    def fair_value_strike(self, dc):
        """
        Computes the fair value strike

        Params:
        -------
        dc: DiscountCurve
            siscount curve object used for npv calculation.
        """
        den = self.npv_fixed_leg(dc)/self.fixed_rate
        num = self.npv_floating_leg(dc)
        return num/den

class DiscountCurve:
    """
    A class to represent discount curves

    Attributes:
    -----------
    obs_date: datetime.date
        observation date.
    pillar_dates: list(datetime.date)
        pillars dates of the discount curve
    discount_factors: list(float)
        actual discount factors
    """
    def __init__(self, obs_date, pillar_dates, discount_factors):
        discount_factors = np.array(discount_factors)
        self.obs_date = obs_date
        if obs_date not in pillar_dates:
            pillar_dates = [obs_date] + pillar_dates
            discount_factors = np.insert(discount_factors, 0, 1)
        self.pillars = [p.toordinal() for p in pillar_dates] 
        self.log_discount_factors = np.log(discount_factors)
        self.interpolator = interp1d(self.pillars, self.log_discount_factors)
        
    def df(self, adate):
        """
        Gets interpolated discount factor at `adate`

        Params:
        -------
        adate: datetime.date
            actual date at which we would like the interpolated discount factor
        """
        d = adate.toordinal()
        if d < self.pillars[0] or d > self.pillars[-1]:
            print (f"Cannot extrapolate discount factors (date: {adate}).")
            return None
        return np.exp(self.interpolator(d))
    
    def annualized_yield(self, d):
        """
        Computes the annualized yield at a given date

        Params:
        -------
        d: datetime.date
            actual date at which calculate the yield
        """
        return -np.log(self.df(d))/((d-self.obs_date).days/365) 

In [31]:
mq = pd.read_excel('../data/20230929/markets/ois_test.xlsx')

In [52]:
obs_date = date.today()
maturity_months = 18
ois = OvernightIndexSwap(1e6, obs_date, maturity_months,
                        mq.loc[mq['months']==maturity_months, 'quotes']*0.01)
ois.fixed_rate

12    0.024475
Name: quotes, dtype: float64

In [53]:
pillar_dates = []
swaps = []
start_date = obs_date
for i in range(len(mq)):
    swap = OvernightIndexSwap(1e6, obs_date, mq.loc[i, 'months'],
                                mq.loc[i, 'quotes']*0.01)
    
    # OvernightIndexSwap(1e5, start_date,
    #                             f"{mq.loc[i, 'months']}M",
    #                             0.01 * mq.loc[i, 'quotes'])
    swaps.append(swap)
    pillar_dates.append(swap.payment_dates[-1])
                                           
pillar_dates = sorted(pillar_dates)

In [54]:
def objective_function(dfs, obs_date, pillars, swaps):
    curve = DiscountCurve(obs_date, pillars, dfs)
    sum_sq = 0.0
    for swap in swaps:
        sum_sq += swap.npv(curve)**2
    return sum_sq

In [55]:
x0 = [1.0 for i in range(len(swaps))]
bounds = [(0.01, 10.0) for i in range(len(swaps))]

In [61]:
result = minimize(objective_function, x0, bounds=bounds,
args=(obs_date, pillar_dates, swaps))
print (result)

  message: CONVERGENCE: REL_REDUCTION_OF_F_<=_FACTR*EPSMCH
  success: True
   status: 0
      fun: 0.0006027305758561416
        x: [ 9.994e-01  9.983e-01 ...  4.428e-01  3.901e-01]
      nit: 11
      jac: [ 1.145e+01  1.766e+01 ... -1.789e+01  8.189e+00]
     nfev: 465
     njev: 15
 hess_inv: <30x30 LbfgsInvHessProduct with dtype=float64>


In [67]:
print ("Initial objective function value ", objective_function(x0, obs_date,
pillar_dates, swaps))
print ("Final objective function value ", objective_function(result.x, obs_date,
pillar_dates, swaps))

Initial objective function value  3792979138168.1074
Final objective function value  0.0006027305758561416


In [66]:
result.jac

array([ 11.44622938,  17.65937571,  20.00923964,  18.21342727,
        13.45806948,   7.82555167,   3.41746089,   1.51570217,
         2.21362486,   4.7723774 ,   8.08736046, -10.95634646,
        -9.03316437,  18.64626798,   8.80588666,   2.90305088,
        -0.83171869,  -3.19791507,  -2.65334357,  -2.26025763,
        -0.60603123,  -1.7133146 ,  -2.86188593, -33.81190336,
         7.05848015, -35.31583683, -25.07174475,   1.7874019 ,
       -17.88887386,   8.1892812 ])

In [78]:
dfs = result.x
curve = DiscountCurve(obs_date, pillar_dates, dfs)
d = obs_date + relativedelta(years=1)
print (f"40y df: {curve.df(d):.3f}")
print (f"40y rate: {curve.annualized_yield(d):.6f}")

40y df: 0.978
40y rate: 0.022617
