# Test power analysis for independence tests

In [None]:
import sys
sys.path.append('..')
import numpy as np
from tqdm.notebook import tqdm
import pickle
import os
import pandas as pd

from synthetic_data import generate_data
from independence import opt_lambda, test_power
from plots import plot_samples, plot_power, plot_type_I_errors, type_I_boxplot

from warnings import filterwarnings
filterwarnings('ignore')

### Test hyperparameters
We start by specifying some hyperparameters for all of our tests.

In [None]:
# number of trials and permutations
n_trials = 200
n_perms = 1000

# number of samples and number of points functional data samples are (randomly) observed and discretised
n_obs = 100
n_preds = 100

# define discretised period
upper_limit = 1
periods = [0.1] # 1
pred_points = np.linspace(0, upper_limit, n_preds)

# number of Fourier basis functions and std of normal distribution of sampled coefficients
n_basis = 3
sd = 1

# statistical significance level
alpha = 0.05

In [None]:
# create folders to save results
if not os.path.exists('results'):
    os.mkdir('results')

if not os.path.exists('results/marginal'):
    os.mkdir('results/marginal')

if not os.path.exists('results/joint'):
    os.mkdir('results/joint')

if not os.path.exists('results/conditional'):
    os.mkdir('results/conditional')

## Marginal independence test

In [None]:
test = 'marginal'

# historical dependence is easier to detect the higher a is
a_list = [0, 0.2, 0.4, 0.6, 0.8, 1]

n_samples = [20, 40, 60]

We iterate over various values of $a$ and different kernels.

In [None]:
type_II_errors = {}

for p in periods:
    print('Period T:', p)
    type_II_errors[p] = {}
    for n_sample in tqdm(n_samples):
        print('Sample size:', int(n_sample))
        type_II_errors[p][int(n_sample)] = []
        for a in a_list:
            print('a:', a)
            # generate synthetic data
            X, Y = generate_data(dep=test, n_samples=int(n_sample), n_trials=n_trials, n_obs=n_obs, n_preds=n_preds, period=p, a=a, upper_limit=upper_limit, n_basis=n_basis, sd=sd)
            
            # conduct n tests
            power = test_power(X=X, Y=Y, n_trials=n_trials, n_perms=n_perms, alpha=alpha, K='K_ID', test=test)
            type_II_errors[p][n_sample].append(power)
            print('Test power:', power)
            print('----------')
        print('----------')
    
power_hist = open('results/{}/test_power_hist_{}.pkl'.format(test, test), 'wb')
pickle.dump(type_II_errors, power_hist)
power_hist.close()

In [None]:
# plot n samples
n = 10
plot_samples(X[:n], pred_points, upper_limit)

In [None]:
type_II_errors = pickle.load(open('results/{}/test_power_hist_{}.pkl'.format(test, test), 'rb'))

In [None]:
# power over strength of dependence
plot_power(type_II_errors, n_samples, a_list, n_trials, test, periods)

## Joint independence test

In [None]:
test = 'joint'

# number of variables in network
n_vars = 4

# historical dependence is easier to detect the higher a is
a_list = [0, 0.2, 0.4, 0.6, 0.8, 1]

n_samples = [20, 40, 60]

upper_limit = 1
pred_points = np.linspace(0, upper_limit, n_preds)

In [None]:
type_II_errors = {}

for p in periods:
    print('Period T:', p)
    type_II_errors[p] = {}
    for n_sample in tqdm(n_samples):
        print('Sample size:', int(n_sample))
        type_II_errors[p][int(n_sample)] = []
        for a in a_list:
            print('a:', a)
            # generate synthetic data
            edges_dict, X_dict = generate_data(dep=test, n_samples=int(n_sample), n_trials=n_trials, n_obs=n_obs, n_preds=n_preds, period=p, n_vars=n_vars, a=a, upper_limit=upper_limit, n_basis=n_basis, sd=sd)

            # conduct n trials
            power = test_power(X=X_dict, edges_dict=edges_dict, n_trials=n_trials, n_perms=n_perms, alpha=alpha, K='K_ID', test=test)
            type_II_errors[p][n_sample].append(power)
            print('Test power:', power)
            print('----------')
        print('----------')
    
