In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from numpy import linalg as LA
from scipy.stats import t
from scipy.stats import norm
import yfinance as yf
import seaborn as sns
import datetime as dt
from pandas_datareader import data as pdr

<font size="5">**Covariance estimation techniques**

In [None]:
def ewm (x, exp_w, lamda):
    w = []
    sum_w = 0
    n = x.shape[1]
    for i in range(1, len(x.index)+1):
        w.append((1-lamda)*lamda**(i-1))
        sum_w = sum_w + w[i-1]
    for i in range(len(x.index)):   
        exp_w.append(w[i] / sum_w)
    
    
    cov_matrix = np.zeros([n,n])
    col_mean = []
    for i in range(n):
        col_mean[i] = np.mean(x.iloc[:, i])
        
    for i in range (len(x.index)):
        for j in range (n):
            x.iloc[i,j] = x.iloc[i,j] - col_mean[j]
    
            
    for i in range (n):
        for j in range (n):
            temp = exp_w * x.iloc[:,i]
            cov_matrix[i,j] = np.dot(temp, x.iloc[:, j])
            
    return cov_matrix

In [None]:
def getCumm_var(x, lamb):
    exp_w = []
    cov_matrix = ewm(x, exp_w, lamb)
    v1, w1 = LA.eigh(cov_matrix)
    
    tot = 0
    for i in range(len(v1)):
        tot += v1[i]
    v1 = v1[::-1]
    
    cumu_var = []
    k=1
    for i in range(len(v1)):
        sum_v = 0
        for j in range(k):
            sum_v += v1[j]
        cumu_var.append(sum_v / tot)
        k = k + 1
    return cumu_var

<font size="5">**Non PSD fixes for correlation matrices**

In [None]:
def is_psd(x):
    return np.all(LA.eigvals(x) >= -1e-8)

In [None]:
def nearPSD(A, epsilon=0):
    n = A.shape[0]
    invSD = None
    out = A.copy()
    A_diag = np.diag(A)
    
    #convert cov to cor matrix
    if (np.count_nonzero(A_diag == 1) != n):
        invSD = np.diag(np.divide(1, np.sqrt(A_diag)))
        out = invSD * out * invSD
    
    eigval, eigvec = LA.eigh(out)
    val = np.matrix(np.maximum(eigval, epsilon))
    vec = np.matrix(eigvec)
    
    T = 1/(np.multiply(vec,vec) * val.T)
    T = np.matrix(np.sqrt(np.diag(np.array(T).reshape((n)))))
    B = T * vec * np.diag(np.array(np.sqrt(val)).reshape((n)))
    out = B * B.T
    print(invSD)
    
    if invSD != None:
        invSD_diag = np.diag(invSD)
        invSD = np.diag(np.divide(1, np.sqrt(invSD_diag)))
        out = invSD * out * invSD
    
    return out

In [None]:
#Higham2002
def _getAplus(A):
    eigval, eigvec = LA.eigh(A)
    Q = np.matrix(eigvec)
    xdiag = np.matrix(np.diag(np.maximum(eigval, 0)))
    return eigvec @ xdiag @ eigvec.T

def _getPs(A, W=None):
    W05 = np.matrix(W**.5)
    iW = W05.I
    return  iW @ _getAplus(W05 @ A @ W05) @ iW

def _getPu(A, W=None):
    Aret = A.copy()
    for i in range(0, A.shape[0]):
        Aret[i,i] = 1
    return Aret

def _wgtNorm(A, W = None):
    W05 = np.sqrt(W)
    W05 = W05 @ A @ W05
    return (W05 * W05).sum()

def hig_nearPSD(pc, W = None, epsilon = 1e-9, maxIter = 100, tol = 1e-9):
    n = pc.shape[0]
    if W == None:
        W = np.identity(n)
        
    deltaS = np.zeros((n,n))
    
    Yk = pc.copy()
    norml = 9999999
    i = 1
    
    while i <= maxIter:
        Rk = Yk - deltaS
        Xk = _getPs(Rk, W)
        deltaS = Xk - Rk
        Yk = _getPu(Xk, W)
        norm = _wgtNorm(Yk - pc, W)
        
        w, v = LA.eigh(Yk)
        minEigVal = np.min(w)
        
        if ((norm - norml) < tol) and (minEigVal > -epsilon):
            break
            
        norml = norm
        i = i + 1
        
    if i < maxIter:
        print("Converged in %d iterations.\n" % i)
    else:
        print("Converged failed after %d iterations.\n" % (i-1))
        
    return Yk

In [None]:
def F_norm (cov, cov_psd):
    temp = cov - cov_psd
    return LA.norm(temp, 'fro')

<font size="5">**VaR calculation methods**

In [1]:
alpha = 5

Normal and T VaR

In [None]:
def calc_VaR(portofolioReturns, portfolioStd, distribution='normal', alpha=5, dof=6):
    if distribution == 'normal':
        VaR = norm.ppf(1-alpha/100)*portfolioStd - portofolioReturns
    elif distribution == 't-distribution':
        nu = dof
        VaR = np.sqrt((nu-2)/nu) * t.ppf(1-alpha/100, nu) * portfolioStd - portofolioReturns
    else:
        raise TypeError("Expected distribution type 'normal'/'t-distribution'")
    return VaR

Monte Carlo VaR

In [None]:
def _mcVaR(returns, alpha):
    # first check wheather returns is series
    if isinstance(returns, pd.Series):
        return np.percentile(returns, alpha)
    else:
        raise TypeError("You need to input a series.")

