In [None]:
import pandas as pd
import numpy as np
import warnings
import matplotlib.pyplot as plt
import requests, time
import seaborn as sns

from datetime import datetime, timedelta
from io import StringIO
from scipy import stats
from tqdm import tqdm
from statsmodels.stats.weightstats import ztest as ztest
from statsmodels.sandbox.stats.multicomp import multipletests
from statsmodels.stats.power import TTestIndPower
from typing import List

from ambrosia.designer import Designer, design, load_from_config

In [None]:
import warnings
warnings.filterwarnings('ignore')

# Simulation of a standard experiment with two groups

In [None]:
pvalues_list = list()
alpha = 0.05
mu = 5
std = 1
group_size = 200
np.random.seed(42)

for _ in tqdm(range(1000)):
    p_values = list()
    control = np.random.normal(mu, std, group_size)
    treatment = np.random.normal(mu, std, group_size)
    
    pvalue = stats.ttest_ind(control, treatment)[1] # here the probability of a type I error = 0.05
    
    if pvalue < 0.05:
        pvalues_list.append(1)
    else:
        pvalues_list.append(0)

print(f"Ошибка I рода: {np.mean(pvalues_list)}")

The Type I error is ~0.05 as expected

In [None]:
pvalues_list = list()
alpha = 0.05
mu = 5
std = 1
group_size = 200
np.random.seed(42)

for _ in tqdm(range(1000)):
    p_values = list()
    control = np.random.normal(mu, std, group_size)
    treatment1 = np.random.normal(mu, std, group_size)
    treatment2 = np.random.normal(mu, std, group_size)
    
    pvalue_a = stats.ttest_ind(control, treatment1)[1] # here the probability of a type I error = 0.05
    pvalue_b = stats.ttest_ind(control, treatment2)[1] # here the probability of a type I error = 0.05
    pvalue_c = stats.ttest_ind(treatment1, treatment2)[1] # and here the probability of a type I error = 0.05
    
    # in total, the probability of being wrong in at least one of the cases = 1 - P(we will never make a mistake) = 1 - (0.95 ** 3) ~= 0.14
    
    if pvalue_a < 0.05 or pvalue_b < 0.05 or pvalue_c < 0.05:
        pvalues_list.append(1)
    else:
        pvalues_list.append(0)

print(f"Групповая вероятность ошибки I рода: {np.mean(pvalues_list)}")

Type I error is much greater than 0.05

## Change in FWER depending on the number of comparisons

In [None]:
n_list = np.arange(1, 100)

result = {}

for n in n_list:
    result[n] = 1 - (1 - 0.05) ** n

In [None]:
plt.plot(list(result.keys()), list(result.values()))
plt.title("FWER")
plt.xlabel("Number of hypotheses")
plt.ylabel("FWER")
plt.grid()
plt.show()

### Functions for correction methods that will be useful later

In [None]:
def method_without_correct(p_values: List[float], alpha: float = 0.05):
    """The function returns the comparison result without corrections"""
    
    res = (np.array(p_values) <= alpha).astype(int)
    return res

In [None]:
def method_bonferroni(p_values: List[float], alpha: float = 0.05):
    """The function returns the comparison result with Bonferroni correction"""
    
    return (multipletests(p_values, alpha=alpha, method="bonferroni")[0]).astype(int)


In [None]:
def method_holm(p_values: List[float], alpha: float = 0.05):
    """The function returns the comparison result with Holm correction"""
    
    return (multipletests(p_values, alpha=alpha, method="holm")[0]).astype(int)

In [None]:
def method_benjamini_hochberg(p_values: List[float], alpha: float = 0.05):
    """The function returns the comparison result with Benjamini-Hochberg correction"""
    
    return (multipletests(p_values, alpha=alpha, method="fdr_bh")[0]).astype(int)

### Functions for demonstrating graphs with the behavior of the methods in question

