In [None]:
#!wget -c ftp://ngs.sanger.ac.uk/production/ag1000g/phase1/AR3/variation/main/hdf5/ag1000g.phase1.ar3.pass.3L.h5


In [None]:
from multiprocessing.pool import Pool
from math import ceil

import numpy as np

import h5py

import dask
import dask.array as da
import dask.multiprocessing
dask.__version__

In [None]:
h5_3L = h5py.File('ag1000g.phase1.ar3.pass.3L.h5', 'r')
samples = h5_3L['/3L/samples']
positions = h5_3L['/3L/variants/POS']
num_samples = len(samples)
del samples

In [None]:
window_size = 50000
last_position = positions[-1]
num_windows = ceil(last_position / window_size)
limits = np.full((num_windows, 2), -1)
curr_window = positions[0] // window_size
limits[curr_window][0] = 0
for index, position in enumerate(positions):
    my_window = position // window_size
    if index % 1000000 == 0:
        print(index, position)
    if my_window != curr_window:
        limits[my_window, 0] = index
        limits[curr_window, 1] = index - 1
        curr_window = my_window
limits[num_windows - 1, 1] = len(positions)

In [None]:
limits = da.from_array(limits, chunks=(60, 2))

In [None]:
print(limits[0], limits[-1])

In [None]:
print(positions[-1] // window_size, num_windows)

In [None]:
h5_3L.close()
del h5_3L, positions

In [None]:
def get_hdf5():
    global calldata_genotype, alt_alleles, is_snp, num_samples
    
    try:
        calldata_genotype
    except NameError:
        import os
        print('Open', os.getpid())
        h5_3L = h5py.File('ag1000g.phase1.ar3.pass.3L.h5', 'r')
        samples = h5_3L['/3L/samples']
        calldata_genotype = h5_3L['/3L/calldata/genotype']
        alt_alleles = h5_3L['/3L/variants/ALT']
        is_snp = h5_3L['/3L/variants/is_snp']
    
    return calldata_genotype, is_snp, alt_alleles, num_samples
    
@da.as_gufunc(signature="(i)->()", output_dtypes=dict, vectorize=True)
def calc_statistics(v):
    calldata_genotype, is_snp, alt_alleles, num_samples = get_hdf5()
    start, end = v[0], v[1]
    min_maf = 0.5
    max_maf = 0.0
    non_bi = 0
    missing = 0
    non_snp = 0
    print(v)
    for pos in range(start, end + 1):
        if not is_snp[pos]:
            non_snp += 1
            continue
        if np.max(calldata_genotype[pos]) > 1:  
               non_bi += 1
               continue
        if np.min(calldata_genotype[pos]) < 0:  
               missing += 1
               continue
        num_alt = np.sum(calldata_genotype[pos])  # Because they are coded as 1
        num_ref = num_samples * 2 - num_alt  # (because all are called)
        min_called = min(num_ref, num_alt)
        maf = min_called / (2 * num_samples)
        if maf < min_maf:
            min_maf = maf
        if maf > max_maf:
            max_maf = maf
    return {'total': end - start + 1, 'missing': missing,
            'non_snp': non_snp, 'non_bi': non_bi,
            'min_maf': min_maf, 'max_maf': max_maf}

In [None]:
stats = None
with dask.config.set(scheduler='multiprocessing'):
    stats = calc_statistics(limits).compute()

In [None]:
limits[0,0].compute()

In [None]:
limits[0]

In [None]:
#persist

In [None]:
stats

In [None]:

#dask.config.set(scheduler='synchronous')
#dask.config.set(scheduler='threads')
