Randomization inference is probably the way we want to go in order to handle very skewed distributions of continuous metrics.  We need to be able to supply the following functionality:

1. Generic randomization inference for any scalar test statistic

2. A selection of pre-built test statistics along with documentation about what they mean and when you might want to use them

3. A method for doing hypothetical assignment, or supplying your own function (we may want to scale this up at some point to have some pre-built functions). We'll focus on binary assignment for now (i.e. only two variants), and we'll work to generalize this later.

4. Power analysis. This is relatively easy, but we need a way to do run time estimations as well

In [3]:
import os
import sys
import numpy as np
import pandas as pd
import scipy.stats as stats
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
import scipy 

from matplotlib import style
from tqdm.auto import tqdm

In [40]:
# Additional imports 
from functools import partial

In [4]:
# Set pandas preferences
pd.options.display.max_columns=500
pd.options.display.max_colwidth=500
pd.options.display.max_rows=500

In [5]:
# Set plot style
style.use("fivethirtyeight")

In [6]:
# Initialize tqdm for pandas
tqdm.pandas()

In [7]:
# Example dataset: from https://www.franciscoyira.com/post/randomization-inference-causal-mixtape/

In [8]:
name_list = ['Andy', 'Ben', 'Chad', 'Daniel', 'Edith', 'Frank', 'George', 'Hank']
d_list = [1,1,1,1,0,0,0,0]
y_list = [10,5,16,3,5,7,8,10]
y0_list = [np.nan, np.nan, np.nan, np.nan, 5, 7, 8, 10]
y1_list = [10,5,16,3,np.nan,np.nan,np.nan,np.nan]
id_unit_list = [1,2,3,4,5,6,7,8]

In [9]:
df = pd.DataFrame()
df['name'] = name_list
df['d'] = d_list
df['y'] = y_list
df['y0'] = y0_list
df['y1'] = y1_list
df['id_unit'] = id_unit_list

In [10]:
df

Unnamed: 0,name,d,y,y0,y1,id_unit
0,Andy,1,10,,10.0,1
1,Ben,1,5,,5.0,2
2,Chad,1,16,,16.0,3
3,Daniel,1,3,,3.0,4
4,Edith,0,5,5.0,,5
5,Frank,0,7,7.0,,6
6,George,0,8,8.0,,7
7,Hank,0,10,10.0,,8


In [26]:
t = "d"
val = 1

In [30]:
df.query("d==1")

Unnamed: 0,name,d,y,y0,y1,id_unit
0,Andy,1,10,,10.0,1
1,Ben,1,5,,5.0,2
2,Chad,1,16,,16.0,3
3,Daniel,1,3,,3.0,4


In [31]:
df.query("{0}==@val".format(t))

Unnamed: 0,name,d,y,y0,y1,id_unit
0,Andy,1,10,,10.0,1
1,Ben,1,5,,5.0,2
2,Chad,1,16,,16.0,3
3,Daniel,1,3,,3.0,4


In [32]:
def ri_test_statistic_difference_in_means(df, outcome_col, treatment_col, treatment_name, control_name):
    
    sdo = df.query("{0}==@treatment_name".format(treatment_col))[outcome_col].mean() - df.query("{0}==@control_name".format(treatment_col)).mean()
    
    return sdo

In [34]:
def ri_test_statistic_difference_in_ks(df, outcome_col, treatment_col, treatment_name, control_name, alternative='two-sided'):
    
    ks_ = stats.ks_2samp(df.query("{0}==@treatment_name".format(treatment_col))[outcome_col].values - df.query("{0}==@control_name".format(treatment_col)).values, alternative=alternative)
    
    return ks_

In [39]:
def ri_test_statistic_difference_in_quantiles(df, outcome_col, treatment_col, treatment_name, control_name, quantile=0.5):
    
    q_diff = df.query("{0}==@treatment_name".format(treatment_col))[outcome_col].quantile(q=quantile) - df.query("{0}==@control_name".format(treatment_col)).quantile(q=quantile)
    
    return q_diff

The steps here for analyzing experimental data are:

1. Choose a sharp null

2. Pick a test statistic

3. Select randomization procedure 
    
    a. Run the permutations

4. Get the p-values by comparing to the true statistic with the simulated distribution

5. Calculate confidence intervals

For power analysis:

1. Choose a sharp null

2. Pick a test statistic

3. Select the desired power and confidence level

4. compute mme

We'll need to run sample size estimation as well, so probably we'll need to sample the historical data and run calculations from there. Then tie the sample sizes back to run times

In [22]:
def test_function(a, b):
    return a+b

In [49]:
test_2 = partial(test_function, **None)

TypeError: functools.partial() argument after ** must be a mapping, not NoneType

In [48]:
test_2(3)

5

In [16]:
callable("im a string")

False

In [14]:
callable(test_function)

True

In [23]:
test_dict = {'func1': test_function}

In [25]:
test_dict['func1'](1, 2)

3

