# High-dimensional HSIC

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import gamma, ortho_group
import pickle
from sklearn.metrics import pairwise_distances
from sklearn.metrics import pairwise_kernels

Generating non-stationary data according to [Pomann et al. 2016](https://www.ncbi.nlm.nih.gov/pmc/articles/PMC4812165/)

In [None]:
# generating distributions over time

# normal distribution
def gen_data(sample_size, sample_points, sd=np.sqrt(0.25), random_state=None):
    """    
    sample_size : number of function samples
    sample_points : observation points
    delta : the coefficient of X^3
    sd : the standard deviation of the observation noise
    """
    rng = np.random.RandomState(random_state)
    n_points = len(sample_points)
    X = rng.normal(0, np.sqrt(10), (sample_size,1)) * np.sqrt(2) * np.sin(2*np.pi*sample_points) + rng.normal(0, np.sqrt(5), (sample_size,1)) * np.sqrt(2) * np.cos(2*np.pi*sample_points)
    X += sample_points    # adding mean function
    X += rng.normal(0, sd, (sample_size,n_points))    # adding noise epsilon
    return X

# uniform
def gen_data_uniform(sample_size, sample_points, sd=np.sqrt(0.25), random_state=None):
    """    
    sample_size : number of function samples
    sample_points : observation points
    delta : the coefficient of X^3
    sd : the standard deviation of the observation noise
    """
    rng = np.random.RandomState(random_state)
    n_points = len(sample_points)
    X = rng.uniform(-10, 10, (sample_size,1)) * np.sqrt(2) * np.sin(2*np.pi*sample_points) + rng.uniform(-5, 5, (sample_size,1)) * np.sqrt(2) * np.cos(2*np.pi*sample_points)
    X += sample_points    # adding mean function
    X += rng.normal(0, sd, (sample_size,n_points))    # adding noise epsilon
    return X

# exponential
def gen_data_exp(sample_size, sample_points, sd=np.sqrt(0.25), random_state=None):
    """    
    sample_size : number of function samples
    sample_points : observation points
    delta : the coefficient of X^3
    sd : the standard deviation of the observation noise
    """
    rng = np.random.RandomState(random_state)
    n_points = len(sample_points)
    X = rng.exponential(1.5, (sample_size,1)) * np.sqrt(2) * np.sin(2*np.pi*sample_points) + rng.exponential(3, (sample_size,1)) * np.sqrt(2) * np.cos(2*np.pi*sample_points)
    X += sample_points    # adding mean function
    X += rng.normal(0, sd, (sample_size,n_points))    # adding noise epsilon
    return X

# student t
def gen_data_t(sample_size, sample_points, sd=np.sqrt(0.25), random_state=None):
    """    
    sample_size : number of function samples
    sample_points : observation points
    delta : the coefficient of X^3
    sd : the standard deviation of the observation noise
    """
    rng = np.random.RandomState(random_state)
    n_points = len(sample_points)
    X = rng.standard_t(3, (sample_size,1)) * np.sqrt(2) * np.sin(2*np.pi*sample_points) + rng.standard_t(5, (sample_size,1)) * np.sqrt(2) * np.cos(2*np.pi*sample_points)
    X += sample_points    # adding mean function
    X += rng.normal(0, sd, (sample_size,n_points))    # adding noise epsilon
    return X

# Generating high-dimensional distributions

### Generating process for different dependencies according to 
- [Zhang et al. 2018](https://link.springer.com/article/10.1007/s11222-016-9721-7) for linear dependence
- [Pomann et al. 2016](https://www.ncbi.nlm.nih.gov/pmc/articles/PMC4812165/) with shared common coefficients of basis functions
- [Gretton et al. 2008](https://papers.nips.cc/paper/3201-a-kernel-statistical-test-of-independence.pdf) for dependence through rotations

In [None]:
# data-generating functions
def linear(sample_size, dim=10):
    sample_points = np.linspace(0, 1, dim)
    X_lin = gen_data(sample_size, sample_points, random_state=None)
    Z_lin = np.transpose([np.random.normal(0, 1, sample_size)])
    Y_lin = X_lin[:,[0]] + Z_lin
    
    return X_lin, Y_lin


def shared_coeff(sample_size, dim=10, sd=np.sqrt(0.25), random_state=None):
    sample_points = np.linspace(0, 1, dim)
    rng = np.random.RandomState(random_state)
    n_points = len(sample_points)
    X_basis_1 = rng.normal(0, np.sqrt(10), (sample_size,1)) * np.sqrt(2) * np.sin(2*np.pi*sample_points)
    X_basis_2 = rng.normal(0, np.sqrt(5), (sample_size,1)) * np.sqrt(2) * np.cos(2*np.pi*sample_points)
    Y_basis_1 = rng.normal(0, np.sqrt(10), (sample_size,1)) * np.sqrt(2) * np.sin(2*np.pi*sample_points)
    #Y_basis_1 = X_basis_1     # common first basis function coefficients 
    #Y_basis_2 = rng.normal(0, np.sqrt(5), (sample_size,1)) * np.sqrt(2) * np.cos(2*np.pi*sample_points)
    Y_basis_2 = X_basis_2     # common second basis function coefficients 
    
    X = sample_points + X_basis_1 + X_basis_2 + np.random.normal(0, sd, (sample_size,n_points))
    Y = sample_points + Y_basis_1 + Y_basis_2 + np.random.normal(0, sd, (sample_size,n_points))
    
    return X, Y

def rotation(sample_size, dim=10, angle=0, dist='uniform', sd=np.sqrt(0.25), random_state=None):
    sample_points = np.linspace(0, 1, dim)
    rng = np.random.RandomState(random_state)
    n_points = len(sample_points)
    
    # step 1: define two independent distributions
    if dist=='uniform':
        X_mix = gen_data_uniform(sample_size, sample_points)
        Y_mix = gen_data_uniform(sample_size, sample_points)
    elif dist=='exp':
        X_mix = gen_data_exp(sample_size, sample_points)
        Y_mix = gen_data_exp(sample_size, sample_points)
    elif dist=='studentt':
        X_mix = gen_data_t(sample_size, sample_points)
        Y_mix = gen_data_t(sample_size, sample_points)
    
    # step 2: rotate
    theta = np.radians(angle)    # rotation angle
    X_rot = X_mix * np.cos(theta) - Y_mix * np.sin(theta)
    Y_rot = X_mix * np.sin(theta) + Y_mix * np.cos(theta)
    
    # step 3: add noise N(0, I_d)
    X_mean = np.zeros(X_rot.shape[1])
    X_cov = np.eye(X_rot.shape[1])
    X_rot += np.random.multivariate_normal(X_mean, X_cov, sample_size)

    Y_mean = np.zeros(Y_rot.shape[1])
    Y_cov = np.eye(Y_rot.shape[1])
    Y_rot += np.random.multivariate_normal(Y_mean, Y_cov, sample_size)
    
    # step 4: multiply by orthogonal matrix
    if X_rot.shape[1] == 1:
        X = X_rot
    else:
        X = X_rot @ ortho_group.rvs(X_rot.shape[1])

    if Y_rot.shape[1] == 1:
        Y = Y_rot
    else:
        Y = Y_rot @ ortho_group.rvs(Y_rot.shape[1])
        
    return X, Y

In [None]:
# generating distributions over time for dependent and independent variables
def gen_dependence(sample_size, dim, test='independence', angle=0, dist='uniform', random_state=None):
    
    # X and Y indepdendent
    if test=='independence':
        sample_points = np.linspace(0, 1, dim)
        t1d = gen_data(sample_size, sample_points, random_state=random_state)
        t2d = gen_data(sample_size, sample_points, random_state=random_state)  
        
    # Y = X1 + N(0,1)
    elif test=='dependence_linear':
        t1d, t2d = linear(sample_size, dim)
        
    # Pomann with shared basis function coefficients
    elif test=='dependence_coeff':
        t1d, t2d = shared_coeff(sample_size, dim)  
   
    elif test=='dependence_rotation':
        t1d, t2d = rotation(sample_size, dim, angle, dist)
    
    return t1d, t2d

### Median heuristic

In [None]:
# median heuristic for kernel width
def width(Z):
    dist_mat = pairwise_distances(Z, metric='euclidean')
    width_Z = np.median(dist_mat[dist_mat > 0])
    
    return width_Z

------------------------------

# HSIC

$\mathcal{H}_0: P_{XY} = P_X P_Y$ is a complex distribution and must be approximated. Here, we approximate it first by randomly permuting the order of $Y$ whilst the order of $X$ is kept fixed, and second by a Gamma distribution with parameters $\alpha$ (`al`) and $\beta$ (`bet`).

## HSIC with permutations

In [None]:
def HSIC_permutations(X, Y, alpha, width_X, width_Y, shuffle):    # set widths to -1 for median heuristics
    
    m = X.shape[0]
    
    # median heuristics for kernel width
    if width_X == -1:
        width_X = width(X)      
    if width_Y == -1:
        width_Y = width(Y)
    
    # compute Gram matrices
    K = pairwise_kernels(X, X, metric='rbf', gamma=0.5/(width_X**2))
    L = pairwise_kernels(Y, Y, metric='rbf', gamma=0.5/(width_Y**2))
    #KL = np.dot(K, L)
    
    # biased test statistic
    # centering matrix...
    H = np.eye(m) - (1/m) * (np.ones((m, m)))
    
    # ...to center K
    K_c = H @ K @ H
    
    # biased statistic
    #stat = 1/(m**2) * np.sum(np.multiply(K_c.T, L))
    
    # unbiased statistic
    np.fill_diagonal(K, 0)
    np.fill_diagonal(L, 0)
    KL = np.dot(K, L)
    stat = np.trace(KL)/(m*(m-3)) + np.sum(K)*np.sum(L)/(m*(m-3)*(m-1)*(m-2)) - 2*np.sum(KL)/(m*(m-3)*(m-2))
    
    # initiating HSIC
    HSIC_arr = np.zeros(shuffle)
    
    # create permutations by reshuffling L except the main diagonal
    for sh in range(shuffle):       
        index_perm = np.random.permutation(L.shape[0])
        L_perm = L[np.ix_(index_perm, index_perm)]
        # biased
        #HSIC_arr[sh] = 1/(m**2) * np.sum(np.multiply(K_c.T, L_perm))
        # unbiased
        HSIC_arr[sh] = np.trace(np.dot(K, L_perm))/(m*(m-3)) + np.sum(K)*np.sum(L_perm)/(m*(m-3)*(m-1)*(m-2)) - 2*np.sum(np.dot(K, L_perm))/(m*(m-3)*(m-2))
        
    HSIC_arr_sort = np.sort(HSIC_arr)
    
    # computing 1-alpha threshold
    threshold = HSIC_arr_sort[round((1-alpha)*shuffle)]
    
    """
    if stat > threshold:
        print('H0 rejected')
    else:
        print('H0 accepted')
    """
    
    return stat, threshold

## HSIC with Gamma distribution approximation

In [None]:
def HSIC_gamma(X, Y, alpha, width_X, width_Y):    # set widths to -1 for median heuristics
    
    m = X.shape[0]
    
    # median heuristics for kernel width
    if width_X == -1:
        width_X = width(X)      
    if width_Y == -1:
        width_Y = width(Y)
    
    # compute Gram matrices
    K = pairwise_kernels(X, X, metric='rbf', gamma=0.5/(width_X**2))
    L = pairwise_kernels(Y, Y, metric='rbf', gamma=0.5/(width_Y**2))
    #KL = np.dot(K, L)
    
    # biased test statistic
    # centering matrix...
    H = np.eye(m) - (1/m) * (np.ones((m, m)))
    
    # ...to center K and L
    K_c = H @ K @ H
    L_c = H @ L @ H
    
    # biased statistic
    #stat = 1/(m**2) * np.sum(np.multiply(K_c.T, L))
    
    # unbiased statistics
    np.fill_diagonal(K, 0)
    np.fill_diagonal(L, 0)
    KL = np.dot(K, L)
    stat = np.trace(KL)/(m*(m-3)) + np.sum(K)*np.sum(L)/(m*(m-1)*(m-2)*(m-3)) - 2.*np.sum(KL)/(m*(m-3)*(m-2))
     
    # fitting Gamma distribution to stat
    vHSIC = np.power(1/6 * KL, 2)
    vaHSIC = 1/(m*(m-1)) * (np.sum(vHSIC) - np.trace(vHSIC))
    varHSIC = 72*(m-4)*(m-5)/(m*(m-1)*(m-2)*(m-3)) * vaHSIC    # variance under H0
    
    bone = np.ones(m)
    
    mu_X = 1/(m*(m-1)) * bone @ (K @ bone)
    mu_Y = 1/(m*(m-1)) * bone @ (L @ bone)
    
    mHSIC = 1/m * (1 + mu_X * mu_Y - mu_X - mu_Y)    # mean under H0
    
    al = mHSIC**2 / varHSIC
    bet = varHSIC * m / mHSIC
    
    # computing 1-alpha threshold
    threshold = gamma.ppf(1-alpha, al, scale=bet)
    
    """
    if stat > threshold:
        print('H0 rejected')
    else:
        print('H0 accepted')
    """
    
    return stat, threshold

## Power estimation

We estimate the statistical power based on 200 replications for each setting. Our experiment settings compose of various dimensions, sample sizes, mean dependencies, and variance dependencies.

In [None]:
# dimensions
dims = [5, 10, 25, 50, 100]

# dependence through rotation
sample_sizes = [200]

# angle
angles = np.linspace(0, 45, 16)

dists = ['uniform', 'exp', 'studentt']

# linear dependence and shared coefficient
sample_sizes = np.linspace(6, 40, 18)

# possible tests
tests = ['dependence_rotation', 'dependence_coeff', 'dependence_linear']

In [None]:
HSIC_p_m = {}
HSIC_g_m = {}

for dim in dims:
    print('Dimensions:', dim)
    sample_points = np.linspace(0, 1, dim)
    
    for sample_size in sample_sizes:
        print('Sample size:', int(sample_size))
        for angle in angles:
            print('Angle:', angle)
            for test in tests:
                for dist in dists:
                    print('Dist:', dist)
        
                    HSIC_p_m_list = []
                    HSIC_g_m_list = []

                    # repeating 200 times
                    for i in range(200):

                        # defining dependencies
                        X, Y = gen_dependence(sample_size=int(sample_size), dim=dim, test=test, angle=angle, dist=dist, random_state=None)

                        # test level alpha = 0.05, 5000 permutations
                        HSIC_p_m_list.append(HSIC_permutations(X, Y, 0.05, -1, -1, 5000))

                        # test level alpha = 0.05
                        HSIC_g_m_list.append(HSIC_gamma(X, Y, 0.05, -1, -1))

                    HSIC_p_m[(dim, int(sample_size), angle, dist)] = HSIC_p_m_list
                    HSIC_g_m[(dim, int(sample_size), angle, dist)] = HSIC_g_m_list

In [None]:
# saving
dependence_p = open('dependencies_p.pkl', 'wb')
pickle.dump(HSIC_p_m, dependence_p)
dependence_p.close()

dependence_g = open('dependencies_g.pkl', 'wb')
pickle.dump(HSIC_g_m, dependence_g)
dependence_g.close()

## Maximising test power

In [None]:
def var_HSIC(X, Y, sigma_x, sigma_y):
    m = X.shape[0]
    K = pairwise_kernels(X, X, metric='rbf', gamma=0.5/(sigma_x**2))
    L = pairwise_kernels(Y, Y, metric='rbf', gamma=0.5/(sigma_y**2))
    np.fill_diagonal(K, 0)
    np.fill_diagonal(L, 0)
    KL = np.dot(K, L)
    
    # unbiased test statistics
    stat = np.trace(KL)/(m*(m-3)) + np.sum(K)*np.sum(L)/(m*(m-1)*(m-2)*(m-3)) - 2*np.sum(KL)/(m*(m-3)*(m-2))
    
    # h and R
    h = (m-2)**2 * np.multiply(K, L) @ np.ones((m, 1)) + (m-2) * (np.dot(np.trace(KL), np.ones((m, 1))) - 2 * np.dot(KL, np.ones((m, 1)))) - m * np.multiply(np.dot(K, np.ones((m,1))), np.dot(L, np.ones((m,1)))) + np.dot(np.sum(L), np.dot(K, np.ones((m,1)))) + np.dot(np.sum(K), np.dot(L, np.ones((m,1)))) - np.dot(np.sum(KL), np.ones((m,1)))
    
    R = 1 / (4*m * (m-1)**2 * (m-2)**2 * (m-3)**2) * np.dot(h.T, h)
    
    var_HSIC = 16/m * (R - stat**2)
    
    return var_HSIC

In [None]:
def maximise(HSIC, HSIC_var, threshold, m):
    ratio = HSIC / np.sqrt(HSIC_var) - threshold / (m*np.sqrt(HSIC_var))
    return ratio

## Power estimation

We estimate the statistical power based on 200 replications for each setting. Our experiment settings compose of various dimensions, sample sizes, mean dependencies, and variance dependencies.

In [None]:
# dimensions
dims = [100]

# sample sizes
sample_sizes = [200]

# angle
angles = np.linspace(0, 45, 16)

dists = ['uniform', 'exp', 'studentt']

# possible tests
tests = ['dependence_rotation']

### Power estimation for dependence

In [None]:
HSIC_p_m = {}
HSIC_g_m = {}

for dim in dims:
    print('Dimensions:', dim)
    sample_points = np.linspace(0, 1, dim)
    
    for sample_size in sample_sizes:
        print('Sample size:', int(sample_size))
        for angle in angles:
            print('Angle:', angle)
            for test in tests:
                for dist in dists:
                    print('Dist:', dist)
        
                    HSIC_p_m_list = []
                    HSIC_g_m_list = []

                    # repeating 200 times
                    for i in range(200):

                        # defining dependencies
                        X_train, Y_train = gen_dependence(sample_size=int(sample_size), dim=dim, test=test, angle=angle, dist=dist, random_state=None)
                        X_test, Y_test = gen_dependence(sample_size=int(sample_size), dim=dim, test=test, angle=angle, dist=dist, random_state=None)
                        
                        m = X_train.shape[0]
                        
                        if dist=='uniform':
                            sigmas_x = np.linspace(1, 40, 40)
                            sigmas_y = np.linspace(1, 40, 40)
                        elif dist=='exp':
                            sigmas_x = np.linspace(1, 20, 20)
                            sigmas_y = np.linspace(1, 20, 20)
                        elif dist=='studentt':
                            sigmas_x = np.linspace(1, 20, 20)
                            sigmas_y = np.linspace(1, 20, 20)
                        
                        
                        ratios = np.empty((len(sigmas_x), len(sigmas_y)))
                        
                        # grid search
                        for i, sigma_x in enumerate(sigmas_x):
                            for j, sigma_y in enumerate(sigmas_y):
                                HSIC, threshold = HSIC_permutations(X_train, Y_train, 0.05, sigma_x, sigma_y, 5000)
                                HSIC_var = var_HSIC(X_train, Y_train)
                                ratios[j,i] = maximise(HSIC, HSIC_var, threshold, m)

                        # sigma of maximum ratio
                        max_y, max_x = np.where(ratios==np.max(ratios))

                        sigma_max_x = sigmas_x[max_x][0]
                        sigma_max_y = sigmas_y[max_y][0]
                        
                        # test level alpha = 0.05, 5000 permutations
                        HSIC_p_m_list.append(HSIC_permutations(X_test, Y_test, 0.05, sigma_max_x, sigma_max_y, 5000))

                        # test level alpha = 0.05
                        HSIC_g_m_list.append(HSIC_gamma(X_test, Y_test, 0.05, sigma_max_x, sigma_max_y))

                    HSIC_p_m[(dim, int(sample_size), angle, dist)] = HSIC_p_m_list
                    HSIC_g_m[(dim, int(sample_size), angle, dist)] = HSIC_g_m_list

In [None]:
# saving
dependence_p = open('dependencies_p_max.pkl', 'wb')
pickle.dump(HSIC_p_m, dependence_p)
dependence_p.close()

dependence_g = open('dependencies_g_max.pkl', 'wb')
pickle.dump(HSIC_g_m, dependence_g)
dependence_g.close()