In [129]:
from random import choice, choices
from string import ascii_lowercase, digits
import numpy as np
from numpy.random import default_rng

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline

from statsmodels.stats.proportion import multinomial_proportions_confint, proportion_confint

from scipy.stats import binom
from multiprocessing import Pool
from functools import partial

import time


In [156]:
def sisson_glaz(sample_freqs, sample_probs, word_list):
    sis = pd.DataFrame(multinomial_proportions_confint(sample_freqs, method='sison-glaz'), 
                   columns=['ci_lower','ci_upper'])
    sis['exp_method'] = 'sison-glaz'
    sis['word'] = word_list
    sis['param_probs'] = sample_probs
    sis['sample_probs'] = sample_probs
    return sis[['word', 'sample_probs', 'param_probs', 'exp_method', 'ci_lower', 'ci_upper']]


def gt_orlitsky(sample, word_list):
    counts = pd.Series(sample).value_counts().reindex(word_list , fill_value=0)
    counts_of_counts = counts.value_counts()
    p = {}
    for c in (counts_of_counts.index):
        c_o_c_1 = counts_of_counts[c+1] if c+1 in counts_of_counts else 0
        if c > c_o_c_1:
            p[c] = (c/counts.sum())
        else:
            p[c] = (c_o_c_1+1) * (c+1) / (counts.sum() * counts_of_counts[c])
    new_p = counts.map(p)
    return list(new_p / new_p.sum())


In [157]:
def get_zipf_dist_probs(zipf_param, exp_alphabet_size):
    ranks = np.arange(1, exp_alphabet_size+1)
    weights = ranks ** (-zipf_param)
    exp_probs = (weights / sum(weights))
    return exp_probs


def get_uniform_dist_probs(size):
    probs = np.array([1]*size) / size
    return probs


def get_step_dist_probs(exp_alphabet_size):
    probs = [1/(2*exp_alphabet_size)]*(int(exp_alphabet_size/2)) + [3/(2*exp_alphabet_size)]*(int(exp_alphabet_size/2))
    return probs


def get_word_list(size):
    chars = ascii_lowercase + digits
    word_list = [''.join(choice(chars) for _ in range(3)) for _ in range(size*2)]
    word_list = list(set(word_list))[:size]
    word_list.sort()
    return word_list


In [158]:
def combined_ci_framwork(boot_list, ci_method, param_func_probs, sample_probs, sample_size, corrected_confidence,
                         num_of_bootstrap_smaples):
    
    if ci_method == 'good_bootstrap':
        max_probs_list = []
        for arr in boot_list: 
            if np.any(arr == 0):
                arr_prob = arr == 0
                max_prob = np.array(param_func_probs)[arr_prob].max()
                max_probs_list.append(max_prob)
        ci_bound_for_zero = np.quantile(max_probs_list,corrected_confidence, interpolation='lower')
   
    elif ci_method == 'rule_of_three':
        ci_bound_for_zero = -np.log(1-corrected_confidence)/sample_size
   
    ci_lower_list = [0]*len(sample_probs)
    ci_upper_list = [0]*len(sample_probs)
    
    for i in range(len(sample_probs)):
        if sample_probs[i] == 0:
            ci_lower_list[i] = 0
            ci_upper_list[i] = ci_bound_for_zero
        else:
            ci_lower_list[i], ci_upper_list[i] = proportion_confint(sample_probs[i]*sample_size, sample_size, 
                                                                    (1-corrected_confidence), method="beta")
    return ci_lower_list, ci_upper_list
      

In [159]:
def get_ci(boot_list, ci_method, probs, sample_size, corrected_confidence, num_of_bootstrap_smaples, sample_probs):
    boot_pct_ci = pd.DataFrame()
    boot_pct_ci['param_probs'] = probs
    
    if ci_method == 'percentile':
        boot_pct_ci['ci_lower'] = np.quantile(boot_list,(1-corrected_confidence)/2, axis=0)
        boot_pct_ci['ci_upper'] = np.quantile(boot_list,(1+corrected_confidence)/2, axis=0)

    elif ((ci_method == 'good_bootstrap') | (ci_method == 'rule_of_three')):
        boot_pct_ci['ci_lower'], boot_pct_ci['ci_upper'] = combined_ci_framwork(boot_list, ci_method, probs, 
                                                                  sample_probs, sample_size, 
                                                                  corrected_confidence, num_of_bootstrap_smaples)
    return boot_pct_ci

