In [532]:
import pandas as pd
import numpy as np
from scipy.stats import norm
import matplotlib.pyplot as plt
import sys
from scipy.optimize import brentq, least_squares
from scipy.stats import norm
from scipy.interpolate import RegularGridInterpolator
import scipy.integrate as integrate
from scipy import interpolate
import warnings

warnings.filterwarnings("ignore", category=DeprecationWarning)
sys.path.append("..")

# handy it seems
# https://docs.sympy.org/latest/modules/solvers/solvers.html
from sympy.solvers import solve
from sympy import Symbol
from analytical_option_formulae.option_types.vanilla_option import VanillaOption


# read data
df_sabr_alpha = pd.read_csv("../swaption_calibration/sabr_alpha.csv")
df_sabr_beta = pd.read_csv("../swaption_calibration/sabr_beta.csv")
df_sabr_nu = pd.read_csv("../swaption_calibration/sabr_nu.csv")
df_sabr_rho = pd.read_csv("../swaption_calibration/sabr_rho.csv")
df_1c = pd.read_csv("../bootstrap_swap_curve/df_1c.csv")
df_combined = pd.read_csv("../bootstrap_swap_curve/df_combined.csv")

In [533]:
# adjust sabr param df
def adjust_sabr_params(my_sabr_df):
    my_sabr_df.set_index("expiry", inplace=True)
    my_sabr_df.columns.name = "tenor"
    my_sabr_df.columns = my_sabr_df.columns.astype(float)

In [534]:
adjust_sabr_params(df_sabr_alpha)
adjust_sabr_params(df_sabr_beta)
adjust_sabr_params(df_sabr_nu)
adjust_sabr_params(df_sabr_rho)

In [535]:
print(df_sabr_alpha.index)
print(df_sabr_alpha.columns)
print(df_sabr_alpha.values.T)

Float64Index([1.0, 5.0, 10.0], dtype='float64', name='expiry')
Float64Index([1.0, 2.0, 3.0, 5.0, 10.0], dtype='float64', name='tenor')
[[0.13907382 0.16661823 0.17824663]
 [0.1846499  0.1995335  0.19618447]
 [0.19685208 0.21030546 0.2077691 ]
 [0.17804234 0.19097227 0.19916746]
 [0.17114286 0.17705111 0.17634725]]


In [536]:
# setup sabr interpolators
sabr_alpha_interp = interpolate.interp2d(
    df_sabr_alpha.index, df_sabr_alpha.columns, df_sabr_alpha.values.T, kind="linear"
)
sabr_beta_interp = interpolate.interp2d(
    df_sabr_beta.index, df_sabr_beta.columns, df_sabr_beta.values.T, kind="linear"
)
sabr_rho_interp = interpolate.interp2d(
    df_sabr_rho.index, df_sabr_rho.columns, df_sabr_rho.values.T, kind="linear"
)
sabr_nu_interp = interpolate.interp2d(
    df_sabr_nu.index, df_sabr_nu.columns, df_sabr_nu.values.T, kind="linear"
)

In [537]:
# setup sabr interpolators
sabr_alpha_interp = interpolate.interp2d(
    df_sabr_alpha.index, df_sabr_alpha.columns, df_sabr_alpha.values.T, kind="linear"
)
sabr_beta_interp = interpolate.interp2d(
    df_sabr_beta.index, df_sabr_beta.columns, df_sabr_beta.values.T, kind="linear"
)
sabr_rho_interp = interpolate.interp2d(
    df_sabr_rho.index, df_sabr_rho.columns, df_sabr_rho.values.T, kind="linear"
)
sabr_nu_interp = interpolate.interp2d(
    df_sabr_nu.index, df_sabr_nu.columns, df_sabr_nu.values.T, kind="linear"
)

In [538]:
# expand df_combined
combined_tenors = np.arange(0, 30.25, 0.25)
df_combined_new = pd.DataFrame({"tenor": combined_tenors})
df_combined_new = pd.merge(df_combined_new, df_combined, how="left", on="tenor")
df_combined_new.loc[0, "ois_df"] = 1
df_combined_new.loc[0, "irs_df"] = 1
df_combined_new.loc[0, "fw_libor"] = 0
df_combined_new.loc[0, "irs_rate"] = 0
df_combined_new.interpolate(inplace=True)

In [539]:
df_combined_new

