In [1]:
#!wget -c ftp://ngs.sanger.ac.uk/production/ag1000g/phase1/AR3/variation/main/hdf5/ag1000g.phase1.ar3.pass.3L.h5


In [1]:
from multiprocessing.pool import Pool
from math import ceil

import numpy as np

import h5py

import dask
import dask.array as da
import dask.multiprocessing
dask.__version__

  from ._conv import register_converters as _register_converters


'0.19.2'

In [2]:
h5_3L = h5py.File('ag1000g.phase1.ar3.pass.3L.h5', 'r')
samples = h5_3L['/3L/samples']
positions = h5_3L['/3L/variants/POS']
num_samples = len(samples)
del samples

In [3]:
window_size = 50000
last_position = positions[-1]
num_windows = ceil(last_position / window_size)
limits = np.full((num_windows, 2), -1)
curr_window = positions[0] // window_size
limits[curr_window][0] = 0
for index, position in enumerate(positions):
    my_window = position // window_size
    if index % 1000000 == 0:
        print(index, position)
    if my_window != curr_window:
        limits[my_window, 0] = index
        limits[curr_window, 1] = index - 1
        curr_window = my_window
limits[num_windows - 1, 1] = len(positions)

In [9]:
limits = da.from_array(limits, chunks=(60, 2))

In [10]:
limits[0], limits[-1]

(dask.array<getitem, shape=(2,), dtype=int64, chunksize=(2,)>,
 dask.array<getitem, shape=(2,), dtype=int64, chunksize=(2,)>)

In [11]:
positions[-1] // window_size, num_windows

(839, 840)

In [12]:
h5_3L.close()
del h5_3L, positions

In [15]:
def get_hdf5():
    global calldata_genotype, alt_alleles, is_snp, num_samples
    
    try:
        calldata_genotype
    except NameError:
        import os
        print('Open', os.getpid())
        h5_3L = h5py.File('ag1000g.phase1.ar3.pass.3L.h5', 'r')
        samples = h5_3L['/3L/samples']
        calldata_genotype = h5_3L['/3L/calldata/genotype']
        alt_alleles = h5_3L['/3L/variants/ALT']
        is_snp = h5_3L['/3L/variants/is_snp']
    
    return calldata_genotype, is_snp, alt_alleles, num_samples
    
@da.as_gufunc(signature="(i)->()", output_dtypes=dict, vectorize=True)
def calc_statistics(v):
    calldata_genotype, is_snp, alt_alleles, num_samples = get_hdf5()
    start, end = v[0], v[1]
    min_maf = 0.5
    max_maf = 0.0
    non_bi = 0
    missing = 0
    non_snp = 0
    print(v)
    for pos in range(start, end + 1):
        if not is_snp[pos]:
            non_snp += 1
            continue
        if np.max(calldata_genotype[pos]) > 1:  
               non_bi += 1
               continue
        if np.min(calldata_genotype[pos]) < 0:  
               missing += 1
               continue
        num_alt = np.sum(calldata_genotype[pos])  # Because they are coded as 1
        num_ref = num_samples * 2 - num_alt  # (because all are called)
        min_called = min(num_ref, num_alt)
        maf = min_called / (2 * num_samples)
        if maf < min_maf:
            min_maf = maf
        if maf > max_maf:
            max_maf = maf
    return {'total': end - start + 1, 'missing': missing,
            'non_snp': non_snp, 'non_bi': non_bi,
            'min_maf': min_maf, 'max_maf': max_maf}

In [16]:
#XXX num_tasks?
stats = None
with dask.config.set(scheduler='multiprocessing'):
    stats = calc_statistics(limits[:10]).compute()


Open 3379
[ 0 43]
[ 44 965]
[ 966 1912]
[1913 3420]
[3421 3436]
[3437 3803]
[3804 5038]
[5039 6608]
[6609 6801]
[6802 7056]


In [17]:
limits[0,0].compute()

0

In [18]:
limits[0]

dask.array<getitem, shape=(2,), dtype=int64, chunksize=(2,)>

In [19]:
#persist

In [20]:
stats

array([{'total': 44, 'missing': 3, 'non_snp': 0, 'non_bi': 4, 'min_maf': 0.00065359477124183, 'max_maf': 0.0196078431372549},
       {'total': 922, 'missing': 153, 'non_snp': 0, 'non_bi': 32, 'min_maf': 0.00065359477124183, 'max_maf': 0.4869281045751634},
       {'total': 947, 'missing': 178, 'non_snp': 0, 'non_bi': 37, 'min_maf': 0.00065359477124183, 'max_maf': 0.4516339869281046},
       {'total': 1508, 'missing': 77, 'non_snp': 0, 'non_bi': 49, 'min_maf': 0.00065359477124183, 'max_maf': 0.39477124183006534},
       {'total': 16, 'missing': 0, 'non_snp': 0, 'non_bi': 2, 'min_maf': 0.00065359477124183, 'max_maf': 0.3235294117647059},
       {'total': 367, 'missing': 8, 'non_snp': 0, 'non_bi': 10, 'min_maf': 0.0, 'max_maf': 0.4503267973856209},
       {'total': 1235, 'missing': 8, 'non_snp': 0, 'non_bi': 54, 'min_maf': 0.0, 'max_maf': 0.3954248366013072},
       {'total': 1570, 'missing': 32, 'non_snp': 0, 'non_bi': 68, 'min_maf': 0.00065359477124183, 'max_maf': 0.3934640522875817},
  

In [None]:

#dask.config.set(scheduler='synchronous')
#dask.config.set(scheduler='threads')
