In [2]:
###library###
import pandas as pd
import numpy as np
import statsmodels.api as sm #for fit-t
from scipy.stats import norm #for VaR and ES with normal
from scipy.stats import t #for VaR and ES with T
from scipy.optimize import minimize #for fit-t and VaR T
from scipy.integrate import quad #for ES simulation

In [3]:
###test 1###
def calculate_matrix(df, skipRow=True, method='cov'):
    df = pd.DataFrame(df) if not isinstance(df, pd.DataFrame) else df
    func = np.cov if method == 'cov' else np.corrcoef
    if skipRow:
        df = df.dropna(axis=0, how='any')
    
    if df.isnull().any().any() and not skipRow:
        n = df.shape[1]
        matrix = np.empty((n, n))
        matrix.fill(np.nan)
        for i in range(n):
            for j in range(i + 1):
                valid_rows = df.iloc[:, [i, j]].dropna().index
                if len(valid_rows) > 0:
                    matrix_ij = func(df.iloc[valid_rows, [i, j]], rowvar=False)[0, 1]
                    matrix[i, j] = matrix_ij
                    matrix[j, i] = matrix_ij
    else:
        matrix = func(df.T)
    
    return matrix

# Load the data from CSV files
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test1.csv')

In [4]:
#testout_1.1 compare
expected_cov1 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_1.1.csv')
# Convert expected_cov to a numpy array if it's not already, assuming it's the expected covariance matrix
if isinstance(expected_cov1, pd.DataFrame):
    expected_cov1 = expected_cov1.values
# Calculate covariance matrix skipping rows with missing values
result1 = calculate_matrix(df, skipRow=True, method='cov')
# Compare `res1` with `expected_cov`
comparison_result1 = np.isclose(expected_cov1, result1, atol=1e-5)
print("Comparison result between calculated covariance and expected output1.1:\n", comparison_result1)

Comparison result between calculated covariance and expected output1.1:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [5]:
#testout_1.2 compare
expected_corr1 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_1.2.csv').values
if isinstance(expected_corr1, pd.DataFrame):
    expected_corr1 = expected_corr1.values
result2 = calculate_matrix(df, skipRow=True, method='corr')
comparison_result2 = np.isclose(expected_corr1, result2, atol=1e-5)
print("Comparison result between calculated correlation and expected output1.2:\n", comparison_result2)

Comparison result between calculated correlation and expected output1.2:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [6]:
#testout_1.3 compare
expected_cov2 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_1.3.csv').values
if isinstance(expected_cov2, pd.DataFrame):
    expected_corr2 = expected_cov2.values
result3 = calculate_matrix(df, skipRow=False, method='cov')
comparison_result3 = np.isclose(expected_cov2, result3, atol=1e-5)
print("Comparison result between calculated covariance and expected output1.3:\n", comparison_result3)

Comparison result between calculated covariance and expected output1.3:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [7]:
#testout_1.4 compare
expected_corr2 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_1.4.csv').values
if isinstance(expected_corr2, pd.DataFrame):
    expected_corr2 = expected_corr2.values
result4 = calculate_matrix(df, skipRow=False, method='corr')
comparison_result4 = np.isclose(expected_corr2, result4, atol=1e-5)
print("Comparison result between calculated correlation and expected output1.4:\n", comparison_result4)

Comparison result between calculated correlation and expected output1.4:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [8]:
###test 2###
def expW(m, lam):
    w = np.empty(m)
    for i in range(m):
        w[i] = (1 - lam) * lam ** (m - i - 1)  # Adjusted index to 0-based
    # Normalize weights to sum to 1
    w /= np.sum(w)
    return w

def ewMatrix(x, lam, matrix_type):
    m, n = x.shape
    w = expW(m, lam)
    # Remove the mean from each column
    xm = np.mean(x, axis=0)
    x -= xm
    # Reshape w from (m,) to (m, 1) to allow broadcasting
    w = w.reshape(-1, 1)
    # Calculate exponentially weighted covariance
    cov_matrix = np.dot((w * x).T, x)
    
    if matrix_type == 'cov':
        return cov_matrix
    elif matrix_type == 'corr':
        # Convert covariance to correlation
        std_dev = np.sqrt(np.diag(cov_matrix))
        corr_matrix = cov_matrix / np.outer(std_dev, std_dev)
        return corr_matrix
    else:
        raise ValueError("matrix_type must be either 'cov' or 'corr'")


# Load the data
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test2.csv')
# Ensure the data is in a NumPy array for processing
data_matrix = df.values

In [9]:
#testout_2.1
expected_res1 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_2.1.csv')
if isinstance(expected_res1, pd.DataFrame):
    expected_res1 = expected_res1.values