Unnamed: 0,tenor,ois_df,irs_rate,fw_libor,irs_df
0,0.00,1.000000,0.000000,0.000000,1.000000
1,0.25,0.999376,0.012500,0.012500,0.993827
2,0.50,0.998752,0.025000,0.025000,0.987654
3,0.75,0.997880,0.026500,0.028003,0.980116
4,1.00,0.997009,0.028000,0.031005,0.972577
...,...,...,...,...,...
116,29.00,0.852674,0.049500,0.076225,0.242118
117,29.25,0.851357,0.049625,0.077735,0.237504
118,29.50,0.850040,0.049750,0.079246,0.232890
119,29.75,0.848723,0.049875,0.080880,0.228276


Using the SABR model calibrated in the previous question, value the
following constant maturity swap (CMS) products:

* PV of a leg receiving CMS10y semi-annually over the next 5 years
* PV of a leg receiving CMS2y quarterly over the next 10 years

Setup with Prof Tee's code

In [540]:
class AbstractBlack76Model:
    """
    A base class used to model Black-Scholes option model
    ...
    Parameters
    ----------
    F : float
        The forward price of the underlying asset
    K : float
        The strike price of the options
    discount_factor : float
        The "numeraire" discount factor of the model (i.e. PVBP, compounded discount factor)
    sigma : float
        Volatility
    T : float
        Maturity period (years)
    """

    def __init__(
        self,
        F: float,
        K: float,
        discount_factor: float,
        sigma: float,
        T: float,
    ):
        self.F = F
        self.K = K
        self.sigma = sigma
        self.T = T

        self.d1 = self._calculate_d1()
        self.d2 = self._calculate_d2()
        self.discount_factor = discount_factor

    def _calculate_d1(self) -> float:
        return (np.log(self.F / self.K) + self.sigma**2 / 2 * self.T) / (
            self.sigma * np.sqrt(self.T)
        )

    def _calculate_d2(self) -> float:
        return self.d1 - self.sigma * np.sqrt(self.T)


class VanillaBlack76Model(AbstractBlack76Model):
    def calculate_call_price(self) -> float:
        return self.discount_factor * (
            self.F * norm.cdf(self.d1) - self.K * norm.cdf(self.d2)
        )

    def calculate_put_price(self) -> float:
        return self.discount_factor * (
            -self.F * norm.cdf(-self.d1) + self.K * norm.cdf(-self.d2)
        )


class AbstractDisplacedDiffusionModel:
    """
    Displaced diffusion is extension of Black76 with an additional parameter beta
    ...
    Parameters
    ----------
    F : float
        The forward price of the underlying asset
    K : float
        The strike price of the options
    discount_factor : float
        The "numeraire" discount factor of the model (i.e. PVBP, compounded discount factor)
    sigma : float
        Volatility
    T : float
        Maturity period (years)
    beta : float
        Displaced diffusion model parameter (0,1], but lecture notes say [0,1]
        https://ink.library.smu.edu.sg/cgi/viewcontent.cgi?article=6976&context=lkcsb_research
    """

    def __init__(
        self,
        F: float,
        K: float,
        discount_factor: float,
        sigma: float,
        T: float,
        beta: float,
    ):
        self.F = F
        self.K = K
        self.sigma = sigma
        self.T = T
        self.beta = beta

        self.adjusted_F = self.F / self.beta
        self.adjusted_K = self.K + ((1 - self.beta) / self.beta) * self.F
        self.adjusted_sigma = self.sigma * self.beta
        self.discount_factor = discount_factor

        self.d1 = self._calculate_d1()
        self.d2 = self._calculate_d2()

    def _calculate_d1(self) -> float:
        return (
            np.log(self.adjusted_F / self.adjusted_K)
            + 0.5 * self.adjusted_sigma**2 * self.T
        ) / (self.adjusted_sigma * np.sqrt(self.T))

    def _calculate_d2(self) -> float:
        return self.d1 - self.adjusted_sigma * np.sqrt(self.T)


class VanillaDisplacedDiffusionModel(AbstractDisplacedDiffusionModel):
    def calculate_call_price(self) -> float:
        return self.discount_factor * (
            self.adjusted_F * norm.cdf(self.d1) - self.adjusted_K * norm.cdf(self.d2)
        )

    def calculate_put_price(self) -> float:
        return self.discount_factor * (
            self.adjusted_K * norm.cdf(-self.d2) - self.adjusted_F * norm.cdf(-self.d1)
        )


