## FinTech 545 - Project Week 05

## Renjie Wang

## Problem 1

Covariance estimation techniques

In [1]:
import numpy as np
import pandas as pd

'''
Function to calculate the covariance matrix for the dataframe that does not have the entire data.
When skipRow is true, use all the rows which have values. When it's false, use pairwise.
func can be np.cov (covariance) and np.corrcoef (correlation)
'''
def missing_cov_corr(df, skipRow=True, func=np.cov):
    if not isinstance(df, pd.DataFrame):
        df = pd.DataFrame(df)

    m, n = df.shape
    missing_rows = df.isnull().any(axis=1).sum()

    # If there is no missing rows, simply calculate the covariance matrix.
    if not missing_rows:
        cov = func(df.T)
        print(cov)
    else:
        # skipRow is True, apply the method on the rows that have all the data.
        if skipRow:
            # Drop the rows that is not of whole data
            df = df.dropna(axis=0, how='any')
            cov = func(df.T)
        # skipRow is True, apply the pairwise method.
        else:
            out = np.empty((n, n))
            for i in range(n):
                for j in range(i + 1):
                    # Select only rows without missing values in either column i or j
                    valid_rows = df.iloc[:, [i, j]].dropna().index

                    if not valid_rows.empty:
                        cov_ij = func(df.iloc[valid_rows, [i, j]], rowvar=False)[0, 1]
                        out[i, j] = cov_ij
                        out[j, i] = cov_ij
                        cov = out
    return cov

test1

In [2]:
# testout_1.1

df = pd.read_csv('test1.csv')
out1 = pd.read_csv('testout_1.1.csv')

