In [1]:
import os
from datetime import datetime
import numpy as np
import pandas as pd
from datetime import date as pdate
import matplotlib.pyplot as plt
from scipy import interpolate
from dateutil.relativedelta import relativedelta
import scipy.stats as stat
import scipy
import math
import statistics
from statistics import NormalDist

In [3]:

#####################################################
########### Helper function and class ###############
#####################################################

# Zero Rate Curve 
class ZeroRateCurve:
    def __init__(self, vdate, dates, rates):
        self.vdate = vdate    
        self.dates = dates
        self.rates = np.array(rates)
        self.times = np.array([(d-vdate).days/365 for d in dates])
        self.interp = interpolate.interp1d(self.times, self.rates, kind = 'linear', fill_value=(self.rates[0], self.rates[-1]), bounds_error=False)
        
    def getDF(self, d):
        t = (d-self.vdate).days/365
        return math.exp(-t*self.interp(t))

    def getShiftedCurves(self, shiftsize):
        curves =[]
        l = len(self.rates)
        for i in range(0, l):
            shiftedRates = self.rates.copy()
            shiftedRates[i] = shiftedRates[i] + shiftsize
            curves = curves + [ ZeroRateCurve(self.vdate, self.dates, shiftedRates.copy()) ]
        return curves
        

# Swap Pricing Functions
# This is a simplified implementation where we assume:
# (1) no business day adjustment, 
# (2) fixed leg freq = float leg freq
# (3) payment is right on the accrual end date
class SofrSwap:
    def __init__(self, notional, isPay, vdate, tenor, strike, freq):
        self.notional = notional    
        self.payIndicator = 1.0 if isPay else -1.0
        self.vdate = vdate    
        self.tenor = tenor
        self.freq = freq
        self.strike = strike
        self.dates = self._generateSwapDates()

    # d: trade date
    # t: tenor in number of years
    # f: no. of payment per years
    def _generateSwapDates(self):
        n = 12/self.freq
        m = self.freq * self.tenor
        dates = [ self.vdate + relativedelta(months=n*(i+1)) for i in range(0, m) ]
        return dates

    def getSchedules(self):
        s = []
        ds = self.vdate
        for d in self.dates:
            s = s + [ (d, (d-ds).days/365 ) ]
            ds = d
        return s

def price_swap(curve, swap):
    schedule = swap.getSchedules()
    fixedLegVal = 0.0
    fltLegVal = 0.0
    df_start = curve.getDF(swap.vdate)
    for (d, a) in schedule:
        df = curve.getDF(d)
        fwd = 1.0 / a * (df_start / df - 1.0)
        fixedLegVal = fixedLegVal + df * a * swap.strike 
        fltLegVal = fltLegVal + df * a * fwd 
        df_start = df
    return swap.notional * swap.payIndicator * (fltLegVal - fixedLegVal)

def price_swap_details(curve, swap):
    schedule = swap.getSchedules()
    fixedLegVal = 0.0
    fltLegVal = 0.0
    df_start = curve.getDF(swap.vdate)
    details = []
    for (d, a) in schedule:
        df = curve.getDF(d)
        fwd = 1.0 / a * (df_start / df - 1.0)
        details = details + [[df_start, df, a, fwd, swap.strike]]
        df_start = df
    return pd.DataFrame(details, columns=['df_start', 'df_end', 'accrual', 'forward_rate', 'strike'])

    
def pv01(curve, swap, shiftsize):    
    pv0 = price_swap(curve, swap)
    shiftedCurves = curve.getShiftedCurves(shiftsize)
    risks = []
    #print("pv0: " + str(pv0))
    for c in shiftedCurves:
        pv = price_swap(c, swap)
        risks = risks + [pv - pv0]
    return pd.DataFrame(risks, columns =['pv01'])

# full revaluation 1d pnl evaluation
def pnl_full(base_date, crv_dates, stock_notl, stock_returns, swap, base_swap_price, base_crv, rate_returns):
    base_rates = base_crv.rates
    scn_rates = [r+dr for r, dr in zip(base_rates, rate_returns)]
    scn_crv = ZeroRateCurve(base_date, crv_dates, scn_rates)
    scn_swap_price = price_swap(scn_crv, swap)
    swap_pnl = scn_swap_price - base_swap_price
    stock_pnl = stock_notl * np.sum(stock_returns)
    return (swap_pnl+stock_pnl)

# sensitivity based 1d pnl evaluation 
def pnl_sen(stock_notl, stock_returns, swap_pv01s, rate_returns):
    stock_pnl = stock_notl * np.sum(stock_returns)
    swap_pnl = np.inner(swap_pv01s, rate_returns)
    return (swap_pnl+stock_pnl)


