# Dask is Much Faster with Parquet Files than HDF5
Michael Wood-Vasey  
*Last Run*: 2018-10-18  

Dask is 10-100x faster with Parquet files than HDF5 when retrieving only two columns.  It takes 0.7 seconds to compute average g-r color across 6.9 million rows for a single-file Parquet store.  

*Time [seconds] to Compute average g-r color by Scheduler * Format*

| Dask Scheduler   | HDF5 | Parquet (hive)   | Parquet (simple)
| :--------------- | ---: | ---------------: | ----------------:
| threads          |  70  |   7.0            |  0.7
| synchronus       |  70  |   7.0            |  0.7
| multiprocess     | 300  |  12.5            | 12.5
| dask.distributed |  70  |   3.5            |  0.7

This significantly improved behavior makes sense given the column-store nature of Parquet.  The performance difference between `hive` (dir with metadata + multiple files) and `simple` (single-file) is a bit surprising but clearly motivates the use of `simple` for queries like this simple two-column sum aggregation.

This is a successor to the DASK+HDF5 tests in [object_catalog_performance_dask.ipynb](object_catalog_performance_dask.ipynb)
where the Dask performance was 70-300 seconds to calculate an average g-r color.  The results in the table above for the HDF5 files are from that notebook.

We specifically look at three different scheduling options:

1. `threads`
2. `multiprocessing`
3. `synchronus` (simple serial)
4. `dask.distributed`

For this relatively simple aggregation, the `synchronus` method was fastest.  This is a simple serial approach.  Its speed emphasizes that it's the reading that's dominant and parallelizing the reading doesn't improve performance.

### Future Work
1. Exploring more complicated analyses, such as binning to make a color-color or color-magnitude density should be explored.  This problem might distribute more work across nodes and benefit from `dask.distributed`.
2. These tests were run against a 'hive'-formatted Parquet storage and also 'simple' Parquet single file.  The `hive` format was much slower for this simple query.  It is worth investigating other use cases that may have different access patterns to see if there are cases where a cluster-based analysis with a `hive`-formatted Parquet store with a partitioning scheme well-matched to the analysis.
3. When we get to the point of explore datasets that don't fit in memory of a single node, it will also be worth revisiting `synchronous` vs. other dask schedulers.
4. Is there a natural size of file on the GPFS NERSC system that guides the natural size of files for performance?

### Logistics

This was run and developed running the NERSC JupyterLab environment.
I've specified `pyarrow` as the Parquet engine because that was the installation I could get on NERSC with the snappy compression library.

In [1]:
import os

from bokeh.io import output_notebook

import dask
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster, progress

import numpy as np

In [2]:
# Load Bokeh into the Notebook
output_notebook()

In [3]:
tract = 4850

data_dir = '/global/projecta/projectdirs/lsst/global/in2p3/Run1.1/summary'

datafile_tract = os.path.join(data_dir, 'dpdd_object_tract_%d.parquet' % tract)
datafile_all_hive = os.path.join(data_dir, 'dpdd_object.parquet')
datafile_all_simple = os.path.join(data_dir, 'dpdd_object_simple.parquet')

In [4]:
# Specify the columns we need.  This allows for significant performance advantages
# particularly when reading a column-based storage format such as Parquet.
columns_to_read = ['mag_g', 'mag_r']
parquet_engine = 'pyarrow'
read_parquet_kws = {'columns': columns_to_read, 'engine': parquet_engine}

In [5]:
da_df_one_tract = dd.read_parquet(datafile_tract, **read_parquet_kws)
da_df_hive = dd.read_parquet(datafile_all_hive, **read_parquet_kws)
da_df_simple = dd.read_parquet(datafile_all_simple, **read_parquet_kws)

Let's quickly check the read time performance.  
Summary: Reading the hive format is expensive at 3.5 seconds -- 10 times that of reading the simple format.

In [6]:
%timeit _ = dd.read_parquet(datafile_tract, **read_parquet_kws)
%timeit _ = dd.read_parquet(datafile_all_hive, **read_parquet_kws)
%timeit _ = dd.read_parquet(datafile_all_simple, **read_parquet_kws)

37.5 ms ± 498 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
1.71 s ± 31.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
22.1 ms ± 1.16 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


Specify the computation to be done.

In [7]:
df_one_tract = np.mean(da_df_one_tract['mag_g'] - da_df_one_tract['mag_r'])
df_hive = np.mean(da_df_hive['mag_g'] - da_df_hive['mag_r'])
df_simple = np.mean(da_df_simple['mag_g'] - da_df_simple['mag_r'])

Time actually doing the computation:

The single tract is one 20th the size and takes one 10th the time.

In [8]:
%timeit df_one_tract.compute()

185 ms ± 5.14 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [9]:
%timeit df_hive.compute()

