In [3]:
import numpy as np
import dask
print('dask', dask.__version__)
import dask.array as da
import h5py
print('h5py', h5py.__version__)
import sys
sys.path.insert(0, '../..')
import allel
print('scikit-allel', allel.__version__)

dask 0.7.5
h5py 2.5.0
scikit-allel 0.21.0.dev0


In [2]:
import bcolz
print('bcolz', bcolz.__version__)

bcolz 0.12.1


In [17]:
bcolz.blosc_version()

('1.4.1', '$Date:: 2014-07-08 #$')

In [3]:
bcolz.detect_number_of_cores()

4

Let's use some real data...

In [4]:
callset = h5py.File('/data/coluzzi/ag1000g/data/phase1/release/AR3/variation/main/hdf5/ag1000g.phase1.ar3.pass.h5', mode='r')
callset

<HDF5 file "ag1000g.phase1.ar3.pass.h5" (mode r)>

In [5]:
genotype = allel.model.chunked.GenotypeChunkedArray(callset['3L/calldata/genotype'])
genotype

Unnamed: 0,0,1,2,3,4,...,760,761,762,763,764
0,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
1,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
2,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
3,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
4,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0


Copy the first 2 million rows into a bcolz carray to use for benchmarking.

In [6]:
g = genotype.copy(stop=2000000)
g

Unnamed: 0,0,1,2,3,4,...,760,761,762,763,764
0,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
1,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
2,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
3,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
4,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0


Out of interest, what chunk size did bcolz choose?

In [5]:
g.data.chunklen * g.shape[1] * g.shape[2]

2096100

How long does it take to decompress the data?

In [6]:
def toarray(x):
    np.asarray(x)

In [9]:
for n in 1, 2, 4, 8:
    bcolz.blosc_set_nthreads(n)
    print('--- blosc threads:', n, '---')
    %time toarray(g)
    print()

--- blosc threads: 1 ---
CPU times: user 1.83 s, sys: 152 ms, total: 1.98 s
Wall time: 1.97 s

--- blosc threads: 2 ---
CPU times: user 1.97 s, sys: 272 ms, total: 2.24 s
Wall time: 1.44 s

--- blosc threads: 4 ---
CPU times: user 2.37 s, sys: 252 ms, total: 2.62 s
Wall time: 1.41 s

--- blosc threads: 8 ---
CPU times: user 2.32 s, sys: 292 ms, total: 2.61 s
Wall time: 1.47 s



How long does it take to compute the maximum value?

In [7]:
def time_max(x):
    x = np.asarray(x)
    %time x.max()

In [11]:
time_max(g)

CPU times: user 2.98 s, sys: 0 ns, total: 2.98 s
Wall time: 2.97 s


Check that scikit-allel's chunked implementation of max() behaves as expected. Implementation is not threaded so time should equal time to decompress data plus time to compute max.

In [12]:
for n in 1, 2, 4, 8:
    bcolz.blosc_set_nthreads(n)
    print('--- blosc threads:', n, '---')
    %time g.max()
    print()

--- blosc threads: 1 ---
CPU times: user 5 s, sys: 0 ns, total: 5 s
Wall time: 4.98 s

--- blosc threads: 2 ---
CPU times: user 5.97 s, sys: 84 ms, total: 6.06 s
Wall time: 4.98 s

--- blosc threads: 4 ---
CPU times: user 6.36 s, sys: 76 ms, total: 6.43 s
Wall time: 4.92 s

--- blosc threads: 8 ---
CPU times: user 6.03 s, sys: 212 ms, total: 6.24 s
Wall time: 4.65 s



Now see how dask behaves.

In [13]:
gd = allel.GenotypeDaskArray.from_array(g)
gd

Unnamed: 0,0,1,2,3,4,...,760,761,762,763,764
0,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
1,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
2,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
3,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
4,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0


In [14]:
for n in 1, 2, 4, 8:
    bcolz.blosc_set_nthreads(n)
    for m in 1, 2, 4, 8:
        print('--- blosc threads:', n, '; dask threads:', m, '---')
        %time gd.max().compute(num_workers=m)
        print()

--- blosc threads: 1 ; dask threads: 1 ---
CPU times: user 7.09 s, sys: 824 ms, total: 7.92 s
Wall time: 7.73 s

--- blosc threads: 1 ; dask threads: 2 ---
CPU times: user 6.57 s, sys: 1.57 s, total: 8.14 s
Wall time: 5.16 s

--- blosc threads: 1 ; dask threads: 4 ---
CPU times: user 7.46 s, sys: 1.66 s, total: 9.12 s
Wall time: 4.07 s

--- blosc threads: 1 ; dask threads: 8 ---
CPU times: user 7.52 s, sys: 1.1 s, total: 8.62 s
Wall time: 3.93 s

--- blosc threads: 2 ; dask threads: 1 ---
CPU times: user 7.76 s, sys: 1.74 s, total: 9.51 s
Wall time: 7.85 s

--- blosc threads: 2 ; dask threads: 2 ---
CPU times: user 7.15 s, sys: 1.76 s, total: 8.91 s
Wall time: 4.57 s

--- blosc threads: 2 ; dask threads: 4 ---
CPU times: user 7.56 s, sys: 1.57 s, total: 9.14 s
Wall time: 3.79 s

--- blosc threads: 2 ; dask threads: 8 ---
CPU times: user 7.62 s, sys: 1.69 s, total: 9.31 s
Wall time: 3.85 s

--- blosc threads: 4 ; dask threads: 1 ---
CPU times: user 7.55 s, sys: 2.06 s, total: 9.6 s
Wall