In [5]:
#####################################################
############## Run VaR Calculations #################
#####################################################

file_path = "D:/OneDrive/main/workspace/smu-teaching/SMUTeaching/2025/group-assignment/"
offset = 693594

df_sofr = pd.read_csv(file_path + "hist_sofr_crvs.csv").set_index('Date')
df_sofr.index = pd.to_datetime(df_sofr.index).date
df_sofr = df_sofr.sort_index()
tenors = df_sofr.columns.tolist()
sofr_interps = []
date_sofr = [ d.toordinal()-693594  for d in df_sofr.index.tolist()]
for t in tenors:
  hist_rates = df_sofr[t].tolist()
  sofr_interps = sofr_interps + [ scipy.interpolate.interp1d(date_sofr, hist_rates)  ]

df_appl = pd.read_csv(file_path + "hist_appl.csv").set_index('Date')
df_appl.index = pd.to_datetime(df_appl.index).date
df_appl = df_appl.sort_index()

df_msft = pd.read_csv(file_path + "hist_msft.csv").set_index('Date')
df_msft.index = pd.to_datetime(df_msft.index).date
df_msft = df_msft.sort_index()

df_ford = pd.read_csv(file_path + "hist_ford.csv").set_index('Date')
df_ford.index = pd.to_datetime(df_ford.index).date
df_ford = df_ford.sort_index()

df_bac = pd.read_csv(file_path + "hist_bac.csv").set_index('Date')
df_bac.index = pd.to_datetime(df_bac.index).date
df_bac = df_bac.sort_index()

d1 = [ d.toordinal()-693594  for d in df_appl.index.tolist()]
p1 = df_appl['AdjClose'].tolist()
interp1 =  scipy.interpolate.interp1d(d1, p1)

d2 = [ d.toordinal()-693594  for d in df_msft.index.tolist()]
p2 = df_msft['AdjClose'].tolist()
interp2 =  scipy.interpolate.interp1d(d2, p2)

d3 = [ d.toordinal()-693594  for d in df_ford.index.tolist()]
p3 = df_ford['AdjClose'].tolist()
interp3 =  scipy.interpolate.interp1d(d3, p3)

d4 = [ d.toordinal()-693594  for d in df_bac.index.tolist()]
p4 = df_bac['AdjClose'].tolist()
interp4 =  scipy.interpolate.interp1d(d4, p4)

dlist = sorted(list(set(d1) | set(d2) | set(d3) | set(d4) | set(date_sofr)))

numchg = len(dlist)-1
appl = [ interp1(dlist[i+1]).flat[0] / interp1(dlist[i]).flat[0] - 1.0  for i in range(numchg) ]
msft = [ interp2(dlist[i+1]).flat[0] / interp2(dlist[i]).flat[0] - 1.0  for i in range(numchg) ]
ford = [ interp3(dlist[i+1]).flat[0] / interp3(dlist[i]).flat[0] - 1.0  for i in range(numchg) ]
bac = [ interp4(dlist[i+1]).flat[0] / interp4(dlist[i]).flat[0] - 1.0  for i in range(numchg) ]

sofr_chg = []
for interp in sofr_interps:
    chg = []
    for i in range(numchg):
        chg = chg + [ (interp(dlist[i+1]).flat[0] - interp(dlist[i]).flat[0]) ]
    sofr_chg = sofr_chg + [chg]

df_sofr_chg = pd.DataFrame(data =  np.transpose(np.array(sofr_chg)), index = dlist[1:len(dlist)], columns = tenors) 

stock_chg_dict = { 'appl' : appl, 'msft' : msft, 'ford' : ford, 'bac' : bac,}
df_stock_chg = pd.DataFrame(stock_chg_dict, index = dlist[1:len(dlist)],  columns=['appl', 'msft', 'ford', 'bac'])

df_all = pd.concat([df_stock_chg, df_sofr_chg], axis=1)
cols = df_all.columns.tolist()
corr_mat = df_all.corr()
cov_mat = df_all.cov()
factor_mean = df_all.mean().to_list()
factor_std = df_all.std().to_list()

# set up base curve, i.e. the zero rate curve using the zero rates as of 2023-10-30
baseDate = pdate(2023,10,30)
crv_tenor_d = [1]
crv_tenor_m = [1,2,3,6,9]
crv_tenor_y = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,25,30,35,40]
crv_dates = [baseDate + relativedelta(days=i) for i in crv_tenor_d]
crv_dates = crv_dates + [ baseDate + relativedelta(months=i) for i in crv_tenor_m]
crv_dates = crv_dates + [ baseDate + relativedelta(years=i) for i in crv_tenor_y]
base_rates = df_sofr.loc[baseDate]
baseCurve = ZeroRateCurve(baseDate, crv_dates, base_rates)

