In [5]:
# from https://github.com/pymc-devs/pymc

%matplotlib inline
import matplotlib.pyplot as plt
plt.rcParams['lines.linewidth'] = 2
plt.rcParams['lines.color'] = 'r'
plt.rcParams['font.size'] = 15
plt.rcParams['axes.labelsize'] = 20
plt.rcParams['axes.titlesize'] = 10
plt.rcParams['xtick.labelsize'] = 12
plt.rcParams['ytick.labelsize'] = 12
plt.rcParams['legend.fontsize'] = 10
plt.rcParams["axes.edgecolor"] = "black"
plt.rcParams['axes.titlesize'] = 20
plt.rcParams['figure.figsize'] = 8, 8
from IPython.display import Image
color = 'lightsteelblue'

In [43]:
import pymc3 as pm
import random
import numpy as np
from scipy.stats import beta, binom, bernoulli, poisson
import pandas as pd
import math 
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import gridspec
from collections import Counter
import theano.tensor as tt
import scipy.stats as stats
from scipy.integrate import quad
import itertools
import random

# Chapter 13: Goals, Power, and Sample Size

## Goals

There are many goals that can be achieved during an experiment. Some common ones are below:

1. Reject a null value - show the ROPE excludes the HDI
2. Affirm a predicted parameter value - show that the ROPE around the predicted value includes the HDI
3. Achieve precision in the estimate of a parameter - show that the posterior HDI has a width less than a specified maximum
4. Average length criterion - the average HDI width across a lot of simulated data should be less than some pre-detertimened value L. 
5. Average coverage criterion - given a specific HDI width, the average mass of the HDI must exceed some pre-determined value (usually 95%)
6. Entropy - using entropy as a measure of precision of the posterior distribribution

The main difficulty with these goals is getting a representative random sample from a population.


## Power

The power is defined as the probability of achieving a goal given the experiment and sampling type. In NHST, the goal is to always reject the null hypothesis, only sampling method is to stop at a certain number of samples, and the hypothesis is a specific value of a parameter. 

There are three main ways to increase the power of an experiment. 

1. Reduce random influencement on the outcome - only possible in some conductable experiments
2. Amplify the magnitude of the effect - only possible in some conductable experiments
3. Increase sample size - only possible in some experiments and observations

## Sample Size

Large sample sizes tend to be good because the likilihood and thus the posterior function tends to get narrower and narrower. The only downside is the computational power required. Sometimes, a large sample size is not necessary. For example, if a very specific and accurate data generating process is known, then a small sample size is just as accurate as a large sample. But, if no prior is known, large samples are generally better.

Since sample size increases power, a minimum sample size would be good to know prior to running an experiment. 

## Computing Power and Sample Size

### When the goal is to exclude a null value

The goal is to show that the ROPE excludes the HDI. We use a typical coin example; 2000 flips with 65% heads. 

1. Make a hypothetical distribution to represent parameters for simulated data. The easiest and most intuitive away is to use prior data or realistic hypothical prior data. In our example, the data-generating hypothetical method is a beta distribution with mode $\omega = 0.65$ and concentration $\kappa = 2000$.  

2. Randomly draw the parameter value from the hypothetical method and then simulate data.

3. Using an appropriate prior (such as uniform), check if the ROPE excludes the null. In this case, this null value is $\theta = 0.5$. 

4. Repeat steps 2 and 3 many times. The number of times the HDI is excluded is tallied. The fraction of succeses is the power.

In [59]:
omega = 0.65
kappa = 2000
a = omega * (kappa - 2) + 1
b = (1 - omega) * (kappa - 2) + 1
rope = [0.48,0.52]

priors = [0.6,0.65,0.7,0.75,0.8,0.85]
powers = [0.7,0.8,0.9]

N = 2000

def coin_flip(pior,N):
    
    return sum()

def sample(a,b,N,z):
    with pm.Model() as power_model:

        theta = pm.Beta('theta', a,b)
        y = pm.Binomial('y', n=N, p=theta, observed=z)
        trace = pm.sample(1000, tune=100, burn=200, thin=10, cores = 1,
                          progressbar = False, nuts_kwargs={'target_accept': 0.90})
        
        return trace
    
prior_power = list(itertools.product(priors,powers))
results = {x:0 for x in prior_power}

