<a href="https://colab.research.google.com/github/arepevanatalia/AB-Testing/blob/main/%D0%98%D1%82%D0%BE%D0%B3%D0%BE%D0%B2%D0%B0%D1%8F_7_4_M7_HW_(1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

<p style="align: center;"><img align=center src="https://mailfit.com/wp-content/uploads/2019/11/lego-5.png"  width=900></p>
<h1 style="text-align: center;"><b>«Домашняя работа» - Ускорение тестирования</b></h3>


## Импорт библиотек

In [None]:
import numpy as np
import pandas as pd

import scipy

import matplotlib.pyplot as plt
import seaborn as sns
sns.set_theme()

## Импорт данных

In [None]:
data_path = "https://raw.githubusercontent.com/a-milenkin/Datasetes_for_Piplines/main/SkillBox/ab_speedup.csv"
data = pd.read_csv(data_path, index_col=0)
print("Размеры датасета", data.shape)
data.head(20)

Размеры датасета (50000, 2)


Unnamed: 0_level_0,group,metrica
strata,Unnamed: 1_level_1,Unnamed: 2_level_1
11,history,42.631346
6,history,14.844453
4,history,2.362768
37,history,79.494017
18,history,-22.627788
34,history,28.679378
9,history,20.875503
16,history,19.505496
43,history,-34.727193
15,history,26.712643


<div class="alert alert-info">
<b>Про Датасет</b>
    
Датасет представляет собой результаты некоторого А/B-Теста.

* `strata` - Некоторая информация по страту (группа), к которой относится пользователь.
* `group` - Группа эксперимента - тестовая, контрольная или прошлая история.
* `metrica` - Целивая метрика
    
</div>

<div class="alert alert-info">

<b>Задание:</b>    
    
Найти различие между группами, увеличив чувствительность тестов тремя способами поочередно:
* Бутстрап
* Стратификация
* CUPED
    
Вспомогательная статья Валерия Бабушкина про стратификацию и CUPED:

* habr.com/ru/company/yandex/blog/497804/

In [None]:
data = data.drop_duplicates()
print("Размер датасета после удаления дубликатов:", data.shape)

Размер датасета после удаления дубликатов: (50000, 2)


In [None]:
data = data.dropna(axis=1)
print("Размер датасета после удаления столбцов с пропусками:", data.shape)

Размер датасета после удаления столбцов с пропусками: (50000, 2)


In [None]:
from scipy import stats
from tqdm import tqdm

def calculate_stats(group):
    return group['metrica'].mean(), group['metrica'].std(), len(group)

def perform_t_test(control, test):
    mean_control, std_control, n_control = calculate_stats(control)
    mean_test, std_test, n_test = calculate_stats(test)

    if n_control == 0 or n_test == 0:
        return np.nan, np.nan

    t_statistic = (mean_test - mean_control) / np.sqrt((std_control**2 / n_control) + (std_test**2 / n_test))

    degrees_freedom = ((std_control**2 / n_control) + (std_test**2 / n_test))**2 / \
                      ((std_control**2 / n_control)**2 / (n_control - 1) + (std_test**2 / n_test)**2 / (n_test - 1))

    p_value = 2 * (1 - stats.t.cdf(abs(t_statistic), data=degrees_freedom))

    return t_statistic, p_value

control_group = data[data['group'] == 'control']
test_group = data[data['group'] == 'test']

t_statistic_base, p_value_base = perform_t_test(control_group, test_group)
print("Базовый A/B-тест:")
print(f"t-статистика: {t_statistic_base:.3f}")
print(f"p-значение: {p_value_base:.3f}")

print("\n--------------------------------------------------\n")

Базовый A/B-тест:
t-статистика: nan
p-значение: nan

--------------------------------------------------



In [None]:
print(f"Размер контрольной группы: {len(control_group)}")
print(f"Размер тестовой группы: {len(test_group)}")
print(f"Количество уникальных значений в контрольной группе: {control_group['metrica'].nunique()}")
print(f"Количество уникальных значений в тестовой группе: {test_group['metrica'].nunique()}")


t_statistic_base, p_value_base = perform_t_test(control_group, test_group)
print("Базовый A/B-тест:")
print(f"t-статистика: {t_statistic_base:.3f}")
print(f"p-значение: {p_value_base:.3f}")

print("\n--------------------------------------------------\n")

def bootstrap_test(control, test, n_iterations=1000, random_state=None):
    """
    Проводит бутстрап-тест для сравнения двух групп.  Улучшенная обработка пустых выборок.
    """
    if random_state is not None:
        np.random.seed(random_state)

    # 1. Смещаем данные: вычитаем среднее из каждой группы
    control_shifted = control['metrica'] - control['metrica'].mean()
    test_shifted = test['metrica'] - test['metrica'].mean()

    observed_difference = test['metrica'].mean() - control['metrica'].mean()

    differences = []
    valid_iterations = 0
    for i in tqdm(range(n_iterations), desc="Бутстрап"):
        # 2. Бутстрапируем *смещенные* данные
        boot_control = np.random.choice(control_shifted, size=len(control_shifted), replace=True)
        boot_test = np.random.choice(test_shifted, size=len(test_shifted), replace=True)

        # 3. Проверяем, что бутстрапированные выборки не пустые и не содержат только нули
        if len(boot_control) == 0 or len(boot_test) == 0 or np.all(boot_control == 0) or np.all(boot_test == 0):

            print(f"Итерация {i}: Пустая или нулевая бутстрап-выборка. Пропускаем.")
            continue

        # 4. Считаем разницу средних *бутстрапированных* выборок и *добавляем наблюдаемую разницу*
        diff = np.mean(boot_test) - np.mean(boot_control) + observed_difference
        differences.append(diff)
        valid_iterations += 1

    # 5. Считаем p-value, если были валидные итерации
    if valid_iterations > 0:
        p_value = np.mean(np.abs(differences) >= np.abs(observed_difference))
    else:
        p_value = np.nan

    return observed_difference, p_value