# Calculate the expoentially weighted covariance matrix
result1 = ewMatrix(data_matrix, lam=0.97, matrix_type='cov')
# Ensure the shapes of result1 and expected_res1 match before comparison
comparison_result1 = np.isclose(expected_res1, result1, atol=1e-5)
print("Comparison result between calculated exponentially weighted covariance and expected output:\n", comparison_result1)

Comparison result between calculated exponentially weighted covariance and expected output:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [10]:
#testout_2.2
expected_res2 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_2.2.csv')
if isinstance(expected_res2, pd.DataFrame):
    expected_res2 = expected_res2.values
# Calculate the expoentially weighted covariance matrix
result2 = ewMatrix(data_matrix, lam=0.94, matrix_type='corr')
# Ensure the shapes of result2 and expected_res2 match before comparison
comparison_result2 = np.isclose(expected_res2, result2, atol=1e-5)
print("Comparison result between calculated exponentially weighted Correlation and expected output:\n", comparison_result2)

Comparison result between calculated exponentially weighted Correlation and expected output:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [11]:
# testout_2.3
expected_res3 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_2.3.csv')
cov = ewMatrix(data_matrix, 0.97, matrix_type='cov')
sd1 = np.sqrt(np.diag(cov))
cov = ewMatrix(data_matrix, 0.94, matrix_type='cov')
sd = 1 / np.sqrt(np.diag(cov))
result3 = np.diag(sd1) @ np.diag(sd) @ cov @ np.diag(sd) @ np.diag(sd1)
comparison_result3 = np.isclose(expected_res3, result3, atol=1e-5)
print("Comparison result is:\n", comparison_result3)

Comparison result is:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [12]:
###test 3###
def near_psd(a, epsilon=0.0):
    n = a.shape[0]
    out = a.copy()
    invSD = None  # Initialize invSD

    # Check if the matrix is a covariance matrix and convert it to a correlation matrix if needed
    if not np.allclose(np.diag(out), 1):
        invSD = np.diag(1.0 / np.sqrt(np.diag(out)))
        out = invSD @ out @ invSD

    # Perform SVD, update eigenvalues, and scale
    vals, vecs = np.linalg.eigh(out)
    vals = np.maximum(vals, epsilon)
    T = 1.0 / (vecs ** 2 @ vals[:, np.newaxis])
    T = np.diag(np.sqrt(T).flatten())
    l = np.diag(np.sqrt(vals))
    B = T @ vecs @ l
    out = B @ B.T

    # Add back the variance if needed
    if invSD is not None:
        invSD = np.diag(1.0 / np.diag(invSD))
        out = invSD @ out @ invSD

    return out

def frobenius(input):
    result = 0
    for i in range(len(input)):
        for j in range(len(input)):
            result += input[i][j]**2
    return result

# define a function calculating PSD via Higham's method
def higham_nearestPSD(input):
    weight = np.identity(len(input))
        
    norml = np.inf
    Yk = input.copy()
    Delta_S = np.zeros_like(Yk)
    
    invSD = None
    if np.count_nonzero(np.diag(Yk) == 1.0) != input.shape[0]:
        invSD = np.diag(1 / np.sqrt(np.diag(Yk)))
        Yk = invSD @ Yk @ invSD
    
    Y0 = Yk.copy()

    for i in range(1000):
        Rk = Yk - Delta_S
        Xk = np.sqrt(weight)@ Rk @np.sqrt(weight)
        vals, vecs = np.linalg.eigh(Xk)
        vals = np.where(vals > 0, vals, 0)
        Xk = np.sqrt(weight)@ vecs @ np.diagflat(vals) @ vecs.T @ np.sqrt(weight)
        Delta_S = Xk - Rk
        Yk = Xk.copy()
        np.fill_diagonal(Yk, 1)
        norm = frobenius(Yk-Y0)
        min_val = np.real(np.linalg.eigvals(Yk)).min()
        if abs(norm - norml) < 1e-8 and min_val > -1e-9:
            break
        else:
            norml = norm
    
    if invSD is not None:
        invSD = np.diag(1 / np.diag(invSD))
        Yk = invSD @ Yk @ invSD
    return Yk

In [13]:
#test 3.1
cov_matrix_1 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_1.3.csv').values
cov_matrix_2 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_3.1.csv').values
# Calculate the near PSD covariance matrices
near_psd_cov_1 = near_psd(cov_matrix_1, epsilon=0.0)
near_psd_cov_2 = near_psd(cov_matrix_2, epsilon=0.0)
# Compare the results under atol=1e-5
comparison_result_3_1 = np.isclose(near_psd_cov_1, near_psd_cov_2, atol=1e-5)
print("Comparison result is:\n", comparison_result_3_1)