swap = SofrSwap(100000000, True, baseDate, 10, 0.042, 1)
stock_notl = 1000000

#base_price = price_swap(baseCurve, swap)
#print(base_price)
#print(price_swap_details(baseCurve, swap))
#print(swap_risks)


# parametric VaR
rate_shift = 0.0001
swap_risks = pv01(baseCurve, swap, rate_shift)
w_stock = [stock_notl, stock_notl, stock_notl, stock_notl ]
w_sofr = [ risk/rate_shift   for risk in swap_risks['pv01'].tolist()]
w_parametric = np.array( w_stock + w_sofr)

port_m =  np.inner(w_parametric, factor_mean)
port_v = np.inner(np.dot(w_parametric, cov_mat), w_parametric)
param_var = np.abs(stat.norm.ppf(0.05, loc=port_m, scale=np.sqrt(port_v)))
print("")
print("===================================================== PARAMETRIC VAR =======================================================")
print(f"mean: {port_m:,.0f}")
print(f"variance: {port_v:,.0f}")
print(f"Parametric VaR [1d, 95%]: {param_var:,.0f}")
print("============================================================================================================================")

# Monte Carlo VaR
n_mc = 100000
dim = corr_mat.shape[0]
factor_loadings = np.linalg.cholesky(corr_mat)
np.random.seed(1000000)
uniforms = np.random.uniform(size=(n_mc, dim))
snorms = [ [NormalDist().inv_cdf(u) for u in r]  for r in uniforms]

snorms_correlated = np.dot(snorms, factor_loadings.transpose())
return1d_sample = [ np.add(factor_mean, np.multiply(factor_std, z).tolist()).tolist()  for z in snorms_correlated]

mc_pnl_full=[]
mc_pnl_sen =[]
base_swap_price = price_swap(baseCurve, swap)

for i in range(0,n_mc):
    mc_pnl_full = mc_pnl_full + [pnl_full(baseDate, crv_dates, stock_notl, return1d_sample[i][0:4], swap, base_swap_price, baseCurve, return1d_sample[i][4:])]
    mc_pnl_sen = mc_pnl_sen + [pnl_sen(stock_notl, return1d_sample[i][0:4], w_sofr, return1d_sample[i][4:])]

mc_var_full = np.abs(np.percentile(mc_pnl_full, 5))
mc_var_full_std = np.std(mc_pnl_full)
mc_var_sen = np.abs(np.percentile(mc_pnl_sen, 5))
mc_var_sen_std = np.std(mc_pnl_sen)


print("")
print("===================================================== Monte Carlo VAR ======================================================")
print(f"Monte-Carlo-Full-Revaluation VaR [1d, 95%]: {mc_var_full:,.0f}")
print(f"Monte-Carlo-Sensitivity-Based VaR [1d, 95%]: {mc_var_sen:,.0f})")
print("============================================================================================================================")


# Historical VaR
hist_returns = df_all.to_numpy().tolist()
hist_pnl_full=[]
hist_pnl_sen =[]
for i in range(0, len(hist_returns)):
    hist_pnl_full = hist_pnl_full + [pnl_full(baseDate, crv_dates, stock_notl, hist_returns[i][0:4], swap, base_swap_price, baseCurve, hist_returns[i][4:])]
    hist_pnl_sen = hist_pnl_sen + [pnl_sen(stock_notl, hist_returns[i][0:4], w_sofr, hist_returns[i][4:])]

hist_var_full = np.abs(np.percentile(hist_pnl_full, 5))
hist_var_full_std = np.std(hist_pnl_full)
hist_var_sen = np.abs(np.percentile(hist_pnl_sen, 5))
hist_var_sen_std = np.std(hist_pnl_sen)

print("")
print("===================================================== Historical VAR =======================================================")
print(f"Historical-Full-Revaluation VaR [1d, 95%]: {hist_var_full:,.0f}")
print(f"Historical-Sensitivity-Based VaR [1d, 95%]: {hist_var_sen:,.0f}")
print("============================================================================================================================")




mean: 21,305
variance: 332,910,331,242
Parametric VaR [1d, 95%]: 927,749

Monte-Carlo-Full-Revaluation VaR [1d, 95%]: 935,547
Monte-Carlo-Sensitivity-Based VaR [1d, 95%]: 930,265)

Historical-Full-Revaluation VaR [1d, 95%]: 985,468
Historical-Sensitivity-Based VaR [1d, 95%]: 979,371


In [210]:
a=[1,2,3,4,5]