In [None]:
def draw_plot(methods: List[str], num_of_tests: List[int], metric: str, result: dict, title: str) -> None:
    """Visualization of a plot of errors depending on the number of comparisons"""
    
    sns.set(style="darkgrid")
    fig, ax = plt.subplots(1, 1, figsize=(8, 5), dpi=200)
    
    method_markup = dict()
    
    for method in methods:
        if method == "without_correct":
            method_markup[method] = ["-v"]
        elif method == "bonferroni":
            method_markup[method] = ["-o"]
        elif method == "holm":
            method_markup[method] = ["-o"]
        else:
            method_markup[method] = ["->"]
    
    if metric in ("fwer", "fdr"):
        ax.hlines(alpha, 0, max(num_of_tests), linestyles='--', label=f"alpha={alpha}", color="red")
    else:
        ax.hlines(0.2, 0, max(num_of_tests), linestyles='--', label="beta=0.2", color='r')
    
    for method, value in result.items():
        ax.plot(value.keys(), value.values(), method_markup[method][0], label=method, alpha=0.7)
        ax.legend(fontsize=12)
        ax.set_title(title)
        ax.set_xlabel("Experiments, num")
        ax.set_ylabel(title)
    
    return


In [None]:
def get_errors(result: dict, metric: str, methods: list, num_of_tests: list, no_effect_type: int) -> dict:
    """Function for calculating errors"""
    
    errors_dict = {
        method_name: {
            num: []
            for num in num_of_tests
        }
        for method_name in methods
    }

    for method, params in result.items():
        if metric == "fdr":
            for cnt, val in params.items():
                if no_effect_type == -1:
                    cnt_groups_no_effects = 1
                elif no_effect_type == 0:
                    cnt_groups_no_effects = cnt // 2
                else:
                    cnt_groups_no_effects = cnt - 1
                fp = np.array(val["first"]) * (cnt - cnt_groups_no_effects)
                tp = (1 - np.array(val["second"])) * cnt
                fp_tp = fp + tp
                fp_tp[fp_tp < 1] = 1
                errors_dict[method][cnt] = np.mean(fp / fp_tp)
        elif metric == "fwer":
            for cnt, val in params.items():
                errors_dict[method][cnt] = np.mean(np.array(val["first"]) > 0)
        elif metric == "error_type_II":
            for cnt, val in params.items():
                errors_dict[method][cnt] = np.mean(val["second"])
        else:
            for cnt, val in params.items():
                errors_dict[method][cnt] = np.mean(np.array(val["second"]) > 0)

    return errors_dict


In [None]:
def construct_groups(num: int, cnt_groups_no_effects: int, metric: str, 
                     mu: float, std: float, group_size: str, effect: float) -> list:
    """Function for forming groups"""
    
    groups_list = list()
    
    for _ in range(num):
        if metric == "fwer":
            groups_list.append((np.random.normal(mu, std, group_size), np.random.normal(mu, std, group_size)))
        elif metric in ("fwer II", "error_type_II"):
            groups_list.append((np.random.normal(mu, std, group_size),
                    np.random.normal(mu * (1 + effect), std, group_size)
                )
            )
        else:
            for _ in range(cnt_groups_no_effects):
                groups_list.append((np.random.normal(mu, std, group_size), np.random.normal(mu, std, group_size)))

            for _ in range(num - cnt_groups_no_effects):
                groups_list.append((np.random.normal(mu, std, group_size),
                        np.random.normal(mu * (1 + effect), std, group_size)
                    )
                )
    return groups_list