def mc_VaR (returns, mc_sim, time, alpha):
    meanReturns = returns.mean()
    covMatrix = returns.cov()
    
    covMatrix = near_psd(np.array(covMatrix))
    #covMatrix = covMatrix
    
    weights = np.random.random(len(meanReturns))
    weights /= np.sum(weights) # normalize the weight
    

    meanMatrix = np.full(shape = (time, len(weights)), fill_value = meanReturns)
    meanMatrix = meanMatrix.T

    portfolio_sim = np.full(shape = (time, mc_sim), fill_value = 0)

    # MC loop

    # we can set a initial value of portofolio to see the absolute value
    # initial_money = 10000
    for m in range(mc_sim):
        '''
        Here we assume that daily return are distributed by a multvariate normal distribution
        Using Cholesky Decomposition, we get L, which is lower triangle.
        return = mean + L * Z, where Z is normal distribution (0,1)
        '''
        Z = np.random.normal(size = (time, len(weights)))
        L = np.linalg.cholesky(covMatrix)
        dailyReturns = meanMatrix + np.inner(L, Z)

        portfolio_sim[:,m] = np.cumprod(np.inner(weights, dailyReturns.T) + 1) # FV = PV * (1 + r)^t

    lastDay_return = portfolio_sim[-1,:]
    _mcVaR(lastDay_return, alpha = alpha)

Historical VaR

In [None]:
# Import data function
def get_data (stocks, start, end):
    stockData = yf.download(stocks, start, end)['Adj Close']
    returns = stockData.pct_change()
    meanReturns = returns.mean()
    covMatrix = returns.cov()
    returns = returns.dropna()
    return returns, meanReturns, covMatrix

In [None]:
# Load the data by choosing what stocks you want to add and start and end date
stockList = ['TSLA', 'NIO', 'BYD', 'XPEV', 'LI', 'GM']
endDate = dt.datetime.now()
startDate = dt.datetime(2021,1,1)

returns, meanReturns, covMatrix = get_data(stockList, startDate, endDate)
print(meanReturns)

In [None]:
# Set a random weight on each stocks
weights = np.random.random(len(meanReturns))
weights /= np.sum(weights) # normalize the weight

returns['portfolio'] = returns.dot(weights)

In [None]:
def histVaR (returns, alpha):
    if isinstance(returns, pd.Series):
        return np.percentile(returns, alpha)
    
    #for here, we use aggregate to calculate the each column's VaR
    elif isinstance(returns, pd.DataFrame):
        return returns.aggregate(histVaR, alpha = alpha)
    
    else:
        raise TypeError("You need to input series or dataframe.")

<font size="5">**ES calculation**

Normal and T CVaR

In [None]:
def calc_CVaR(portofolioReturns, portfolioStd, distribution='normal', alpha=5, dof=6):
    if distribution == 'normal':
        CVaR = (alpha/100)**-1 * norm.pdf(norm.ppf(alpha/100))*portfolioStd - portofolioReturns
    elif distribution == 't-distribution':
        nu = dof
        xanu = t.ppf(alpha/100, nu)
        CVaR = -1/(alpha/100) * (1-nu)**(-1) * (nu-2+xanu**2) * t.pdf(xanu, nu) * portfolioStd - portofolioReturns
    else:
        raise TypeError("Expected distribution type 'normal'/'t-distribution'")
    return CVaR

Monte Carlo CVaR

In [None]:
def _mcCVaR(returns, alpha):
    # first check wheather returns is series
    if isinstance(returns, pd.Series):
        belowVaR = returns <= mcVaR(returns, alpha = alpha)
        return returns[belowVaR].mean()
    else:
        raise TypeError("You need to input a series.")
        
        
def mc_CVaR (returns, mc_sim, time, alpha):
    meanReturns = returns.mean()
    covMatrix = returns.cov()
    
    covMatrix = near_psd(np.array(covMatrix))
    #covMatrix = covMatrix
    
    weights = np.random.random(len(meanReturns))
    weights /= np.sum(weights) # normalize the weight
    

    meanMatrix = np.full(shape = (time, len(weights)), fill_value = meanReturns)
    meanMatrix = meanMatrix.T

    portfolio_sim = np.full(shape = (time, mc_sim), fill_value = 0)

    # MC loop

    # we can set a initial value of portofolio to see the absolute value
    # initial_money = 10000
    for m in range(mc_sim):
        '''
        Here we assume that daily return are distributed by a multvariate normal distribution
        Using Cholesky Decomposition, we get L, which is lower triangle.
        return = mean + L * Z, where Z is normal distribution (0,1)
        '''
        Z = np.random.normal(size = (time, len(weights)))
        L = np.linalg.cholesky(covMatrix)
        dailyReturns = meanMatrix + np.inner(L, Z)

        portfolio_sim[:,m] = np.cumprod(np.inner(weights, dailyReturns.T) + 1) # FV = PV * (1 + r)^t

    lastDay_return = portfolio_sim[-1,:]
    _mcCVaR(lastDay_return, alpha = alpha)

Historical CVaR

In [None]:
# CVaR function
def histCVaR (returns, alpha):
    if isinstance(returns, pd.Series):
        belowVaR = returns <= histVaR(returns, alpha = alpha)
        return returns[belowVaR].mean()
    
    #for here, we use aggregate to calculate the each column's VaR
    elif isinstance(returns, pd.DataFrame):
        return returns.aggregate(histCVaR, alpha = alpha)
    
    else:
        raise TypeError("You need to input series or dataframe.")