In [None]:
# required packages
import numpy as np
import math
import pandas as pd
import numpy.matlib as mt
from sklearn.cluster import KMeans
from scipy.linalg import eigh
from scipy.stats import gaussian_kde
from scipy.linalg import eigh

In [None]:
def blockbuster(Y, N_clust, k = None):

    # compute the sample covariance matrix 
    S = np.cov(Y, rowvar=False)

    # compute the largest eigenvalues
    eigvals, eigvecs = eigh(S, subset_by_index=[S.shape[0] - N_clust, S.shape[0] - 1])

    # apply Eucledian norm
    X = eigvecs / np.linalg.norm(eigvecs, axis=1, keepdims=True)

    # apply Kmeans algorithm for N = N_clust
    kmeans = KMeans(n_clusters=N_clust, random_state=0).fit(X)
    
    # return the partitions
    return kmeans.labels_

In [None]:
def adaptive_blockbuster(R, bandwidth="silverman", kernel='gaussian', k = None):

    # compute the sample covariance matrix
    S = np.cov(R, rowvar=False)
    
    # spectral decomposition of the sample covariance matrix
    eigvals, _ = eigh(S)
    
    # apply eucledian norm of sorted eigenvalues
    Ve = np.sort(eigvals)[::-1]
    Y = Ve / np.linalg.norm(Ve)
    
    # apply KDE
    kde = gaussian_kde(Y, bw_method=bandwidth)
    y_grid = np.linspace(min(Y), max(Y), 1000)
    kde_values = kde(y_grid)
    
    # find number of local minima
    local_minima = np.diff(np.sign(np.diff(kde_values))).nonzero()[0] + 1
    M = len(local_minima)
    
    # run blockbuster algorithm with N_clust = K = M + 1
    K = M + 1
    return blockbuster(R, K)

In [None]:
def GeneralisedCVC(Y, K_clust=None, N_clust=None, bandwidth='silverman', kernel='gaussian', k=None):
"""
Contains code of https://github.com/pald22/covShrinkage:
Ledoit, O. and Wolf, M. (2004a). 
Honey, I shrunk the sample covariance matrix. Journal of Portfolio Management, 30(4):110–119.
"""
    # de-mean returns if required
    N, p = Y.shape        
    if k is None or math.isnan(k): 
        mean = Y.mean(axis=0)
        Y = Y.sub(mean, axis=1)                               
        k = 1
    n = N - k  
    
    # choose how to determine the clusters
    if K_clust is not None:
        K_clust = np.array(K_clust)  # predetermined clusters
    else:
        if N_clust is None:
            K_clust = adaptive_blockbuster(Y, bandwidth=bandwidth, kernel=kernel)  # adaptive Blockbuster
        else:
            K_clust = blockbuster(Y, N_clust)  # Blockbuster
    
    # compute sample covariance marix
    sample = pd.DataFrame(np.matmul(Y.T.to_numpy(), Y.to_numpy())) / n 
    
    # create target matrix
    F_GCVC = np.zeros((p, p))
    
    # cluster information
    unique_clusters = np.unique(K_clust)
    K = len(unique_clusters)
    
    # diagonal clusters
    for l in unique_clusters:
        cluster_l_indices = np.where(K_clust == l)[0]
        Nl = len(cluster_l_indices)
        
        cluster_l_sample = sample.iloc[cluster_l_indices, cluster_l_indices]
        meanvar_l = np.mean(np.diag(cluster_l_sample))
        meancov_l = (np.sum(cluster_l_sample.to_numpy()) - np.sum(np.diag(cluster_l_sample))) / (Nl * (Nl - 1))
        
        F_GCVC[np.ix_(cluster_l_indices, cluster_l_indices)] = meanvar_l * np.eye(Nl) + meancov_l * (1 - np.eye(Nl))
        
        # off-diagonal clusters
        for m in unique_clusters:
            if l != m:
                cluster_m_indices = np.where(K_clust == m)[0]
                Nm = len(cluster_m_indices)
                
                cov_lm = sample.iloc[cluster_l_indices, cluster_m_indices]
                nu_lm = np.mean(cov_lm.to_numpy())
                F_GCVC[np.ix_(cluster_l_indices, cluster_m_indices)] = nu_lm * np.ones((Nl, Nm))
                
    # estimate the parameter pi
    Y2 = pd.DataFrame(np.multiply(Y.to_numpy(),Y.to_numpy()))
    sample2= pd.DataFrame(np.matmul(Y2.T.to_numpy(),Y2.to_numpy()))/n    
    piMat=pd.DataFrame(sample2.to_numpy()-np.multiply(sample.to_numpy(),sample.to_numpy()))
    pihat = sum(piMat.sum())   
    
    # estimate the parameter gamma 
    gammahat = np.linalg.norm(sample.to_numpy()-F_GCVC,ord = 'fro')**2
    
    # estimta rho
    rho_diag = (sample2.sum().sum()-np.trace(sample.to_numpy())**2)/p;
    
    # off-diagonal part of the parameter that we call rho 
    sum1=Y.sum(axis=1)
    sum2=Y2.sum(axis=1)
    temp = (np.multiply(sum1.to_numpy(),sum1.to_numpy())-sum2)
    rho_off1 = np.sum(np.multiply(temp,temp))/(p*n)
    rho_off2 = (sample.sum().sum()-np.trace(sample.to_numpy()))**2/p
    rho_off = (rho_off1-rho_off2)/(p-1)
    
    # compute shrinkage intensity
    rhohat = rho_diag + rho_off
    
    # compute shrinkage intensity
    kappahat=(pihat-rhohat)/gammahat
    shrinkage=max(0,min(1,kappahat/n))
    
    # compute shrinkage estimator
    sigmahat=shrinkage*F_GCVC+(1-shrinkage)*sample
    
    # return the estimated covariance matrix, the shrinkage intensity, and the amount of clusters
    return sigmahat, shrinkage, K