In [1]:
import pandas as pd
import numpy as np

In [2]:
def check_matrix_correctness(input_matrix: pd.DataFrame, test_matrix:pd.DataFrame, error_epsilon = 1e-12):
    assert input_matrix.shape == test_matrix.shape, "Input matrix and test matrix have different shapes"
    for i in range(len(input_matrix)):
        for j in range(len(input_matrix.columns)):
            assert abs(input_matrix.iloc[i,j] - test_matrix.iloc[i,j]) < error_epsilon, f"row {i} and column {j} of the data do not match"
    
    return True

## Test 1.1 Covariance Missing data, skip missing rows

In [24]:
test1_cov = test1_data.dropna().cov()

In [25]:
test1_cov

Unnamed: 0,x1,x2,x3,x4,x5
x1,2.148513,-1.38947,-0.516466,-0.129327,-1.056814
x2,-1.38947,1.035342,0.339993,0.193888,0.626876
x3,-0.516466,0.339993,0.942388,0.947887,0.051788
x4,-0.129327,0.193888,0.947887,1.113436,-0.204731
x5,-1.056814,0.626876,0.051788,-0.204731,0.592027


In [185]:
test_1_1_vals = pd.read_csv("testfiles/data/testout_1.1.csv")
check_matrix_correctness(test1_cov, test_1_1_vals, error_epsilon=1e-12)

True

## Test 1.2 Correlation Missing data, skip missing rows

In [34]:
test1_corr = test1_data.dropna().corr()

In [186]:
test_1_2_vals = pd.read_csv("testfiles/data/testout_1.2.csv")
check_matrix_correctness(test1_corr, test_1_2_vals, error_epsilon=1e-12)

True

## Test 1.3 Covariance Missing Data, Pairwise

In [37]:
test1_cov_missing = test1_data.cov()

In [187]:
test_1_3_vals = pd.read_csv("testfiles/data/testout_1.3.csv")
check_matrix_correctness(test1_cov_missing, test_1_3_vals, error_epsilon=1e-12)

True

## Test 1.4 Correlation Missing Data, Pairwise

In [14]:
test1_corr_missing = test1_data.corr()

In [15]:
test_1_4_vals = pd.read_csv("testfiles/data/testout_1.4.csv")
check_matrix_correctness(test1_corr_missing, test_1_4_vals)

NameError: name 'check_matrix_correctness' is not defined

In [None]:
def compute_correlation(x:pd.DataFrame, drop_missing = False):
    if drop_missing:
        return x.dropna().corr()
    else:
        return x.corr()

def compute_covariance(x:pd.DataFrame, drop_missing = False):
    if drop_missing:
        return x.dropna().cov()
    else:
        return x.cov()

## Test 2.1 EW Covariance, lambda=0.97

In [None]:
test2_data = pd.read_csv("testfiles/data/test2.csv")

In [None]:
lambda_= 0.97
ewm_cov = test2_data.ewm(alpha = (1-lambda_),).cov(bias=True)
last_ewm_cov_matrix = ewm_cov.loc[ewm_cov.index.get_level_values(0).max()]

In [None]:
test_2_1_vals = pd.read_csv("testfiles/data/testout_2.1.csv")
check_matrix_correctness(last_ewm_cov_matrix, test_2_1_vals, error_epsilon=1e-12)

True

## Test 2.2 EW Correlation, lambda=0.94

In [87]:
lambda_= 0.94
ewm_corr = test2_data.ewm(alpha = (1-lambda_),).corr()
last_ewm_corr_matrix = ewm_corr.loc[ewm_corr.index.get_level_values(0).max()]

In [190]:
test_2_2_vals = pd.read_csv("testfiles/data/testout_2.2.csv")
check_matrix_correctness(last_ewm_corr_matrix, test_2_2_vals, error_epsilon=1e-12)

True

## Test 2.3 Covariance with EW Variance (l=0.97), EW Correlation (l=0.94)

In [117]:
lambda_= 0.97
ewm_var = test2_data.ewm(alpha = (1-lambda_),).var(bias=True)
std_devs = np.sqrt(ewm_var.iloc[-1])
std_dev_products_matrix = np.outer(std_devs, std_devs)

lambda_= 0.94
ewm_corr = test2_data.ewm(alpha = (1-lambda_),).corr()
last_ewm_corr_matrix = ewm_corr.loc[ewm_corr.index.get_level_values(0).max()]