class VanillaOption:
    def black_model(
        self, F: float, K: float, discount_factor: float, sigma: float, T: float
    ) -> VanillaBlack76Model:
        return VanillaBlack76Model(F, K, discount_factor, sigma, T)

    def displaced_diffusion_model(
        self,
        F: float,
        K: float,
        discount_factor: float,
        sigma: float,
        T: float,
        beta: float,
    ) -> AbstractDisplacedDiffusionModel:
        return VanillaDisplacedDiffusionModel(F, K, discount_factor, sigma, T, beta)

In [541]:
# SABR
def SABR_model(F, K, T, alpha, beta, rho, nu):
    # is this what harry taught us; prevent K explosion just use 2 * of the s_nN(0)
    K = np.min([K, F * 2])
    K = np.min([K, 0.065])
    X = K
    # if K is at-the-money-forward
    if abs(F - K) < 1e-12:
        numer1 = (((1 - beta) ** 2) / 24) * alpha * alpha / (F ** (2 - 2 * beta))
        numer2 = 0.25 * rho * beta * nu * alpha / (F ** (1 - beta))
        numer3 = ((2 - 3 * rho * rho) / 24) * nu * nu
        VolAtm = alpha * (1 + (numer1 + numer2 + numer3) * T) / (F ** (1 - beta))
        sabrsigma = VolAtm
    else:
        z = (nu / alpha) * ((F * X) ** (0.5 * (1 - beta))) * np.log(F / X)
        zhi = np.log((((1 - 2 * rho * z + z * z) ** 0.5) + z - rho) / (1 - rho))
        numer1 = (((1 - beta) ** 2) / 24) * ((alpha * alpha) / ((F * X) ** (1 - beta)))
        numer2 = 0.25 * rho * beta * nu * alpha / ((F * X) ** ((1 - beta) / 2))
        numer3 = ((2 - 3 * rho * rho) / 24) * nu * nu
        numer = alpha * (1 + (numer1 + numer2 + numer3) * T) * z
        denom1 = ((1 - beta) ** 2 / 24) * (np.log(F / X)) ** 2
        denom2 = (((1 - beta) ** 4) / 1920) * ((np.log(F / X)) ** 4)
        denom = ((F * X) ** ((1 - beta) / 2)) * (1 + denom1 + denom2) * zhi
        sabrsigma = numer / denom

    return sabrsigma


def calculate_SABR_vol_err(x, strikes, vols, F, T, beta):
    err = 0.0
    for i, vol in enumerate(vols):
        err += (vol - SABR_model(F, strikes[i], T, x[0], beta, x[1], x[2])) ** 2
    return err

In [542]:
# setup vanilla option
vanilla_option = VanillaOption()

In [543]:
def IRR_0(K, m, N):
    # implementation of IRR(K) function
    value = 1 / K * (1.0 - 1 / (1 + K / m) ** (N * m))
    return value


def IRR_1(K, m, N):
    # implementation of IRR'(K) function (1st derivative)
    firstDerivative = -1 / K * IRR_0(K, m, N) + 1 / (K * m) * N * m / (1 + K / m) ** (
        N * m + 1
    )
    return firstDerivative


def IRR_2(K, m, N):
    # implementation of IRR''(K) function (2nd derivative)
    secondDerivative = -2 / K * IRR_1(K, m, N) - 1 / (K * m * m) * (N * m) * (
        N * m + 1
    ) / (1 + K / m) ** (N * m + 2)
    return secondDerivative

In [544]:
def g_0(K):
    return K


def g_1(K):
    return 1.0


def g_2(K):
    return 0.0

In [545]:
def h_0(K, m, N):
    # implementation of h(K)
    value = g_0(K) / IRR_0(K, m, N)
    return value


def h_1(K, m, N):
    # implementation of h'(K) (1st derivative)
    firstDerivative = (IRR_0(K, m, N) * g_1(K) - g_0(K) * IRR_1(K, m, N)) / IRR_0(
        K, m, N
    ) ** 2
    return firstDerivative


def h_2(K, m, N):
    # implementation of h''(K) (2nd derivative)
    secondDerivative = (
        IRR_0(K, m, N) * g_2(K)
        - IRR_2(K, m, N) * g_0(K)
        - 2.0 * IRR_1(K, m, N) * g_1(K)
    ) / IRR_0(K, m, N) ** 2 + 2.0 * IRR_1(K, m, N) ** 2 * g_0(K) / IRR_0(K, m, N) ** 3
    return secondDerivative