power_hist = open('results/{}/test_power_hist_{}.pkl'.format(test, test), 'wb')
pickle.dump(type_II_errors, power_hist)
power_hist.close()

In [None]:
# plot n samples
n = 10
print(edges_dict[0])
plot_samples(X_dict[0][3][:n], pred_points, upper_limit)

In [None]:
type_II_errors = pickle.load(open('results/{}/test_power_hist_{}.pkl'.format(test, test), 'rb'))

In [None]:
plot_power(type_II_errors, n_samples, a_list, n_trials, test, periods)

## Conditional independence test

In [None]:
test = 'conditional'

n_samples = [200] # , 100 300, 

# number of conditional variables
n_vars = [2, 4, 8, 16]  #  16

# # historical dependence between X and Y is easier to detect the higher a' is
a_prime_list = [0, 4, 8, 10]

# range of possible values for lambda
lambs = [1e-5, 1e-4, 1e-3]
n_pretests = 100
n_steps = 50

In [None]:
type_II_errors = {}
lamb_opts = {}

for p in periods:
    print('Period T:', p)
    type_II_errors[p] = {}
    lamb_opts[p] = {}
    for i_n, n_sample in enumerate(n_samples):
        print('Sample size:', int(n_sample))
        type_II_errors[p][int(n_sample)] = {}
        lamb_opts[p][int(n_sample)] = {}
        for i_d, d in enumerate(n_vars):
            print('Number of conditional variables:', d)
            type_II_errors[p][int(n_sample)][d] = []
            lamb_opts[p][int(n_sample)][d] = []
            for i_a, a_prime in enumerate(a_prime_list):
                print("a':", a_prime)
                # generate synthetic data
                X, Y, Z = generate_data(dep=test, n_samples=int(n_sample), n_trials=n_trials, n_obs=n_obs, n_preds=n_preds, period=p, n_vars=d, a=1, a_prime=a_prime, upper_limit=upper_limit, n_basis=n_basis, sd=sd)

                if i_a == 0:
                    lamb_opt, rejects_opt = opt_lambda(X[:n_sample], Y[:n_sample], Z[:, :n_sample, :], lambs, n_pretests, n_perms, n_steps, alpha, K='K_ID')
                    #lamb_opt = 1e-4
                
                # conduct n trials
                power = test_power(X=X, Y=Y, Z=Z, n_trials=n_trials, n_perms=n_perms, alpha=alpha, K='K_ID', test=test, lamb_opt=lamb_opt)
                type_II_errors[p][n_sample][d].append(power)
                lamb_opts[p][n_sample][d].append(lamb_opt)
                print('Test power:', power)
                print('----------')
            print('----------')
    print('----------')
    
power_hist = open('results/{}/test_power_hist_{}_200_01.pkl'.format(test, test), 'wb')
pickle.dump(type_II_errors, power_hist)
power_hist.close()
lambs_opt_hist = open('results/{}/lambs_opt_{}_200_01.pkl'.format(test, test), 'wb')
pickle.dump(lamb_opts, lambs_opt_hist)
lambs_opt_hist.close()

In [None]:
type_II_errors = pickle.load(open('results/{}/test_power_hist_{}.pkl'.format(test, test), 'rb'))

In [None]:
type_II_errors

In [None]:
plot_power(type_II_errors, n_samples, a_prime_list, n_trials, test)

#### Make boxplots for the type-I error rates over various values for $\lambda$ and sample sizes.

In [None]:
df = pd.read_pickle('tests/results/conditional/type_I_boxplots_df2.pkl')

In [None]:
type_I_boxplot(df)