covariance_matrix = last_ewm_corr_matrix*std_dev_products_matrix

In [191]:
test_2_3_vals = pd.read_csv("testfiles/data/testout_2.3.csv")
check_matrix_correctness(covariance_matrix, test_2_3_vals, error_epsilon=1e-12)

True

## Test 3.1 near_psd covariance

In [376]:
data_3_1_vals = pd.read_csv("testfiles/data/testout_1.3.csv")

def near_psd(A: pd.DataFrame, epsilon = 0.0):

    A = np.asarray(A)
    n = A.shape[0]

    invSD = None
    out = A.copy()

    diag_vals = np.diag(out)
    count_ones = np.sum(np.isclose(diag_vals, 1.0))

    if count_ones != n:
        # convert covariance matrix to correlation matrix
        stds = np.sqrt(diag_vals)
        invSD = np.diag(1.0 / stds)
        out = invSD @ out @ invSD

    # out, invSD = convert_covariance_to_correlation(out, n)

    # svd
    eigenvalues, eigenvectors  = np.linalg.eigh(out)
    eigenvalues = np.maximum(eigenvalues, epsilon)

    T = 1.0 / ((eigenvectors ** 2) @ eigenvalues)
    T = np.diag(np.sqrt(T))

    l = np.diag(np.sqrt(eigenvalues))

    B = T @ eigenvectors @ l

    out = B @ B.T

    # Add back the variance if invSD was set earlier
    if invSD is not None:
        stds = 1.0 / np.diag(invSD)
        SD = np.diag(stds)
        out = SD @ out @ SD
        # out = invSD @ out @ invSD
    
    return out

In [377]:
near_psd_covariance = pd.DataFrame(near_psd(data_3_1_vals, epsilon=0))

In [336]:
test_3_1_vals = pd.read_csv("testfiles/data/testout_3.1.csv")
check_matrix_correctness(near_psd_covariance, test_3_1_vals, error_epsilon=1e-8)

True

## Test 3.2 near_psd correlation

In [340]:
data_3_2_vals = pd.read_csv("testfiles/data/testout_1.4.csv")
near_psd_correlation = pd.DataFrame(near_psd(data_3_2_vals, epsilon=0))
test_3_2_vals = pd.read_csv("testfiles/data/testout_3.2.csv")
check_matrix_correctness(near_psd_correlation, test_3_2_vals, error_epsilon=1e-8)

True

## Test 3.3 Higham Covariance

In [406]:
def higham_covariance(A: pd.DataFrame, tolerance = 1e-8, max_iterations= 100_000):

    def P_u(A):
        # TODO -> could add weights
        np.fill_diagonal(A, 1)
        return A
    
    def P_s(A):
        # TODO -> could add weights
        eigenvalues, eigenvectors = np.linalg.eigh(A)
        diag = np.maximum(np.diag(eigenvalues), 0)
        A_proj = eigenvectors @ diag @ eigenvectors.T
    
        return A_proj
    
    def valid_correlation_matrix(A, tolerance):
        # check 1 on diagonals
        if not np.allclose(np.diag(A), 1):
            return False

        # check symmetry
        if not np.allclose(A, A.T, atol=tolerance):
            return False

        # ensure non-negative eigenvalues
        eigvalues = np.linalg.eigvalsh(A)
        return np.all(eigvalues >= -tolerance)

    # convert to correlation
    A = np.asarray(A)
    n = A.shape[0]

    invSD = None
    out = A.copy()

    diag_vals = np.diag(out)
    count_ones = np.sum(np.isclose(diag_vals, 1.0))

    if count_ones != n:
        # convert covariance matrix to correlation matrix
        stds = np.sqrt(diag_vals)
        invSD = np.diag(1.0 / stds)
        out = invSD @ out @ invSD
    
    # start higham
    delta_S = np.zeros_like(out)
    Y = out.copy()
    gamma = np.inf

    for i in range(max_iterations):
        R = Y - delta_S
        X = P_s(R)
        delta_S = X - R
        Y = P_u(X)
    
        if valid_correlation_matrix(Y, tolerance):
            break
    
    # convert back to covariance if needed
    if invSD is not None:
        stds = 1.0 / np.diag(invSD)
        SD = np.diag(stds)
        Y = SD @ Y @ SD
        # out = invSD @ out @ invSD
    
    return Y #, i