Comparison result is:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [14]:
#test 3.2
# Load the correlation matrices from the provided files
corr_matrix_1 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_1.4.csv').values
corr_matrix_2 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_3.2.csv').values
# Calculate the near PSD correlation matrices
near_psd_corr_1 = near_psd(corr_matrix_1, epsilon=0.0)
near_psd_corr_2 = near_psd(corr_matrix_2, epsilon=0.0)
# Compare the results under atol=1e-5
comparison_result_3_2 = np.isclose(near_psd_corr_1, near_psd_corr_2, atol=1e-5)
print("Comparison result is:\n", comparison_result_3_2)

Comparison result is:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [15]:
#test 3.3
# Load the correlation matrices from the provided files
cov_matrix_1 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_1.3.csv').values
cov_matrix_2 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_3.3.csv').values
# Calculate the near PSD correlation matrices
highamnear_psd_cov1 = higham_nearestPSD(cov_matrix_1)
highamnear_psd_cov2 = higham_nearestPSD(cov_matrix_2)
# Compare the results under atol=1e-5
comparison_result_3_3 = np.isclose(highamnear_psd_cov1, highamnear_psd_cov2, atol=1e-5)
print("Comparison result is:\n", comparison_result_3_3)

Comparison result is:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [16]:
#test 3.4
# Load the correlation matrices from the provided files
corr_matrix_1 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_1.4.csv').values
corr_matrix_2 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_3.4.csv').values
# Calculate the near PSD correlation matrices
highamnear_psd_corr1 = higham_nearestPSD(corr_matrix_1)
highamnear_psd_corr2 = higham_nearestPSD(corr_matrix_2)
# Compare the results under atol=1e-5
comparison_result_3_4 = np.isclose(highamnear_psd_corr1, highamnear_psd_corr2, atol=1e-5)
print("Comparison result is:\n", comparison_result_3_4)

Comparison result is:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [17]:
###test 4###
def chol_psd(a):
    if not isinstance(a, pd.DataFrame):
        a = pd.DataFrame(a)

    m, n = a.shape
    root = np.zeros((m, n))

    for j in range(m):
        s = 0.0
        if j >= 0:
            s = np.dot(root[j, :j], root[j, :j])
        # Diagonal element
        temp = a.iloc[j, j] - s
        if -1e-8 <= temp <= 0:
            temp = 0.0
        root[j, j] = np.sqrt(max(temp, 0))

        if root[j, j] == 0.0:
            root[j, (j+1):n] = 0.0
        else:
            ir = 1.0 / root[j, j]
            for i in range(j + 1, m):
                s = np.dot(root[i, :j], root[j, :j])
                root[i, j] = (a.iloc[i, j] - s) * ir
    return root

In [18]:
#test 4.1
# Load the correlation matrices from the provided files
matrix_1 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_3.1.csv').values
matrix_2 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_4.1.csv').values
# Calculate the near PSD correlation matrices
chol_psd1 = chol_psd(matrix_1)
comparison_result_4 = np.isclose(chol_psd1, matrix_2, atol=1e-5)
print("Comparison result is:\n", comparison_result_4)

Comparison result is:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [19]:
###test 5###
def simulateNormal(N, df, mean=None, seed=1234, fixMethod=near_psd):
    # Error Checking
    m,n = df.shape
    if n != m:
        raise ValueError(f"Covariance Matrix is not square ({n},{m})")
    # Initialize the output
    out = np.zeros((N, n))
    # Set mean
    if mean is None:
        mean = np.zeros(n)
    else:
        if len(mean) != n:
            raise ValueError(f"Mean ({len(mean)}) is not the size of cov ({n},{n})")
    # Set the seed to make sure the value is the same each time.
    np.random.seed(seed)
    
    eigenvalues, eigenvectors = np.linalg.eig(df)
    # If the covariance is not PS, try to fix it
    if min(eigenvalues) < 0:
        df = fixMethod (df)
    # Take the root (cholesky factorization)
    l = chol_psd(df)
    # Generate random standard normals
    rand_normals = np.random.normal(0.0, 1.0, size=(N, n))
    # Apply the Cholesky root and plus the mean to the random normals
    out = np.dot(rand_normals, l.T) + mean
    
    return out.T

