### Import Librarieres für RiVaPy Test

In [1]:
import rivapy
import pandas as pd
import numpy as np

In [2]:
np.set_printoptions(formatter={'float': '{: 0.5f}'.format})

### Benötigte Testdaten

In [3]:
# User inputs
TransMat = np.matrix("""
90.81, 8.33, 0.68, 0.06, 0.08, 0.02, 0.01, 0.01;
0.70, 90.65, 7.79, 0.64, 0.06, 0.13, 0.02, 0.01;
0.09, 2.27, 91.05, 5.52, 0.74, 0.26, 0.01, 0.06;
0.02, 0.33, 5.95, 85.93, 5.30, 1.17, 1.12, 0.18;
0.03, 0.14, 0.67, 7.73, 80.53, 8.84, 1.00, 1.06;
0.01, 0.11, 0.24, 0.43, 6.48, 83.46, 4.07, 5.20;
0.21, 0, 0.22, 1.30, 2.38, 11.24, 64.86, 19.79""")/100

In [4]:
positions = pd.read_excel("C:/Users/Anwender/Desktop/Datenmodell_Krediportfoliomodell.xlsx", "Positions")
issuer = pd.read_excel("C:/Users/Anwender/Desktop/Datenmodell_Krediportfoliomodell.xlsx", "Issuer")

In [5]:
n_issuer = issuer["IssuerID"].nunique()
Nsim = 5000 # num sim for CVaR
r = 0 # risk free rate
t= 1

### Nutzung Credit Metrics Modell innerhalb RiVaPy

In [6]:
test = rivapy.credit.creditMetricsModel(0.2, 10000, TransMat, positions, issuer, r, t, 5)

In [7]:
test.get_portfolio_VaR()

127980.0

### Erste Überlegungen Korrelationen

In [None]:
from __future__ import division
import pandas as pd
import numpy as np
from scipy.stats import norm
import sys
import math
from scipy.linalg import sqrtm
from random import seed
from random import random
import plotly.express as px
from pandas_datareader import data as pdr
from datetime import date
import yfinance as yf
yf.pdr_override()
import rivapy

from numpy.linalg import cholesky

In [None]:
pdr.get_data_yahoo('LHA','2007-04-02','2022-03-14')[['Close']]

In [None]:
marketDataDAX = pd.read_excel("C:/Users/Anwender/Desktop/^GDAXI.xlsx", "DAX").rename(columns={"Close" : "Close_Dax"})
marketDataBASF = pd.read_excel("C:/Users/Anwender/Desktop/^GDAXI.xlsx", "BASF").rename(columns={"Close" : "Close_BASF"})
marketDataLHA = pd.read_excel("C:/Users/Anwender/Desktop/^GDAXI.xlsx", "Lufthansa").rename(columns={"Close" : "Close_LHA"})
marketDataVW = pd.read_excel("C:/Users/Anwender/Desktop/^GDAXI.xlsx", "Volkswagen").rename(columns={"Close" : "Close_VW"})

In [None]:
marketDataDAX = marketDataDAX[marketDataDAX["Date"]>='2007-04-02']

In [None]:
mergedData = marketDataDAX[['Date', 'Close_Dax']].merge(marketDataBASF[['Date', 'Close_BASF']], on='Date', how='left').merge(marketDataLHA[['Date', 'Close_LHA']], on='Date', how='left').merge(marketDataVW[['Date', 'Close_VW']], on='Date', how='left')

In [None]:
mergedData = mergedData.drop(['Date'], axis=1)
returns = mergedData.pct_change()
returns

In [None]:
correlation_mat = returns.corr()
correlation_mat

In [None]:
corr_pairs = correlation_mat.unstack()

print(corr_pairs['Close_Dax','Close_BASF'])

### Funktionen außerhalb RiVaPy

In [11]:
rating_map = pd.DataFrame({'Rating': ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "D"], 'RatingID': [0, 1, 2, 3, 4, 5, 6, 7]})
issuer_adj = issuer.merge(rating_map, on = "Rating", how = "left")
positions_adj = positions.merge(issuer_adj[["IssuerID","Rating","RatingID"]], on = "IssuerID", how = "left")

In [5]:
def mergePositionsIssuer(position_data, issuer_data):
    rating_map = pd.DataFrame({'Rating': ["AAA", "AA", "A", "BBB", "BB", "B", "CCC", "D"], 'RatingID': [0, 1, 2, 3, 4, 5, 6, 7]})
    issuer_adj = issuer_data.merge(rating_map, on = "Rating", how = "left")
    positions_adj = positions.merge(issuer_adj[["IssuerID","Rating","RatingID"]], on = "IssuerID", how = "left")

    return positions_adj





In [None]:
def get_correlation_matrix (rho, n):
    sigma = rho*np.ones((n,n))
    sigma = sigma -np.diag(np.diag(sigma)) + np.eye(n)
    return sigma

def get_cutoffs_rating(transition_matrix):
    Z=np.cumsum(np.flipud(transition_matrix.T),0)
    Z[Z>=(1-1/1e12)] = 1-1/1e12;
    Z[Z<=(0+1/1e12)] = 0+1/1e12;

    CutOffs=norm.ppf(Z,0,1) # compute cut offes by inverting normal distribution
    return(CutOffs)

def get_cholesky_decomposition(rho, n):
    # simulate jointly normals with sigma as vcov matrix
    # use cholesky decomposition

    sigma = get_correlation_matrix(rho, n)
    c = cholesky(sigma)

    return(c)