In [None]:
class RandomizationInference:
    
    _supported_test_statistics = {'difference_in_means': ri_test_statistic_difference_in_means, 
                                  'difference_in_percentiles': ri_test_statistic_difference_in_quantiles, 
                                  'difference_in_ks_statistic': ri_test_statistic_difference_in_ks}
    
    def __init__(self):
        # self.df = df
        # self.sharp_null = sharp_null
        # self.test_statistic = test_statistic
        # self.outcome_column_name = outcome_column_name
        # self.treatment_column_name = treatment_column_name
        # self.treatment_name = treatment_name
        # self.control_name = control_name
        # self.num_permutations = num_permutations
        
    @static_method
    def _sharp_null(df_, sharp_null_type, sharp_null_value, outcome_column_name):
        
        if sharp_null_type = 'additive':
            df_['outcome_sharp_null'] = df_[outcome_column_name] + sharp_null_value
        else:
            df_['outcome_sharp_null'] = df_[outcome_column_name] * df_[outcome_column_name] * sharp_null_value
        
        return df_
    
    
    @static_method
    def _select_test_statistic(test_statistic)
        
        if type(test_statstic['function']) == str:
            assert test_statistic['function'] in _supported_test_statistics, "test statistic {0} is not currently support. Please select from {1} for implement your own function".format(test_statstic['function'], _supported_test_statistics)
        else:
            assert callable(test_statistic['function']), "supplied custom test statistic {0} is neither a supported type nor a function".format(test_statstic['function'])
        
        if test_statistic['params'] is None:
            test_statistic['params'] = {}
        else:
            assert type(test_statistic['params']) == dict, "Please pass a dictionary of parameters to your test statistic function. The keys should be the parameter names and the values the parameter values."
        
        test_statistic_function = partial(test_statistic['function'], **test_statistic['params'])
        
        return test_statistic_function

    
    @static_method
    def _make_hypothetical_assignments(df_, treatment_assignment_probability):
        
        assert treatment_assignment_probability > 0 and treatment_assignment_probability < 1, "Treatment assignment probabilities must be great than 0 and less than 1. Received {0}".format(treatment_assignment_probability)
        
        # set the random seed
        np.random.seed()
        
        df_['ri_in_treatment'] = stats.binom.rvs(n=1, p=treatment_assignment_probability, size=df_.shape[0])
        
        return df_
    
    
    @static_method
    def _p_value(observed_statistic_value, df_sims, alternative):
        
        df_ = df_sims.copy()
        
        if not observed_statistic_value in df_['test_statistic'].values:
            add_row = pd.DataFrame("permutation": [-1], "test_statistic": observed_statistic_value)
            df_ = pd.concat([add_row, df_])
            observed_perm = -1
        else:
            observed_perm = df_.loc[(df_['test_statistic'] == observed_statistic_value)]['permutation'].values[0]
        
        if alternative == 'two-sided':
            df_['rank_column'] = np.abs(df_['test_statistic'])
        else:
            df_['rank_column'] = df_['test_statistic']
        
        df_['rank'] = df_['rank_column'].rank(method='max', ascending=False)
        
        p_value = df_.query("permutation==@observed_perm")['rank'].values[0] / df_.shape[0]
        
        return p_value
    
    
    def _run_randomization_inference(df, test_statistic_function, treatment_assignment_probability):
        
        df_ = self._make_hypothetical_assignments(df_=df_, treatment_assignment_probability=treatment_assignment_probability)
        
        stat_ = test_statistic_function(df=df_, outcome_col='outcome_sharp_null', treatment_col='ri_in_treatment', treatment_name=1, control_name=0)
        
        return stat_
    
    
    def experimental_analysis(self, df, sharp_null_type='additive', sharp_null_value=0, test_statistic={'function': 'difference_in_means', 'params': None}, treatment_assignment_probability=0.5, outcome_column_name='y', treatment_column_name='d', treatment_name=1, control_name=0, num_permutations=1000, alternative='two-sided', confidence=0.95):
        
        assert sharp_null_type in ['additive', 'multiplicative'], "only additive or multiplicative sharp nulls are supported. Received {0}".format(sharp_null_type)
        
        assert type(num_permutations) == int, "Only an interger number of permutations is possible. Received {0}".format(num_permutations)
        
        assert alternative in ['two-sided', 'less', 'greater'], "Only {0} alternatives are supported. Received {0}".format(alternative)
        
        # Copy the input DataFrame so that we don't modify the original data in_place
        df_ = df.copy()
        
        # Step 1: implement the selected sharp null
        df_ = self._sharp_null(df_=df_, sharp_null_type=sharp_null_type, sharp_null_value=sharp_null_value, outcome_column_name=outcome_column_name)
        
        # Step 2: pick a test statistic. We have a list of pre-built ones, otherwise a function must be supplied
        # This function must consume a DataFrame and return a single scalar value
        test_statistic_function = self._select_test_statistic(test_statistic=test_statistic)
        
        # Step 3: Select a randomization proceedure. 
        # Currently this only supports random assignment via coin flip. The probability of being in the treatment group is customizable
        # Also run the simulations
        print('Running {0} permutations'.format(num_permutations))
        sim_dict = {i: self._run_randomization_inference(df=df_, test_statistic_function=test_statistic_function, treatment_assignment_probability=treatment_assignment_probability) for i in tqdm(range(num_permutations))}
        
        df_sims = pd.DataFrame(sim_dict, orient='index')
        df_sims = df_sims.reset_index()
        df_sims.columns = ['permutation', 'test_statistic']
    
        # Step 4: calculate p-values
        # First, calculate the observed statistic difference
        observed_test_satistic = test_statistic_function(df=df_, outcome_col=outcome_column_name, treatment_col=treatment_column_name, treatment_name=treatment_name, control_name=control_name)
        
        p_value = _p_value(observed_statistic_value=observed_test_satistic, df_sims=df_sims, alternative=alternative)
        
        # Step 5: calculate confidence interval
        
        # final output
        
    
    def randomization_inference(self, df, sharp_null, test_statistic, outcome_column_name='y', treatment_column_name='d', treatment_name=1, control_name=0, num_permutations=1000):
        