In [1]:
# import modules and functions

import numpy as np, pandas as pd, feather as ft, os
from utils.testing import SeqIndTester, compute_hyperparam

In [2]:
# sequential kernelized independence test for our examples

def seq_kern_independence(X: np.ndarray,
                          Y: np.ndarray,
                          lmbd_type: str,
                          n0: int):
    """
    Runs the sequential kernelized independence test and returns the test martingale.
    
    Parameters
    ----------
    X: array_like
        the X observations
    Y: array_like
        the Y observations
    lmbd_type: either 'ONS' or 'aGRAPA'
        method for choosing betting fraction
    n0: integer
        initial sample size for choosing kernel bandwidth with median heuristic,
        with default values 1 for both X and Y if n0 == 1
        
    Details
    -------
    Parameters are fixed to their default values in SeqIndTester, apart from the following.
    
    truncation_level:
        0.5 for lmbd_type = 'ONS', 0.9 for lmbd_type = 'aGRAPA'
    kernel_param_x:
        output from median heuristic on first n0 samples
    kernel_param_y:
        output from median heuristic on first n0 samples

    The test martingale for the first n0 observations is set to 1.compute_hyperparam(x).
    
    In the function SeqIndTester we deleted
        line 193 self.wf.weighted_wf = self.weighted_wf)
    which causes an error when running the function.
    
    Returns
    -------
    test_martingale:np.ndarray
        the test martingale
                        
    
    """
    # container for test martingale
    n = X.size
    test_martingale = np.ones(n)
    
    # specificatinos of the tester
    tester = SeqIndTester()
    tester.lmbd_type = lmbd_type
    if lmbd_type == 'ONS':
        tester.truncation_level = 0.5
    else:
        tester.truncation_level = 0.9
    if n0 == 1:
        kernel_param_x = 1
        kernel_param_y = 1
    else:
        kernel_param_x = compute_hyperparam(X[:n0])
        kernel_param_y = compute_hyperparam(Y[:n0])
    
    # run the test
    n_pairs = (n - n0)//2
    for cur_pair in range(n0, n0 + 2*n_pairs, 2):
        tester.process_pair(X[cur_pair:(cur_pair + 2)], Y[cur_pair:(cur_pair + 2)],
                                X[:cur_pair], Y[:cur_pair])
        test_martingale[cur_pair] = tester.wealth
        test_martingale[cur_pair + 1] = tester.wealth
    
    # return test martingale
    return test_martingale
        
    
    

In [3]:
# import the data from feather file

task_id = os.getenv('SLURM_ARRAY_TASK_ID')
in_path = 'simulation_examples_' + task_id + '.feather'
df = pd.read_feather(in_path)

In [4]:
# run the test and export the result

n0 = 20
lmbd_type = 'ONS'

sims = df['sim'].unique()
nsims = sims.size
out = []

for s in sims:
    data = df.loc[df['sim'] == s, ['x', 'y']].to_numpy()
    X = data[:,0]
    Y = data[:,1]
    results = pd.DataFrame(seq_kern_independence(X, Y, 'aGRAPA', n0,),
                           columns = ['martingale'])
    results['sim'] = s
    out.append(results)

out = pd.concat(out)
out.reset_index(inplace = True, drop = True)
out_path = 'simulations_' + lmbd_type + '_' + str(n0) + '_' + task_id + '.feather'
ft.write_dataframe(out, out_path)