<a href="https://colab.research.google.com/github/Pezzekazil/templates/blob/master/simulation_effect_power_on_point_estimate.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import scipy.stats
import pandas as pd
import altair as alt

In [2]:
synth_data = np.random.gamma(0.3,8,10000)

In [17]:
class Simulation():
  def __init__(self, data):
    self.data = data
    self.simulated_output = None
    self.simulation_results = []

  def _summary(self, data=None) -> dict:
    """
    Helper function that, given a dataset, calculates common sample statistics
    """
    data = data if data else self.data
    mean = data.mean()
    sd = data.std()
    ss = data.shape[0]
    return {"mean": mean, "sd": sd, "ss_one_arm": ss, "se": sd / np.sqrt(ss)}

  def _get_effect_size(self, summary, power=0.80, confidence=0.95) -> dict:
    """
    Assumes
    1. same variance
    2. shift to the right applied uniformely to all observations
    3. Two-tail test
    4. Only two variants
    """
    z_crit_alpha = scipy.stats.norm.ppf(1 - (1-confidence)/2)
    z_crit_beta = scipy.stats.norm.ppf(1 - (1-power))
    effect_size = (
        (np.sqrt(2) * summary["sd"] * (z_crit_alpha + z_crit_beta)) /
        (np.sqrt(summary["ss_one_arm"])))
    return {
        **summary,
        "power": power,
        "confidence": confidence,
        "z_critical_alpha": z_crit_alpha,
        "z_critical_beta": z_crit_beta,
        "effect_size_abs": effect_size,
        "MDE": effect_size / summary["mean"]}

  def _resample(self, power=0.80, confidence=0.95, resample_control=False) -> dict:
    h0_data = np.random.choice(self.data, size = self.data.shape[0], replace=True) if resample_control else self.data
    es = self._get_effect_size(self._summary(), power, confidence)
    data_shifted = self.data + es["effect_size_abs"]
    ha_data = np.random.choice(data_shifted, size = data_shifted.shape[0], replace=True)
    t_stat, p_value = scipy.stats.ttest_ind(h0_data, ha_data)
    return {
        "power":power,
        "confidence":confidence,
        "ha_mean":ha_data.mean(),
        "h0_mean": h0_data.mean(),
        "delta": (ha_data.mean() - es["mean"]) / es["mean"],
        "is_grt_than_MDE": ((ha_data.mean() - es["mean"]) / es["mean"]) > es["MDE"],
        "is_stat_sig": p_value < (1-confidence),
        "true_lift": es["MDE"],
        "param_effect_size": es["effect_size_abs"],
        "param_data_mean": es["mean"],
        "n": 1
        }

  def simulate(self, power=0.8, confidence=0.95, cycles=1000, resample_control=False) -> pd.DataFrame:
    out = []
    for i in range(0,cycles):
      out.append(self._resample(power, confidence, resample_control))
    self.simulation = pd.DataFrame(out)
    return self.simulation

  def aggregate_results(self):
    all = self.simulation.agg({"n": "count", 'delta': 'mean'})
    stat_sig = self.simulation[self.simulation.is_stat_sig].agg({"n": "count", 'delta': 'mean'})
    self.simulation_results.append({
        "power": self.simulation.power.min(),
        "confidence": self.simulation.confidence.min(),
        "number_tests_simulated": all.n,
        "number_tests_SS": stat_sig.n,
        "true_lift": self.simulation.true_lift.min(),
        "lift_observed_all": all.delta,
        "lift_observed_SS": stat_sig.delta,
        "estimated_bias_pp": stat_sig.delta - self.simulation.true_lift.min()})
    return self.simulation_results[-1]

  def health_check_simulation(self):
    return self.simulation.groupby(["is_stat_sig", "is_grt_than_MDE"])["n"].sum()

  def e2e_simulation(self, resample_control=False):
    for power in range(1, 10, 1):
      self.simulate(power=power/10.0, resample_control=resample_control)
      self.aggregate_results()
    index = [i["power"] for i in self.simulation_results]
    data = {
        "percent_SS": [i["number_tests_SS"] / i["number_tests_simulated"] for i in self.simulation_results],
        "true_lift": [i["true_lift"] for i in self.simulation_results],
        "lift_observed_all": [i["lift_observed_all"] for i in self.simulation_results],
        "lift_observed_SS": [i["lift_observed_SS"] for i in self.simulation_results],
        "estimated_bias_pp": [i["estimated_bias_pp"] for i in self.simulation_results]}
    self.final_results = pd.DataFrame(data, pd.Index(index, name='power'))

In [18]:
sim1 = Simulation(synth_data)
sim1.e2e_simulation()

In [19]:
sim1.final_results.reset_index()

Unnamed: 0,power,percent_SS,true_lift,lift_observed_all,lift_observed_SS,estimated_bias_pp
0,0.1,0.029,0.017274,0.017505,0.057015,0.039741
1,0.2,0.116,0.028475,0.028401,0.0591,0.030624
2,0.3,0.221,0.036552,0.036208,0.060257,0.023705
3,0.4,0.353,0.043454,0.04356,0.06258,0.019126
4,0.5,0.503,0.049905,0.049812,0.064033,0.014128
5,0.6,0.651,0.056355,0.057504,0.067885,0.011529
6,0.7,0.786,0.063257,0.063573,0.070316,0.007059
7,0.8,0.891,0.071334,0.070761,0.074434,0.0031
8,0.9,0.966,0.082536,0.082636,0.084073,0.001538


In [None]:
base = alt.Chart(sim1.final_results.reset_index()).encode(
    x=alt.X("power:N", axis=alt.Axis(format=".1p", labelAngle=0)),
)
percent_SS = base.encode(
    y=alt.Y("percent_SS:Q", axis=alt.Axis(format=".1p"))
).mark_bar().properties(
    width=800,
    height=300
)
estimated_bias_pp = base.encode(
    y=alt.Y("estimated_bias_pp:Q", axis=alt.Axis(format=".2p"))
).mark_bar().properties(
    width=800,
    height=300
)
percent_SS & estimated_bias_pp