In [None]:
def get_plot_metrics(methods: list, num_of_tests: list, metric: str, 
                     title: str, group_size: int = 100, effect: float = 0.0,
    mu: float = 5.0, std: float = 1, n_iter: int = 300, alpha: float = 0.05, no_effect_type: int = 0) -> None:
    """"""
    if metric not in ("fwer", "fwer II", "fdr", "error_type_II"):
        return "Incorrect Method Name"
    
    result = {
        method: {num: {"first": [], "second": []} for num in num_of_tests} for method in methods
    }
    
    
    for num in tqdm(num_of_tests):
        cnt_groups_no_effects = 0
        if metric == "fdr":
            if no_effect_type == -1:
                cnt_groups_no_effects = 1
            elif no_effect_type == 0:
                cnt_groups_no_effects = num // 2
            else:
                cnt_groups_no_effects = num - 1
        
        for _ in range(n_iter):
            groups_list = construct_groups(num, cnt_groups_no_effects, metric, mu, std, group_size, effect)

            p_values = list()
    
            for control, treatment in groups_list:
                p_values.append(stats.ttest_ind(control, treatment)[1])

            for method in methods:
                if method == "without_correct":
                    func = method_without_correct
                elif method == "bonferroni":
                    func = method_bonferroni
                elif method == "holm":
                    func = method_holm
                else:
                    func = method_benjamini_hochberg
                
                if metric in ("fwer", "fwer II", "error_type_II"):
                    result[method][num]["first"].append(np.mean(func(p_values)))
                    result[method][num]["second"].append(
                        1 - np.mean(func(p_values))
                    )
                else:
                    result[method][num]["first"].append(np.mean(func(p_values)[:cnt_groups_no_effects]))
                    result[method][num]["second"].append(
                        1 - np.mean(func(p_values)[cnt_groups_no_effects:])
                    )

    errors = get_errors(result, metric, methods, num_of_tests, no_effect_type)
    
    if metric == "fwer":
        draw_plot(methods, num_of_tests, metric, errors, title)
    elif metric == "fwer II":
        draw_plot(methods, num_of_tests, metric, errors, title)
    else:
        draw_plot(methods, num_of_tests, metric, errors, title)

    return 


List of methods that will be passed to the function for drawing a graph

In [None]:
methods = [
    "without_correct",
    "bonferroni",
    "holm",
    "benjamini-hochberg"
]

Drawing a graph showing the behavior of the methods in question depending on the specified parameters

## FWER

### Bonferroni

#### Checking FWER control using the Bonferroni method

In [None]:
methods = [
    "without_correct",
    "bonferroni",
#     "holm",
#     "benjamini-hochberg"
]

In [None]:
power_analysis = TTestIndPower()

effect_size = power_analysis.solve_power(
    power=0.8, 
    alpha=0.05,
    nobs1=100,
    alternative="two-sided"
)

print(f"Effect size (MDE) to add to test group: {round(effect_size, 3)}")

In [None]:
num_of_tests = [1, 2, 3, 5, 10, 30, 50]

In [None]:
get_plot_metrics(
    methods, # list of methods for comparing
    num_of_tests, # list of number of comparings
    "fwer", # type error for drawing
    "FWER", # title and ylabel for plot
    group_size=100,
    effect=0.0,
    mu=5.0,
    std=1,
    n_iter=500,
    alpha=0.05
)

#### Checking FWER II control using the Bonferroni method

In [None]:
get_plot_metrics(
    methods,
    num_of_tests,
    "fwer II",
    "FWER II",
    group_size=500, # Added a larger sample size to make the graph more readable
    effect=0.0398, # mde
    mu=5,
    std=1,
    n_iter=1000,
    alpha=0.05
)

## Holm

### Checking FWER control using the Holm method

In [None]:
methods = [
    "without_correct",
    "bonferroni",
    "holm",
#     "benjamini-hochberg"
]

In [None]:
get_plot_metrics(
    methods,
    num_of_tests,
    "fwer",
    "FWER",
    group_size=100,
    effect=0.0,
    mu=5,
    std=1,
    n_iter=500,
    alpha=0.05,
)

### Checking FWER II control using the Holm method

In [None]:
get_plot_metrics(
    methods,
    num_of_tests,
    "fwer II",
    "FWER II",
    group_size=500,
    effect=0.0398,
    mu=5,
    std=1,
    n_iter=500,
    alpha=0.05
)

## FDR

In [None]:
methods = [
    "without_correct",
    "bonferroni",
    "holm",
    "benjamini-hochberg"
]

### Checking FDR control using the B-H method

Half of the hypotheses have an effect, half have no effect