In [546]:
def v_payer(discount_factor, F, K, sigma, expiry, payment_freq, tenor):
    b76_model = vanilla_option.black_model(
        F, K, IRR_0(F, payment_freq, tenor), sigma, expiry
    )  # IRR NUMERAIRE, just put it outside so its clearer

    return discount_factor * b76_model.calculate_call_price()



def v_receiver(discount_factor, F, K, sigma, expiry, payment_freq, tenor):
    b76_model = vanilla_option.black_model(
        F, K, IRR_0(F, payment_freq, tenor), sigma, expiry
    )  # IRR NUMERAIRE

    return discount_factor * b76_model.calculate_put_price()

In [547]:
def integ_receiver_func(K, discount_factor, F, sabr_sigma, expiry, payment_freq, tenor):
    return h_2(K, payment_freq, tenor) * v_receiver(
        discount_factor, F, K, sabr_sigma, expiry, payment_freq, tenor
    )


def integ_payer_func(K, discount_factor, F, sabr_sigma, expiry, payment_freq, tenor):
    return h_2(K, payment_freq, tenor) * v_payer(
        discount_factor, F, K, sabr_sigma, expiry, payment_freq, tenor
    )

In [548]:
def get_pvbp_pvfloat_krate(my_dataframe, payment_freq):
    divisor = 1 / payment_freq
    for x, row in my_dataframe.iterrows():
        expiry = my_dataframe.loc[x, "expiry"]
        tenor = my_dataframe.loc[x, "tenor"]
        time_sum = expiry + tenor
        tenor_range = np.arange(expiry, time_sum + divisor, divisor)
        rows_to_work = df_combined_new[df_combined_new["tenor"].isin(tenor_range)]
        rows_to_work = rows_to_work.drop(rows_to_work.index[[0]])
        my_dataframe.loc[x, "pv_fix"] = divisor * rows_to_work["ois_df"].sum()
        my_dataframe.loc[x, "pv_float"] = (
            divisor * (rows_to_work["ois_df"] * rows_to_work["fw_libor"]).sum()
        )
        my_dataframe.loc[x, "s_nn0"] = (
            my_dataframe.loc[x, "pv_float"] / my_dataframe.loc[x, "pv_fix"]
        )

In [549]:
def compute_cms_rate(payment_duration, payment_freq, tenor, df_combined_new):
    # 1. we construct pvbp, pvfloat, snN0 dataset first
    start_time = 1 / payment_freq
    expiry = np.arange(start_time, payment_duration + start_time, start_time)
    temp_df = pd.DataFrame()
    temp_df["expiry"] = expiry
    temp_df["tenor"] = tenor
    temp_df["expiry"] = temp_df["expiry"].astype(float)
    temp_df["tenor"] = temp_df["tenor"].astype(float)
    get_pvbp_pvfloat_krate(temp_df, payment_freq)

    for x, rows in temp_df.iterrows():
        tenor = temp_df.loc[x, "tenor"]
        expiry = temp_df.loc[x, "expiry"]
        alpha = sabr_alpha_interp(expiry, tenor)[0]
        beta = sabr_beta_interp(expiry, tenor)[0]
        nu = sabr_nu_interp(expiry, tenor)[0]
        rho = sabr_rho_interp(expiry, tenor)[0]
        temp_df.loc[x, "alpha"] = alpha
        temp_df.loc[x, "beta"] = beta
        temp_df.loc[x, "nu"] = nu
        temp_df.loc[x, "rho"] = rho
        disc_factor = df_combined_new[df_combined_new["tenor"] == expiry][
            "ois_df"
        ].values[0]
        # print(
        #    f"for {expiry} x {tenor} = {alpha} {beta} {rho} {nu} with disc factor {disc_factor}"
        # )
        # g(F)
        F = temp_df.loc[x, "s_nn0"]
        first_term = g_0(F)

        # receiver integ
        second_term = integrate.quad(
            lambda x: integ_receiver_func(
                x,
                disc_factor,
                first_term,
                SABR_model(F, x, expiry, alpha, beta, rho, nu),
                expiry,
                payment_freq,
                tenor,
            ),
            0,
            F,
        )
        second_term = second_term[0] / disc_factor

        # third integ
        third_term = integrate.quad(
            lambda x: integ_payer_func(
                x,
                disc_factor,
                first_term,
                SABR_model(F, x, expiry, alpha, beta, rho, nu),
                expiry,
                payment_freq,
                tenor,
            ),
            F,
            float("inf"),
        )
        third_term = third_term[0] / disc_factor

        sum = first_term + second_term + third_term
        # print(f"the rate {sum}")
        temp_df.loc[x, "s_nnT"] = sum

    return temp_df