# Calculate covariance matrix skipping rows
res1 = missing_cov_corr(df, skipRow=True, func=np.cov)
close_elements = np.isclose(out1, res1, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 1.1 is passed.
print(close_elements)

[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [3]:
# testout_1.2

out2 = pd.read_csv('testout_1.2.csv')

# Calculate correlation matrix skipping rows
res2 = missing_cov_corr(df, skipRow=True, func=np.corrcoef)
close_elements = np.isclose(out2, res2, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 1.2 is passed.
print(close_elements)

[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [4]:
# testout_1.3

out3 = pd.read_csv('testout_1.3.csv')

# Calculate covariance matrix pairwise
res3 = missing_cov_corr(df, skipRow=False, func=np.cov)
close_elements = np.isclose(out3, res3, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 1.3 is passed.
print(close_elements)

[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [5]:
# testout_1.4

out4 = pd.read_csv('testout_1.4.csv')

# Calculate covariance matrix pairwise
res4 = missing_cov_corr(df, skipRow=False, func=np.corrcoef)
close_elements = np.isclose(out4, res4, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 1.4 is passed.
print(close_elements)

[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [6]:
'''
Function that calculates the EW covariance and correlation.
Func take the parameter of 'cov' and 'corr'
'''
def ew_cov_corr(df, lmbd, func='cov'):
    if not isinstance(df, pd.DataFrame):
        df = pd.DataFrame(df)
        
    if func not in ['cov', 'corr']:
        raise ValueError(f'The func parameter must be "cov" or "corr", got {func} instead.')
    
    # Center the data - to calculate the covariance matrix.
    df -= df.mean(axis=0)
        
    m, n = df.shape
    wts = np.empty(m)
    
    # Setting weights for prior observation
    for i in range(m):
        wts[i] = (1 - lmbd) * lmbd ** (m - i - 1)
        
    # Normalizing the weights
    wts /= np.sum(wts)
    wts = wts.reshape(-1, 1)
    if func == 'cov':   
        res = (wts * df).T @ df
        
    elif func == 'corr':
        res = (wts * df).T @ df
        # Calculate the standard deviations (square root of variances along the diagonal)
        std_devs = np.sqrt(np.diag(res))

        # Convert the covariance matrix to a correlation matrix
        res /= np.outer(std_devs, std_devs)
        
    return res

In [7]:
# testout_2.1
df = pd.read_csv('test2.csv')

out1 = pd.read_csv('testout_2.1.csv')
res1 = ew_cov_corr(df, 0.97, func='cov')
close_elements = np.isclose(out1, res1, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 2.1 is passed.
print(close_elements)

[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [8]:
# testout_2.2
out2 = pd.read_csv('testout_2.2.csv')
res2 = ew_cov_corr(df, 0.94, func='corr')
close_elements = np.isclose(out2, res2, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 2.2 is passed.
print(close_elements)

[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [9]:
# testout_2.3
out3 = pd.read_csv('testout_2.3.csv')

cov = ew_cov_corr(df, 0.97, func='cov')
sd1 = np.sqrt(np.diag(cov))
cov  = ew_cov_corr(df, 0.94, func='cov')
sd = 1 / np.sqrt(np.diag(cov))
res3 = np.diag(sd1) @ np.diag(sd) @ cov @ np.diag(sd) @ np.diag(sd1)

close_elements = np.isclose(out3, res3, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 2.3 is passed.
print(close_elements)

[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


Non-PSD fixes for correlation matrices

In [10]:
'''
Near-PSD covariance and correlation.
'''
def near_psd(df, epsilon=0.0):
    if not isinstance(df, pd.DataFrame):
        df = pd.DataFrame(df)

    invSD = None

    # If the matrix is the covariance matrix, convert it to correlation matrix first.
    if not np.allclose(np.diag(df), 1.0, rtol=1e-03):
        invSD = np.diag(1.0 / np.sqrt(np.diag(df)))
        df = invSD @ df @ invSD
        # # Calculate the standard deviations (square root of variances along the diagonal)
        # std_devs = np.sqrt(np.diag(df))
        # # Convert the covariance matrix to a correlation matrix
        # df /= np.outer(std_devs, std_devs)
    
    # Calculate eigenvalues and eigenvectors
    eigenvalues, eigenvectors = np.linalg.eig(df)
    
    # Update the negative eigenvalues to 0.
    eigenvalues = np.maximum(eigenvalues, epsilon)
    
    # Construct the diagonal scaling matrix
    S = 1 / (eigenvectors * eigenvectors @ eigenvalues)
    S = np.diag(np.sqrt(S))
    #T = np.diag(1.0 / np.sqrt(np.sum(eigenvectors**2 * eigenvalues, axis=0)))
    l = np.diag(np.sqrt(eigenvalues))
    B = S @ eigenvectors @ l
    df = B @ B.T
    
    # Add back the variance
    if invSD is not None:
        invSD = np.diag(1 / np.diag(invSD))
        df = invSD @ df @ invSD

    return df

In [11]:
# testout_3.1
df = pd.read_csv('testout_1.3.csv')

out1 = pd.read_csv('testout_3.1.csv')
res1 = near_psd(df)
close_elements = np.isclose(out1, res1, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 3.1 is passed.
print(close_elements)

[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [12]:
# testout_3.2
df = pd.read_csv('testout_1.4.csv')

out2 = pd.read_csv('testout_3.2.csv')
res2 = near_psd(df)
close_elements = np.isclose(out2, res2, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 3.2 is passed.
print(close_elements)

[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [13]:
'''
Higham-PSD covariance and correlation.
'''
def higham_psd(df, W=None, epsilon=1e-9, maxIter=100, tol=1e-9):
    if not isinstance(df, pd.DataFrame):
        df = pd.DataFrame(df)
    
    m, n = df.shape
    
    # Generate the identity matrix.
    if W is None:
        W = np.eye(m)
        
    deltaS = 0    
    invSD = None
    
    Yk = np.copy(df)

    # If the matrix is the covariance matrix, convert it to correlation matrix first.
    if not np.allclose(np.diag(Yk), 1.0, rtol=1e-03):
        invSD = np.diag(1.0 / np.sqrt(np.diag(Yk)))
        Yk = invSD @ Yk @ invSD
        
    Yo = np.copy(Yk)
    norml = np.finfo(np.float64).max
    i = 1
    
    while i <= maxIter:
        Rk = Yk - deltaS
        
        # Ps update
        Xk = getPS(Rk, W)
        deltaS = Xk - Rk
        # Pu update
        Yk = getPu(Xk)
        # Get Norm
        norm = wgtNorm(Yk-Yo, W)
        #Smallest Eigenvalue
        minEigVal = np.min(np.real(np.linalg.eigvals(Yk)))
        
        if norm - norml < tol and minEigVal > -epsilon:
            break
        
        norml = norm
        i += 1

    if i < maxIter:
        print("Converged in {} iterations.".format(i))
    else:
        print("Convergence failed after {} iterations".format(i-1))
    
    # Add back the variance
    if invSD is not None:
        invSD = np.diag(1 / np.diag(invSD))
        Yk = invSD @ Yk @ invSD
        
    return Yk

'''
Helper functions
'''
def getAplus(A):
    eigenvalues, eigenvectors = np.linalg.eig(A)
    eigenvalues = np.diag(np.maximum(eigenvalues, 0))
    return eigenvectors @ eigenvalues @ eigenvectors.T

def getPS(A, W):
    W05 = np.sqrt(W)
    iW = np.linalg.inv(W05)
    return (iW @ getAplus(W05 @ A @ W05) @ iW)

def getPu(A):
    A = np.copy(A)  # Work on a copy to avoid modifying the original matrix
    np.fill_diagonal(A, 1)
    return A

def wgtNorm(A, W):
    W05 = np.sqrt(W)
    W05 = W05 @ A @ W05
    return np.sum(W05 * W05)

In [14]:
# testout_3.3
df = pd.read_csv('testout_1.3.csv')

out3 = pd.read_csv('testout_3.3.csv')
res3 = higham_psd(df)
close_elements = np.isclose(out3, res3, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 3.3 is passed.
print(close_elements)

Converged in 20 iterations.
[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [15]:
# testout_3.4
df = pd.read_csv('testout_1.4.csv')

out4 = pd.read_csv('testout_3.4.csv')
res4 = higham_psd(df)
close_elements = np.isclose(out4, res4, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 3.4 is passed.
print(close_elements)

Converged in 1 iterations.
[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [16]:
'''
Cholesky Factorization
'''
def chol_psd(df):
    if not isinstance(df, pd.DataFrame):
        df = pd.DataFrame(df)
    
    m, n = df.shape
    root = np.zeros((m, n))  # Initialize the root matrix within the function
    
    for j in range(m):
        s = 0.0
        # If we are not on the first column, calculate the dot product of the preceeding row values.
        if j >= 0:
            s =  np.dot(root[j, :j], root[j, :j])
            
            # Diagonal element
            temp = df.iloc[j, j] - s
            if -1e-8 <= temp <= 0:
                temp = 0.0
            root[j, j] = np.sqrt(max(temp, 0))
            
            if root[j, j] == 0.0:
                root[j, (j+1):n] = 0.0
            else:
                ir = 1.0 / root[j, j]
                for i in range(j+1, m):
                    s = np.dot(root[i, :j], root[j, :j])
                    root[i, j] = (df.iloc[i, j] - s) * ir
                    
    return root

In [17]:
# testout_4.1
df = pd.read_csv('testout_3.1.csv')

out = pd.read_csv('testout_4.1.csv')
res = chol_psd(df)
close_elements = np.isclose(out, res, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 4.1 is passed.
print(close_elements)

[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


Simulation Methods

In [18]:
'''
Multivariate Normal Simulation
'''
def simulateNormal(N, df, mean=None, seed=1234, fixMethod=near_psd):
    # Error Checking
    m, n = df.shape
    if n != m:
        raise ValueError(f"Covariance Matrix is not square ({n},{m})")
    
    # Initialize the output
    out = np.zeros((N, n))
    
    # Set mean
    if mean is None:
        mean = np.zeros(n)
    else:
        if len(mean) != n:
            raise ValueError(f"Mean ({len(mean)}) is not the size of cov ({n},{n})")
    
    # Set the seed to make sure the value is the same each time.
    np.random.seed(seed)
    

    eigenvalues, eigenvectors = np.linalg.eig(df)
    # If the covariance is not PSD, try to fix it
    if min(eigenvalues) < 0:
        df = fixMethod(df)
        
    # Take the root (cholesky factorization)
    l = chol_psd(df)
    
    # Generate random standard normals
    rand_normals = np.random.normal(0.0, 1.0, size=(N, n))
    
    # Apply the Cholesky root and plus the mean to the random normals
    out = np.dot(rand_normals, l.T) + mean
    
    return out.T

In [19]:
# testout_5.1
df = pd.read_csv('test5_1.csv')

out = pd.read_csv('testout_5.1.csv')
sim = simulateNormal(100000, df)
res = np.cov(sim)

# atol is the absolute tolerance parameter - set to a lower tolerance since the simulation is random.
close_elements = np.isclose(out, res, atol=1e-3)


# Test 5.1 is passed.
print(close_elements)

[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [20]:
# testout_5.2
df = pd.read_csv('test5_2.csv')

out = pd.read_csv('testout_5.2.csv')
sim = simulateNormal(100000, df)
res = np.cov(sim)

# atol is the absolute tolerance parameter - set to a lower tolerance since the simulation is random.
close_elements = np.isclose(out, res, atol=1e-3)


# Test 5.2 is passed.
print(close_elements)

[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [21]:
# testout_5.3
df = pd.read_csv('test5_3.csv')

out = pd.read_csv('testout_5.3.csv')
sim = simulateNormal(100000, df, fixMethod=near_psd)
res = np.cov(sim)

# atol is the absolute tolerance parameter - set to a lower tolerance since the simulation is random.
close_elements = np.isclose(out, res, atol=1e-3)


# Test 5.3 is passed.
print(close_elements)

[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [22]:
# testout_5.4
df = pd.read_csv('test5_3.csv')

out = pd.read_csv('testout_5.4.csv')
sim = simulateNormal(100000, df, fixMethod=higham_psd)
res = np.cov(sim)

# atol is the absolute tolerance parameter - set to a lower tolerance since the simulation is random.
close_elements = np.isclose(out, res, atol=1e-3)


# Test 5.4 is passed.
print(close_elements)

Converged in 16 iterations.
[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [23]:
'''
Multivariate PCA Simulation
'''
def simulatePCA(N, df, mean=None, seed=1234, pctExp=1):
    # Error Checking
    m, n = df.shape
    if n != m:
        raise ValueError(f"Covariance Matrix is not square ({n},{m})")
    
    # Initialize the output
    out = np.zeros((N, n))
    
    # Set mean
    if mean is None:
        mean = np.zeros(n)
    else:
        if len(mean) != n:
            raise ValueError(f"Mean ({len(mean)}) is not the size of cov ({n},{n})")
    
    eigenvalues, eigenvectors = np.linalg.eig(df)
    
    # Get the indices that would sort eigenvalues in descending order
    indices = np.argsort(eigenvalues)[::-1]
    # Sort eigenvalues
    eigenvalues = eigenvalues[indices]
    # Sort eigenvectors according to the same order
    eigenvectors = eigenvectors[:, indices]
    
    tv = np.sum(eigenvalues)
    posv = np.where(eigenvalues >= 1e-8)[0]
    if pctExp <= 1:
        nval = 0
        pct = 0.0
        # How many factors needed
        for i in posv:
            pct += eigenvalues[i] / tv
            nval += 1
            if pct >= pctExp:
                break
    
     # If nval is less than the number of positive eigenvalues, truncate posv
    if nval < len(posv):
        posv = posv[:nval]
        
    # Filter eigenvalues based on posv
    eigenvalues = eigenvalues[posv]
    eigenvectors = eigenvectors[:, posv]
    
    B = eigenvectors @ np.diag(np.sqrt(eigenvalues))
    
    np.random.seed(seed)
    rand_normals = np.random.normal(0.0, 1.0, size=(N, len(posv)))
    out = np.dot(rand_normals, B.T) + mean
    
    return out.T

In [24]:
# testout_5.5
df = pd.read_csv('test5_2.csv')

out = pd.read_csv('testout_5.5.csv')
sim = simulatePCA(100000, df, pctExp=0.99)
res = np.cov(sim)

# atol is the absolute tolerance parameter - set to a lower tolerance since the simulation is random.
close_elements = np.isclose(out, res, atol=1e-3)


# Test 5.5 is passed.
print(close_elements)

[[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [25]:
'''
Calculate Return
'''

# Implement the function to calculate the return
def return_calculate(prices, method='ARS', dateColumn='Date'):
    # Exclude the date column from the calculations
    tickers = [col for col in prices.columns if col != dateColumn]
    df = prices[tickers] # The dataframe is now with no date column.
    
    # Calculate the return using Classical Brownian Motion.
    if method == 'CBM':
        df = df.diff().dropna()
    
    # Calculate the return using Arithmetic Return System.
    elif method == 'ARS':
        df = (df - df.shift(1)) / df.shift(1)
        df = df.dropna()
        
    # Calculate the return using Geometric Brownian Motion.
    elif method == 'GBM':
        df = np.log(df).diff().dropna()
        
    else:
        raise ValueError(f"method: {method} must be in (\"CBM\",\"ARS\",\"GBM\")")
    
    return df

In [26]:
# testout_6.1
df = pd.read_csv('test6.csv')

out = pd.read_csv('test6_1.csv').drop('Date', axis=1)
res = return_calculate(df, dateColumn='Date')

close_elements = np.allclose(out, res, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 6.1 is passed.
print(close_elements)

True


In [27]:
# testout_6.2
df = pd.read_csv('test6.csv')

out = pd.read_csv('test6_2.csv').drop('Date', axis=1)
res = return_calculate(df, method='GBM', dateColumn='Date')

close_elements = np.allclose(out, res, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 6.2 is passed.
print(close_elements)

True


In [28]:
from scipy.stats import norm

'''
Fit the Data with Normal Distribution
'''
def fit_normal(data):
    # Fit the normal distribution to the data
    mu, std = norm.fit(data)
    return mu, std

In [29]:
# testout_7.1
df = pd.read_csv('test7_1.csv')

out = pd.read_csv('testout7_1.csv')
res = fit_normal(df)

close_elements = np.isclose(out, res, atol=1e-3)  # atol is the absolute tolerance parameter

# Test 7.1 is passed.
print(close_elements)

[[ True  True]]


In [30]:
from scipy.stats import t

'''
Fit the Data with t Distribution
'''
def fit_general_t(data):
    # Fit the t distribution to the data
    nu, mu, sigma = t.fit(data)
    return mu, sigma, nu

In [31]:
# testout_7.2
df = pd.read_csv('test7_2.csv')

out = pd.read_csv('testout7_2.csv')
res = fit_general_t(df)

close_elements = np.isclose(out, res, atol=1e-3)  # atol is the absolute tolerance parameter

# Test 7.2 is passed.
print(close_elements)

[[ True  True  True]]


In [32]:
from scipy.optimize import minimize
import statsmodels.api as sm

'''
Fit the Data with t Distribution - regression
'''
def fit_regression_t(df):
    Y = df.iloc[:, -1]
    X = df.iloc[:, :-1]
    betas = MLE_t(X, Y)
    X = sm.add_constant(X)
    
    # Get the residuals.
    e = Y - np.dot(X, betas)

    params = t.fit(e)
    out = {"mu": [params[1]], 
           "sigma": [params[2]], 
           "nu": [params[0]]}
    for i in range(len(betas)):
        out["B" + str(i)] = betas[i]
    out = pd.DataFrame(out)
    out.rename(columns={'B0': 'Alpha'}, inplace=True)
    return out

# The objective negative log-likelihood function (need to be minimized).
def MLE_t(X, Y):
    X = sm.add_constant(X)
    def ll_t(params):
        nu, sigma = params[:2]
        beta_MLE_t = params[2:]
        epsilon = Y - np.dot(X, beta_MLE_t)
        # Calculate the log-likelihood
        log_likelihood = np.sum(t.logpdf(epsilon, df=nu, loc=mu, scale=sigma))
        return -log_likelihood
    
    beta = np.zeros(X.shape[1])
    nu, mu, sigma = 1, 0, np.std(Y - np.dot(X, beta))
    params = np.append([nu, sigma], beta)
    bnds = ((1e-9, None), (1e-9, None), (None, None), (None, None), (None, None), (None, None))
    
    # Minimize the log-likelihood to get the beta
    res = minimize(ll_t, params, bounds=bnds, options={'disp': True})
    beta_MLE = res.x[2:]
    return beta_MLE

In [33]:
# testout_7.3
df = pd.read_csv('test7_3.csv')

out = pd.read_csv('testout7_3.csv')
res = fit_regression_t(df)

close_elements = np.isclose(out, res, atol=1e-3)  # atol is the absolute tolerance parameter

# Test 7.3 is passed.
print(close_elements)

[[ True  True  True  True  True  True  True]]


VaR calculation methods (all discussed) 

In [34]:
from scipy.stats import norm

'''
Fit the Data with Normal Distribution
'''
def fit_normal(data):
    # Fit the normal distribution to the data
    mu, std = norm.fit(data)
    return mu, std


'''
VaR for Normal Distribution
'''

def var_normal(data, alpha=0.05):
    # Fit the data with normal distribution.
    mu, std = fit_normal(data)
    
    # Calculate the VaR
    VaR = -norm.ppf(alpha, mu, std)
    
    # Calculate the relative difference from the mean expected.
    VaR_diff = VaR + mu
    
    return pd.DataFrame({"VaR Absolute": [VaR], 
                         "VaR Diff from Mean": [VaR_diff]})

In [35]:
# testout_8.1
df = pd.read_csv('test7_1.csv')

out = pd.read_csv('testout8_1.csv')

# Calculate the VaR at 5% quantile.
res = var_normal(df, 0.05)

close_elements = np.isclose(out, res, atol=1e-3)  # atol is the absolute tolerance parameter

# Test 8.1 is passed.
print(close_elements)

[[ True  True]]


In [36]:
from scipy.stats import t

'''
Fit the Data with t Distribution
'''
def fit_general_t(data):
    # Fit the t distribution to the data
    nu, mu, sigma = t.fit(data)
    return mu, sigma, nu


'''
VaR for t Distribution
''' 
def var_t(data, alpha=0.05):
    # Fit the data with t distribution.
    mu, sigma, nu = fit_general_t(data)
    
    # Calculate the VaR
    VaR = -t.ppf(alpha, nu, mu, sigma)

    # Calculate the relative difference from the mean expected.
    VaR_diff = VaR + mu
    
    return pd.DataFrame({"VaR Absolute": [VaR], 
                         "VaR Diff from Mean": [VaR_diff]})


In [37]:
# testout_8.2
df = pd.read_csv('test7_2.csv')

out = pd.read_csv('testout8_2.csv')
# Calculate the VaR at 5% quantile.
res = var_t(df, 0.05)

close_elements = np.isclose(out, res, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 8.2 is passed.
print(close_elements)

[[ True  True]]


In [38]:
from scipy.stats import t

'''
Fit the Data with t Distribution
'''
def fit_general_t(data):
    # Fit the t distribution to the data
    nu, mu, sigma = t.fit(data)
    return mu, sigma, nu

'''
VaR for t Distribution simulation
''' 
def var_simulation(data, alpha=0.05, size=10000):
    # Fit the data with t distribution.
    mu, sigma, nu = fit_general_t(data)
    
    # Generate given size random numbers from a t-distribution
    random_numbers = t.rvs(df=nu, loc=mu, scale=sigma, size=size)
    
    return var_t(random_numbers, alpha)

In [39]:
# testout_8.3 - It depends - 10000 might be not enough
df = pd.read_csv('test7_2.csv')

out = pd.read_csv('testout8_3.csv')
# Calculate the VaR at 5% quantile.
res = var_simulation(df, 0.05, 10000)

close_elements = np.isclose(out, res, atol=1e-2)  # atol is the absolute tolerance parameter

# Test 8.3 is passed.
print(close_elements)

[[ True  True]]


ES calculation

In [40]:
from scipy.stats import norm
from scipy.integrate import quad

'''
Fit the Data with Normal Distribution
'''
def fit_normal(data):
    # Fit the normal distribution to the data
    mu, std = norm.fit(data)
    return mu, std

'''
VaR for Normal Distribution
'''

def var_normal(data, alpha=0.05):
    # Fit the data with normal distribution.
    mu, std = fit_normal(data)
    
    # Calculate the VaR
    VaR = -norm.ppf(alpha, mu, std)
    
    # Calculate the relative difference from the mean expected.
    VaR_diff = VaR + mu
    
    return pd.DataFrame({"VaR Absolute": [VaR], 
                         "VaR Diff from Mean": [VaR_diff]})

'''
ES for Normal Distribution
'''

def es_normal(data, alpha=0.05):
    # Fit the data with normal distribution.
    mu, std = fit_normal(data)
    
    # Calculate the VaR
    res = var_normal(data, alpha)
    VaR = res.iloc[0, 0]
    
    # Define the integrand function: x times the PDF of the distribution
    def integrand(x, mu, std):
        return x * norm.pdf(x, loc=mu, scale=std)
    
    # Calculate the ES
    ES, _ = quad(lambda x: integrand(x, mu, std), -np.inf, -VaR)
    ES /= -alpha
    
    # Calculate the relative difference from the mean expected.
    ES_diff = ES + mu
    
    return pd.DataFrame({"ES Absolute": [ES], 
                         "ES Diff from Mean": [ES_diff]})

In [41]:
# testout_8.4
df = pd.read_csv('test7_1.csv')

out = pd.read_csv('testout8_4.csv')
# Calculate the VaR at 5% quantile.
res = es_normal(df, 0.05)

close_elements = np.isclose(out, res, atol=1e-3)  # atol is the absolute tolerance parameter

# Test 8.4 is passed.
print(close_elements)

[[ True  True]]


In [42]:
from scipy.stats import t
from scipy.integrate import quad

'''
Fit the Data with t Distribution
'''
def fit_general_t(data):
    # Fit the t distribution to the data
    nu, mu, sigma = t.fit(data)
    return mu, sigma, nu

'''
VaR for t Distribution
''' 
def var_t(data, alpha=0.05):
    # Fit the data with t distribution.
    mu, sigma, nu = fit_general_t(data)
    
    # Calculate the VaR
    VaR = -t.ppf(alpha, nu, mu, sigma)

    # Calculate the relative difference from the mean expected.
    VaR_diff = VaR + mu
    
    return pd.DataFrame({"VaR Absolute": [VaR], 
                         "VaR Diff from Mean": [VaR_diff]})

'''
ES for t Distribution
'''

def es_t(data, alpha=0.05):
    # Fit the data with normal distribution.
    mu, sigma, nu = fit_general_t(data)
    
    # Calculate the VaR
    res = var_t(data, alpha)
    VaR = res.iloc[0, 0]
    
    # Define the integrand function: x times the PDF of the distribution
    def integrand(x, mu, sigma, nu):
        return x * t.pdf(x, df=nu, loc=mu, scale=sigma)
    
    # Calculate the ES
    ES, _ = quad(lambda x: integrand(x, mu, sigma, nu), -np.inf, -VaR)
    ES /= -alpha
    
    # Calculate the relative difference from the mean expected.
    ES_diff = ES + mu
    
    return pd.DataFrame({"ES Absolute": [ES], 
                         "ES Diff from Mean": [ES_diff]})

In [43]:
# testout_8.5
df = pd.read_csv('test7_2.csv')

out = pd.read_csv('testout8_5.csv')
# Calculate the VaR at 5% quantile.
res = es_t(df, 0.05)

close_elements = np.isclose(out, res, atol=1e-5)  # atol is the absolute tolerance parameter

# Test 8.5 is passed.
print(close_elements)

[[ True  True]]


In [44]:
from scipy.stats import t

'''
Fit the Data with t Distribution
'''
def fit_general_t(data):
    # Fit the t distribution to the data
    nu, mu, sigma = t.fit(data)
    return mu, sigma, nu

'''
VaR for t Distribution
''' 
def var_t(data, alpha=0.05):
    # Fit the data with t distribution.
    mu, sigma, nu = fit_general_t(data)
    
    # Calculate the VaR
    VaR = -t.ppf(alpha, nu, mu, sigma)

    # Calculate the relative difference from the mean expected.
    VaR_diff = VaR + mu
    
    return pd.DataFrame({"VaR Absolute": [VaR], 
                         "VaR Diff from Mean": [VaR_diff]})

'''
VaR for t Distribution simulation
''' 

def es_simulation(data, alpha=0.05, size=10000):
    # Fit the data with t distribution.
    mu, sigma, nu = fit_general_t(data)
    
    # Generate given size random numbers from a t-distribution
    random_numbers = t.rvs(df=nu, loc=mu, scale=sigma, size=size)
    
    return es_t(random_numbers, alpha)

In [45]:
# testout_8.6 - It depends - 10000 might be not enough
df = pd.read_csv('test7_2.csv')

out = pd.read_csv('testout8_6.csv')
# Calculate the VaR at 5% quantile.
res = es_simulation(df, 0.05)

close_elements = np.isclose(out, res, atol=1e-2)  # atol is the absolute tolerance parameter

# Test 8.6 is passed.
print(close_elements)

[[ True  True]]


In [46]:
from scipy.stats import norm, t

'''
VaR/ES on 2 levels from simulated values - Copula
'''

def simulateCopula(portfolio, returns):
    portfolio['CurrentValue'] = portfolio['Holding'] * portfolio['Starting Price']
    models = {}
    uniform = pd.DataFrame()
    standard_normal = pd.DataFrame()
    
    for stock in returns.columns:
        # If the distribution for the model is normal, fit the data with normal distribution.
        if portfolio.loc[portfolio['Stock'] == stock, 'Distribution'].iloc[0] == 'Normal':
            models[stock] = norm.fit(returns[stock])
            mu, sigma = norm.fit(returns[stock])
            
            # Transform the observation vector into a uniform vector using CDF.
            uniform[stock] = norm.cdf(returns[stock], loc=mu, scale=sigma)
            
            # Transform the uniform vector into a Standard Normal vector usig the normal quantile function.
            standard_normal[stock] = norm.ppf(uniform[stock])
            
        # If the distribution for the model is t, fit the data with normal t.
        elif portfolio.loc[portfolio['Stock'] == stock, 'Distribution'].iloc[0] == 'T':
            models[stock] = t.fit(returns[stock])
            nu, mu, sigma = t.fit(returns[stock])
            
            # Transform the observation vector into a uniform vector using CDF.
            uniform[stock] = t.cdf(returns[stock], df=nu, loc=mu, scale=sigma)
            
            # Transform the uniform vector into a Standard Normal vector usig the normal quantile function.
            standard_normal[stock] = norm.ppf(uniform[stock])
        
    # Calculate Spearman's correlation matrix
    spearman_corr_matrix = standard_normal.corr(method='spearman')
    
    nSim = 100000
    
    # Use the PCA to simulate the multivariate normal.
    np.random.seed(1234)
    simulations = simulatePCA(nSim, spearman_corr_matrix)
    simulations = pd.DataFrame(simulations.T, columns=[stock for stock in returns.columns])
    
    # Transform the simulations into uniform variables using standard normal CDF.
    uni = norm.cdf(simulations)
    uni = pd.DataFrame(uni, columns=[stock for stock in returns.columns])
    
    simulatedReturns = pd.DataFrame()
    # Transform the uniform variables into the desired data using quantile.
    for stock in returns.columns:
        # If the distribution for the model is normal, use the quantile of the normal distribution.
        if portfolio.loc[portfolio['Stock'] == stock, 'Distribution'].iloc[0] == 'Normal':
            mu, sigma = models[stock]
            simulatedReturns[stock] = norm.ppf(uni[stock], loc=mu, scale=sigma)
            
        # If the distribution for the model is t, use the quantile of the t distribution.
        elif portfolio.loc[portfolio['Stock'] == stock, 'Distribution'].iloc[0] == 'T':
            nu, mu, sigma = models[stock]
            simulatedReturns[stock] = t.ppf(uni[stock], df=nu, loc=mu, scale=sigma)
    
    simulatedValue = pd.DataFrame()
    pnl = pd.DataFrame()
    # Calculate the daily prices for each stock
    for stock in returns.columns:
        currentValue = portfolio.loc[portfolio['Stock'] == stock, 'CurrentValue'].iloc[0]
        simulatedValue[stock] = currentValue * (1 + simulatedReturns[stock])
        pnl[stock] = simulatedValue[stock] - currentValue
        
    risk = pd.DataFrame(columns = ["Stock", "VaR95", "ES95", "VaR95_Pct", "ES95_Pct"])
    w = pd.DataFrame()

    for stock in pnl.columns:
        i = risk.shape[0]
        risk.loc[i, "Stock"] = stock
        risk.loc[i, "VaR95"] = -np.percentile(pnl[stock], 5)
        risk.loc[i, "VaR95_Pct"] = risk.loc[i, "VaR95"] / portfolio.loc[portfolio['Stock'] == stock, 'CurrentValue'].iloc[0]
        risk.loc[i, "ES95"] = -pnl[stock][pnl[stock] <= -risk.loc[i, "VaR95"]].mean()
        risk.loc[i, "ES95_Pct"] = risk.loc[i, "ES95"] / portfolio.loc[portfolio['Stock'] == stock, 'CurrentValue'].iloc[0]
        
        # Determine the weights for the two stock
        w.at['Weight', stock] = portfolio.loc[portfolio['Stock'] == stock, 'CurrentValue'].iloc[0] / portfolio['CurrentValue'].sum()
        
    # Calculate the total pnl.
    pnl['Total'] = 0
    for stock in returns.columns:
        pnl['Total'] += pnl[stock]
    
    i = risk.shape[0]
    risk.loc[i, "Stock"] = 'Total'
    risk.loc[i, "VaR95"] = -np.percentile(pnl['Total'], 5)
    risk.loc[i, "VaR95_Pct"] = risk.loc[i, "VaR95"] / portfolio['CurrentValue'].sum()
    risk.loc[i, "ES95"] = -pnl['Total'][pnl['Total'] <= -risk.loc[i, "VaR95"]].mean()
    risk.loc[i, "ES95_Pct"] = risk.loc[i, "ES95"] / portfolio['CurrentValue'].sum()


        
#     w = w.loc['Weight']
#     weightedPnl = w * pnl
#     weightedPnl['Total'] = 0
    
#     # Calculate the total PnL for one day.
#     for stock in returns.columns:
#         weightedPnl['Total'] += weightedPnl[stock]

#     # Calculate the portfolio's mean and standard deviation
#     portfolioMean = weightedPnl['Total'].mean()
#     portfolioStd = weightedPnl['Total'].std()
    
#     # Determine the risk measure for the whole portfolio
#     i = risk.shape[0]
#     risk.loc[i, "Stock"] = 'Total'
#     risk.loc[i, "VaR95"] = -norm.ppf(0.05, portfolioMean, portfolioStd)
#     risk.loc[i, "VaR95_Pct"] = -norm.ppf(0.05, portfolioMean, portfolioStd)risk.loc[i, "VaR95"] / portfolio['CurrentValue'].sum()
#     risk.loc[i, "ES95"] = -weightedPnl['Total'][weightedPnl['Total'] <= -risk.loc[i, "VaR95"]].mean()
#     risk.loc[i, "ES95_Pct"] = risk.loc[i, "ES95"] / portfolio['CurrentValue'].sum()
        
    return risk

In [47]:
# testout_9.1
df1 = pd.read_csv('test9_1_portfolio.csv')
df2 = pd.read_csv('test9_1_returns.csv')

out = pd.read_csv('testout9_1.csv')
# Calculate the VaR at 5% quantile.
res = simulateCopula(df1, df2)

# Test 9.1 is passed - within appropriate range.
print(f'{out}\n')
print(res)

   Stock       VaR95        ES95  VaR95_Pct  ES95_Pct
0      A   94.460376  118.289371   0.047230  0.059145
1      B  107.880427  151.218174   0.035960  0.050406
2  Total  152.565684  199.704532   0.030513  0.039941

   Stock       VaR95        ES95 VaR95_Pct  ES95_Pct
0      A    93.98826  117.199357  0.046994    0.0586
1      B  108.812034  152.088099  0.036271  0.050696
2  Total  152.863228  200.235847  0.030573  0.040047
