# Examining statistical tests

## Set up representative dataset

In [1]:
import sys
sys.path.append('..')


from pyllelic import pyllelic

config = pyllelic.configure(
    base_path="../assets/",
    prom_file="tert_genome.txt",
    prom_start=1293200,
    prom_end=1296000,
    chrom="5",
    offset=1293000,
)

files_set = pyllelic.make_list_of_bam_files(config)

data = pyllelic.pyllelic(config=config, files_set=files_set)

df = data.individual_data

Process BAM Files: 100%|██████████| 1/1 [00:00<00:00, 11.38it/s]
Process methylation: 100%|██████████| 1/1 [00:04<00:00,  4.46s/it]


In [2]:
import numpy as np
from scipy import stats

## Define test cases

In [3]:
tests = {  # Pulled from real dataset
    "allelic": df["1295771"].tolist()[0],
    "non-allelic": df["1293589"].tolist()[0],
    "question": df["1294420"].tolist()[0],
}

In [4]:
for val in tests.values():
    print(val)

[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]


## Define test approaches

In [5]:
def diffs(data_list):
    """Find the difference between list mean and mode, 
    assuming larger is  more allelic"""
    
    means = float(np.mean(data_list))
    modes = stats.mode(data_list)[0][0]
    return abs(means-modes)

In [6]:
def ad_stats(data_list):
    """Calculate Anderson-Darling stat, assuming normal distribution."""
    stat, crits, _ = stats.anderson(data_list)
    is_sig = bool(stat > crits[4])  # type: ignore
    return (is_sig, stat, crits)

In [7]:
"""
    Module for computing The Hartigans' dip statistic
    The dip statistic measures unimodality of a sample from a random process.
    See: 
    Hartigan, J. A.; Hartigan, P. M. The Dip Test of Unimodality. The Annals 
    of Statistics 13 (1985), no. 1, 70--84. doi:10.1214/aos/1176346577. 
    http://projecteuclid.org/euclid.aos/1176346577.
"""

import numpy as np
import collections

def _gcm_(cdf, idxs):
    work_cdf = cdf
    work_idxs = idxs
    gcm = [work_cdf[0]]
    touchpoints = [0]
    while len(work_cdf) > 1:
        distances = work_idxs[1:] - work_idxs[0]
        slopes = (work_cdf[1:] - work_cdf[0]) / distances
        minslope = slopes.min()
        minslope_idx = np.where(slopes == minslope)[0][0] + 1
        gcm.extend(work_cdf[0] + distances[:minslope_idx] * minslope)
        touchpoints.append(touchpoints[-1] + minslope_idx)
        work_cdf = work_cdf[minslope_idx:]
        work_idxs = work_idxs[minslope_idx:]
    return np.array(np.array(gcm)),np.array(touchpoints)

def _lcm_(cdf, idxs):
    g,t = _gcm_(1-cdf[::-1], idxs.max() - idxs[::-1])
    return 1-g[::-1], len(cdf) - 1 - t[::-1]

def _touch_diffs_(part1, part2, touchpoints):
    diff = np.abs((part2[touchpoints] - part1[touchpoints]))
    return diff.max(), diff

def dip(histogram=None, idxs=None):
    """
        Compute the Hartigans' dip statistic either for a histogram of
        samples (with equidistant bins) or for a set of samples.
    """
    if idxs is None:
        idxs = np.arange(len(histogram))
    elif histogram is None:
        h = collections.Counter(idxs)
        idxs = np.msort(h.keys())
        histogram = np.array([h[i] for i in idxs])
    else:
        if len(histogram) != len(idxs):
            raise ValueError("Need exactly as many indices as histogram bins.")
        if len(idxs) != len(set(idxs)):
            raise ValueError("idxs must be unique if histogram is given.")
        if not np.array_equal(np.msort(idxs), idxs):
            idxs_s = np.argsort(idxs)
            idx = np.asarray(idxs)[idxs_s]
            histogram = np.asarray(histogram)[idxs_s]

    cdf = np.cumsum(histogram, dtype=float)
    cdf /= cdf[-1]

    work_idxs = idxs
    work_histogram = np.asarray(histogram, dtype=float) / np.sum(histogram)
    work_cdf = cdf

    D = 0
    left = [0]
    right = [1]

    while True:
        left_part, left_touchpoints   = _gcm_(work_cdf - work_histogram, work_idxs)
        right_part, right_touchpoints = _lcm_(work_cdf, work_idxs)

        d_left, left_diffs   = _touch_diffs_(left_part, right_part, left_touchpoints)
        d_right, right_diffs = _touch_diffs_(left_part, right_part, right_touchpoints)

        if d_right > d_left:
            xr = right_touchpoints[d_right == right_diffs][-1]
            xl = left_touchpoints[left_touchpoints <= xr][-1]
            d  = d_right
        else:
            xl = left_touchpoints[d_left == left_diffs][0]
            xr = right_touchpoints[right_touchpoints >= xl][0]
            d  = d_left

        left_diff  = np.abs(left_part[:xl+1] - work_cdf[:xl+1]).max()
        right_diff = np.abs(right_part[xr:]  - work_cdf[xr:] + work_histogram[xr:]).max()

        if d <= D or xr == 0 or xl == len(work_cdf):
            the_dip = max(np.abs(cdf[:len(left)] - left).max(), np.abs(cdf[-len(right)-1:-1] - right).max())
            return the_dip/2, (cdf, idxs, left, left_part, right, right_part)
        else:
            D = max(D, left_diff, right_diff)

        work_cdf = work_cdf[xl:xr+1]
        work_idxs = work_idxs[xl:xr+1]
        work_histogram = work_histogram[xl:xr+1]

        left[len(left):] = left_part[1:xl+1]
        right[:0] = right_part[xr:-1]