#Multivariate PCA Simulation
def simulatePCA(N, df, mean=None, seed=1234, pctExp=1):
    # Error Checking
    m, n = df.shape
    if n != m:
        raise ValueError(f"Covariance Matrix is not square ({n},{m})")
    # Initialize the output
    out = np.zeros((N, n))
    # Set mean
    if mean is None:
        mean = np.zeros(n)
    else:
        if len(mean) != n:
            raise ValueError(f"Mean ({len(mean)}) is not the size of cov ({n},{n})")
    eigenvalues, eigenvectors = np.linalg.eig(df)
    # Get the indices that would sort eigenvalues in descending order
    indices = np.argsort(eigenvalues)[::-1]
    # Sort eigenvalues
    eigenvalues = eigenvalues[indices]
    # Sort eigenvectors according to the same order
    eigenvectors = eigenvectors[:, indices]
    tv = np.sum(eigenvalues)
    posv = np.where(eigenvalues >= 1e-8)[0]
    if pctExp < 1:
        nval = 0
        pct = 0.0
        # How many factors needed
        for i in posv:
            pct = pct + eigenvalues[i] / tv
            nval = 1 + nval
            if pct >= pctExp:
                break
     # If nval is less than the number of positive eigenvalues, truncate posv
    if nval < len(posv):
        posv = posv[:nval]
    # Filter eigenvalues based on posv
    eigenvalues = eigenvalues[posv]
    eigenvectors = eigenvectors[:, posv]
    
    B = eigenvectors @ np.diag(np.sqrt(eigenvalues))
    
    np.random.seed(seed)
    rand_normals = np.random.normal(0.0, 1.0, size=(N, len(posv)))
    out = np.dot(rand_normals, B.T) + mean
    
    return out

In [66]:
#test 5.1
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test5_1.csv')
expected_res = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_5.1.csv')
sim = simulateNormal(100000, df)
result = np.cov(sim)
comparison_result = np.isclose(expected_res, result, atol=1e-3)
print("Comparison result is:\n", comparison_result)

Comparison result is:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [67]:
#test 5.2
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test5_2.csv')
expected_res_5_2 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_5.2.csv')
sim = simulateNormal(100000, df)
result_5_2 = np.cov(sim)
comparison_result_5_2 = np.isclose(expected_res_5_2, result_5_2, atol=1e-3)
print("Comparison result is:\n", comparison_result_5_2)

Comparison result is:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [68]:
# testout_5.3
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test5_3.csv')
expected_res = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_5.3.csv')
sim = simulateNormal(100000, df, fixMethod=near_psd)
result = np.cov(sim)
comparison_result_5_3 = np.isclose(expected_res, result, atol=1e-3)
print("Comparison result is:\n", comparison_result_5_3)

Comparison result is:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [69]:
# testout_5.4
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test5_3.csv')
expected_res = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_5.4.csv')
sim = simulateNormal(100000, df, fixMethod=higham_nearestPSD)
result = np.cov(sim)
comparison_result_5_4 = np.isclose(expected_res, result, atol=1e-3)
print("Comparison result is:\n", comparison_result_5_4)

Comparison result is:
 [[ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]
 [ True  True  True  True  True]]


In [None]:
# testout_5.5
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test5_2.csv')
expected_res = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout_5.5.csv')
sim = simulatePCA(100000, df, pctExp=0.99)
result = np.cov(sim)
comparison_result_5_5 = np.isclose(expected_res, result, atol=1e-3)
print("Comparison result is:\n", comparison_result_5_5)

In [3]:
###test 6###
def return_calculate(prices, method="DISCRETE", date_column="Date"):
    if date_column not in prices.columns:
        raise ValueError(f"dateColumn: {date_column} not in DataFrame: {prices.columns.tolist()}")
    
    # Extract the columns that are not the date column
    vars = [col for col in prices.columns if col != date_column]
    prices[vars] = prices[vars].apply(pd.to_numeric, errors='coerce')
    
    p = prices[vars].values
    n, m = p.shape
    
    p2 = np.empty((n-1, m))
    p2 = p[1:] / p[:-1]

    # Calculate the returns
    if method.upper() == "DISCRETE":
        # Discrete returns: (Price_t+1 / Price_t) - 1
        p2 -= 1
    elif method.upper() == "LOG":
        # Log returns: log(Price_t+1 / Price_t)
        p2 = np.log(p2)
    else:
        raise ValueError(f"method: {method} must be in (\"LOG\", \"DISCRETE\")")

    # Create the output DataFrame
    dates = prices[date_column].iloc[1:].reset_index(drop=True)
    out = pd.DataFrame(data=p2, columns=vars)
    out.insert(0, date_column, dates)
    
    return out
#Load data
prices = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test6.csv')
discrete_returns = return_calculate(prices, method="DISCRETE")
log_returns = return_calculate(prices, method="LOG")

