In [20]:
import numpy as np
np.random.seed(42)

In [37]:
def loop_bootstrap(a: np.ndarray, bs_iters: int = 1_000, agg="mean", **kwargs) -> np.ndarray:
    """Return a sampling with replacement with given number of iterations

    Arguments:
    a:          Dataset to sample from
    bs_iters:   Number of sampl ing iteration
    agg:        Aggregation method to use, must be in ["mean", "median", "quantile"]
    Returns:
    np.ndarray with samples of size(bs_iters, len(a))
    """
    if agg not in ["mean", "median", "quantile"]:
        raise ValueError("agg should be in ['mean', 'median', 'quantile']")
    res = []
    for _ in range(bs_iters):
        if agg == "mean":
            res.append(np.mean(np.random.choice(a, len(a), replace=True)))
        elif agg == "median":
            res.append(np.median(np.random.choice(a, len(a), replace=True)))
        elif agg == "quantile":
            res.append(np.quantile(np.random.choice(a, len(a), replace=True)), q=kwargs["q"])
    return np.array(res)

def poisson_bootstrap(a:np.ndarray, bs_iters:int = 1_000):
    res = []
    for _ in range(bs_iters):
        poisson_weights = np.random.poisson(1, size=len(a))
        weights_sum = np.sum(poisson_weights)
        a_weighted = a * poisson_weights
        weighted_mean = np.sum(a_weighted) / weights_sum
        res.append(weighted_mean)
    return np.array(res)

def poisson_bootstrap_preweighted(a:np.ndarray, weights:np.ndarray):
    weights_sum = np.sum(weights, axis=1)
    a_weighted = a * weights
    weighted_mean = np.sum(a_weighted, axis=1) / weights_sum
    return weighted_mean

def ci(diffs: np.ndarray, alpha: float = 0.05, **kwargs):
    # -> tuple[tuple[float, float], bool]:
    """Return a sampling with replacement with given number of iterations

    Arguments:
        diffs:  Dataset to compute percentiles upon
        alpha:  Confidence interval alpha
    Returns:
        tuple[CI left, CI right]
    """
    ci_l = np.percentile(diffs, (alpha / 2) * 100)
    ci_r = np.percentile(diffs, (1 - alpha / 2) * 100)
    return (ci_l, ci_r)

In [52]:
DATASET_SIZE = 500_000

In [53]:
dataset = np.random.normal(10, 2, DATASET_SIZE)

In [54]:
vanilla_bs = loop_bootstrap(dataset)
print('Vanilla bootstrap', ci(vanilla_bs))

Vanilla bootstrap (9.992997150340356, 10.004097500424876)


In [55]:
poisson_bs = poisson_bootstrap(dataset)
print('Poisson bootstrap', ci(poisson_bs))

Poisson bootstrap (9.992943040141007, 10.004210535657265)


In [56]:
pre_weights = np.random.poisson(1, size=(1_000, len(dataset)))

In [57]:
poisson_bs_preweighted = poisson_bootstrap_preweighted(dataset, pre_weights)
print('Poisson bootstrap', ci(poisson_bs))

Poisson bootstrap (9.992943040141007, 10.004210535657265)


Примерная имплементация в Spark
1. Единожды рассчитываем и сохраняем табличку с полями |bootstrap_iteration|item_idx|poisson_weight|. Максимальный item_idx можно задать, например, в 1М.
2. Когда приходит время, собираем табличку с полями |item|metric_to_assess|
3. В табличке из п.2 собираем поле item_hash как hash(concatenate(item, test_name, metric_name))
4. В табличке из п.2 собираем поле item_idx как индекс строки после сортировки по полю item_hash
5. Делаем Join двух табличек по полю item_idx
6. Домножаем metric_to_assess на poisson_weight. Получаем weighted_metric
7. Делаем GroupBy сджойненной таблички по полю bootstrap_iteration, считаем сумму weighted_metric и сумму poisson_weight
8. Считаем поле final_metric как sum(weighted_metric) / sum(poisson_weight)