In [188]:
def run_parametric_bootstrap(sample_words_list, sample_probs, word_index, sample_size, num_of_bootstrap_smaples, 
                             param_dist_func, ci_method='percentile',confidence=.95, bonferroni_correction=True):
    
    bonf_corrected_alpha = (1-confidence)/len(word_index)
    corrected_confidence = 1-(bonf_corrected_alpha) if bonferroni_correction else confidence
    probs = param_dist_func(sample_words_list, word_index)
    
    boot_arr = default_rng().multinomial(sample_size, probs, size=num_of_bootstrap_smaples)
    boot_list = list(boot_arr / boot_arr.sum(axis=1, keepdims=True))

    boot_pct_ci = get_ci(boot_list, ci_method, probs, sample_size, corrected_confidence,
                        num_of_bootstrap_smaples, sample_probs)
    
    boot_pct_ci['exp_method'] = ci_method 

    boot_pct_ci['word'] = word_index
    boot_pct_ci['sample_probs'] = sample_probs

    return boot_pct_ci[['word', 'sample_probs', 'param_probs', 'exp_method', 'ci_lower', 'ci_upper']].reset_index(drop=True)


In [189]:
def run_experiment(sample_size, probs, word_list, dictionary, num_of_bootstrap_smaples, iteration=None):
    if iteration % 10 == 0: 
        print('iteration {}'.format(iteration))
    
    samp_exp = default_rng().multinomial(sample_size, probs, size=1).flatten()

    our_sample_exp = pd.DataFrame(np.transpose(samp_exp), index=word_list, columns=['samp'])
    our_sample_exp['probs'] = our_sample_exp['samp'] / our_sample_exp['samp'].sum()
    our_sample_raw_exp = list(np.repeat(word_list, our_sample_exp.samp))
    
    sis = sisson_glaz(our_sample_exp.samp, our_sample_exp['probs'].to_list(), word_list)
    
    good_bootstrap = run_parametric_bootstrap(our_sample_raw_exp,our_sample_exp['probs'].to_list(), word_list, 
                                              sample_size, num_of_bootstrap_smaples, 
                                              gt_orlitsky, ci_method='good_bootstrap')
    rot = run_parametric_bootstrap(our_sample_raw_exp,our_sample_exp['probs'].to_list(), word_list, 
                                   sample_size, num_of_bootstrap_smaples, 
                                   gt_orlitsky, ci_method='rule_of_three')
    
    res =pd.concat([sis, good_bootstrap, rot], ignore_index=True)
    res.loc[res['ci_lower'] < 0, 'ci_lower'] = 0
    res.loc[res['ci_upper'] > 1, 'ci_upper'] = 1
    res['ci_length'] = res['ci_upper'] - res['ci_lower']
    res['log_ci_length'] = np.log(res['ci_length'])
    res['exp_num'] = iteration 
    res['true_prob'] = res['word'].map(dictionary)
    res['log_true_prob'] = np.log(res['true_prob'])
    res['in_ci'] = (res['true_prob'] >= res['ci_lower']) & \
                       (res['true_prob'] <= res['ci_upper'])
    
    return res

In [190]:
exp_alphabet_size = 100

exp_probs = get_zipf_dist_probs(1.01, exp_alphabet_size)
exp_sample_size = 50

chars = ascii_lowercase + digits
exp_word_list = [''.join(choice(chars) for _ in range(3)) for _ in range(exp_alphabet_size*2)]
exp_word_list = list(set(exp_word_list))[:exp_alphabet_size]
exp_word_list.sort()
exp_dictionary = dict(zip(exp_word_list, exp_probs))

exp_bootstrap_smaples = 5000

In [191]:

experiment_runs = 100
pool = Pool(10)
func = partial(run_experiment, exp_sample_size, exp_probs, exp_word_list, exp_dictionary, exp_bootstrap_smaples)

start_time = time.time()
pooled_results = pool.starmap(func, zip(range(experiment_runs)))
pool.close()
pool.join()
print("\n *********** process run time {} minutes ***********".format((time.time() - start_time)/60))


iteration 0
iteration 10
iteration 20
iteration 30
iteration 40
iteration 50
iteration 60
iteration 70
iteration 80
iteration 90

 *********** process run time 0.37437983751297 minutes ***********