In [222]:
#test6_1
expected_discrete_return = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test6_1.csv')
computed_discrete_result = discrete_returns.drop(columns = ['Date']).to_numpy()
expected_discrete_result = expected_discrete_return.drop(columns = ['Date']).to_numpy()
comparison_discrete = np.isclose(expected_discrete_result, computed_discrete_result, atol=1e-5)
all_true_discrete = np.all(comparison_discrete)
print("All values in comparison_discrete:", all_true_discrete)

All values in comparison_discrete: True


In [223]:
#test6_2
expected_log_return = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test6_2.csv')
computed_log_result = log_returns.drop(columns = ['Date']).to_numpy()
expected_log_result = expected_log_return.drop(columns = ['Date']).to_numpy()
comparison_log = np.isclose(expected_log_result, computed_log_result, atol=1e-5)
all_true_log = np.all(comparison_log)
print("All values in comparison_log:", all_true_log)

All values in comparison_log: True


In [4]:
###test 7###
def fit_normal(data):
    # Fit the normal distribution to the data
    mu, std = norm.fit(data)
    return mu, std

def fit_general_t(data):
    # Fit the t distribution to the data
    nu, mu, sigma = t.fit(data)
    return mu, sigma, nu

#Fit t distribution
def fit_regression_t(df):
    Y = df.iloc[:, -1]
    X = df.iloc[:, :-1]
    betas = MLE_t(X, Y)
    X = sm.add_constant(X)
    
    # Get the residuals.
    e = Y - np.dot(X, betas)

    params = t.fit(e)
    out = {"mu": [params[1]], 
           "sigma": [params[2]], 
           "nu": [params[0]]}
    for i in range(len(betas)):
        out["B" + str(i)] = betas[i]
    out = pd.DataFrame(out)
    out.rename(columns={'B0': 'Alpha'}, inplace=True)
    return out

#The objective negative log-likelihood function (need to be minimized).
def MLE_t(X, Y):
    X = sm.add_constant(X)
    def ll_t(params):
        nu, sigma = params[:2]
        beta_MLE_t = params[2:]
        epsilon = Y - np.dot(X, beta_MLE_t)
        # Calculate the log-likelihood
        log_likelihood = np.sum(t.logpdf(epsilon, df=nu, loc=mu, scale=sigma))
        return -log_likelihood
    
    beta = np.zeros(X.shape[1])
    nu, mu, sigma = 1, 0, np.std(Y - np.dot(X, beta))
    params = np.append([nu, sigma], beta)
    bnds = ((0, None), (0, None), (None, None), (None, None), (None, None), (None, None))
    
    # Minimize the log-likelihood to get the beta
    res = minimize(ll_t, params, bounds=bnds, options={'disp': True})
    beta_MLE = res.x[2:]
    return beta_MLE

In [255]:
#testout7_1
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test7_1.csv')
expected_res = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout7_1.csv')
result = fit_normal(df)
comparison_result = np.isclose(expected_res, result, atol=1e-3)
print("Comparison result is:\n", comparison_result)

Comparison result is:
 [[ True  True]]


In [257]:
#testout7_2
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test7_2.csv')
expected_res = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout7_2.csv')
result = fit_general_t(df)
comparison_result = np.isclose(expected_res, result, atol=1e-3)
print("Comparison result is:\n", comparison_result)

Comparison result is:
 [[ True  True  True]]


In [9]:
#testout7_3
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test7_3.csv')
expected_res = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout7_3.csv')
result = fit_Tregression(df)
comparison_result = np.isclose(expected_res, result, atol=1e-3)
print("Comparison result is:\n", comparison_result)

RUNNING THE L-BFGS-B CODE

           * * *

Machine precision = 2.220D-16
 N =            6     M =           10

At X0         0 variables are exactly at the bounds

At iterate    0    f=  1.21632D+00    |proj g|=  2.91239D+01

At iterate    1    f=  2.46768D-01    |proj g|=  3.49674D+01

At iterate    2    f= -8.25857D+00    |proj g|=  6.36315D+01

At iterate    3    f= -9.00656D+00    |proj g|=  2.01391D+01

At iterate    4    f= -9.74769D+00    |proj g|=  1.89823D+01

At iterate    5    f= -1.78442D+01    |proj g|=  2.00398D+02

At iterate    6    f= -8.53086D+01    |proj g|=  2.05029D+02

At iterate    7    f= -1.19412D+02    |proj g|=  1.47451D+02

At iterate    8    f= -1.26128D+02    |proj g|=  1.47882D+02

At iterate    9    f= -1.33571D+02    |proj g|=  1.20313D+02

