In [None]:
from tda import PD, PWGK, PL, PSSK

import tda
import numpy as np
import os
import random
import json
import math

from itertools import combinations

In [None]:
# compute MMDu
def n_mmd(mat_gram, unbias=True):
    n_total = mat_gram.shape[0]
    n = int(n_total / 2)
    mat_xx = mat_gram[0:n, 0:n]
    mat_yy = mat_gram[n:n_total, n:n_total]
    mat_xy = mat_gram[0:n, n:n_total]
    sum_xx = sum(sum(mat_xx))
    sum_yy = sum(sum(mat_yy))
    sum_xy = sum(sum(mat_xy))
    if unbias:
        sum_xx -= sum(np.diag(mat_xx))
        sum_yy -= sum(np.diag(mat_yy))
        sum_xy -= sum(np.diag(mat_xy))
        return (sum_xx + sum_yy - 2 * sum_xy) / (n - 1)
    else:
        return (sum_xx + sum_yy - 2 * sum_xy) / n


def hist_wchi(mat_gram, num_hist=int(1e+4)):
    n = len(mat_gram)

    # centered Gram matrix
    mat_center = np.empty((n, n))
    vec_gram = sum(mat_gram)
    val_total = sum(vec_gram)
    for i in range(n):
        for j in range(i + 1):
            mat_center[i, j] = (mat_gram[i, j]
                                - ((vec_gram[i] + vec_gram[j]) / n)
                                + (val_total / (n ** 2)))
            mat_center[j, i] = mat_center[i, j]

    # estimated eigenvalues
    vec_nu = np.sort(np.linalg.eigh(mat_center)[0])[::-1][0: - 1]
    vec_lambda = vec_nu / (n - 1)
    sum_lambda = sum(vec_lambda)

    # histogram of the null distribution (weighted chi square)
    vec_hist = np.empty(num_hist)
    for i in range(num_hist):
        vec_z = np.random.normal(0, np.sqrt(2), n - 1) ** 2
        vec_hist[i] = np.inner(vec_lambda, vec_z) - 2 * sum_lambda

    return np.sort(vec_hist)[::-1]


def extract_submat(mat_gram, num_m=None):
    n_total = mat_gram.shape[0]
    n = int(n_total / 2)
    if num_m is None:
        num_m = n - 1
    d = int(2 * num_m)
    mat = np.empty((d, d))
    idx_x = random.sample(range(0, n), num_m)
    idx_y = random.sample(range(n, n_total), num_m)
    idx_xy = idx_x + idx_y
    for i, a in enumerate(idx_xy):
        for j, b in enumerate(idx_xy):
            mat[i, j] = mat_gram[a, b]
    return mat


def two_sample_test(mat_gram, alpha=0.05, num_m=None, num_test=500):
    vec_wchi = hist_wchi(mat_gram)                    # null distribution of psi-hat
    vec_p_value = np.empty(num_test)
    for temp_test in range(num_test):                 # for l=1,...,N
        mat_reduced = extract_submat(mat_gram, num_m)  # resample m samples
        value_mmd = n_mmd(mat_reduced)                 # compute mMMDu
        vec_temp = np.where(vec_wchi > value_mmd)[0]   # how many psi-hat's are greater than mMMDu?
        vec_p_value[temp_test] = len(vec_temp) / len(vec_wchi)
    return vec_p_value, len(np.where(vec_p_value < alpha)[0]) / num_test


In [None]:
# import PD - json, each pds are saved as unnamed array

with open("./beetlepd.json") as f:
    pd = json.load(f)

In [None]:
pd1list=[]
for ii in range(len(pd)):
    pd1dat = np.array(pd[ii])
    pd1mat = np.transpose( np.resize(pd1dat, (3,int(len(pd1dat)/3)) ) )
    pd1mat1 = pd1mat[pd1mat[:,0]==1,1:]
    pd1mat1fin = pd1mat1.astype(np.float)
    pd1list.append(pd1mat1fin)

In [None]:
# functions for PWGK
func_kernel = tda.function_kernel("Gaussian", sigma=3 )
func_weight = tda.function_weight("none")

# between two

In [None]:
nrep=100
nset=20
ntotalset=4000
per=np.zeros(nrep)
for rr in range(nrep):
    # import dimension one PDs
    combpdlist = pd1list[rr*nset:(rr+1)*nset]
    combpdlist.extend(pd1list[(rr*nset+ntotalset):((rr+1)*nset+ntotalset)])
    # compute gram matrix
    pwgk = PWGK(combpdlist, func_kernel, func_weight, sigma=3,approx=True)
    mat_gaussian_pwgk = pwgk.gram_matrix()
    # define gram matrix
    name_rkhs = ["Linear", "Gaussian"][1]
    mat_gram_pwgk = tda.matrix_gram(mat_gaussian_pwgk, name_rkhs)[0]
    num_reject = two_sample_test(mat_gram_pwgk, alpha=0.05, num_m=20, num_test=500)[1]
    per[rr] = num_reject
1-np.sum(per<0.05)/nrep

# between stable

In [None]:
nrep=100
nset=20
ntotalset=2000
per=np.zeros(nrep)
for rr in range(nrep):
    # import dimension one PDs
    combpdlist = pd1list[rr*nset:(rr+1)*nset]
    combpdlist.extend(pd1list[(rr*nset+ntotalset):((rr+1)*nset+ntotalset)])
    # compute gram matrix
    pwgk = PWGK(combpdlist, func_kernel, func_weight, sigma=3,approx=True)
    mat_gaussian_pwgk = pwgk.gram_matrix()
    # define gram matrix
    name_rkhs = ["Linear", "Gaussian"][1]
    mat_gram_pwgk = tda.matrix_gram(mat_gaussian_pwgk, name_rkhs)[0]
    num_reject = two_sample_test(mat_gram_pwgk, alpha=0.05, num_m=20, num_test=500)[1]
    per[rr] = num_reject
1-np.sum(per<0.05)/nrep 

# between aperiodic

In [None]:
nrep=100
nset=20
ntotalset=2000
per=np.zeros(nrep)
for rr in range(nrep):
    # import dimension one PDs
    combpdlist = pd1list[(rr*nset+4000):((rr+1)*nset+4000)]
    combpdlist.extend(pd1list[(rr*nset+ntotalset+4000):((rr+1)*nset+ntotalset+4000)])
    # compute gram matrix
    pwgk = PWGK(combpdlist, func_kernel, func_weight, sigma=3,approx=True)
    mat_gaussian_pwgk = pwgk.gram_matrix()
    # define gram matrix
    name_rkhs = ["Linear", "Gaussian"][1]
    mat_gram_pwgk = tda.matrix_gram(mat_gaussian_pwgk, name_rkhs)[0]
    num_reject = two_sample_test(mat_gram_pwgk, alpha=0.05, num_m=20, num_test=500)[1]
    per[rr] = num_reject
1-np.sum(per<0.05)/nrep 