In [11]:
import mmh3
import re
from collections import defaultdict, Counter

import numpy as np
from rpy2 import robjects
from scipy.stats import chisquare

robjects.r("version")
from rpy2.robjects.packages import importr
import rpy2.robjects.numpy2ri as np2r

In [12]:
tolerance = importr('tolerance')
np2r.activate()

In [13]:
def chisquare_test(sample, power=1.16, beta=0.0, skip=0):
    unique, counts = np.unique(sample, return_counts=True)
    counts[::-1].sort()

    frequencies = np.array([1.0 / np.power(i + beta, power) for i in range(1, len(counts) + 1)])
    zipf_counts = np.ceil(len(sample) * frequencies / frequencies.sum())

    # print(Counter(sample).most_common(10))
    # print(counts[:10])
    # print(zipf_counts[:10])

    chi = chisquare(counts[skip:], zipf_counts[skip:])
    # print(chi[0])
    return chi[1]

In [14]:
def hash_distributed_test(test, sample, paral_size=10):
    distributed_sample = defaultdict(list)
    for el in sample:
        distributed_sample[mmh3.hash(str('qwerty' + el)) % paral_size].append(el)

    print('############################')
    print('PARTITIONING ', paral_size)
    for ds in distributed_sample:
        print('PARTITION ', ds)
        data = distributed_sample[ds]
        data = get_sample(data)
        # power, beta = estimate_params(data)
        # print(power, beta)
        # p_value = test(data, power, beta)

        power, beta = estimate_params(data)
        print(power, beta)
        p_value = test(data, power, beta)

        print('P VALUE', p_value)
        print('#######')

In [15]:
def estimate_params(sample):
    unique, counts = np.unique(sample, return_counts=True)
    res = tolerance.zm_ll(x=sample, N=len(unique), dist="Zipf-Man")
    split = str(res).split('b')[3].strip().split(' ')
    s = float(split[0])
    beta = float(split[1])
    return s, beta

In [16]:
def get_sample(words):
    word_counts = Counter(words)
    sorted_word_counts = sorted(word_counts.items(), key=lambda kv: -kv[1])
    sorted_word_counts = dict([(sorted_word_counts[i][0], i + 1) for i in range(len(sorted_word_counts))])
    return np.array([sorted_word_counts[word] for word in words])

## Test war and peace

In [17]:
file = open('war_and_peace.txt', 'r')
text = file.read().lower()
words = re.sub('\W', ' ', text).split()[:500000]

tests = {'chi': chisquare_test}
paral_sizes = range(1, 5)
for name in tests:
    for ps in paral_sizes:
        hash_distributed_test(tests[name], words, paral_size=ps)

############################
PARTITIONING  1
PARTITION  0
1.17345 3.606634
P VALUE 0.9999999999997025
#######
############################
PARTITIONING  2
PARTITION  0
1.1727379 0.7840889
P VALUE 1.0
#######
PARTITION  1
1.248518 5.510748
P VALUE 1.0
#######
############################
PARTITIONING  3
PARTITION  1
1.116659 0.0
P VALUE 0.0
#######
PARTITION  0
1.265478 3.930909
P VALUE 1.0
#######
PARTITION  2
1.276557 2.419009
P VALUE 1.0
#######
############################
PARTITIONING  4
PARTITION  2
1.261104 0.7889334
P VALUE 1.324280386066556e-32
#######
PARTITION  1
1.245314 2.138257
P VALUE 1.0
#######
PARTITION  0
1.14273 0.0
P VALUE 0.0
#######
PARTITION  3
1.257746 3.059312
P VALUE 1.0
#######


## Test lognormal 

In [23]:
sample = np.random.lognormal(mean=0.3, sigma=2, size=100000)
sample = np.array(sample, dtype=np.int)
s, alpha = estimate_params(sample)
print(s, alpha)
print(chisquare_test(sample, s, alpha))

1.7165558 0.2584549
8.651000404238344e-28