At iterate   10    f= -1.35629D+02    |proj g|=  1.38175D+02

At iterate   11    f= -1.36856D+02    |proj g|=  7.11314D+01

At iterate   12    f= -1.36866D+02    |proj g|=  4.68715D+01

At iterate   13    f= -1.3

In [5]:
###test 8###
def fit_normal(data):
    #Fit the normal distribution to the data
    mu, std = norm.fit(data)
    return mu, std
def var_normal(data, alpha=0.05):
    #Fit the data with normal distribution.
    mu, std = fit_normal(data)
    VaR = -norm.ppf(alpha, mu, std)
    #Calculate the relative difference from the mean expected.
    VaR_diff = VaR + mu
    return pd.DataFrame({"VaR Absolute": [VaR], 
                         "VaR Diff from Mean": [VaR_diff]})
    
#ES for normal distribution
def es_normal(data, alpha=0.05):
    #Fit the data with normal distribution.
    mu, std = fit_normal(data)

    res = var_normal(data, alpha)
    VaR = res.iloc[0, 0]
    #Define the integrand function: x times the PDF of the distribution
    def integrand(x, mu, std):
        return x * norm.pdf(x, loc=mu, scale=std)
    
    ES, _ = quad(lambda x: integrand(x, mu, std), -np.inf, -VaR)
    ES /= -alpha
    #Calculate the relative difference from the mean expected.
    ES_diff = ES + mu
    return pd.DataFrame({"ES Absolute": [ES], 
                         "ES Diff from Mean": [ES_diff]})


def fit_general_t(data):
    #Fit the t distribution to the data
    nu, mu, sigma = t.fit(data)
    return mu, sigma, nu
def var_t(data, alpha=0.05):
    #Fit the data with t distribution.
    mu, sigma, nu = fit_general_t(data)
    VaR = -t.ppf(alpha, nu, mu, sigma)
    #From the mean expected.
    VaR_diff = VaR + mu
    return pd.DataFrame({"VaR Absolute": [VaR], 
                         "VaR Diff from Mean": [VaR_diff]})
    
#VaR for t Distribution simulation 
def var_simulation(data, alpha=0.05, size=10000):
    #Fit the data with t distribution.
    mu, sigma, nu = fit_general_t(data)
    #Generate given size random numbers from a t-distribution
    random_numbers = t.rvs(df=nu, loc=mu, scale=sigma, size=size)
    return var_t(random_numbers, alpha)

#ES for t Distribution
def es_t(data, alpha=0.05):
    #Fit the data with normal distribution.
    mu, sigma, nu = fit_general_t(data)
    
    res = var_t(data, alpha)
    VaR = res.iloc[0, 0]
    #Define the integrand function: x times the PDF of the distribution
    def integrand(x, mu, sigma, nu):
        return x * t.pdf(x, df=nu, loc=mu, scale=sigma)

    ES, _ = quad(lambda x: integrand(x, mu, sigma, nu), -np.inf, -VaR)
    ES /= -alpha
    ES_diff = ES + mu
    return pd.DataFrame({"ES Absolute": [ES], 
                         "ES Diff from Mean": [ES_diff]})

#ES for simulation
def es_simulation(data, alpha=0.05, size=10000):
    #Fit the data with t distribution.
    mu, sigma, nu = fit_general_t(data)
    random_numbers = t.rvs(df=nu, loc=mu, scale=sigma, size=size)
    return es_t(random_numbers, alpha)

In [29]:
#testout8_1
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test7_1.csv')
expected_res = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout8_1.csv')
result = var_normal(df)
comparison_result = np.isclose(expected_res, result, atol=1e-3)
print("Comparison result is:\n", comparison_result)

Comparison result is:
 [[ True  True]]


In [30]:
#testout8_2
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test7_2.csv')
expected_res = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout8_2.csv')
result = var_t(df)
comparison_result = np.isclose(expected_res, result, atol=1e-3)
print("Comparison result is:\n", comparison_result)

Comparison result is:
 [[ True  True]]


In [35]:
#testout8_3
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test7_2.csv')
expected_res = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout8_3.csv')
result = var_simulation(df, 0.05, 10000)
comparison_result = np.isclose(expected_res, result, atol=1e-3)
print("Comparison result is:\n", comparison_result)
#the result is not always 'True', depends on the simulation

Comparison result is:
 [[ True  True]]


In [36]:
#testout8_4
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test7_1.csv')
expected_res = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout8_4.csv')
# Calculate the ES at 5% quantile.
result = es_normal(df, 0.05)
comparison_result = np.isclose(expected_res, result, atol=1e-3)
print("Comparison result is:\n", comparison_result)

