<h3>Running Tests </h3>
What I want to do is to test three different ways on analyzing A/B tests:
<ol><li> Bayesian analysis with a structurally appropriate prior and good hyperparameters </li>
    <li> Bayesian analysis with a structurally appropriate prior and poor hyperparameters</li>
    <li>T-Tests</li>
</ol>
I am going to make multiple data files that use a hurdle model, with non-zero spending using a distibution like the above. The data will have a variable TestControl that will be T for tes and C for control. Then I give the T cases a 3% increase in revenue. This should be real, but hard to detect.<br>
<br>
These things take forever to run, and sometimes crash in the middle, so I am seperating the process of generating the tests vs. analyzing the tests.


In [1]:
# Importing Things #
import pandas as pd
import numpy as np
import math as m
from scipy import stats
from scipy.stats import gennorm
from scipy.stats import gamma
import seaborn as sns
import matplotlib.pyplot as plt
import pymc as pm
import pickle
import os



In [2]:
gGaussMean = 3
gGaussSpread = 2.5
gaussianBeta = 3
alpha=0.2945910793963176
scale=0.0051161465104606904
numTests=66
lift=1.03
resultFile = r"C:\Users\efree\BayesTests_v0.pkl"

In [3]:
def generate_dataset(N=48000, R=0.60, M=gGaussMean, S=gGaussSpread, B=gaussianBeta,lift=1.00):
    first_col = np.random.choice(['T', 'C'], size=N)
    second_col = np.zeros(N)
    non_zero_count = int((1 - R) * N)
    non_zero_indices = np.random.choice(N, size=non_zero_count, replace=False)
    random_values = gennorm.rvs(beta=B, loc=M, scale=S, size=non_zero_count)
    second_col[non_zero_indices] = np.exp(random_values)
    df = pd.DataFrame({
        'TestControl': first_col,
        'Revenue': second_col
    })
    df['Revenue']=df.apply(lambda x: x['Revenue']*lift if x['TestControl']=='T' else x['Revenue'],axis=1)

    return df

In [4]:
def hurdle_gamma_test_good(dataset, samples=3000, tune=1000):
    T_data = dataset[dataset['TestControl'] == 'T']['Revenue'].values
    C_data = dataset[dataset['TestControl'] == 'C']['Revenue'].values

    with pm.Model() as model:
        # Hurdle components (probability of non-zero revenue)
        theta_T = pm.Beta('theta_T', alpha=1, beta=1)
        theta_C = pm.Beta('theta_C', alpha=1, beta=1)

        # Gamma distribution parameters
        alpha_T = pm.HalfNormal('alpha_T', sigma=1)
        beta_T = pm.HalfNormal('beta_T', sigma=0.5)
        alpha_C = pm.HalfNormal('alpha_C', sigma=1)
        beta_C = pm.HalfNormal('beta_C', sigma=0.5)

        # Likelihoods for T group
        is_nonzero_T = pm.Bernoulli('is_nonzero_T', p=theta_T, observed=(T_data > 0).astype(int))
        revenue_nonzero_T = pm.Gamma('revenue_nonzero_T', alpha=alpha_T, beta=beta_T, observed=T_data[T_data > 0])

        # Likelihoods for C group
        is_nonzero_C = pm.Bernoulli('is_nonzero_C', p=theta_C, observed=(C_data > 0).astype(int))
        revenue_nonzero_C = pm.Gamma('revenue_nonzero_C', alpha=alpha_C, beta=beta_C, observed=C_data[C_data > 0])

        # Posterior predictive samples for comparison
        revenue_T = pm.Deterministic('revenue_T', theta_T * alpha_T / beta_T)
        revenue_C = pm.Deterministic('revenue_C', theta_C * alpha_C / beta_C)

        trace = pm.sample(samples, tune=tune, return_inferencedata=True, chains=2, target_accept=0.9)

    # Compute the posterior probability that T > C
    T_better = (trace.posterior['revenue_T'] > trace.posterior['revenue_C']).values.mean()

    percent_chance_T_better = T_better

    return percent_chance_T_better