def get_cut_ratings(transition_matrix, index_rating):
    
    # idx = position_data["RatingID"]
    cutOffs = get_cutoffs_rating(transition_matrix)
    # cut off matrix for each bond based on their ratings
    cut = np.matrix(cutOffs[:,index_rating]).T

    return(cut)

def get_credit_spreads(transition_matrix, LGD):
    # credit spread implied by transmat
    PD_t = transition_matrix[:,-1] # default probability at t
    credit_spread = -np.log(1-LGD*PD_t)/1
    
    return(credit_spread)

def get_expected_value (r, position_data,  transition_matrix, t):
    exposure = np.matrix(position_data["Exposure"]).T
    # print(exposure)
    idx = position_data["RatingID"]
    # print(idx)
    LGD = 0.45
    credit_spread = get_credit_spreads(transition_matrix, LGD)
    # print(credit_spread)
    EV = np.multiply(exposure, np.exp(-(r+credit_spread[idx])*t))

    return(EV)

def get_states (transition_matrix, position_data, r, t):
    # bond state variable for security Value
    LGD = 0.45
    recover = 0.55
    credit_spread = get_credit_spreads(transition_matrix, LGD)
    cp = np.tile(credit_spread.T,[position_data["InstrumentID"].nunique(),1])
    # print(cp)
    exposure = np.matrix(position_data["Exposure"]).T
    # print(exposure)
    state = np.multiply(exposure,np.exp(-(r+cp)*t))
    # print(state)
    state = np.append(state,np.multiply(exposure,recover),axis=1) #last column is default case
    # print(state)
    states = np.fliplr(state) # keep in same order as credit cutoff
    # print(states)

    return(states)

def mc_calculation(rho, n_issuer, n_simulation, transition_matrix, position_data, r, t):
    # c = get_cholesky_distribution(rho, n_issuer)
    # cut = get_cut_ratings(transition_matrix, position_data)
    cutOffs = get_cutoffs_rating(transition_matrix)
    states = get_states (transition_matrix, position_data, r, t)
    EV = get_expected_value (r, position_data,  transition_matrix, t)
    n_positions = position_data["InstrumentID"].nunique()
    Loss = np.zeros((n_simulation,n_positions))
    # np.random.seed(1)

    for i in range(0,n_simulation):
        YY = norm.ppf(np.random.rand())
        # rr=c*YY.T
        rr = YY*rho
        for j in range (0,n_positions):
            YY_ido = norm.ppf(np.random.rand())
            #corr_idio=np.sqrt((1-(c*c)))
            rr_idio=np.sqrt(1-(rho**2))*YY_ido
            # print(rr_idio)
            rr_all=rr+rr_idio
            # print(rr_all)
            rating = np.array(rr_all<np.matrix(cutOffs[:,position_data.loc[j,"RatingID"]]).T)
            rate_idx = len(rating) - np.sum(rating,0)
            # print(rate_idx)
            col_idx = rate_idx
            V_t = states[j,col_idx] # retrieve the corresponding state value of the exposure
            Loss_t = V_t-EV.item(j)
            # print(Loss_t)
            Loss[i,j] = Loss_t
            # print(Loss)

    # Portfolio_MC_Loss = np.sum(Loss,1)
    return(Loss)

def get_Loss_distribution (rho, n_issuer, n_simulation, transition_matrix, position_data, r, t):
    Loss = mc_calculation(rho, n_issuer, n_simulation, transition_matrix, position_data, r, t)
    Portfolio_MC_Loss = np.sum(Loss,1)

    return(Portfolio_MC_Loss)

def get_portfolio_VaR(rho, n_issuer, n_simulation, transition_matrix, position_data, r, t, confidencelevel):
    loss_Distribution = get_Loss_distribution(rho, n_issuer, n_simulation, transition_matrix, position_data, r, t)
    Port_Var = -1*np.percentile(loss_Distribution,confidencelevel)

    return(Port_Var)

def get_portfolio_ES(rho, n_issuer, n_simulation, transition_matrix, position_data, r, t, confidencelevel):
    loss_Distribution = get_Loss_distribution(rho, n_issuer, n_simulation, transition_matrix, position_data, r, t)
    portVar = get_portfolio_VaR(rho, n_issuer, n_simulation, transition_matrix, position_data, r, t, confidencelevel)

    expectedShortfall = -1*np.mean(loss_Distribution[loss_Distribution<-1*portVar])

    return(expectedShortfall)

In [None]:
var = get_portfolio_VaR(0.2, n_issuer, 5000, TransMat, positions_adj, r, t, 1)
var

In [None]:
tmp = get_Loss_distribution(0.2, n_issuer, 5000, TransMat, positions_adj, r, t)


In [None]:
tmp.sort()

In [None]:
tmp

In [None]:
es = get_portfolio_ES(0.2, n_issuer, 2000, TransMat, positions_adj, r, t, 1)
es

In [None]:
lossDistribution = mc_calculation(0.2, n_issuer, 20000, TransMat, positions_adj, r, t)
test_df = pd.DataFrame(lossDistribution)
Port_Var = -1*np.percentile(lossDistribution,1)

In [None]:
test_df = test_df.rename(columns={0: "Value"})
test_df.groupby(["Value"]).size().reset_index(name='Count').sort_values("Count")

In [None]:
px.histogram(test_df)

In [None]:
type(TransMat)