In [1]:
%env PURE_PYTHON True
%matplotlib inline

import matplotlib
import matplotlib.pyplot as plt

from datetime import datetime
from BTrees.OOBTree import OOBTree
import numpy as np
from collections import Counter
from scipy import stats
import pandas as pd
import pprint
import timeit

CHUNKS_SIZE = 10000
KEY_LENGTH = 8
ALPHABET = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'

env: PURE_PYTHON=True


In [6]:
_debug_random_sampling = []
DEFAULT_EXPLORING_STEP = 0

In [36]:
import os
os.environ["PURE_PYTHON"] = "True"

from BTrees.OOBTree import OOBTree as _OOBTree
from collections import Counter
import numpy as np


class OOBTreeExtLean(_OOBTree):
    _fanout_distribution_cache = {}
    _root = None
    _cache_hit_counter = Counter()

    def __init__(self):
        super(OOBTreeExtLean, self).__init__()
        self.walking_path_to_fanout_distribution = {}
        self.default_exploring_step = DEFAULT_EXPLORING_STEP

    def random_sampling(self, k):
        self._first_walk_to_determine_root_coefs()
        self.walking_path_to_fanout_distribution = {}
        all_accept_reject_measures = {
            'accept': [],
            'reject': [],
            'revisited_paths': Counter()
        }

        k = min(len(self), k)
        sampled_values = []
        all_walking_paths_set = set()
        all_walking_paths_stats = []
        while len(sampled_values) < k:
            sampled_value, walking_path, walking_path_stats  = \
                self._get_value_and_path_by_random_walk_from_node(node=self)

            if _this_value_was_sampled_already(walking_path, all_walking_paths_set):
                all_accept_reject_measures['revisited_paths'][str(walking_path)] += 1
                continue

            accept_reject_measures = {
                'path': walking_path,
                'value': sampled_value,
            }

            all_accept_reject_measures['accept'].append(accept_reject_measures)

            all_walking_paths_set.add(str(walking_path))
            all_walking_paths_stats.append(walking_path_stats)
            sampled_values.append(sampled_value)

        add_to_debug_global(locals())

        return sampled_values


    def _calc_fanout_distribution_of_node(self, node):
        if node in self._fanout_distribution_cache:
            self._cache_hit_counter['hit'] += 1
            return self._fanout_distribution_cache[node].copy()
        self._cache_hit_counter['miss'] += 1

        all_sizes = np.array([node.child.size for node in node._data])
        node_distribution = all_sizes / sum(all_sizes)

        self._fanout_distribution_cache[node] = node_distribution
        return node_distribution.copy()

    def _first_walk_to_determine_root_coefs(self):
        self._root = self
        branch_coefs = self._determine_root_to_leaf_walking_probs()
        equations_matrix, equations_equal_matrix = self._create_equations_for_equaling_all_walking_probs(branch_coefs)

        self.root_probs_coefs = np.linalg.solve(equations_matrix, equations_equal_matrix)

    def _create_equations_for_equaling_all_walking_probs(self, branch_coefs):
        root_fanout_distribution = self._calc_fanout_distribution_of_node(node=self)

        equations_matrix = np.zeros((len(branch_coefs) + 1, len(branch_coefs)))
        equations_equal_matrix = np.zeros(len(branch_coefs))

        for root_child_number in range(len(branch_coefs)):
            equations_matrix[root_child_number][0] = branch_coefs[0]
            equations_matrix[root_child_number][root_child_number] = -1 * branch_coefs[root_child_number]
            equations_matrix[-1][root_child_number] = root_fanout_distribution[root_child_number]

        equations_matrix = equations_matrix[1:, ]

        equations_equal_matrix[-1] = 1
        return equations_matrix, equations_equal_matrix

    def _determine_root_to_leaf_walking_probs(self):
        assert self._root, 'must keep _root aside before working with this method'
        root_to_leaf_walking_probs = {}
        root_fanout_distribution = self._calc_fanout_distribution_of_node(self._root)

        for root_child_number in range(len(self._root._data)):
            current_node = self._root
            walking_prob = root_fanout_distribution[root_child_number]
            current_node = current_node._data[root_child_number].child

            while not isinstance(current_node._data[DEFAULT_EXPLORING_STEP].child, self._bucket_type):
                node_fanout_distribution = self._calc_fanout_distribution_of_node(current_node)
                walking_prob *= node_fanout_distribution[0]
                current_node = current_node._data[DEFAULT_EXPLORING_STEP].child

            assert isinstance(current_node._data[DEFAULT_EXPLORING_STEP].child, self._bucket_type)
            walking_prob *= 1 / len(current_node._data)
            root_to_leaf_walking_probs[root_child_number] = walking_prob

        branch_coefs = np.array(list(root_to_leaf_walking_probs.values()))
        return branch_coefs

    def _get_value_and_path_by_random_walk_from_node(self, node):
        walking_path = []
        current_node = node
        prob_along_path = 1
        walking_path_stats = []
        while not isinstance(current_node, self._bucket_type):
            next_random_step, chosen_random_step_prob = self._random_next_move_respect_fanout_prob(
                current_node, walking_path)

            prob_along_path *= chosen_random_step_prob
            walking_path.append((next_random_step, current_node.size, chosen_random_step_prob, prob_along_path))
            current_node = current_node._data[next_random_step].child
            walking_path_stats.append({
                'next_random_step': next_random_step,
                'chosen_random_step_prob':
                    chosen_random_step_prob, 'prob_along_path':prob_along_path})

        next_random_step = np.random.randint(low=0, high=current_node.size)
        chosen_random_step_prob = 1/current_node.max_leaf_size  # todo: size
        prob_along_path *= chosen_random_step_prob
        walking_path.append((next_random_step, current_node.size, chosen_random_step_prob, prob_along_path))
        walking_path_stats.append({
            'next_random_step': next_random_step,
            'chosen_random_step_prob':
                chosen_random_step_prob, 'prob_along_path': prob_along_path,
            'entire_walking_path': walking_path})

        leaf = current_node._keys
        return leaf[next_random_step], walking_path, walking_path_stats

    def _is_root_node(self, node):
        assert self._root
        return node == self._root

    def _random_next_move_respect_fanout_prob(self, current_node, walking_path):
        node_distribution = self._calc_fanout_distribution_of_node(current_node)
        if self._is_root_node(current_node):
            node_distribution *= self.root_probs_coefs
            node_distribution = _fix_distribution_mistake_due_to_floating_calc_errors_if_needed(node_distribution)

        next_random_step = np.random.choice(current_node.size, p=node_distribution)
        
        chosen_random_step_prob = node_distribution[next_random_step]
        return next_random_step, chosen_random_step_prob


    def join(self, right_tree):
        pass