Comparison result is:
 [[ True  True]]


In [37]:
#testout8_5
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test7_2.csv')
expected_res = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout8_5.csv')
# Calculate the ES at 5% quantile.
result = es_t(df, 0.05)
comparison_result = np.isclose(expected_res, result, atol=1e-3)
print("Comparison result is:\n", comparison_result)

Comparison result is:
 [[ True  True]]


In [39]:
#testout8_6
df = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test7_2.csv')
expected_res = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout8_6.csv')
result = es_simulation(df, 0.05, 10000)
comparison_result = np.isclose(expected_res, result, atol=1e-3)
print("Comparison result is:\n", comparison_result)
#the result is not always 'True', depends on the simulation

Comparison result is:
 [[ True  True]]


In [13]:
from scipy.stats import norm, t

def return_calculate(prices, method='ARS', dateColumn='Date'):
    # Exclude the date column from the calculations
    tickers = [col for col in prices.columns if col != dateColumn]
    prices = prices[tickers] # The dataframe is now with no date column
    # Classical Brownian Motion
    if method == 'CBM':
        prices = prices.diff().dropna()
    # Arithmetic Return System
    elif method == 'ARS':
        prices = (prices - prices.shift(1)) / prices.shift(1)
        prices = prices.dropna()
    # Geometric Brownian Motion
    elif method == 'GBM':
        prices = np.log(df).diff().dropna()
    else:
        raise ValueError(f"method: {method} must be in (\"CBM\",\"ARS\",\"GBM\")")
    
    return prices

def simulatePCA(N, prices, mean=None, seed=1234, pctExp=1):
    # Error Checking
    m, n = prices.shape
    if n != m:
        raise ValueError(f"Covariance Matrix is not square ({n},{m})")
    # Initialize output
    out = np.zeros((N, n))
    # Set mean
    if mean is None:
        mean = np.zeros(n)
    else:
        if len(mean) != n:
            raise ValueError(f"Mean ({len(mean)}) is not the size of cov ({n},{n})")
    
    eigenvalues, eigenvectors = np.linalg.eig(prices)
    
    # Get the indices that would sort eigenvalues in descending order
    indices = np.argsort(eigenvalues)[::-1]
    # Sort eigenvalues
    eigenvalues = eigenvalues[indices]
    # Sort eigenvectors according to the same order
    eigenvectors = eigenvectors[:, indices]
    
    tv = np.sum(eigenvalues)
    posv = np.where(eigenvalues >= 1e-8)[0]
    if pctExp <= 1:
        nval = 0
        pct = 0.0
        # How many factors needed
        for i in posv:
            pct += eigenvalues[i] / tv
            nval += 1
            if pct >= pctExp:
                break
    
     # If nval is less than the number of positive eigenvalues, truncate posv
    if nval < len(posv):
        posv = posv[:nval]
        
    # Filter eigenvalues based on posv
    eigenvalues = eigenvalues[posv]
    eigenvectors = eigenvectors[:, posv]
    
    B = eigenvectors @ np.diag(np.sqrt(eigenvalues))
    
    np.random.seed(seed)
    rand_normals = np.random.normal(0.0, 1.0, size=(N, len(posv)))
    out = np.dot(rand_normals, B.T) + mean
    
    return out.T