5.81 s ± 83.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [10]:
%timeit df_simple.compute()

488 ms ± 38.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Use default `threads` scheduler from Dask.  This is the default for dataframes.

In [11]:
with dask.config.set(scheduler='threads'):
    df_hive = np.mean(da_df_hive['mag_g'] - da_df_hive['mag_r'])
    %timeit df_hive.compute()

5.76 s ± 23.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [12]:
with dask.config.set(scheduler='threads'):
    df_simple = np.mean(da_df_simple['mag_g'] - da_df_simple['mag_r'])
    %timeit df_simple.compute()

483 ms ± 11.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Dask `multiprocessing` scheduler:

In [13]:
with dask.config.set(scheduler='multiprocessing'):
    df_hive = np.mean(da_df_hive['mag_g'] - da_df_hive['mag_r'])
    %timeit df_hive.compute()

8.86 s ± 178 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [14]:
with dask.config.set(scheduler='multiprocessing'):
    df_hive = np.mean(da_df_hive['mag_g'] - da_df_hive['mag_r'])
    %timeit df_hive.compute()

9.11 s ± 251 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


Dask `synchronus` scheduler:

In [15]:
with dask.config.set(scheduler='synchronous'):
    df_hive = np.mean(da_df_hive['mag_g'] - da_df_hive['mag_r'])
    %timeit df_hive.compute()

4.88 s ± 157 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [16]:
with dask.config.set(scheduler='synchronous'):
    df_simple = np.mean(da_df_simple['mag_g'] - da_df_simple['mag_r'])
    %timeit df_simple.compute()

601 ms ± 59 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


## Can I Explicitly Set Up a Local Dask Cluster to Go Faster?

No.

The single-machine `threads` or `synchronus` schdulers run in 0.5-0.7 seconds for Parquet simple, 7 seconds for Parquet hive.

A LocalCluster configuration with n_workers=1 is ~10 seconds.
As we increase the number of workers we approach and at n=4 workers surpass the single-threaded performance for Parquet hive, roughly doubling performance with 3 seconds at 16 works.

The Parquet simple format yields the same performance for all.

In [17]:
for n in (1, 4, 8, 16):
    cluster = LocalCluster(n_workers=n, threads_per_worker=2)
    client = Client(cluster)
    display(client)
    
    df_hive = np.mean(da_df_hive['mag_g'] - da_df_hive['mag_r'])
    %timeit _ = df_hive.compute()

0,1
Client  Scheduler: tcp://127.0.0.1:41170  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 1  Cores: 2  Memory: 16.91 GB


8.66 s ± 108 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


0,1
Client  Scheduler: tcp://127.0.0.1:33190  Dashboard: http://127.0.0.1:44983/status,Cluster  Workers: 4  Cores: 8  Memory: 67.62 GB


3.12 s ± 38.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


0,1
Client  Scheduler: tcp://127.0.0.1:32940  Dashboard: http://127.0.0.1:42467/status,Cluster  Workers: 8  Cores: 16  Memory: 135.24 GB


2.11 s ± 111 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


0,1
Client  Scheduler: tcp://127.0.0.1:45134  Dashboard: http://127.0.0.1:33435/status,Cluster  Workers: 16  Cores: 32  Memory: 270.49 GB


2.23 s ± 73.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [18]:
for n in (1, 4, 8, 16):
    cluster = LocalCluster(n_workers=n, threads_per_worker=2)
    client = Client(cluster)
    display(client)
    
    df_simple = np.mean(da_df_simple['mag_g'] - da_df_simple['mag_r'])
    %timeit _ = df_simple.compute()

0,1
Client  Scheduler: tcp://127.0.0.1:44273  Dashboard: http://127.0.0.1:38238/status,Cluster  Workers: 1  Cores: 2  Memory: 16.91 GB


585 ms ± 24.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


0,1
Client  Scheduler: tcp://127.0.0.1:44141  Dashboard: http://127.0.0.1:41265/status,Cluster  Workers: 4  Cores: 8  Memory: 67.62 GB


The slowest run took 75.15 times longer than the fastest. This could mean that an intermediate result is being cached.
508 ms ± 317 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


0,1
Client  Scheduler: tcp://127.0.0.1:42059  Dashboard: http://127.0.0.1:36895/status,Cluster  Workers: 8  Cores: 16  Memory: 135.24 GB


The slowest run took 46.28 times longer than the fastest. This could mean that an intermediate result is being cached.
412 ms ± 250 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


0,1
Client  Scheduler: tcp://127.0.0.1:39061  Dashboard: http://127.0.0.1:37860/status,Cluster  Workers: 16  Cores: 32  Memory: 270.49 GB


The slowest run took 47.79 times longer than the fastest. This could mean that an intermediate result is being cached.
143 ms ± 204 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