def plot_dip(histogram=None, idxs=None):
    from matplotlib import pyplot as plt

    d,(cdf,idxs,left,left_part,right,right_part) = dip(histogram,idxs)


    plt.plot(idxs[:len(left)], left, color='red')
    plt.plot(idxs[len(left)-1:len(left)+len(left_part) - 1], left_part, color='gray')
    plt.plot(idxs[-len(right):], right, color='blue')
    plt.plot(idxs[len(cdf) - len(right) + 1 - len(right_part):len(cdf) - len(right) + 1], right_part, color='gray')

    the_dip = max(np.abs(cdf[:len(left)] - left).max(), np.abs(cdf[-len(right)-1:-1] - right).max())
    l_dip_idxs = np.abs(cdf[:len(left)] - left) == the_dip
    r_dip_idxs = np.abs(cdf[-len(right)-1:-1] - right) == the_dip
    print(the_dip/2, d)

    plt.vlines(x=idxs[:len(left)][l_dip_idxs], ymin=cdf[:len(left)][l_dip_idxs], ymax = cdf[:len(left)][l_dip_idxs] - the_dip)
    plt.vlines(x=idxs[-len(right):][r_dip_idxs], ymin=cdf[-len(right)-1:-1][r_dip_idxs], ymax = cdf[-len(right)-1:][r_dip_idxs] + the_dip)

    plt.plot(np.repeat(idxs,2)[1:], np.repeat(cdf,2)[:-1], color='black')
    plt.scatter(idxs, cdf)

    plt.show()


def crit_points(random_function, quantiles, sample_size, n_samples):
    """
        Compute the quantiles for the dip statistic for n_samples 
        samples of size sample_size from the random process given by 
        random_function.
        Parameters:
        random_function : a paramter-free function which returns random values.
        quantiles : a sequence of values between 0 and 1
        sample_size : the size of the samples to draw from random_function
        n_samples : the number of samples for which to compute dips
        Returns: a list such that the i'th value is the greatest dip observed
        such that the fraction of dips less than or equal to that value is less
        than the i'th value from quantiles.
    """
    data = [[random_function() for _ in range(sample_size)] for __ in range(n_samples)]
    dips = np.array([dip(idxs=samples)[0] for samples in data])
    
    return np.percentile(dips, [p * 100 for p in quantiles])

In [8]:
def hartigan_dip(data_list):
   """Calculate Hartigan dip test statistic"""
   return dip(data_list)[0]

In [12]:
def gaussian_mixture(data_list):
    train_data = np.array([[100,  0,  0,  0, 100],
       [100,  0,  0,  0, 100]])
    from sklearn.mixture import GaussianMixture
    GMM = GaussianMixture(n_components=2, random_state=0).fit(train_data)
    data_hist = np.histogram(data_list, bins=[0.0, 0.2, 0.4, 0.6, 0.8, 1.0])
    data_array = np.array([data_hist[0], data_hist[0]])
   #  print(data_array)
    
    return round(GMM.score(data_array),3)

In [13]:
methods = {  # Save our methods in a dictionary
    "diffs": diffs, 
    "ad_stats": ad_stats,
    "dip_test": hartigan_dip,
    "gaussian_mixture": gaussian_mixture,
}

## Test all the methods

In [14]:
for name, method in methods.items():
    print("="*20)
    print(f"testing {name}")
    print("-"*20)
    for k,v in tests.items():
        print(f"expected: {k}, result: {method(v)}")

testing diffs
--------------------
expected: allelic, result: 0.5
expected: non-allelic, result: 0.0
expected: question, result: 0.30434782608695654
testing ad_stats
--------------------
expected: allelic, result: (True, 8.099645009495575, array([0.536, 0.61 , 0.732, 0.854, 1.016]))
expected: non-allelic, result: (False, nan, array([0.543, 0.618, 0.742, 0.865, 1.029]))
expected: question, result: (True, 4.8279084069810345, array([0.511, 0.582, 0.699, 0.815, 0.969]))
testing dip_test
--------------------
expected: allelic, result: 0.02173913043478265
expected: non-allelic, result: 0.07142857142857145
expected: question, result: 0.07142857142857145
testing gaussian_mixture
--------------------
expected: allelic, result: -529000004.49
expected: non-allelic, result: -24500004.49
expected: question, result: -152500004.49