In [5]:
def hurdle_gamma_test_bad(dataset, samples=3000, tune=1000):
    T_data = dataset[dataset['TestControl'] == 'T']['Revenue'].values
    C_data = dataset[dataset['TestControl'] == 'C']['Revenue'].values

    with pm.Model() as model:
        # Hurdle components (probability of non-zero revenue)
        theta_T = pm.Uniform('theta_T')
        theta_C = pm.Uniform('theta_C')

        # Gamma distribution parameters
        alpha_T = pm.HalfNormal('alpha_T', sigma=20)
        beta_T = pm.HalfNormal('beta_T', sigma=20)
        alpha_C = pm.HalfNormal('alpha_C', sigma=20)
        beta_C = pm.HalfNormal('beta_C', sigma=20)

        # Likelihoods for T group
        is_nonzero_T = pm.Bernoulli('is_nonzero_T', p=theta_T, observed=(T_data > 0).astype(int))
        revenue_nonzero_T = pm.Gamma('revenue_nonzero_T', alpha=alpha_T, beta=beta_T, observed=T_data[T_data > 0])

        # Likelihoods for C group
        is_nonzero_C = pm.Bernoulli('is_nonzero_C', p=theta_C, observed=(C_data > 0).astype(int))
        revenue_nonzero_C = pm.Gamma('revenue_nonzero_C', alpha=alpha_C, beta=beta_C, observed=C_data[C_data > 0])

        # Posterior predictive samples for comparison
        revenue_T = pm.Deterministic('revenue_T', theta_T * alpha_T / beta_T)
        revenue_C = pm.Deterministic('revenue_C', theta_C * alpha_C / beta_C)

        trace = pm.sample(samples, tune=tune, return_inferencedata=True, chains=2, target_accept=0.9)

    # Compute the posterior probability that T > C
    T_better = (trace.posterior['revenue_T'] > trace.posterior['revenue_C']).values.mean()

    percent_chance_T_better = T_better

    return percent_chance_T_better

In [6]:
%%capture

for i in range(numTests):
    data1 = generate_dataset(lift=lift)
    t_mean = data1[data1['TestControl']=='T']['Revenue'].mean()
    c_mean = data1[data1['TestControl']=='C']['Revenue'].mean()
    t_statistic, p_value = stats.ttest_ind(data1[data1['TestControl']=='T']['Revenue'].values, data1[data1['TestControl']=='C']['Revenue'].values)
    try:
        tBayes_bad = hurdle_gamma_test_bad(data1)
    except:
        tBayes_bad = 0.50
    try:
        tBayes_good= hurdle_gamma_test_good(data1)
    except:
        tBayes_good=0.50
    # Like I said, sometimes the notebook crashes or gets interrupted. This is why I am saving the results to a file each time #
    # This makes re-starting after crash easier.                                                                               #
    if os.path.exists(resultFile):
        with open(resultFile, 'rb') as fileName:
          resultList = pickle.load(fileName)
        resultList.append([tBayes_good, tBayes_bad, t_mean, c_mean, p_value])
        with open(resultFile, 'wb') as fileName:
            pickle.dump(resultList,fileName)
    else:
        resultList=[[tBayes_good, tBayes_bad,  t_mean, c_mean, p_value]]
        with open(resultFile, 'wb') as fileName:
            pickle.dump(resultList,fileName)       


Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (2 chains in 2 jobs)
NUTS: [theta_T, theta_C, alpha_T, beta_T, alpha_C, beta_C]
Sampling 2 chains for 1_000 tune and 3_000 draw iterations (2_000 + 6_000 draws total) took 194 seconds.
We recommend running at least 4 chains for robust computation of convergence diagnostics
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (2 chains in 2 jobs)
NUTS: [theta_T, theta_C, alpha_T, beta_T, alpha_C, beta_C]
Sampling 2 chains for 1_000 tune and 3_000 draw iterations (2_000 + 6_000 draws total) took 196 seconds.
We recommend running at least 4 chains for robust computation of convergence diagnostics
Initializing NUTS using jitter+adapt_diag...
Multiprocess sampling (2 chains in 2 jobs)
NUTS: [theta_T, theta_C, alpha_T, beta_T, alpha_C, beta_C]
Sampling 2 chains for 1_000 tune and 3_000 draw iterations (2_000 + 6_000 draws total) took 198 seconds.
We recommend running at least 4 chains for robust computation of c

In [7]:
print("All Done!")

All Done!
