<img src="img/pandas_logo.png" align="right" width="40%">

DataFrame
==========

The `dask.dataframe` module implements a blocked parallel `DataFrame` object that mimics a subset of the Pandas `DataFrame`. One dask `DataFrame` is comprised of several in-memory pandas `DataFrames` separated along the index. One operation on a dask `DataFrame` triggers many pandas operations on the constituent pandas `DataFrame`s in a way that is mindful of potential parallelism and memory constraints.

**Related Documentation**

*  [Dask DataFrame documentation](http://dask.pydata.org/en/latest/dataframe.html)
*  [Pandas documentation](http://pandas.pydata.org/)

**Main Take-aways**

1.  Dask.dataframe should be familiar to Pandas users
2.  The index grows to include partitions, which are important for efficient queries

### Setup

We create artifical data.

In [6]:
from prep import accounts_csvs
accounts_csvs(3, 5000000, 500)

In [17]:
import os
filename = os.path.join('data', 'accounts.*.csv')

## Load Data from CSVs and inspect the dask graph

### `dask.dataframe.read_csv`

This works just like `pandas.read_csv`, except on multiple csv files at once.

In [18]:
filename

'data/accounts.*.csv'

In [19]:
import dask.dataframe as dd
df = dd.read_csv(filename)

In [22]:
df

dd.DataFrame<from-de..., npartitions=6>

However unlike Pandas, operations on dask.dataframes don't trigger immediate computation, instead they add key-value pairs to an underlying dask graph.

Some operations trigger direct computation, like `len` and `head`:

In [20]:
%time len(df)

CPU times: user 4.92 s, sys: 380 ms, total: 5.3 s
Wall time: 2.37 s


15000000

In [13]:
df.head()

Unnamed: 0,id,names,amount
0,374,Xavier,53
1,366,Victor,868
2,12,Charlie,-60
3,490,Charlie,4
4,228,Michael,4006


In [15]:
pd.options.display.memory_usage = 'deep'

In [21]:
df.info(memory_usage=True)

<class 'dask.dataframe.core.DataFrame'>
Data columns (total 3 columns):
id          int64
names       object
amount      int64
dtypes: object(1), int64(2)
memory usage: 343.3 MB


### How does this compare to Pandas?

#### Features and Size

Pandas is more mature and fully featured than `dask.dataframe`.  If your data fits in memory then you should use Pandas.  The `dask.dataframe` module gives you a limited `pandas` experience when you operate on datasets that don't fit comfortably in memory.

During this tutorial we provide a small dataset consisting of a few CSV files.  This dataset is 45MB on disk that expands to about 400MB in memory (the difference is caused by using `object` dtype for strings).  This dataset is small enough that you would normally use Pandas.

We've chosen this size so that exercises finish quickly.  Dask.dataframe only really becomes meaningful for problems significantly larger than this, when Pandas breaks with the dreaded 

    MemoryError:  ...
    
#### Speed

Dask.dataframe operations use `pandas` operations internally.  Generally they run at about the same speed except in the following two cases:

1.  Dask introduces a bit of overhead, around 1ms per task.  This is usually negligible.
2.  When Pandas releases the GIL (coming to `groupby` in the next version) `dask.dataframe` can call several pandas operations in parallel increasing speed somewhat proportional to the number of cores.

Exercise: Recall and use Pandas API
----------------------------------------

If you are already familiar with the Pandas API then you should have a firm grasp on how to use `dask.dataframe`.  There are a couple of small changes.

As noted above, computations on dask `DataFrame` objects don't perform work, instead they build up a dask graph.  We can evaluate this dask graph at any time using the `.compute()` method.

In [24]:
result = df.amount.mean()  # create lazily evaluated result
result

dd.Scalar<series-..., dtype=float64>

In [26]:
result.compute()           # perform actual computation

934.0851584

<div class="alert alert-success">
    <b>EXERCISE</b>: Using groupby(), calculate the average age for each sex.

Try the following exercises

<ol>
<li> Use the `head()` method to get the first ten rows. </li> 
<li> Use the `drop_duplicates()` method to find all of the distinct names</li>
<li> Use selections `df[...]` to find how many positive and negative amounts there are</li>
<li> Use groupby `df.groupby(df.A).B.func()` to get the average amount per user ID</li>
<li> Sort the result to (4) by amount, find the names of the top 10 </li>
</ol>

<p> This section should be easy if you are familiar with Pandas.  If you aren't then that's ok too.  You may find the [pandas documenation](http://pandas.pydata.org/) a useful read in the future.  Don't worry, future sections in this tutorial will not depend on this knowledge. </p>

</div>


In [None]:
# 1. Use the `head()` method to get the first ten rows
#    Note, head computes by default, this is the only operation that doesn't need an explicit call to .compute()


In [None]:
# 2. Use the `drop_duplicates()` method to find all of the distinct names


In [None]:
# 3. Use selections `df[...]` to find how many positive and negative amounts there are


In [None]:
# 3. Use selections `df[...]` to find how many positive and negative amounts there are


In [None]:
# 4. Use groupby `df.groupby(df.A).B.func()` to get the average amount per user ID 


In [None]:
# 5. Combine your answers to 3 and 4 to compute the average withdrawal (negative amount) per name


In [None]:
# 1. Use the `head()` method to get the first ten rows
df.head()

# 2. Use the `drop_duplicates()` method to find all of the distinct names
df.names.drop_duplicates().compute()

# 3. Use selections `df[...]` to find how many positive and negative amounts
# there are
len(df[df.amount < 0])

# 3. Use selections `df[...]` to find how many positive and negative amounts
# there are
len(df[df.amount > 0])

# 4. Use groupby `df.groupby(df.A).B.func()` to get the average amount per user
# ID
df.groupby(df.names).amount.mean().compute()

# 5. Combine your answers to 3 and 4 to compute the average withdrawal
# (negative amount) per name
df2 = df[df.amount < 0]
df2.groupby(df2.names).amount.mean().compute()


<!--    <img src="img/dask-dataframe.svg" align="right" width="40%"> -->

Divisions and the Index
---------------------------

The Pandas index associates a value to each record/row of your data.  Operations that align with the index, like `loc` can be a bit faster as a result.

In `dask.dataframe` this index becomes even more important.  Recall that one dask `DataFrame` consists of several Pandas `DataFrame`s.  These dataframes are separated along the index by value.  For example, when working with time series we may partition our large dataset by month.

Recall that these many partitions of our data may not all live in memory at the same time, instead they might live on disk; we simply have tasks that can materialize these pandas `DataFrames` on demand.

Partitioning your data can greatly improve efficiency.  Operations like `loc`, `groupby`, and `merge/join` along the index are *much more efficient* than operations along other columns.  You can see how your dataset is partitioned with the `.divisions` attribute.  Note that data that comes out of simple data sources like CSV files aren't intelligently indexed by default.  In these cases the values for `.divisions` will be `None.`

In [41]:
df = dd.read_csv(filename)
df.divisions

(None, None, None, None, None, None, None)

In [42]:
df

dd.DataFrame<from-de..., npartitions=6>

However if we set the index to some new column then dask will divide our data roughly evenly along that column and create new divisions for us.  Warning, `set_index` triggers immediate computation.

In [43]:
df2 = df.set_index('names')
df2.divisions

('Alice', 'Bob', 'Frank', 'Kevin', 'Quinn', 'Ursula', 'Zelda')

In [44]:
df2.head()

Unnamed: 0_level_0,id,amount
names,Unnamed: 1_level_1,Unnamed: 2_level_1
Alice,208,289
Alice,124,1877
Alice,215,217
Alice,436,3610
Alice,442,163


In [47]:
len(df2)

15000000

We see here the minimum and maximum values ("Alice" and "Zelda") as well as several intermediate values that separate our data well. 

In [29]:
df2.npartitions

3

In [30]:
df2.head()

Unnamed: 0_level_0,id,amount
names,Unnamed: 1_level_1,Unnamed: 2_level_1
Alice,303,216
Alice,445,295
Alice,204,-5
Alice,53,32
Alice,204,24


Operations like `loc` only need to load the relevant partitions

In [31]:
df2.loc['Edith']

dd.DataFrame<loc-ae3..., npartitions=1, divisions=('Edith', 'Edith')>

In [51]:
df2.loc['Edith'].compute()

Unnamed: 0_level_0,id,amount
names,Unnamed: 1_level_1,Unnamed: 2_level_1
Edith,472,92
Edith,396,24
Edith,195,2007
Edith,40,453
Edith,408,944
Edith,210,2409
Edith,444,2934
Edith,40,502
Edith,195,1946
Edith,245,-3


## DataFrame Storage


Efficient storage can dramatically improve performance, particularly when operating repeatedly from disk.

Decompressing text and parsing CSV files is expensive.  One of the most effective strategies with medium data is to use a binary storage format like HDF5.  Often the performance gains from doing this is sufficient so that you can switch back to using Pandas again instead of using `dask.dataframe`.

In this section we'll learn how to efficiently arrange and store your datasets in on-disk binary formats.  We'll use the following:

1.  [Pandas `HDFStore`](http://pandas.pydata.org/pandas-docs/stable/io.html#io-hdf5) format on top of `HDF5`
2.  Categoricals for storing text data numerically

**Main Take-aways**

1.  Storage formats affect performance by an order of magnitude
2.  Text data will keep even a fast format like HDF5 slow
3.  A combination of binary formats, column storage, and partitioned data turns one second wait times into 80ms wait times.

### Write to HDF5

Pandas contains a specialized HDF5 format, `HDFStore`.  The ``dd.DataFrame.to_hdf`` method works exactly like the ``pd.DataFrame.to_hdf`` method.

In [52]:
target = os.path.join('data', 'accounts.h5')
target

'data/accounts.h5'

In [53]:
%time df.to_hdf(target, '/data')

CPU times: user 19.7 s, sys: 1 s, total: 20.7 s
Wall time: 17.8 s


(None,)

In [54]:
df2 = dd.read_hdf(target, '/data')
df2.head()

Unnamed: 0,id,names,amount
0,57,Yvonne,631
1,427,Ray,-767
2,91,Quinn,969
3,87,Victor,267
4,400,Dan,628


### Compare CSV to HDF5 speeds

We do a simple computation that requires reading a column of our dataset and compare performance between CSV files and our newly created HDF5 file.  Which do you expect to be faster?

In [55]:
%time df.amount.sum().compute()

CPU times: user 5.06 s, sys: 624 ms, total: 5.68 s
Wall time: 2.51 s


14011277376

In [56]:
%time df2.amount.sum().compute()

CPU times: user 13.2 s, sys: 540 ms, total: 13.7 s
Wall time: 13.7 s


14011277376

Sadly they are about the same.  

The culprit here is `names` column, which is of `object` dtype and thus hard to store efficiently.  There are two problems here:

1.  How do we store text data like `names` efficiently on disk?
2.  Why did we have to read the `names` column when all we wanted was `amount`

### 1.  Store text efficiently with categoricals

We can use Pandas categoricals to replace our object dtypes with a numerical representation.  This takes a bit more time up front, but results in better performance.

More on categoricals at the [pandas docs](http://pandas.pydata.org/pandas-docs/stable/categorical.html) and [this blogpost](http://matthewrocklin.com/blog/work/2015/06/18/Categoricals).

In [57]:
# Categorize data, then store in HDFStore
%time df.categorize(columns=['names']).to_hdf(target, '/data2')

CPU times: user 18.8 s, sys: 1.83 s, total: 20.7 s
Wall time: 13.8 s


(None,)

In [58]:
# It looks the same
df2 = dd.read_hdf(target, '/data2')
df2.head()

Unnamed: 0,id,names,amount
0,57,Yvonne,631
1,427,Ray,-767
2,91,Quinn,969
3,87,Victor,267
4,400,Dan,628


In [59]:
# But loads more quickly
%time df2.amount.sum().compute()

CPU times: user 872 ms, sys: 104 ms, total: 976 ms
Wall time: 959 ms


14011277376

This is significantly faster.  This tells us that it's not only the file type that we use but also how we represent our variables that influences storage performance.

However this can still be better.  We had to read all of the columns (`names` and `amount`) in order to compute the sum of one (`amount`). For this, we could use other columnar on-disk stores like parquet. 

Limitations
-------------

### What doesn't work?

Dask.dataframe only covers a small but well-used portion of the Pandas API.
This limitation is for two reasons:

1.  The Pandas API is *huge*
2.  Some operations are genuinely hard to do in parallel (e.g. sort)

Additionally, some important operations like ``set_index`` work, but are slower
than in Pandas because they may write out to disk.

Finally, `dask.dataframe` is quite new and non-trivial bugs are frequently reported (and quickly fixed).

### What definitely works?

* Trivially parallelizable operations (fast):
    *  Elementwise operations:  ``df.x + df.y``
    *  Row-wise selections:  ``df[df.x > 0]``
    *  Loc:  ``df.loc[4.0:10.5]``
    *  Common aggregations:  ``df.x.max()``
    *  Is in:  ``df[df.x.isin([1, 2, 3])]``
    *  Datetime/string accessors:  ``df.timestamp.month``
* Cleverly parallelizable operations (also fast):
    *  groupby-aggregate (with common aggregations): ``df.groupby(df.x).y.max()``
    *  value_counts:  ``df.x.value_counts``
    *  Drop duplicates:  ``df.x.drop_duplicates()``
    *  Join on index:  ``dd.merge(df1, df2, left_index=True, right_index=True)``
* Operations requiring a shuffle (slow-ish, unless on index)
    *  Set index:  ``df.set_index(df.x)``
    *  groupby-apply (with anything):  ``df.groupby(df.x).apply(myfunc)``
    *  Join not on the index:  ``pd.merge(df1, df2, on='name')``
* Ingest operations
    *  CSVs: ``dd.read_csv``
    *  Pandas: ``dd.from_pandas``
    *  Anything supporting numpy slicing: ``dd.from_array``
    *  Dask.bag: ``mybag.to_dataframe(columns=[...])``