<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">


# Dask DataFrames

## When to use `dask.dataframe`

* We have seen DataFrames as list of `delayed` objects
* Now: treat them as a single DataFrame
* Pandas is great for tabular datasets that fit in memory.
* Dask becomes useful when the dataset is larger than your machine's RAM.

## `dask.dataframe` basics

* `dask.dataframe` implements a blocked parallel `DataFrame`
* mimics a large subset of the Pandas `DataFrame`.
* One operation on a Dask `DataFrame` triggers many pandas operations

## Related Documentation

*  [Dask DataFrame documentation](http://dask.pydata.org/en/latest/dataframe.html)
*  [Pandas documentation](http://pandas.pydata.org/)

## Main Take-aways

1.  Dask.dataframe should be familiar to Pandas users
2.  The partitioning of dataframes is important for efficient queries

## Setup

We create artifical data.

In [None]:
import os
import numpy as np
import pandas as pd

data_dir = 'data'

names = ['Alice', 'Bob', 'Charlie', 'Dan', 'Edith', 'Frank', 'George',
'Hannah', 'Ingrid', 'Jerry', 'Kevin', 'Laura', 'Michael', 'Norbert', 'Oliver',
'Patricia', 'Quinn', 'Ray', 'Sarah', 'Tim', 'Ursula', 'Victor', 'Wendy',
'Xavier', 'Yvonne', 'Zelda']

k = 100


def account_params(k):
    ids = np.arange(k, dtype=int)
    names2 = np.random.choice(names, size=k, replace=True)
    wealth_mag = np.random.exponential(100, size=k)
    wealth_trend = np.random.normal(10, 10, size=k)
    freq = np.random.exponential(size=k)
    freq /= freq.sum()

    return ids, names2, wealth_mag, wealth_trend, freq

def account_entries(n, ids, names, wealth_mag, wealth_trend, freq):
    indices = np.random.choice(ids, size=n, replace=True, p=freq)
    amounts = ((np.random.normal(size=n) + wealth_trend[indices])
                                         * wealth_mag[indices])

    return pd.DataFrame({'id': indices,
                         'names': names[indices],
                         'amount': amounts.astype('i4')},
                         columns=['id', 'names', 'amount'])


def accounts(n, k):
    ids, names, wealth_mag, wealth_trend, freq = account_params(k)
    df = account_entries(n, ids, names, wealth_mag, wealth_trend, freq)
    return df


def json_entries(n, *args):
    df = account_entries(n, *args)
    g = df.groupby(df.id).groups

    data = []
    for k in g:
        sub = df.iloc[g[k]]
        d = dict(id=int(k), name=sub['names'].iloc[0],
                transactions=[{'transaction-id': int(i), 'amount': int(a)}
                              for i, a in list(zip(sub.index, sub.amount))])
        data.append(d)

    return data

def accounts_json(n, k):
    args = account_params(k)
    return json_entries(n, *args)

def accounts_csvs(num_files, n, k):
    fn = os.path.join(data_dir, 'accounts.%d.csv' % (num_files - 1))

    if os.path.exists(fn):
        return

    print("Create CSV accounts for dataframe exercise")

    args = account_params(k)

    for i in range(num_files):
        df = account_entries(n, *args)
        df.to_csv(os.path.join(data_dir, 'accounts.%d.csv' % i),
                  index=False)


In [None]:
accounts_csvs(3, 1000000, 500)

import os
import dask
filename = os.path.join('data', 'accounts.*.csv')

## Read data into a DataFrame

This works just like `pandas.read_csv`, except on multiple csv files at once.

In [None]:
filename

In [None]:
import dask.dataframe as dd
df = dd.read_csv(filename)
# load and count number of rows
df.head()

In [None]:
len(df)

## What happened here?

- Dask investigated the input path and found that there are three matching files 
- a set of jobs was intelligently created for each chunk - one per original CSV file in this case
- each file was loaded into a pandas dataframe, had `len()` applied to it
- the subtotals were combined to give you the final grant total.

### Real Data

Lets try this with the New York Taxi Data we just converted to Apache Parquet.

In [None]:
df = dd.read_parquet(os.path.join('taxi-data-parquet/', '*.parquet'))

Notice that the respresentation of the dataframe object contains no data - Dask has just done enough to read the start of the first file, and infer the column names and types.

In [None]:
df

We can view the start and end of the data

In [None]:
df.head()

In [None]:
df.tail()

## Computations with `dask.dataframe`

We compute the maximum of the `trip_distance` column. With just pandas, we would loop over each file to find the individual maximums, then find the final maximum over all the individual maximums

```python
maxes = []
for fn in filenames:
    df = pd.read_parquet(fn)
    maxes.append(df.trip_distance.max())
    
final_max = max(maxes)
```

In [None]:
%time df['trip_distance'].max().compute()

1.  As with `dask.delayed`, we need to call `.compute()` when we're done.  Up until this point everything is lazy.
2.  Dask will delete intermediate results as soon as possible.
    -  Lets us handle datasets that are larger than memory
    -  Repeated computations will have to load all of the data in each time
    

As with `Delayed` objects, you can view the underlying task graph using the `.visualize` method:

In [None]:
# notice the parallelism
df['trip_distance'].max().visualize(rankdir="LR")

## Comparision to Pandas

* Pandas is more mature and fully featured
* If your data fits in memory then you should use Pandas.
* When you see `MemoryError: `, then it's time for `dask`.
* When you want more than 1 CPU, also use `dask`    

## Performance comparison

Dask.dataframe operations use `pandas` operations internally, but:

1. Dask introduces overhead, around 1ms per task (negligible)
2. When Pandas releases the GIL (coming to `groupby` in the next version): in-process parallelism
3. With the GIL, we need several processes

## Dask DataFrame Data Model

<img src="http://dask.pydata.org/en/latest/_images/dask-dataframe.svg" width="30%">

In [None]:
import dask.dataframe as dd

df = dd.read_parquet('taxi-data-parquet/*.parquet')
df

In [None]:
%%time
df['trip_distance'].mean().compute()

In [None]:
%%time
df = df.persist()

In [None]:
%%time
df['trip_distance'].mean().compute()

In [None]:
tasks = df.groupby('passenger_count')['trip_distance'].mean()
tasks

In [None]:
tasks.visualize(rankdir='LR')

In [None]:
tasks.compute()

In [None]:
df['VendorID'].nunique().visualize(rankdir='LR')

In [None]:
df['VendorID'].nunique().compute()

In [None]:
df['VendorID'].unique().compute()

### Custom code and Dask Dataframe

`dask.dataframe` provides a few methods to make applying custom functions to Dask DataFrames easier:

- [`map_partitions`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions)
- [`map_overlap`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_overlap)
- [`reduction`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.reduction)

In [None]:
# Look at the docs for `map_partitions`

help(df['trip_distance'].map_partitions)

The basic idea is to apply a function that operates on a DataFrame to each partition.
In this case, we'll apply `pd.to_timedelta`.

In [None]:
def trip_time(df):
    return df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']

duration = df.map_partitions(trip_time)

In [None]:
duration.visualize(rankdir="LR")

In [None]:
duration.tail()

## `dask.dataframe` limitations

Dask.dataframe only covers a small but well-used portion of the Pandas API.
This limitation is for two reasons:

1.  The Pandas API is *huge*
2.  Some operations are genuinely hard to do in parallel (e.g. sort)

Operations like ``set_index`` work, but are slower than in Pandas.

### What definitely works?

* Trivially parallelizable operations (fast):
    *  Elementwise operations:  ``df.x + df.y``
    *  Row-wise selections:  ``df[df.x > 0]``
    *  Loc:  ``df.loc[4.0:10.5]``
    *  Common aggregations:  ``df.x.max()``
    *  Is in:  ``df[df.x.isin([1, 2, 3])]``
    *  Datetime/string accessors:  ``df.timestamp.month``

* Cleverly parallelizable operations (also fast):
    *  groupby-aggregate (with common aggregations): ``df.groupby(df.x).y.max()``
    *  value_counts:  ``df.x.value_counts``
    *  Drop duplicates:  ``df.x.drop_duplicates()``
    *  Join on index:  ``dd.merge(df1, df2, left_index=True, right_index=True)``

* Operations requiring a shuffle (slow-ish, unless on index)
    *  Set index:  ``df.set_index(df.x)``
    *  groupby-apply (with anything):  ``df.groupby(df.x).apply(myfunc)``
    *  Join not on the index:  ``pd.merge(df1, df2, on='name')``

* Ingest operations
    *  Files: ``dd.read_csv, dd.read_parquet, dd.read_json, dd.read_orc``, etc.
    *  Pandas: ``dd.from_pandas``
    *  Anything supporting numpy slicing: ``dd.from_array``
    *  From any set of functions creating sub dataframes via ``dd.from_delayed``.
    *  Dask.bag: ``mybag.to_dataframe(columns=[...])``