<img src="images/hdd.jpg" width="20%" align="right">

DataFrame Storage
================

Decompressing text and parsing CSV files is expensive.  One of the most effective strategies with medium data is to use a binary storage format like HDF5.  Often this is sufficient so that you can switch back to using Pandas again instead of using dask.

In this section we'll learn how to efficiently arrange and store your datasets in on-disk binary formats.

### Setup

Create data if we don't have any

In [1]:
from prep import accounts_csvs
accounts_csvs(3, 1000000, 500)

### Read CSV

First we read our csv data as before

In [2]:
import os
filename = os.path.join('data', 'accounts.*.csv')
filename

'data/accounts.*.csv'

In [3]:
import dask.dataframe as dd
df = dd.read_csv(filename)
df.head()

Unnamed: 0,id,names,amount
0,110,Victor,-245
1,364,Hannah,3259
2,81,Oliver,1118
3,199,Oliver,-3513
4,257,Edith,-84


### Write to HDF5

Pandas contains a specialized HDF5 format, `HDFStore`.  The ``dd.DataFrame.to_hdf`` method works exactly like the ``pd.DataFrame.to_hdf`` method.

In [4]:
target = os.path.join('data', 'accounts.h5')
target

'data/accounts.h5'

In [5]:
%%time
df.to_hdf(target, '/data')

CPU times: user 1.82 s, sys: 202 ms, total: 2.02 s
Wall time: 1.97 s


In [6]:
df2 = dd.read_hdf(target, '/data')
df2.head()

Unnamed: 0,id,names,amount
0,110,Victor,-245
1,364,Hannah,3259
2,81,Oliver,1118
3,199,Oliver,-3513
4,257,Edith,-84


### Compare CSV to HDF5 speeds

We do a simple computation that requires reading in a bit of our dataset and compare performance between CSV files and our newly created HDF5 file

In [7]:
%time df.amount.sum().compute()

CPU times: user 720 ms, sys: 40.6 ms, total: 761 ms
Wall time: 701 ms


2811228523

In [8]:
%time df2.amount.sum().compute()

CPU times: user 426 ms, sys: 103 ms, total: 529 ms
Wall time: 422 ms


2811228523

Sadly this is about the same cost.  The culprit here is names column, which is an object dtype and thus hard to store efficiently.

### Categoricals

We can use Pandas categoricals to replace our object dtypes with a numerical representation.  This takes a bit more time up front, but results in better performance.

In [9]:
%%time
df.categorize(columns=['names']).to_hdf(target, '/data2')

CPU times: user 2.94 s, sys: 219 ms, total: 3.16 s
Wall time: 2.99 s


In [10]:
df2 = dd.read_hdf(target, '/data2')
df2.head()

Unnamed: 0,id,names,amount
0,110,Victor,-245
1,364,Hannah,3259
2,81,Oliver,1118
3,199,Oliver,-3513
4,257,Edith,-84


In [11]:
%%time
df2.amount.sum().compute()

CPU times: user 276 ms, sys: 24.7 ms, total: 301 ms
Wall time: 174 ms


2811228523

This is significantly faster.  This tells us that it's not only the file type that we use but also how we represent our variables that influences storage performance.

However this can still be better.  We had to read all of the columns (`names` and `amount`) in order to compute the sum of one (`amount`).  We'll improve further on this with `castra`, an on-disk column-store.  First though we learn about how to set an index in a dask.dataframe.

`set_index`
------------

As we're about to learn, the index is even more important in `dask.dataframe` than it was in `pandas`.  The index determines how we parallelize our computations and how efficiently we can index into parts of our dataset.  

In [12]:
%%time
# By default `DataFrame.set_index` is *not lazily evaluated*
df3 = df.set_index('id')

CPU times: user 2.26 s, sys: 81.7 ms, total: 2.34 s
Wall time: 1.87 s


In [13]:
df3.head()

Unnamed: 0_level_0,names,amount
id,Unnamed: 1_level_1,Unnamed: 2_level_1
0,Xavier,75
0,Xavier,-84
0,Xavier,189
0,Xavier,117
0,Xavier,83


### But now we can perform lookups with `.loc`

In [16]:
df3.loc[100].head()

Unnamed: 0_level_0,names,amount
id,Unnamed: 1_level_1,Unnamed: 2_level_1
100,Bob,-621
100,Bob,-524
100,Bob,-601
100,Bob,-586
100,Bob,-622


### Castra

Additionally, once we have a proper index we can use `castra`, an on-disk column-store that is partitioned along the index.

In [18]:
%%time
if os.path.exists('accounts.castra'):
    import shutil
    shutil.rmtree('accounts.castra')

c = df3.to_castra('accounts.castra', categories=['names'])
df4 = c.to_dask()

CPU times: user 2.65 s, sys: 312 ms, total: 2.96 s
Wall time: 2.69 s


In [19]:
%%time
df4.head()

CPU times: user 21 ms, sys: 0 ns, total: 21 ms
Wall time: 19.8 ms


Unnamed: 0_level_0,names,amount
id,Unnamed: 1_level_1,Unnamed: 2_level_1
0,Xavier,75
0,Xavier,-74
0,Xavier,-88
0,Xavier,44
0,Xavier,21


In [20]:
%%time
df4.loc[0:4].compute()

CPU times: user 26.7 ms, sys: 0 ns, total: 26.7 ms
Wall time: 25.5 ms


Unnamed: 0_level_0,names,amount
id,Unnamed: 1_level_1,Unnamed: 2_level_1
0,Xavier,75
0,Xavier,-74
0,Xavier,-88
0,Xavier,44
0,Xavier,21
0,Xavier,101
0,Xavier,85
0,Xavier,-8
0,Xavier,13
0,Xavier,75


In [21]:
%%time
df4.amount.sum().compute()

CPU times: user 72.4 ms, sys: 20.7 ms, total: 93.1 ms
Wall time: 57.4 ms


2811228523

In [23]:
# %%time
df4.names.drop_duplicates().compute()

id
0       Xavier
1        Wendy
3        Edith
4        Quinn
5          Ray
7        Sarah
8       Hannah
9        Alice
10      Ingrid
11         Dan
12       Jerry
13       Zelda
14       Kevin
17      Yvonne
21     Norbert
23      George
24       Frank
30     Charlie
31     Michael
33         Tim
34      Victor
39      Oliver
46      Ursula
50       Laura
52         Bob
71    Patricia
Name: names, dtype: category
Categories (26, object): [Xavier, Wendy, Edith, Quinn, ..., Ursula, Laura, Bob, Patricia]

## Conclusion

Storage choices strongly impact performance.  We evolved from text-based CSV files to binary-based Castra and saw our query times drop from 1s to 80ms.

We also used `DataFrame.set_index` to organize our data along a special column.  A common recipe for success with `dask.dataframe` is as follows:

1.  Read in your data however it was delivered to you

        df = dd.read_csv('myfiles.*.csv')
    
2.  Set your index 

        df2 = df.set_index('column-name')
        
3.  Base computation on Castra file

        c = df2.to_castra('/path/to/new/file.castra', 
                          categories=['list', 'of', 'columns', 'to', 'categorize'])
        df3 = c.to_dask()
        
4.  Perform efficient queries

        df3.loc['2014': '2015'].groupby().events.count()