In [1]:
import numpy as np
import scipy.stats as sps
import pandas as pd
import plotly.graph_objs as go

## Доп функции

In [2]:
def get_qq_plot(p_values, title):
    """Рисует распределение p-value"""
    p_values = np.array(p_values)
    probs = []
    x = [0.01 * i for i in range(101)]
    for i in range(101):
        alpha_step = 0.01 * i
        probs.append(p_values[p_values < alpha_step].shape[0] / p_values.shape[0])
    fig = go.Figure([go.Scatter(x=x, y=probs, mode="markers", name="p_value"),
                 go.Scatter(x=x, y=x, mode="lines", name="uniform")])
    fig.update_layout(height=600, width=600, title=title) 
    return fig

In [3]:
def get_power(p_values, alpha=0.05):
    """Оценка мощности критерия, при условии, что значения p_value взяты при наличии 
    различий в сравниваемых выборках 
    """
    p_values = np.array(p_values)
    return p_values[p_values < alpha].shape[0] / p_values.shape[0] * 100

1. Реализовать формулу подсчета длительности теста, сравнить ее с онлайн калькуляторами (например https://mindbox.ru/tools/ab-test-calculator/ ). При сравнении оценить мощность критерия при указанном изменении и рассчитанном количестве наблюдений в выборке. 

In [5]:
def calculate_sample_size(mean_value, alpha, effect, power):
    p1 = mean_value
    p2 = p1 + effect

    z_beta = sps.norm.ppf(power)
    z_alpha = sps.norm.ppf(1 - alpha / 2)

    # Вычисление размера выборки
    sample_size = ((z_beta + z_alpha) ** 2 * (p1 * (1 - p1) + p2 * (1 - p2))) / (
        p1 - p2
    ) ** 2

    return sample_size

In [6]:
def calculate_power(sample_size, mean_value, effect, alpha):
    p_values_ttest = []
    p_values_mw = []

    n_exp = 10000
    params_1 = {"size": int(sample_size), "p": mean_value}
    params_2 = {"size": int(sample_size), "p": mean_value + effect}

    for _ in range(n_exp):
        x_a = sps.bernoulli.rvs(**params_1)
        x_b = sps.bernoulli.rvs(**params_2)
        p_values_ttest.append(sps.ttest_ind(x_a, x_b, equal_var=False).pvalue)
        p_values_mw.append(sps.mannwhitneyu(x_a, x_b).pvalue)
    p_values_ttest = np.array(p_values_ttest)
    ttest_power = (
        p_values_ttest[p_values_ttest < alpha].shape[0] / p_values_ttest.shape[0] * 100
    )
    p_values_mw = np.array(p_values_mw)
    mw_power = p_values_mw[p_values_mw < alpha].shape[0] / p_values_mw.shape[0] * 100

    return ttest_power, mw_power

In [7]:
p = 0.2
alpha = 0.05
effect = 0.03
power = 0.8

sample_size = calculate_sample_size(
    mean_value=p, alpha=alpha, effect=effect, power=power
)
print(f"Необходимый размер выборки для каждой группы: {sample_size}")

ttest_power, mw_power = calculate_power(
    sample_size, mean_value=p, effect=effect, alpha=alpha
)
print(f"Рассчитанная мощность с использованием t-теста: {ttest_power}%")
print(f"Рассчитанная мощность с использованием теста Манна-Уитни: {mw_power}%")

Необходимый размер выборки для каждой группы: 2939.841509387865
Рассчитанная мощность с использованием t-теста: 80.17%
Рассчитанная мощность с использованием теста Манна-Уитни: 80.17%


In [9]:
p = 0.5
alpha = 0.05
effect = 0.1
power = 0.7

sample_size = calculate_sample_size(
    mean_value=p, alpha=alpha, effect=effect, power=power
)
print(f"Необходимый размер выборки для каждой группы: {sample_size}")

ttest_power, mw_power = calculate_power(
    sample_size, mean_value=p, effect=effect, alpha=alpha
)
print(f"Рассчитанная мощность с использованием t-теста: {ttest_power}%")
print(f"Рассчитанная мощность с использованием теста Манна-Уитни: {mw_power}%")

Необходимый размер выборки для каждой группы: 302.43128080415227
Рассчитанная мощность с использованием t-теста: 70.17%
Рассчитанная мощность с использованием теста Манна-Уитни: 69.89%


2. Реализовать метод линеаризации. Проверить для него корректность и мощность. Мощность должна быть больше, чем просто на обычных значениях конверсии пользователей.

In [10]:
def linearization(p=0.05, effect=0):
    n_exp = 1000
    p_values = []
    p_values_lin = []

    for _ in range(n_exp):
        records = []
        for _ in range(100):
            n_views = int(sps.expon.rvs(loc=100, scale=100))
            clicks = sps.bernoulli.rvs(p=p, size=n_views)
            records.append([n_views, np.sum(clicks), np.sum(clicks) / n_views, "A"])

        for _ in range(100):
            n_views = int(sps.expon.rvs(loc=100, scale=100))
            clicks = sps.bernoulli.rvs(p=p + effect, size=n_views)
            records.append([n_views, np.sum(clicks), np.sum(clicks) / n_views, "B"])

        df_data = pd.DataFrame(records, columns=["views", "clicks", "cr", "group"])
        cr_A = df_data[df_data["group"] == "A"]["clicks"].sum() / df_data[df_data["group"] == "A"]["views"].sum()
        df_data["cr_lin"] = df_data["clicks"] - cr_A * df_data["views"]

        x_a = df_data[df_data["group"] == "A"]["cr"]
        x_b = df_data[df_data["group"] == "B"]["cr"]
        p_value = sps.ttest_ind(x_a, x_b).pvalue
        p_values.append(p_value)

        x_a_lin = df_data[df_data["group"] == "A"]["cr_lin"]
        x_b_lin = df_data[df_data["group"] == "B"]["cr_lin"]
        p_value_lin = sps.ttest_ind(x_a_lin, x_b_lin).pvalue
        p_values_lin.append(p_value_lin)

    return p_values, p_values_lin

In [11]:
p_values, p_values_lin = linearization()

In [12]:
get_qq_plot(p_values, title='Без линеаризации')

In [13]:
get_qq_plot(p_values_lin, title='С Линеаризацией')

In [14]:
p_values, p_values_lin = linearization(p=0.05, effect=0.005)

print(f'Мощность без линеаризации: {get_power(p_values)}')
print(f'Мощность с линеаризацией: {get_power(p_values_lin)}')

Мощность без линеаризации: 52.800000000000004
Мощность с линеаризацией: 59.599999999999994


3.Реализовать метод CUPED. Проверить для него корректность и мощность. Данные на этапе до A/B тесте необходимо сгенерировать один раз, далее синтетически генерировать только часть, связанную с проведением A/B-теста.

In [15]:
def cuped(dist, pre_exp, params_1, params_2):
    n_exp = 1000
    p_values = np.empty(n_exp)
    p_values_cuped = np.empty(n_exp)

    for i in range(n_exp):
        df_A = pd.DataFrame({
            'user': [f"A_{x:5}" for x in range(len(pre_exp))],
            'pre_exp': pre_exp
        })
        df_B = pd.DataFrame({
            'user': [f"B_{x:5}" for x in range(len(pre_exp))],
            'pre_exp': pre_exp
        })

        df_A["payments"] = dist(**params_1) * df_A["pre_exp"]
        df_B["payments"] = dist(**params_2) * df_B["pre_exp"]

        p_values[i] = sps.ttest_ind(df_A["payments"], df_B["payments"]).pvalue

        x_a = df_A["pre_exp"]
        x_b = df_B["pre_exp"]
        y_a = df_A["payments"]
        y_b = df_B["payments"]
        theta = np.cov(x_a, y_a)[0, 1] / np.var(x_a)

        df_A["payments_cuped"] = df_A["payments"] - theta * df_A["pre_exp"]
        df_B["payments_cuped"] = df_B["payments"] - theta * df_B["pre_exp"]

        p_values_cuped[i] = sps.ttest_ind(df_A["payments_cuped"], df_B["payments_cuped"]).pvalue

    return p_values, p_values_cuped


size = 10000
pre_exp = sps.norm.rvs(loc=100, scale=20, size=size)

params_1 = dict(loc=100, scale=100, size=size)
params_2 = dict(loc=100, scale=100, size=size)
p_values, p_values_cuped = cuped(sps.expon.rvs, pre_exp, params_1, params_2)
get_qq_plot(p_values_cuped, title='СUPED')

In [16]:
effect = 0.005
params_1 = dict(loc=1, scale=0.1, size=size)
params_2 = dict(loc=1 + effect, scale=0.1, size=size)
p_values, p_values_cuped = cuped(sps.expon.rvs, pre_exp, params_1, params_2)
power_ordinal_test = np.mean(p_values < alpha) * 100
power_cuped = np.mean(p_values_cuped < alpha) * 100

print(f"Мощность обычного теста: {power_ordinal_test:.2f}")
print(f"Мощность с применением метода CUPED: {power_cuped:.2f}")

Мощность обычного теста: 12.10
Мощность с применением метода CUPED: 93.80