In [550]:
def compute_pv_cms(payment_duration, payment_freq, tenor, df_combined_new):
    df_to_work = compute_cms_rate(
        payment_duration, payment_freq, tenor, df_combined_new
    )
    # print(df_to_work)
    pv_list = []
    for x, rows in df_to_work.iterrows():
        pay_term = df_to_work.loc[x, "expiry"]
        disc_factor = df_combined_new[df_combined_new["tenor"] == pay_term][
            "ois_df"
        ].values[0]
        cash_at_term = disc_factor * (1 / payment_freq) * df_to_work.loc[x, "s_nnT"]
        pv_list.append(cash_at_term)
    return np.sum(pv_list)

In [551]:
# CMS10y semi-annually over the next 5 years
payment_duration = 5
payment_freq = 2
tenor = 10
pv = compute_pv_cms(payment_duration, payment_freq, tenor, df_combined_new)
print(
    f"pv for {payment_duration}y payment duration paying {tenor}y CMS with {payment_freq} payments/year is {pv}"
)

pv for 5y payment duration paying 10y CMS with 2 payments/year is 0.21002519891755195


In [552]:
# PV of a leg receiving CMS2y quarterly over the next 10 years
payment_duration = 10
payment_freq = 4
tenor = 2
pv = compute_pv_cms(payment_duration, payment_freq, tenor, df_combined_new)
print(
    f"pv for {payment_duration}y payment duration paying {tenor}y CMS with {payment_freq} payments/year is {pv}"
)

pv for 10y payment duration paying 2y CMS with 4 payments/year is 0.38929641580760627


## Compare with forward swap rate
Assume semi-annual payment as well

In [553]:
cms_start_time = [1, 5, 10]
cms_duration = [1, 2, 3, 5, 10]
forward_swaps = [[x, y] for x in cms_start_time for y in cms_duration]

In [554]:
df_cms_rate = pd.DataFrame()

In [555]:
pay_term_list = []
tenor_list = []
cms_rate_list = []
for k in forward_swaps:
    pay_term = k[0]
    tenor = k[1]
    temp_df = compute_cms_rate(pay_term, 2, tenor, df_combined_new)
    cms_rate = temp_df.iloc[-1]["s_nnT"]
    pay_term_list.append(pay_term)
    tenor_list.append(tenor)
    cms_rate_list.append(cms_rate)

In [556]:
df_cms_rate = pd.DataFrame(
    list(zip(pay_term_list, tenor_list, cms_rate_list)),
    columns=["payment_term", "cms_term", "cms_rate"],
)
df_cms_rate["payment_term"] = df_cms_rate["payment_term"].astype(float)
df_cms_rate["cms_term"] = df_cms_rate["cms_term"].astype(float)
df_cms_rate["cms_rate"] = df_cms_rate["cms_rate"].astype(float)
df_cms_rate["fw_swap_rate"] = df_1c[
    (df_1c["maturity"] == df_cms_rate["payment_term"])
    & (df_1c["duration"] == df_cms_rate["cms_term"])
]["k_rate"]

In [None]:
print("CMS term table")
df_cms_rate

CMS term table


In [558]:
df_1c

Unnamed: 0,maturity,duration,pv_fix_nok,pv_float,k_rate
0,1.0,1.0,0.9944,0.031828,0.032007
1,1.0,2.0,1.985294,0.066029,0.033259
2,1.0,3.0,2.972386,0.101093,0.034011
3,1.0,5.0,4.93407,0.173953,0.035255
4,1.0,10.0,9.747887,0.374591,0.038428
5,5.0,1.0,0.978517,0.03843,0.039274
6,5.0,2.0,1.952145,0.078232,0.040075
7,5.0,3.0,2.920444,0.117029,0.040072
8,5.0,5.0,4.840612,0.198916,0.041093
9,5.0,10.0,9.542492,0.416374,0.043634
