In [None]:
import numpy as np
from scipy import stats
import math
from collections import defaultdict, Counter
from tqdm import tqdm
import pandas as pd
import os  
import time

In [None]:
MILLION = 1000000
GROUPS = 20
BLOCKSIZE = MILLION

In [None]:
from os import walk
mypath = '../data/'
filenames = next(walk(mypath), (None, None, []))[2] 

pi_cf_digits = []
for filename in tqdm(filenames):
    pi_digits = np.load(filename)
    pi_cf_digits.extend(pi_digits['arr_0'])
pi_cf_digits = pi_cf_digits[:math.floor((len(pi_cf_digits)/BLOCKSIZE))* BLOCKSIZE]
pi_cf_digits = [int(x) for x in pi_cf_digits]
len(pi_cf_digits)

# Chi-square Test V1

In [None]:
def ChiSquare(data, blocksize=100, number_of_groups=5):
    p_val_arr = []
    q_res_arr = []
    number_of_blocks = len(data)/blocksize
    
    groups = list(np.array_split(data, number_of_blocks))
    
    def P(num_of_groups):
        res = []
        for k in range(1, num_of_groups+1):
            res.append(math.log(1+1/(k*(k+2)))/math.log(2))
        res.append(1-sum(res))
        return res

    def Y(number_of_groups, pi_digits):
        Y = [0] * (number_of_groups+1)
        for digit in pi_digits:
            if digit > number_of_groups:
                Y[-1] += 1
            else:
                Y[digit-1] += 1
        return Y

    def chisquare_test(y_array, blocksize, p_array, number_of_groups):
        q_val = 0
        for i in range(number_of_groups):
            q_val += ((y_array[i] - blocksize*p_array[i])**2)/(blocksize*p_array[i])
        return q_val
    p_array = P(number_of_groups)
    assert(len(p_array) == (number_of_groups + 1))
    for pi_cf_digits in tqdm(groups):
        pi_digits = list(pi_cf_digits)

        y_array = Y(number_of_groups, pi_digits)
        assert(len(y_array) == (number_of_groups + 1))
        
        q_res = chisquare_test(y_array, blocksize, p_array, number_of_groups+1)
        p_val = 1 - stats.chi2.cdf(q_res , number_of_groups)
        
        p_val_arr.append(p_val)
        q_res_arr.append(q_res)
        
    assert(len(p_val_arr) == (number_of_blocks))
    assert(len(q_res_arr) == (number_of_blocks))

    result = defaultdict(list)
    for p_val, q_val in zip(p_val_arr, q_res_arr):
        result['p-value'].append(p_val)
        result['chi-square value'].append(q_val)
    df = pd.DataFrame(data=result)
    filename = 'chi-square-results-' + str(len(data)) +'-pi-digits-' + \
                str(int(number_of_blocks)) + '-blocks-' + str(number_of_groups) \
                + '-groups'
    df.to_csv(filename + '.csv') 
    return p_val_arr, q_res_arr

In [None]:
# start_time = time.time()
# p_val_arrV1, q_res_arrV1 = ChiSquare(pi_cf_digits, BLOCKSIZE, GROUPS)
# print("Version 1 took ", time.time() - start_time, " seconds")

# Chi-square Test V2

In [None]:
import time
from joblib import Parallel, delayed

In [None]:
def ChiSquare(data, blocksize=100, number_of_groups=5):
    p_val_arr = []
    q_res_arr = []
    number_of_blocks = len(data)/blocksize
    
    groups = list(np.array_split(data, number_of_blocks))
    
    def P(num_of_groups):
        res = []
        for k in range(1, num_of_groups+1):
            res.append(math.log(1+1/(k*(k+2)))/math.log(2))
        res.append(1-sum(res))
        return res

    def Y(number_of_groups, pi_digits):
        Y = [0] * (number_of_groups+1)
        for digit in pi_digits:
            if digit > number_of_groups:
                Y[-1] += 1
            else:
                Y[digit-1] += 1
        return Y

    def chisquare_test(y_array, blocksize, p_array, number_of_groups):
        q_val = 0
        for i in range(number_of_groups):
            q_val += ((y_array[i] - blocksize*p_array[i])**2)/(blocksize*p_array[i])
        return q_val
    
    def parallelized_testing(pi_cf_digits):
        pi_digits = list(pi_cf_digits)

        y_array = Y(number_of_groups, pi_digits)
        assert(len(y_array) == (number_of_groups + 1))

        p_array = P(number_of_groups)
        assert(len(p_array) == (number_of_groups + 1))

        q_res = chisquare_test(y_array, blocksize, p_array, number_of_groups+1)

        p_val = 1 - stats.chi2.cdf(q_res , number_of_groups)
        return q_res, p_val
    
    q_p_arr_val = Parallel(n_jobs=12)(delayed(parallelized_testing)(pi_cf_digits) for pi_cf_digits in groups)
    q_res_arr = [q_p[0] for q_p in q_p_arr_val]
    p_val_arr = [q_p[1] for q_p in q_p_arr_val]
    
    assert(len(p_val_arr) == (number_of_blocks))
    assert(len(q_res_arr) == (number_of_blocks))

    result = defaultdict(list)
    for p_val, q_val in zip(p_val_arr, q_res_arr):
        result['p-value'].append(p_val)
        result['chi-square value'].append(q_val)
    df = pd.DataFrame(data=result)
    filename = 'chi-square-results-' + str(len(data)) +'-pi-digits-' + \
                str(int(number_of_blocks)) + '-blocks-' + str(number_of_groups) \
                + '-groups'
    df.to_csv(filename + '.csv') 
    return p_val_arr, q_res_arr

In [None]:
start_time = time.time()
p_val_arrV2, q_res_arrV2 = ChiSquare(pi_cf_digits, BLOCKSIZE, GROUPS)
print("Version 2 took ", time.time() - start_time, " seconds")

### Confirm $V_1$ and $V_2$ are same

In [None]:
(p_val_arrV1 == p_val_arrV2) and (q_res_arrV1 == q_res_arrV2)

In [None]:
len(q_res_arrV2)

# Kolmogorov-Smirnov Test

In [None]:
stats.kstest(q_res_arrV2, stats.chi2(GROUPS).cdf)

In [None]:
stats.kstest(p_val_arrV2, 'uniform')

# Anderson-Darling

In [None]:
#stats.anderson(p_val_arrV1, 'norm')

# Cramer-van-Mises

In [None]:
#stats.cramervonmises([p for p in p_val_arrV2], 'uniform')

In [None]:
#stats.cramervonmises(q_res_arrV2, stats.chi2(10).cdf)