In [None]:
get_plot_metrics(
    methods,
    num_of_tests,
    "fdr",
    "FDR",
    group_size=20,
    effect=0.0398,
    mu=5,
    std=1,
    n_iter=500,
    alpha=0.05,
    no_effect_type=0
)

One hypothesis from all without effect, the rest - with effect

In [None]:
get_plot_metrics(
    methods,
    num_of_tests,
    "fdr",
    "FDR",
    group_size=20,
    effect=0.0398,
    mu=5,
    std=1,
    n_iter=500,
    alpha=0.05,
    no_effect_type=-1
)

One of all hypotheses has an effect, the rest have no effect

In [None]:
get_plot_metrics(
    methods,
    num_of_tests,
    "fdr",
    "FDR",
    group_size=20,
    effect=0.0398,
    mu=5,
    std=1,
    n_iter=500,
    alpha=0.05,
    no_effect_type=1
)

# Simulation of experiment with example

In [None]:
def get_sample_size(mu: float, std: float, eff: float = 1.01, alpha: float = 0.05, beta: float = 0.2) -> int:
    """Function for calculating sample size for t-test"""
    
    t_a = abs(stats.norm.ppf(alpha / 2, loc=0, scale=1))
    t_b = stats.norm.ppf(1 - beta, loc=0, scale=1)
    

    mu_sqr = (mu - mu * eff) ** 2
        
    z_sqr = (t_a + t_b) ** 2
    disp = 2 * (std ** 2)

    sample_size = int(
        np.ceil(
            z_sqr * disp / mu_sqr
        )
    )
    return sample_size

Create a dataframe with a normal metric distribution

In [None]:
mu = 10
std = 1
size_group = 1000000

df = pd.DataFrame({"metric": np.random.normal(mu, std, size_group)})

In [None]:
designer = Designer(dataframe=df, metrics="metric")

Let's estimate the sample size required to detect the effect using the Ambrosia library

In [None]:
designer = Designer(dataframe=df, metrics="metric")

effects = [1.005, 1.05]  # MDE in percents
first_type_errors = [0.01, 0.05]
second_type_errors = [0.1, 0.2]

designer.set_first_errors(first_type_errors)
designer.set_second_errors(second_type_errors)

designer.run(to_design="size", method="theory", effects=effects)

Let's compare the value with our calculation of the minimum sample size

In [None]:
alpha = 0.05
beta = 0.2

mu = 10
std = 1
effect = 1.005

In [None]:
get_sample_size(mu, std, effect, alpha, beta)

How does sample size depend on type 2 error?

In [None]:
res = {}
nums = [0.01, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.45, 0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9]

for b in nums:
    res[b] = get_sample_size(10, 1, 1.005, 0.05, b)

In [None]:
plt.plot(res.keys(), res.values())

Function for calculating errors of the first and second types, taking into account sample size

