In [1]:
import pandas as pd 
import process
import numpy as np 
# Jerome path : r'C:\Users\33640\OneDrive\Documents\GitHub\Portfolio_clustering_project\Data\DataBase.csv'
# Nail path : '/Users/khelifanail/Documents/GitHub/Portfolio_clustering_project/Data/DataBase.csv'
df = pd.read_csv(r'/Users/khelifanail/Documents/GitHub/Portfolio_clustering_project/Data/DataBase.csv')

df.set_index('ticker', inplace=True)

df.columns = pd.to_datetime(df.columns.str[1:], format='%Y%m%d').strftime('%d/%m/%Y')

df_cleaned = df.fillna(0) # Utilisez la méthode fillna(0) pour remplacer les NaN par 0

df_cleaned = df_cleaned.transpose() ## WE WANT COLUMNS TO BE VECTOR OF RETURN FOR A GIVEN TICKER

In [3]:
from sklearn.model_selection import ShuffleSplit

######################### 1. We start by randomizing the auxiliary observation matrix  ̃X from Equation (5) along the time axis #########################
def auxilary_matrix(days, beta, df_cleaned):

    ## 1. We extract the data corresponding to the returns of our assets (columns) during these d days (lines)
    X = df_cleaned.iloc[0:days,:]

    ## 2. We slightly adjust the matrix of observations to get the auxiliary matrix that puts more weight on recent dates

    W = np.sqrt(np.diag(days * (1 - beta) * beta**(np.arange(days)[::-1]) / (1 - beta**days)))  # Compute the weight matrix
    X_tilde = pd.DataFrame(index=X.index, columns=X.columns, data=np.dot(W, X))

    ## 3. We randomize the auxiliary matrix of observations according to the time axis
    Randomized_X = X_tilde.transpose().sample(frac=1, axis=1, random_state=42) ## we transpose X as we want to have daily observations of the whole dataset !

    return Randomized_X

######################### 2. We then split the (randomized) auxiliary observations into K non-overlapping folds of equal size #########################
def shuffle_split(array, K):
    # Initialize ShuffleSplit
    shuffle_split = ShuffleSplit(n_splits=K, test_size=0.2, random_state=42) 
    # test_size=0.2 : 20% des données pour l'ensemble de test, 80% pour l'ensemble d'entraînement.

    # Create empty list to store splits
    splits = []

    # Perform shuffling and splitting
    for train_index, test_index in shuffle_split.split(array):
        train_fold = [array[i] for i in train_index]
        test_fold = [array[i] for i in test_index]
        splits.append((train_fold, test_fold)) ## attention à cette structure

    return splits

######################### 3. For each K fold configuration, we estimate the sample eigenvectors from the training set #########################
def eigen_sample(data, train_fold): ## we train the data on this test fold

    train_data = data.loc[:, train_fold]

    # Calculer la moyenne de l'ensemble d'entraînement
    mean_train = np.mean(train_data, axis=1)

    # Centrer les données d'entraînement
    centered_train_data = train_data.sub(mean_train, axis=0)

    # Calculer la matrice de covariance des données d'entraînement
    cov_matrix_train = np.cov(centered_train_data) ## size number of assets * number of assets

    # Calculer les vecteurs et valeurs propres de la matrice de covariance
    _, eigenvectors_train = np.linalg.eig(cov_matrix_train)

    return eigenvectors_train

def intra_fold_loss(data, test_fold, sample_eigenvector_i, beta): ## we test the data on this test fold

    ## 1. get the fold cardinality 
    fold_cardinality = len(test_fold)

    ## 2. sample vector of the auxiliary observation matrix from the test fold (inspired from the code above)

    days = len(test_fold)
    X = data.loc[test_fold,:]

    ## 2. We slightly adjust the matrix of observations to get the auxiliary matrix that puts more weight on recent dates

    W = np.sqrt(np.diag(days * (1 - beta) * beta**(np.arange(days)[::-1]) / (1 - beta**days)))  # Compute the weight matrix
    X_tilde = pd.DataFrame(index=X.index, columns=X.columns, data=np.dot(W, X))

    res = (np.dot(sample_eigenvector_i, X_tilde) ** 2) / fold_cardinality
    result = np.sum(res, axis=1)

    return result

def average_loss_i(data, splits, index, beta):

    res = 0 ## to stock the overall loss

    for (train_fold, test_fold) in splits:

        ## sur chaque fold, on calcule les sample eigenvectors à partir du training fold correspondant

        sample_eigenvector_i = eigen_sample(data=data, train_fold=train_fold)[index] ## on ne garde que l'eigenvector correspondant au bon index

        ## sur chaque fold, on calcule la perte au sein du fold à partir de l'échantillon de test

        res = res + intra_fold_loss(data=data, test_fold=test_fold, sample_eigenvector_i=sample_eigenvector_i, beta=beta)

    res = res / len(splits) ## we average by the number of folds (which corresponds to the lengths of the splits)

    return res

def eigenvalue_estimator(data, splits, beta):

    number_of_stocks = data.shape[0]

    xi = np.array([average_loss_i(data, splits, i, beta) for i in range(number_of_stocks)])
                  
    return xi


In [92]:
X_tilde = auxilary_matrix(days=250, beta=0.99, df_cleaned=df_cleaned)
array = X_tilde.columns
K = 10 
splits = shuffle_split(array=array, K=K)
train, test = splits[0][0], splits[0][1]
eigenvectors_train = eigen_sample(data=X_tilde, train_fold=train)
x1 = eigenvectors_train[0]
print(len(splits))

10


In [76]:
train_data = X_tilde.loc[:, train]
cov = np.cov(train_data)
cov.shape

(695, 695)