def _fix_distribution_mistake_due_to_floating_calc_errors_if_needed(node_distribution):
    # todo: as it happens only at the root, can fix the coefs
    if sum(node_distribution) == 1:
        return node_distribution

    calc_error = 1 - sum(node_distribution)
    assert calc_error <= 0.0_000_001, 'calculation faults cant be higher than this'
    random_child = np.random.randint(len(node_distribution))
    node_distribution[random_child] += calc_error
    return node_distribution


def add_to_debug_global(all_vars):
    global _debug_random_sampling
    _debug_random_sampling.append({
        'params': {
            'k': all_vars['k'],
        },
        'tree_size': len(all_vars['self']),
        'all_accept_reject_measures': all_vars['all_accept_reject_measures'],
        'all_walking_paths_stats': all_vars['all_walking_paths_stats']
    })


def _this_value_was_sampled_already(walking_path, all_walking_paths_set):
    return str(walking_path) in all_walking_paths_set


In [37]:
def generate_btree_index_x_values_with_dist(num_of_values, disired_prefix_to_percent_dist, my_index=None):
    my_index = my_index if my_index is not None else OOBTreeExt()
    for prefix, amount_percent in disired_prefix_to_percent_dist.items():
        amount = int(num_of_values * amount_percent)
        my_index = insert_to_index_random(my_index, amount, prefix)

    return my_index


def insert_to_index_random(my_index, amount, prefix=''):
    amount_in_iteration = min(CHUNKS_SIZE, amount)
    print('generating %s values, chunk of %s, with prefix=\'%s\'' %(amount, amount_in_iteration, prefix))

    proceed = 0
    for i in range(0, amount, amount_in_iteration):
        alphabet = list(ALPHABET)
        np_alphabet = np.array(alphabet)
        np_codes = np.random.choice(np_alphabet, [amount_in_iteration, KEY_LENGTH])
        my_index.update({
            prefix + ''.join(np_codes[i]): "".join(np_codes[i])
            for i in range(len(np_codes))
        })

        proceed += amount_in_iteration
        if (proceed % 150000) == 0:
            print('done generating %s values' % (proceed))
    return my_index


In [79]:
def _calculate_prefix_ditribution(values):
    return {value: occurences/len(values) for value, occurences in Counter([key[:4] for key in values]).most_common(10)}

In [38]:
prefix_to_percent = {
    'gggg': 0.25,
    'hhhh': 0.15,
    'mmmm': 0.10,
    'rrrr': 0.03,
    '': 0.47
}
print(datetime.utcnow())
num_of_values = 4_000_000
my_index_4m = generate_btree_index_x_values_with_dist(num_of_values, prefix_to_percent, OOBTreeExtLean())
print(datetime.utcnow())

2021-01-30 13:00:07.252999
generating 1000000 values, chunk of 10000, with prefix='gggg'
done generating 150000 values
done generating 300000 values
done generating 450000 values
done generating 600000 values
done generating 750000 values
done generating 900000 values
generating 600000 values, chunk of 10000, with prefix='hhhh'
done generating 150000 values
done generating 300000 values
done generating 450000 values
done generating 600000 values
generating 400000 values, chunk of 10000, with prefix='mmmm'
done generating 150000 values
done generating 300000 values
generating 120000 values, chunk of 10000, with prefix='rrrr'
generating 1880000 values, chunk of 10000, with prefix=''
done generating 150000 values
done generating 300000 values
done generating 450000 values
done generating 600000 values
done generating 750000 values
done generating 900000 values
done generating 1050000 values
done generating 1200000 values
done generating 1350000 values
done generating 1500000 values
done g

In [71]:
print(datetime.utcnow())
sampled = my_index_4m.random_sampling(k=10_000)
print(datetime.utcnow()) # to 11 seconds 

2021-02-03 17:32:41.589325
2021-02-03 17:32:45.564131


In [75]:
all_tree_elements = list(my_index_4m.keys())

In [78]:
stats.kstest(sampled, all_tree_elements)

KstestResult(statistic=0.01210924999999996, pvalue=0.1064102220731521)

In [77]:
len(all_tree_elements)

4000000