See especially the case with 4 blosc threads and 4 dask threads. Total wall time here is substantially less than the sum of the time required to decompress the data with the same number of blosc threads and the time required to compute the maximum. So dask is able to do some work in parallel, even though bcolz does not release the GIL. 

In [18]:
bcolz.blosc_set_nthreads(4)
%timeit -n1 -r5 gd.max().compute(num_workers=4)

1 loops, best of 5: 3.4 s per loop


## with nogil

Hack bcolz to reinstate nogil sections around blosc_decompress...

In [1]:
import bcolz
bcolz.__version__

'0.12.2.dev1+dirty'

In [2]:
bcolz.blosc_version()

('1.4.1', '$Date:: 2014-07-08 #$')

Check compression time is unaffected.

In [9]:
for n in 1, 2, 4, 8:
    bcolz.blosc_set_nthreads(n)
    print('--- blosc threads:', n, '---')
    %time toarray(g)
    print()

--- blosc threads: 1 ---
CPU times: user 1.7 s, sys: 216 ms, total: 1.91 s
Wall time: 1.91 s

--- blosc threads: 2 ---
CPU times: user 2.13 s, sys: 220 ms, total: 2.35 s
Wall time: 1.54 s

--- blosc threads: 4 ---
CPU times: user 2.38 s, sys: 272 ms, total: 2.65 s
Wall time: 1.46 s

--- blosc threads: 8 ---
CPU times: user 2.27 s, sys: 332 ms, total: 2.6 s
Wall time: 1.48 s



Check scikit-allel's chunked implementation is unaffected.

In [10]:
for n in 1, 2, 4, 8:
    bcolz.blosc_set_nthreads(n)
    print('--- blosc threads:', n, '---')
    %time g.max()
    print()

--- blosc threads: 1 ---
CPU times: user 5.03 s, sys: 4 ms, total: 5.03 s
Wall time: 5.03 s

--- blosc threads: 2 ---
CPU times: user 6.18 s, sys: 52 ms, total: 6.23 s
Wall time: 5.11 s

--- blosc threads: 4 ---
CPU times: user 6.32 s, sys: 116 ms, total: 6.44 s
Wall time: 4.92 s

--- blosc threads: 8 ---
CPU times: user 6.44 s, sys: 164 ms, total: 6.6 s
Wall time: 5 s



Now see if dask does any better...

In [9]:
gd = allel.GenotypeDaskArray.from_array(g)
gd

Unnamed: 0,0,1,2,3,4,...,760,761,762,763,764
0,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
1,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
2,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
3,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0
4,0/0,0/0,0/0,0/0,0/0,...,0/0,0/0,0/0,0/0,0/0


In [12]:
for n in 1, 2, 4, 8:
    bcolz.blosc_set_nthreads(n)
    for m in 1, 2, 4, 8:
        print('--- blosc threads:', n, '; dask threads:', m, '---')
        %time gd.max().compute(num_workers=m)
        print()

--- blosc threads: 1 ; dask threads: 1 ---
CPU times: user 7.38 s, sys: 924 ms, total: 8.3 s
Wall time: 8.13 s

--- blosc threads: 1 ; dask threads: 2 ---
CPU times: user 6.83 s, sys: 1.59 s, total: 8.42 s
Wall time: 4.41 s

--- blosc threads: 1 ; dask threads: 4 ---
CPU times: user 7.68 s, sys: 1.84 s, total: 9.52 s
Wall time: 3.21 s

--- blosc threads: 1 ; dask threads: 8 ---
CPU times: user 7.43 s, sys: 964 ms, total: 8.4 s
Wall time: 3.02 s

--- blosc threads: 2 ; dask threads: 1 ---
CPU times: user 7.71 s, sys: 1.84 s, total: 9.55 s
Wall time: 7.93 s

--- blosc threads: 2 ; dask threads: 2 ---
CPU times: user 7.18 s, sys: 1.66 s, total: 8.83 s
Wall time: 4.14 s

--- blosc threads: 2 ; dask threads: 4 ---
CPU times: user 7.56 s, sys: 1.58 s, total: 9.14 s
Wall time: 3.65 s

--- blosc threads: 2 ; dask threads: 8 ---
CPU times: user 7.74 s, sys: 1.72 s, total: 9.46 s
Wall time: 3.79 s

--- blosc threads: 4 ; dask threads: 1 ---
CPU times: user 7.6 s, sys: 628 ms, total: 8.23 s
Wall 

In [13]:
bcolz.blosc_set_nthreads(1)
%timeit -r5 gd.max().compute(num_workers=4)

1 loops, best of 5: 2.9 s per loop


In [14]:
bcolz.blosc_set_nthreads(1)
%timeit -r5 gd.max().compute(num_workers=8)

1 loops, best of 5: 3.23 s per loop


In [15]:
bcolz.blosc_set_nthreads(4)
%timeit -r5 gd.max().compute(num_workers=4)

1 loops, best of 5: 3.52 s per loop


## Try to reproduce segfaults...

In [10]:
bcolz.blosc_set_nthreads(1)

1

In [11]:
gd.astype('f4').max().compute()

3.0

In [12]:
gd.mean(axis=1).sum().compute(num_workers=4)

170767.96339869278

In [13]:
((gd + gd) * gd).std(axis=0).compute()

array([[ 0.50495372,  0.84406405],
       [ 0.48935613,  0.81991694],
       [ 0.48457914,  0.8209112 ],
       ..., 
       [ 0.50302364,  0.84650751],
       [ 0.49943217,  0.8405899 ],
       [ 0.5033905 ,  0.84399184]])