*Notes on reducing I/O bottlenecks in parallel computations with Dask and Zarr.*

When I'm analysing data I tend to keep one eye on the system monitor at the top of my screen. The height of the green bar tells me how much RAM I'm using and the height of the blue bar tells me how much CPU...

<p><image src='/assets/Selection_058.png' alt='status bar'/></p>

I'll kick off a computation then look up to see what's happening. Most of the time the blue bar chugs away at 1/8 height, which means that 1 of the 8 logical cores on my computer is fully utilised and the others are idle. I spend a fair amount of time waiting for computations to finish and it started to bug me that 7/8 CPU capacity was going spare. Many of the computations I run are simple and could be parallelized, so I started looking into ways of adapting my analysis code to make better use of multiple CPUs.

## Dask

One solution I really like is [Dask](http://dask.pydata.org). Using [``dask.array``](http://dask.pydata.org/en/latest/array.html) it's simple to parallelize a wide range of operations over numerical arrays, using either multiple threads or multiple processes. To evaluate Dask I wrote an alternative [Dask-backed implementation](http://scikit-allel.readthedocs.io/en/latest/model/dask.html) of some of the basic genetic data transformations I use every day. I then ran these on some [data from the Ag1000G project](https://www.malariagen.net/data/ag1000g-phase1-ar3) using Dask's multi-threaded scheduler, hoping to see my idle CPU fire up to maximum capacity...

In [1]:
import h5py
import multiprocessing
import dask
import dask.array as da
from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
from dask.diagnostics.profile_visualize import visualize
from cachey import nbytes
import bokeh
from bokeh.io import output_notebook
output_notebook()
from functools import reduce
import operator
import allel

In [2]:
# setup input data
callset_path = '/data/coluzzi/ag1000g/data/phase1/release/AR3/variation/main/hdf5/ag1000g.phase1.ar3.pass.3R.h5'
callset = h5py.File(callset_path, mode='r')
genotype = callset['3R/calldata/genotype']
genotype

<HDF5 dataset "genotype": shape (13167162, 765, 2), type "|i1">

In [22]:
genotype.compression, genotype.compression_opts

('gzip', 3)

In [3]:
# how many cores on this computer?
multiprocessing.cpu_count()

8

In [14]:
# when I made the HDF5 file I set the chunks a bit too small, so let's operate on bigger chunks
chunks = (genotype.chunks[0], genotype.chunks[1] * 20, genotype.chunks[2])

# run the allele count computation via Dask
gd = allel.model.dask.GenotypeDaskArray.from_array(genotype, chunks=chunks)
ac = gd.count_alleles(max_allele=3)
with ResourceProfiler(dt=1) as rprof:
    ac.compute(num_workers=multiprocessing.cpu_count())
visualize(rprof);

As you can see from the plot above, I do manage to use ~2 cores (200% CPU) but I don't get anywhere near full CPU utilisation (800%). The limiting factor is related to I/O, in particular [h5py](h5py.org) which I've used to pull input data out of an [HDF5 file](https://www.hdfgroup.org/HDF5/). The h5py library is a totally awesome piece of software that I use every day, but HDF5 is not designed to support multi-threaded data access. Also, h5py holds onto the [GIL](https://wiki.python.org/moin/GlobalInterpreterLock), a Python technicality which means other threads cannot run while h5py is doing anything, even if the other threads want to do something completely unrelated to HDF5 I/O.

## Zarr

Recently I've been working on [Zarr](zarr.readthedocs.io/), a new Python library for chunked, compressed, N-dimensional data. Previously I [introduced Zarr](http://alimanfoo.github.io/2016/04/14/to-hdf5-and-beyond.html) and showed how it can be used to get fast access into large multi-dimensional arrays. The other thing Zarr can do is let you read or write to an array from multiple threads or processes in parallel. Also, Zarr releases the GIL during compression and decompression, so other threads can carry on working.

Here's the allele count example again, but this time using a Zarr array as the input data source:

In [5]:
import zarr
zarr.__version__

'1.0.0b6'

In [18]:
# setup a Zarr array, copying in genotype data from the HDF5 file
# (N.B., let's use the similar compression options as the HDF5 file for a fairer
# comparison, although other compressors might be faster)
genotype_zarr = zarr.array(genotype, chunks=chunks, compression='blosc',
                           compression_opts=dict(cname='zlib', clevel=3, shuffle=0))
genotype_zarr

zarr.core.Array((13167162, 765, 2), int8, chunks=(6553, 200, 2), order=C)
  compression: blosc; compression_opts: {'clevel': 3, 'cname': 'zlib', 'shuffle': 0}
  nbytes: 18.8G; nbytes_stored: 645.8M; ratio: 29.8; initialized: 8040/8040
  store: builtins.dict

In [21]:
# run allele count computation via dask
gdz = allel.model.dask.GenotypeDaskArray.from_array(genotype_zarr, chunks=chunks)
acz = gdz.count_alleles(max_allele=3)
with ResourceProfiler(dt=1) as rprof:
    acz.compute(num_workers=multiprocessing.cpu_count())
visualize(rprof);

This time I get nearly full CPU usage. Also the computation is about 4 times faster, which is about what you'd expect if I am utilising 4 times more CPU.

## Distributed computing

I'm currently focused on making better use of multi-core processors, but others like [Matt Rocklin](@@TODO) are working on frameworks for large-scale distributed computing. After I released Zarr v0.4.0 back in @@MONTH, Matt got in touch to suggest a reorganization of the code so that Zarr arrays could be stored in distributed systems like S3 or HDFS. Earlier this week I [released Zarr v1.0.0](@@TODO) which includes a new storage architecture and a number of other improvements. 

Here is Matt on using the new version of Zarr with Dask and S3 on a 20 node cluster...

@@TODO youtube

Dask’s distributed scheduler looks seriously cool. It’s exciting to think that computations I’m currently coding to run in parallel via Dask on my multi-core laptop could in future be scaled up to a large compute cluster without any extra work at all.

To go with the new Zarr release there is some [new documentation](@@TODO), including a [tutorial](@@TODO), [API reference](@@TODO) and [storage specification](@@TODO). Please bear in mind that Zarr is still a young project, so if you do take it for a spin, any [feedback is appreciated](@@TODO).

## Further reading

* Zarr
* Dask
* h5py
* h5py parallel