# Estimate covariance matrix of financial time series (FTS)

In [1]:
import re
import sys
import warnings
import numpy as np # type: ignore
import pandas as pd # type: ignore

from scipy.linalg import eigh # type: ignore

sys.path.append('../modules')
import misc_functions as mf # type: ignore
import estimate_market_factors as emf # type: ignore
import get_financial_time_series as get_fts # type: ignore

warnings.filterwarnings("ignore")
pd.options.mode.chained_assignment = None
pd.set_option('display.max_columns', None)

## Global variables

In [2]:
input_path_raw = "../input_files/raw_data"
input_path_processed = "../input_files/processed_data"
input_path_data_dictionary = "../input_files/data_dictionary"
output_path = "../output_files"
input_generation_date = "2024-06-24"

## Load data and filter duplicates by symbol and date

In [3]:
# Standard and Poor's 500 - S&P 500 (United States)
# Mexico Price and Quote Index (Mexico)

df_stock_index = pd.read_pickle("{}/df_stock_index_mxx_{}.pkl".format(input_path_processed, re.sub("-", "", input_generation_date)))
df_stock_index.drop_duplicates(subset = ["date", "symbol"], keep = "first", inplace = True, ignore_index = True)

## Construct covariance matrix

In [5]:
df_cov = get_fts.estimate_covariance_stock_index(df = df_stock_index, column_ = "z_score_log_return")

## Apply Bouchaud clipping filter

In [9]:
pd.DataFrame({"eig" : eigh(df_cov)[0]}).value_counts(dropna = False, sort = False).reset_index()

Unnamed: 0,eig,count
0,0.41663,1
1,0.449658,1
2,0.516379,1
3,0.564964,1
4,0.593939,1
5,0.618421,1
6,0.631009,1
7,0.663864,1
8,0.685119,1
9,0.709068,1


In [7]:
p_i = df_cov.shape[0] # Number of time series (shares in stock index)
q = 1/2
m = 114
step = 8
n_i = int((1/q)*p_i) # Length of time series
delta = 20
k1_k0 = 8

In [10]:
pd.DataFrame({"eig" : eigh(emf.clipping_covariance_matrix(covariance_matrix = df_cov, n = n_i)[1])[0]}).value_counts(dropna = False, sort = False).reset_index()

Unnamed: 0,eig,count
0,-3.697385e-18,1
1,0.02695225,1
2,0.02695225,1
3,0.02695225,1
4,0.02695225,1
5,0.02695225,1
6,0.02695225,1
7,0.02695225,1
8,0.02695225,1
9,0.02695225,1


In [8]:
df_tracy_widom = pd.read_csv("{}/tracy_widom.csv".format(input_path_data_dictionary), low_memory = False) 
print(emf.estimate_tracy_widom_probability(df_tracy_widom = df_tracy_widom, z_score = 5.01))
print(emf.estimate_wishart_order_2(p = 10, n = 10, df_tracy_widom = df_tracy_widom, lambda_1 = 3))
print(emf.get_market_factors(df_tracy_widom, eigen_values = np.array([179, 190, 0.26, 3.07, 2.6]), n = 10, alpha = 0.01))

0.0
0.999838414979975
3


In [78]:
df_onatski = pd.read_csv("{}/onatski.csv".format(input_path_data_dictionary), low_memory = False)
df_onatski

Unnamed: 0,level,1,2,3,4,5,6,7,8
0,15,2.75,3.62,4.15,4.54,4.89,5.2,5.45,5.7
1,10,3.33,4.31,4.91,5.4,5.77,6.13,6.42,6.66
2,9,3.5,4.49,5.13,5.62,6.03,6.39,6.67,6.92
3,8,3.69,4.72,5.37,5.91,6.31,6.68,6.95,7.25
4,7,3.92,4.99,5.66,6.24,6.62,7.0,7.32,7.59
5,6,4.2,5.31,6.03,6.57,7.0,7.41,7.74,8.04
6,5,4.52,5.73,6.46,7.01,7.5,7.95,8.29,8.59
7,4,5.02,6.26,6.97,7.63,8.16,8.61,9.06,9.36
8,3,5.62,6.91,7.79,8.48,9.06,9.64,10.11,10.44
9,2,6.55,8.15,9.06,9.93,10.47,11.27,11.75,12.13


In [46]:
pd.DataFrame([2,3], columns=[str(10)])
[2,3,4,5,6,7,8,9,10] < df_onatski.loc[1].astype(float).values

array([ True,  True,  True, False, False, False, False, False, False])

In [83]:
df_onatski[df_onatski["level"] == 1].drop(columns = ["level"]).astype(float).values[0]

array([ 8.74, 10.52, 11.67, 12.56, 13.42, 14.26, 14.88, 15.25])

In [88]:
def get_significal_test_onatski(df_onatski, r_statistics, level=1):
    # Modify R statistics vector for taking into account the maximum value per component
    r_statistics_new = np.zeros(len(r_statistics))
    for i in range(len(r_statistics)):
        if i == 0:
            r_statistics_new[i] = r_statistics[0]
        else:
            r_statistics_new[i] = max(r_statistics_new[i - 1], r_statistics[i])

    # Available levels: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 15
    z_scores = df_onatski[df_onatski["level"] == level].drop(columns = ["level"]).astype(float).values[0]
    logical = r_statistics_new > z_scores
    
    print(r_statistics_new)
    print(z_scores)
    
    if np.all(logical) == True:
        number = 8
    else:
        number = np.argmax(logical == False)
    
    return number

get_significal_test_onatski(df_onatski = df_onatski, r_statistics = np.array([12, 11, 14, 5, 6, 7, 8, 9]), level = 1)

[12. 12. 14. 14. 14. 14. 14. 14.]
[ 8.74 10.52 11.67 12.56 13.42 14.26 14.88 15.25]


5

## Save data in input files for no reprocessing

In [14]:
#df_market.to_pickle("{}/df_sp500_{}.pkl".format(input_path_processed, re.sub("-", "", input_generation_date)))