# Distance-Based tests

We can examine the distance based effects used for permanova and the mantel test. Here, the observations are not independent, permutative testing is required...

Due to the time requirements of permutation tests, these simulations were run on the Knight Lab's super computer. The functions used on the computer and parameters used to produce the power files are reproduced here. However, we recommend that this notebook be for review, and not be run without a super computer.

In [1]:
import functools
import itertools
from multiprocessing import Pool
import os
import pickle

import numpy as np
import pandas as pd
import scipy
import skbio

from power import subsample_power

In [2]:
np.random.seed(25) 

In [3]:
num_cpus = 10
overwrite = True

In [None]:
num_rounds = 100

In [4]:
sim_location = './simulations/'
if not os.path.exists(sim_location):
    raise ValueError('The simulations do not exist. '
                     'Go back and simulate some data!')

In [5]:
num_rounds = 100

In [6]:
distributions = {}

We're going to vary the number of samples we draw and the level of the test.

In [7]:
depths = [20, 100, 1000]
alphas = [0.05, 0.01, 0.001]

# Power Calculation

## Permanova

In [17]:
def calculate_permanova_power(simulation, alpha, depth, power_fp):
    """Description strings are awesome!"""

    # Draws the groups and distance matrix and idenifies the sample
    dm, groups = simulation['samples']
    samples = [groups.loc[groups == i].index for i in [0, 1]]

    # Sets up the count depth because we cannot use bootstrapping easily
    # (unless we cheat? Do I need to cheat?)
    counts = np.arange(5, len(samples[0]) - 10, 10)

    # Defines the statistical test
    def test(ids):
        obs = np.hstack(ids)
        res = skbio.stats.distance.permanova(
            distance_matrix=dm.filter(obs),
            grouping=groups.loc[obs],
            permutations=depth,
        )
        return res['p-value']
    
    # Calculates power
    power = subsample_power(test=test,
                            samples=samples,
                            counts=counts,
                            num_iter=100,
                            num_runs=5,
                            alpha=alpha,
                            bootstrap=False,
                            )
    
    # Generates the summary dictionary
    power_summary = {'emperical_power': power,
                     'traditional_power': None,
                     'original_p': test(samples),
                     'num_obs': len(samples[0]),
                     'counts': counts,
                     }
    
    with open(power_fp, 'wb') as f_:
        pickle.dump(power_summary, f_)

In [23]:
sim_dir = os.path.join(sim_location, 'data/permanova')
# Creates the necessary power directories
power_dir = os.path.join(sim_location, 'power/permanova/trials/iter_%i-alpha_%s')
for (depth, alpha) in itertools.product(depths, alphas):
    #  Skips over alpha values that won't give power results
    if (1/(depth + 1)) > alpha:
        continue
    if not os.path.exists(power_dir % (depth, alpha)):
        os.makedirs(power_dir % (depth, alpha))

In [None]:
p = Pool(num_cpus)

for i in range(num_rounds):
    # Loads the simulation
    simulation_fp = os.path.join(sim_dir, 'simulation_%i.p' % i)
    with open(simulation_fp, 'rb') as f_:
        simulation = pickle.load(f_)
    
    # Iterates through the alpha values and depths
    for (depth, alpha) in itertools.product(depths, alphas):
        #  Skips over alpha values that won't give power results
        if (1/(depth + 1)) > alpha:
            continue

        # Generates the power calculation if appropriate
        power_fp = os.path.join(power_dir, 'simulation_%i.p') % (depth, alpha, i)
        if (overwrite or (not os.path.exists(power_fp))):
            sim_kwargs = {'simulation': simulation, 
                          'alpha': alpha, 
                          'depth': depth, 
                          'power_fp': power_fp
                          }
            p.apply_async(calculate_permanova_power, kwds=sim_kwargs)
p.close()
p.join()

## Mantel Calculation

In [48]:
def calculate_mantel_power(simulation, power_fp, alpha, depth):
    """Wrapper to calculate power for the mantel test"""
    
    # Loads the simulation data
    with open(os.path.join(sim_dir, 'simulation_%i.p' % i), 'rb') as f_:
        sim = pickle.load(f_)
    
    # Draws the groups and distance matrix and idenifies the sample
    x, y = sim['samples']
    samples = [np.array(x.ids)]
    
    # Sets up the counts vector
    counts = np.arange(5, len(samples[0]) - 10, 10)
    
    def test(samples):
        obs = samples[0]
        res = skbio.stats.distance.mantel(
            x.filter(obs),
            y.filter(obs),
            permutations=depth
        )
        return res[1]
    
    power = subsample_power(test=test,
                        samples=samples,
                        counts=counts,
                        num_iter=1000,
                        num_runs=3,
                        alpha=alpha,
                        bootstrap=False,
                        draw_mode='matched'
                        )
    # Generates the summary dictionary
    power_summary = {'emperical_power': power,
                     'traditional_power': None,
                     'original_p': test(samples),
                     'num_obs': len(samples[0]),
                     'depth': depth,
                     'alpha': alpha,
                     'counts': counts,
                     }
    
    # Saves the file
    with open(power_fp, 'wb') as f_:
        pickle.dump(power_summary, f_)

In [49]:
sim_dir = os.path.join(sim_location, 'data/mantel')
# Creates the necessary power directories
power_dir = os.path.join(sim_location, 'power/mantel/trials/iter_%i-alpha_%s')
for (depth, alpha) in itertools.product(depths, alphas):
    #  Skips over alpha values that won't give power results
    if (1/(depth + 1)) > alpha:
        continue
    if not os.path.exists(power_dir % (depth, alpha)):
        os.makedirs(power_dir % (depth, alpha))

In [None]:
p = Pool(num_cpus)

for i in range(num_rounds):
    # Loads the simulation
    simulation_fp = os.path.join(sim_dir, 'simulation_%i.p' % i)
    with open(simulation_fp, 'rb') as f_:
        simulation = pickle.load(f_)
    
    # Iterates through the alpha values and depths
    for (depth, alpha) in itertools.product(depths, alphas):
        #  Skips over alpha values that won't give power results
        if (1/(depth + 1)) > alpha:
            continue

        # Generates the power calculation if appropriate
        power_fp = os.path.join(power_dir, 'simulation_%i.p') % (depth, alpha, i)
        if (overwrite or (not os.path.exists(power_fp))):
            sim_kwargs = {'simulation': simulation, 
                          'alpha': alpha, 
                          'depth': depth, 
                          'power_fp': power_fp
                          }
            p.apply_async(calculate_mantel_power, kwds=sim_kwargs)
p.close()
p.join()