In [417]:
data_3_3_vals = pd.read_csv("testfiles/data/testout_1.3.csv")
adjusted_cov, iteration_count = higham_covariance(data_3_3_vals, tolerance=0.0, max_iterations=1000)

test_3_3_vals = pd.read_csv("testfiles/data/testout_3.3.csv")
check_matrix_correctness(pd.DataFrame(adjusted_cov), test_3_3_vals, error_epsilon=1e-9)

True

## Test 3.4 Higham correlation

In [419]:
data_3_4_vals = pd.read_csv("testfiles/data/testout_1.4.csv")
adjusted_corr, iteration_count = higham_covariance(data_3_4_vals, tolerance=0.0, max_iterations=1000)

In [422]:
test_3_4_vals = pd.read_csv("testfiles/data/testout_3.4.csv")
check_matrix_correctness(pd.DataFrame(adjusted_corr), test_3_4_vals, error_epsilon=1e-12)

True

## Test 4.1 Cholesky PSD

In [20]:
from numpy.linalg import cholesky

In [21]:
data_4_1 = pd.read_csv("testfiles/data/testout_3.1.csv")

#### Package way of computing cholesky

In [23]:
chol

Unnamed: 0,0,1,2,3,4
0,1.083506,0.0,0.0,0.0,0.0
1,-0.57036,0.996437,0.0,0.0,0.0
2,-0.262628,-0.133175,0.911807,0.0,0.0
3,-0.06013,0.412871,0.431384,0.73116,0.0
4,-0.63524,-0.223938,0.054179,-0.256892,1.193941e-08


In [22]:
chol = pd.DataFrame(cholesky(data_4_1))
test_4_1_vals = pd.read_csv("testfiles/data/testout_4.1.csv")
check_matrix_correctness(chol, test_4_1_vals, error_epsilon=1e-8)

NameError: name 'check_matrix_correctness' is not defined

#### Manual code for computing cholesky

In [442]:
def cholesky_psd(A:np.array, zero_tol = 1e-8):
    assert len(A.shape) == 2, "Matrix array must have 2 dimensions"
    assert A.shape[0] == A.shape[1], "Matrix must be square"

    n = A.shape[0]
    root = np.zeros_like(A)

    for j in range(n):
        row_sum_of_squares = root[j, :j].dot(root[j, :j])
        diag = A[j,j] - row_sum_of_squares

        # correcting for floating point errors
        if diag <= 0 and diag >= -zero_tol:
            diag = 0.0
        if diag < 0:
            raise np.linalg.LinAlgError("Matrix not positive semidefinite")
        root[j,j] = np.sqrt(diag)
        
        # if diag is 0, the rest of column remains 0 (do nothing)
        # else update the rest of the column values
        if root[j,j] != 0.0:
            divisor = 1/root[j,j]
            for i in range(j+1, n):
                subvector_sum = root[i, :j].dot(root[j, :j])
                root[i,j] = (A[i,j] - subvector_sum) * divisor
        
    return root


manual_chol = cholesky_psd(np.array(data_4_1))


In [445]:
test_4_1_vals = pd.read_csv("testfiles/data/testout_4.1.csv")
check_matrix_correctness(pd.DataFrame(manual_chol), test_4_1_vals, error_epsilon=1e-8)

True

## Test 6.1 Calculate Arithmetic Returns

In [144]:
test_6_data = pd.read_csv("testfiles/data/test6.csv")
test_6_data = test_6_data.set_index("Date")

In [149]:
arithmetic_returns = test_6_data.pct_change().dropna()

In [203]:
test_6_1_vals = pd.read_csv("testfiles/data/testout6_1.csv")
test_6_1_vals = test_6_1_vals.set_index("Date")
check_matrix_correctness(arithmetic_returns, test_6_1_vals, error_epsilon=1e-12)

True

## Test 6.2 Calculate Log Returns

In [167]:
log_returns = np.log(test_6_data / test_6_data.shift(1)) # log(Pt / Pt-1)
log_returns = log_returns.dropna()

In [204]:
test_6_2_vals = pd.read_csv("testfiles/data/testout6_2.csv")
test_6_2_vals = test_6_2_vals.set_index("Date")
check_matrix_correctness(log_returns, test_6_2_vals, error_epsilon=1e-12)

True