def simulate_copula(portfolio, returns):
    portfolio['CurrentValue'] = portfolio['Holding'] * portfolio['Starting Price']
    models = {}
    uniform = pd.DataFrame()
    standard_normal = pd.DataFrame()
    
    for stock in portfolio["Stock"]:
        # If the distribution for the model is normal, fit the data with normal distribution
        if portfolio.loc[portfolio['Stock'] == stock, 'Distribution'].iloc[0] == 'Normal':
            models[stock] = norm.fit(returns[stock])
            mu, sigma = norm.fit(returns[stock])
            # Transform the observation vector into a uniform vector using CDF
            uniform[stock] = norm.cdf(returns[stock], loc=mu, scale=sigma)
            # Transform the uniform vector into a Standard Normal vector usig the normal quantile function
            standard_normal[stock] = norm.ppf(uniform[stock])
            
        # If the distribution for the model is t, fit the data with normal t
        elif portfolio.loc[portfolio['Stock'] == stock, 'Distribution'].iloc[0] == 'T':
            models[stock] = t.fit(returns[stock])
            nu, mu, sigma = t.fit(returns[stock])
            # Transform the observation vector into a uniform vector using CDF
            uniform[stock] = t.cdf(returns[stock], df=nu, loc=mu, scale=sigma)
            # Transform the uniform vector into a Standard Normal vector usig the normal quantile function
            standard_normal[stock] = norm.ppf(uniform[stock])
        
    # Calculate Spearman's correlation matrix
    spearman_corr_matrix = standard_normal.corr(method='spearman')
    
    simulate_time = 10000
    
    # Use the PCA to simulate the multivariate normal
    simulations = simulatePCA(simulate_time, spearman_corr_matrix)
    simulations = pd.DataFrame(simulations.T, columns=[stock for stock in portfolio["Stock"]])
    # Transform the simulations into uniform variables using standard normal CDF
    uni = norm.cdf(simulations)
    uni = pd.DataFrame(uni, columns=[stock for stock in portfolio["Stock"]])
    simulatedReturns = pd.DataFrame()
    # Transform the uniform variables into the desired data using quantile
    for stock in portfolio["Stock"]:
        # If the distribution for the model is normal/t, use the quantile of the normal/t distribution
        if portfolio.loc[portfolio['Stock'] == stock, 'Distribution'].iloc[0] == 'Normal':
            mu, sigma = models[stock]
            simulatedReturns[stock] = norm.ppf(uni[stock], loc=mu, scale=sigma)
        elif portfolio.loc[portfolio['Stock'] == stock, 'Distribution'].iloc[0] == 'T':
            nu, mu, sigma = models[stock]
            simulatedReturns[stock] = t.ppf(uni[stock], df=nu, loc=mu, scale=sigma)
    
    simulatedValue = pd.DataFrame()
    pnl = pd.DataFrame()
    # Calculate the daily prices for each stock
    for stock in portfolio["Stock"]:
        currentValue = portfolio.loc[portfolio['Stock'] == stock, 'CurrentValue'].iloc[0]
        simulatedValue[stock] = currentValue * (1 + simulatedReturns[stock])
        pnl[stock] = simulatedValue[stock] - currentValue
        
    risk = pd.DataFrame(columns = ["Stock", "VaR95", "ES95", "VaR95_Pct", "ES95_Pct"])
    w = pd.DataFrame()

    for stock in pnl.columns:
        i = risk.shape[0]
        risk.loc[i, "Stock"] = stock
        risk.loc[i, "VaR95"] = -np.percentile(pnl[stock], 5)
        risk.loc[i, "VaR95_Pct"] = risk.loc[i, "VaR95"] / portfolio.loc[portfolio['Stock'] == stock, 'CurrentValue'].iloc[0]
        risk.loc[i, "ES95"] = -pnl[stock][pnl[stock] <= -risk.loc[i, "VaR95"]].mean()
        risk.loc[i, "ES95_Pct"] = risk.loc[i, "ES95"] / portfolio.loc[portfolio['Stock'] == stock, 'CurrentValue'].iloc[0]
        
        # Determine the weights for the two stock
        w.at['Weight', stock] = portfolio.loc[portfolio['Stock'] == stock, 'CurrentValue'].iloc[0] / portfolio['CurrentValue'].sum()
        
    # Calculate the total pnl
    pnl['Total'] = 0
    for stock in portfolio["Stock"]:
        pnl['Total'] += pnl[stock]
    
    i = risk.shape[0]
    risk.loc[i, "Stock"] = 'Total'
    risk.loc[i, "VaR95"] = -np.percentile(pnl['Total'], 5)
    risk.loc[i, "VaR95_Pct"] = risk.loc[i, "VaR95"] / portfolio['CurrentValue'].sum()
    risk.loc[i, "ES95"] = -pnl['Total'][pnl['Total'] <= -risk.loc[i, "VaR95"]].mean()
    risk.loc[i, "ES95_Pct"] = risk.loc[i, "ES95"] / portfolio['CurrentValue'].sum()

    return risk

In [15]:
# testout_9.1
df1 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test9_1_portfolio.csv')
df2 = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/test9_1_returns.csv')
expected_result = pd.read_csv('/Users/queenieliu/FinTech545_Spring2024/testfiles/data/testout9_1.csv')
result = simulate_copula(df1, df2)
print("The expecetd outcome is:\n", f'{expected_result}\n')
print("The calculated result is:\n", result)

The expecetd outcome is:
    Stock       VaR95        ES95  VaR95_Pct  ES95_Pct
0      A   94.460376  118.289371   0.047230  0.059145
1      B  107.880427  151.218174   0.035960  0.050406
2  Total  152.565684  199.704532   0.030513  0.039941

The calculated result is:
    Stock       VaR95        ES95 VaR95_Pct  ES95_Pct
0      A   93.131154  115.757315  0.046566  0.057879
1      B  108.605932  153.976595  0.036202  0.051326
2  Total   152.25331  202.826173  0.030451  0.040565