for prior, power in prior_power:
    
    print(prior,power)
    success = 0
    count = 1
    while success/count <= power:
        print(count, success/count)
        trace = sample(a,b,N,prior)
        hdi_low, hdi_high = pm.stats.hpd(trace['theta'])
        
        if not(rope_low <= hdi_low and rope_high >= hdi_high): 
            success += 1
        count += 1
    
    results[(prior,power)] = success
        

0.6 0.7
1 0.0


Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...


KeyboardInterrupt: 

In [62]:
prior = 0.85
power = 0.7


def coin_flip(prior,N):
    return sum(1 if random.uniform(0,1) <= prior else 0 for x in range(N))

def sample(a,b,N,z):
    with pm.Model() as power_model:

        theta = pm.Uniform('theta')
        y = pm.Binomial('y', n=N, p=theta, observed = z)
        trace = pm.sample(1000, tune=100, burn=200, thin=10, cores = 1,
                          progressbar = False, nuts_kwargs={'target_accept': 0.90})
        
        return trace
    
print(prior,power)
success = 0
count = 1

while success/count <= power:
    print(count, success/count)
    z = coin_flip(prior,N)
    trace = sample(a,b,N,z)
    hdi_low, hdi_high = pm.stats.hpd(trace['theta'])
    print(hdi_low,hdi_high)
    if not(rope_low <= hdi_low and rope_high >= hdi_high): 
        success += 1
    count += 1


Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...


0.85 0.7
1 0.0


Sequential sampling (2 chains in 1 job)
NUTS: [theta_interval__]
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...


0.835882213132 0.867271371609
2 0.5


Sequential sampling (2 chains in 1 job)
NUTS: [theta_interval__]
Auto-assigning NUTS sampler...
Initializing NUTS using jitter+adapt_diag...


0.830892742651 0.864067368667
3 0.6666666666666666


Sequential sampling (2 chains in 1 job)
NUTS: [theta_interval__]
The acceptance probability does not match the target. It is 0.964917483594, but should be close to 0.9. Try to increase the number of tuning steps.
The acceptance probability does not match the target. It is 0.962377977456, but should be close to 0.9. Try to increase the number of tuning steps.


0.831561836222 0.862345033387


In [61]:
sample_size = 4
notPowerfulEnough = True

while(notPowerfulEnough):
    zvec = np.array(range(0,sample_size+1))
    pzvec = beta(a,b,zvec)
    hdis = [hpd(x, 0.95) for x in pzvec]
    powerHDI = sum()
    powerHDI = sum( pzvec[ hdiMat[,1] > ROPE[2] | hdiMat[,2] < ROPE[1] ] )

3

In [82]:
def hpd(x, alpha):
    """Calculate highest posterior density (HPD) of array for given alpha. The HPD is the
    minimum width Bayesian credible interval (BCI).
    :Arguments:
      x : Numpy array
          An array containing MCMC samples
      alpha : float
          Desired probability of type I error
    """

    # Make a copy of trace
    x = x.copy()

    # For multivariate node
    if x.ndim > 1:

        # Transpose first, then sort
        tx = tr(x, list(range(x.ndim)[1:]) + [0])
        dims = shape(tx)

        # Container list for intervals
        intervals = np.resize(0.0, (2,) + dims[:-1])

        for index in make_indices(dims[:-1]):

            try:
                index = tuple(index)
            except TypeError:
                pass

            # Sort trace
            sx = np.sort(tx[index])

            # Append to list
            intervals[0][index], intervals[1][index] = calc_min_interval(sx, alpha)
        
        # Transpose back before returning
        return np/array(intervals)

    else:
        # Sort univariate node
        sx = np.sort(x)

        return np.array(calc_min_interval(sx, alpha))
    
def calc_min_interval(x, alpha):
    """Internal method to determine the minimum interval of
    a given width
    Assumes that x is sorted numpy array.
    """

    n = len(x)
    cred_mass = 1.0 - alpha

    interval_idx_inc = int(np.floor(cred_mass * n))
    n_intervals = n - interval_idx_inc
    interval_width = x[interval_idx_inc:] - x[:n_intervals]

    if len(interval_width) == 0:
        print_('Too few elements for interval calculation')
        return [None, None]

    min_idx = np.argmin(interval_width)
    hdi_min = x[min_idx]
    hdi_max = x[min_idx + interval_idx_inc]
    return [hdi_min, hdi_max]

array([ nan,   0.,   0.,   0.,   0.])

In [83]:
a

1299.7

In [79]:
beta.pdf(0.9,1,1)

1.0