In [None]:
def get_barplot_errors(df: pd.DataFrame, methods: dict, mu: float, std: float, effect: float, 
                       sample_size: int, n_groups: int, n_iter: int, type_error: str) -> None:
    """Function for drawing a graph with errors taking into account the sample size"""
    
    fwer = {
        method_name: []
        for method_name in methods
    }

    error_type_II = {
        method_name: []
        for method_name in methods
    }

    fwtr = {
        method_name: []
        for method_name in methods
    }
    
    values = df["metric"].values
    effect = effect
    groups = n_groups
    np.random.seed(64)

    groups_bucket = {
        num: []
        for num in range(1, groups + 1)
    }

    result = dict()

    for method in methods:
        result[method] = {"first": list(), "second": list()}
        
    for _ in tqdm(range(n_iter)):
        for num in groups_bucket.keys():
            if num == 1:
                group = np.random.normal(mu * effect, std, int(sample_size))
            else:
                group = np.random.choice(values, int(sample_size), True)
            groups_bucket[num] = group

        pvalues = list()

        for first_num in range(1, groups + 1):
            for second_num in range(first_num + 1, groups + 1):
                _, pvalue = stats.ttest_ind(groups_bucket[first_num], groups_bucket[second_num])
                pvalues.append(pvalue)

        for method, func in methods.items():
            result[method]["first"].append(np.mean(func(pvalues)[4:]))
            result[method]["second"].append(
                    1 - np.mean(func(pvalues)[:4])
                )
        
    for method, params in result.items():
        fwer[method] = np.mean(np.array(params["first"]) > 0)
        fwtr[method] = np.mean(np.array(params["second"]) > 0)
        error_type_II[method] = np.mean(params["second"])
        
    if type_error == "fwer":
        errors = fwer
        ylabel = "FWER"
        label = "alpha=0.05"
        xline = 0.05
    elif type_error == "fwer II":
        errors = fwtr
        ylabel = "FWER II"
        label = "beta=0.2"
        xline = 0.2
    else:
        errors = error_type_II
        ylabel = "Error Type II"
        label = "beta=0.2"
        xline = 0.2
    
    sns.set(style="darkgrid")
    fig, ax = plt.subplots(1, 1, figsize=(8, 5), dpi=200)
    plt.bar(range(len(errors)), list(errors.values()), align="center")
    plt.xticks(range(len(errors)), list(errors.keys()))
    ax.hlines(xline, -1, 4, linestyles='--', label=label, color='r')
    plt.ylabel(ylabel)
    plt.xlabel("Method")

    plt.show()
    
    return 


In [None]:
alpha = 0.05
beta = 0.2

mu = 10
std = 1
effect = 1.005

In [None]:
methods = {
    "without_correct": method_without_correct,
    "bonferroni": method_bonferroni,
    "holm": method_holm,
    "benjamini-hochberg": method_benjamini_hochberg,
}

In [None]:
get_barplot_errors(df=df, methods=methods, mu=mu, std=std, effect=effect, 
                       sample_size=6300, n_groups=5, n_iter=3000, type_error="fwer")

Holm and Bonferroni methods control the FWER at a given level

In [None]:
get_barplot_errors(df=df, methods=methods, mu=mu, std=std, effect=effect, 
                       sample_size=6300, n_groups=5, n_iter=3000, type_error="error_type_II")

But the average type II error is higher than the expected value of 0.2

Let's adjust the sample size by adjusting the alpha

In [None]:
groups = 5
comparisons = 10

correct_alpha = alpha / comparisons
sample_size_corrected = get_sample_size(mu, std, effect, correct_alpha, beta)

print(f"Adjusted sample size: {sample_size_corrected}")

In [None]:
get_barplot_errors(df=df, methods=methods, mu=mu, std=std, effect=effect, 
                       sample_size=10651, n_groups=5, n_iter=3000, type_error="error_type_II")

Now the average error of the second type corresponds to the level of 0.2

Let's try to adjust the sample size by averaging the power

In [None]:
groups = 5
comparisons = 10

n_values = list()

for i in range(1, comparisons):
    n_values.append(get_sample_size(mu, std, effect, alpha / i, beta))

print(f"Adjusted sample size: {int(np.mean(n_values))}")

In [None]:
get_barplot_errors(df=df, methods=methods, mu=mu, std=std, effect=effect, 
                       sample_size=8986, n_groups=5, n_iter=3000, type_error="error_type_II")

The average type II error is higher than the expected value of 0.2, but acceptable

Let's adjust the sample size by controlling the probability of making at least one type II error

In [None]:
groups = 5
comparisons = 10
n_values = list()

n = get_sample_size(mu, std, effect, alpha, beta)

for i in range(1, int(comparisons ** 2)):
    n_values.append(get_sample_size(mu, std, effect, alpha / i, beta))

print(f"Adjusted sample size: {int(np.mean(n_values))}")

In [None]:
get_barplot_errors(df=df, methods=methods, mu=mu, std=std, effect=effect, 
                       sample_size=13125, n_groups=5, n_iter=3000, type_error="fwer II")

For the Holm method, the value of the second type error is close to 0.2