print("Бутстрап:")
observed_difference_boot, p_value_boot = bootstrap_test(control_group, test_group, random_state=42)
print(f"Наблюдаемая разница средних: {observed_difference_boot:.3f}")
print(f"p-значение: {p_value_boot:.3f}")

print("\n--------------------------------------------------\n")


Размер контрольной группы: 0
Размер тестовой группы: 0
Количество уникальных значений в контрольной группе: 0
Количество уникальных значений в тестовой группе: 0
Базовый A/B-тест:
t-статистика: nan
p-значение: nan

--------------------------------------------------

Бутстрап:


Бутстрап: 100%|██████████| 1000/1000 [00:00<00:00, 15643.73it/s]

Итерация 0: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 1: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 2: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 3: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 4: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 5: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 6: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 7: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 8: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 9: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 10: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 11: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 12: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 13: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 14: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 15: Пустая или нулевая бутстрап-выборка. Пропускаем.
Итерация 16: Пуста




In [None]:
print("Стратификация:")

if 'strata' not in data.columns:
    print("Столбец 'strata' отсутствует в данных. Невозможно провести стратификацию.")
else:

    def stratified_t_test(data, strata_col='strata', group_col='group', metric_col='metrica'):
        """
        Проводит стратифицированный t-тест для сравнения двух групп.
        """
        stratified_results = []
        strata_values = data[strata_col].unique()
        overall_mean = data[metric_col].mean()

        for strata in strata_values:
            control_strata = data[(data[group_col] == 'control') & (data[strata_col] == strata)][metric_col]
            test_strata = data[(data[group_col] == 'test') & (data[strata_col] == strata)][metric_col]

            if len(control_strata) == 0 or len(test_strata) == 0:
                print(f"Предупреждение: Страта '{strata}' содержит недостаточно данных для обеих групп. Пропускается.")
                continue

            control_strata_adj = control_strata - control_strata.mean() + overall_mean
            test_strata_adj = test_strata - test_strata.mean() + overall_mean

            t_statistic, p_value = perform_t_test(pd.DataFrame({'metrica': control_strata_adj}), pd.DataFrame({'metrica': test_strata_adj}))
            stratified_results.append({'strata': strata, 't_statistic': t_statistic, 'p_value': p_value})

        p_values = [result['p_value'] for result in stratified_results if not np.isnan(result['p_value'])]
        if not p_values:
            print("Нет валидных p-value для объединения.")
            return np.nan, stratified_results

        chi2_statistic = -2 * np.sum(np.log(p_values))
        degrees_freedom = 2 * len(p_values)
        combined_p_value = 1 - stats.chi2.cdf(chi2_statistic, degrees_freedom)

        return combined_p_value, stratified_results


    combined_p_value_stratified, stratified_results = stratified_t_test(data)
    print(f"Объединенное p-значение (стратификация): {combined_p_value_stratified:.3f}")
    print("\nРезультаты по стратам:")
    for result in stratified_results:
        print(f"Страта: {result['strata']}, t-статистика: {result['t_statistic']:.3f}, p-значение: {result['p_value']:.3f}")


print("\n--------------------------------------------------\n")

Стратификация:
Столбец 'strata' отсутствует в данных. Невозможно провести стратификацию.

--------------------------------------------------



In [None]:
print("CUPED:")

if 'past_history' not in data['group'].unique():
    print("Ошибка: В данных отсутствует 'past_history' для применения CUPED.  Убедитесь, что она указана в столбце 'group'.")
else:
    # 1. Расчет Theta
    past_history_group = data[data['group'] == 'past_history']
    test_group_cuped = data[data['group'] == 'test']
    control_group_cuped = data[data['group'] == 'control']

    if len(test_group_cuped) == 0 or len(past_history_group) == 0 or len(control_group_cuped) == 0:
        print("Ошибка: Отсутствуют данные в группах 'test', 'control' или 'past_history'. CUPED не может быть применен.")
    else:
        cov = np.cov(control_group_cuped['metrica'], past_history_group['metrica'])[0][1]
        var = np.var(past_history_group['metrica'])
        theta = cov / var

        # 2. CUPED Adjustment
        data['metric_cuped'] = data['metrica']

        data.loc[data['group'].isin(['test', 'control']), 'metric_cuped'] = data.loc[data['group'].isin(['test', 'control']), 'metrica'] - theta * (data.loc[data['group'].isin(['test', 'control']), 'metrica'] - past_history_group['metrica'].mean())

        # 3. Проведение t-теста на скорректированных данных
        control_group_cuped = data[data['group'] == 'control']
        test_group_cuped = data[data['group'] == 'test']
        t_statistic_cuped, p_value_cuped = perform_t_test(control_group_cuped, test_group_cuped)

        print(f"Theta: {theta:.3f}")
        print(f"t-статистика (CUPED): {t_statistic_cuped:.3f}")
        print(f"p-значение (CUPED): {p_value_cuped:.3f}")

CUPED:
Ошибка: В данных отсутствует 'past_history' для применения CUPED.  Убедитесь, что она указана в столбце 'group'.
