<img src="images/dask_horizontal.svg"
     width="45%"
     alt="Dask logo\">

# Dask DataFrame

In the last exercise in the Dask Delayed notebook, we wrote a function which used `dask.delayed` to parallelize loading multiple CSV files into a Pandas DataFrame. In this notebook, we'll introduce and use Dask's DataFrame interface to automatically build similiar parallel computations for tabular data computations. Dask DataFrames look and feel like Pandas DataFrames, but they run on the same infrastructure that powers `dask.delayed`.

## The Dask DataFrame data model

For the most part, a Dask DataFrame feels like a Pandas DataFrame. However, internally a Dask DataFrame is composed of many Pandas DataFrames (see the image below). 

<img src="http://dask.pydata.org/en/latest/_images/dask-dataframe.svg" width="30%">

Dask DataFrames are partitioned along their index into different **partitions** where each parition is a normal Pandas DataFrame. These Pandas objects may live on disk or on other machines.

Dask DataFrames implement a large subset of the Pandas API which are backed by blocked algorithms that allow for parallel and out-of-core computation. For many purposes Dask DataFrames can serve as drop-in replacements for Pandas DataFrames. Much like the Dask Delayed interface, Dask DataFrames are lazily evaluated. You can use use the DataFrame API to automatically build up a task graph representing complex computations and then call `compute()` to to evaluate the graph in parallel. 

## When to use Dask DataFrames

Pandas is great for tabular datasets that fit in memory. If your data fits in memory then you should use Pandas. **Dask becomes useful when the dataset you want to analyze is larger than your machine's RAM** where you would normally run into `MemoryError`s.

```python
    MemoryError:  ...
```

During this tutorial, the example NYC dataset we're working with is only about 200MB so that you can download it in a reasonable time and exercises finish quickly, but Dask Dataframes will scale to datasets much larger than the memory on your local machine. Furthermore, as we'll talk about in the next notebook, Dask's distributed scheduler allows you to run the same DataFrame computation across a cluster.

# Getting started with Dask DataFrames

Let's use Dask DataFrame's to explore our NYC flight dataset. Dask's `read_csv` function will automatically example wildcard characters like `"*"` which can, for example, be used to load an entire directory of CSV files.

In [None]:
%run prep.py -d flights

In [None]:
import os

files = os.path.join('data', 'nycflights', '*.csv')
files

In [None]:
import dask.dataframe as dd

df = dd.read_csv(files,
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={"TailNum": str,
                        "CRSElapsedTime": float,
                        "Cancelled": bool})
df

Notice that the respresentation of the dataframe object contains no data - Dask has just done enough to read the start of the first file, and infer the column names and dtypes.

In [None]:
df.columns

In [None]:
df.dtypes

Dask DataFrames have an `.npartitions` attribute which tells you how many Pandas DataFrames make up a Dask DataFrame.

In [None]:
df.npartitions

Let's look at the task graph for our Dask DataFrame to get a sense for where these paritions are coming from:

In [None]:
df.visualize()

Each paritition in our Dask DataFrame is the result of calling Pandas' `read_csv` on an input CSV file in our dataset.

We can view the start of the data with `df.head()`

In [None]:
df.head()

# Computations with Dask DataFrames

Since Dask DataFrames implement a Pandas-like API, we can write familiar looking Pandas code using our Dask DataFrames. For example, let's compute the largest flight depature delay.

In [None]:
max_delay = df["DepDelay"].max()
max_delay

The above cell looks exactly like what we would do using Pandas and constructs a task graph that we can compute in parallel. Let's look at the task graph to get a feel for how Dask's blocked algorithms work:

In [None]:
max_delay.visualize()

To evaluate the result for `max_delay`, call its `compute()` method:

In [None]:
%time max_delay.compute()

This writes the delayed computation for us and then runs it.  

Some things to note:

1.  As with `dask.delayed`, we need to call `.compute()` when we're done.  Up until this point everything is lazy.
2.  Dask will delete intermediate results (like the full pandas dataframe for each file) as soon as possible.
    -  This lets us handle datasets that are larger than memory
    -  This means that repeated computations will have to load all of the data in each time (run the code above again, is it faster or slower than you would expect?)
    
As with `Delayed` objects, you can view the underlying task graph using the `.visualize` method:

# Exercises

In this section we do a few Dask DataFrame computations. If you are comfortable with Pandas, then these should be very familiar. You will have to think about when to call `compute`.

## Exercise 1: In total, how many non-canceled flights were taken?


In [None]:
# Your solution here

In [None]:
%load solutions/dataframe-1.py

## Exercise 2: In total, how many non-cancelled flights were taken from each airport?

*Hint*: use [`df.groupby`](https://pandas.pydata.org/pandas-docs/stable/groupby.html).

In [None]:
# Your solution here

In [None]:
%load solutions/dataframe-2.py

## Exercise 3: What was the average departure delay from each airport?

Note, this is the same computation you did in the previous notebook (is this approach faster or slower?)

In [None]:
# Your solution here

In [None]:
%load solutions/dataframe-3.py

## Exercise 4: What day of the week has the worst average departure delay?

In [None]:
# Your solution here

In [None]:
%load solutions/dataframe-4.py

# Performance tip: Share intermediate results

When doing computations in the above exercises, we sometimes did the same operation more than once (e.g. `read_csv`). For most operations, Dask DataFrames hashes the arguments, allowing duplicate computations to be shared, and only computed once.

For example, let's compute the mean and standard deviation for departure delay of all non-canceled flights. Since Dask operations are lazy, those values aren't the final results yet. They're just the recipe required to get the result.

If we compute them with two calls to compute, there is no sharing of intermediate computations.

In [None]:
non_cancelled = df[~df["Cancelled"]]
mean_delay = non_cancelled["DepDelay"].mean()
std_delay = non_cancelled["DepDelay"].std()

In [None]:
%%time

mean_delay_result = mean_delay.compute()
std_delay_result = std_delay.compute()

Let's now try computing `mean_delay` and `std_delay` with a single `dask.compute` call.

In [None]:
import dask

%time mean_delay_res, std_delay_res = dask.compute(mean_delay, std_delay)

Using `dask.compute` takes roughly 1/2 the time. This is because the task graphs for both results are merged when calling `dask.compute`, allowing shared operations (like `read_csv`) to only be done once instead of twice. In particular, using `dask.compute` only does the following once:

- The calls to `read_csv`
- The filter (`df[~df["Cancelled"]]`)
- The `"DepDelay"` column indexing
- Some of the necessary reductions (`sum`, `count`)

To see what the merged task graphs between multiple results look like (and what's shared), you can use the `dask.visualize` function:

In [None]:
dask.visualize(mean_delay, std_delay)

# Applying custom code to Dask DataFrames: Converting `CRSDepTime` to a timestamp

This dataset stores timestamps as `HHMM`, which are read in as integers in `read_csv`:

In [None]:
crs_dep_time = df.CRSDepTime.head(10)
crs_dep_time

To convert these to timestamps of scheduled departure time, we need to convert these integers into `pd.Timedelta` objects, and then combine them with the `Date` column.

In pandas we'd do this using the `pd.to_timedelta` function, and a bit of arithmetic:

In [None]:
import pandas as pd

# Get the first 10 dates to complement our `crs_dep_time`
date = df.Date.head(10)

# Get hours as an integer, convert to a timedelta
hours = crs_dep_time // 100
hours_timedelta = pd.to_timedelta(hours, unit='h')

# Get minutes as an integer, convert to a timedelta
minutes = crs_dep_time % 100
minutes_timedelta = pd.to_timedelta(minutes, unit='m')

# Apply the timedeltas to offset the dates by the departure time
departure_timestamp = date + hours_timedelta + minutes_timedelta
departure_timestamp

### Custom code and Dask Dataframe

We could swap out `pd.to_timedelta` for `dd.to_timedelta` and do the same operations on the entire dask DataFrame. But let's say that Dask hadn't implemented a `dd.to_timedelta` that works on Dask DataFrames. What would you do then?

Dask DataFrames have a few methods to make applying custom functions to Dask DataFrames easier:

- [`map_partitions`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions)
- [`map_overlap`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_overlap)
- [`reduction`](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.reduction)

Here we'll just be discussing `map_partitions`, which we can use to implement `to_timedelta` on our own:

In [None]:
# Look at the docs for `map_partitions`

df.CRSDepTime.map_partitions?

The basic idea is to apply a function that operates on a DataFrame to each partition.
In this case, we'll apply `pd.to_timedelta`.

In [None]:
hours = df.CRSDepTime // 100
# hours_timedelta = pd.to_timedelta(hours, unit='h')
hours_timedelta = hours.map_partitions(pd.to_timedelta, unit='h')

minutes = df.CRSDepTime % 100
# minutes_timedelta = pd.to_timedelta(minutes, unit='m')
minutes_timedelta = minutes.map_partitions(pd.to_timedelta, unit='m')

departure_timestamp = df.Date + hours_timedelta + minutes_timedelta

In [None]:
departure_timestamp

In [None]:
departure_timestamp.head()

## Exercise 5: Rewrite above to use a single call to `map_partitions`

This will be slightly more efficient than two separate calls, as it reduces the number of tasks in the graph.

In [None]:
# Your solution here

def compute_departure_timestamp(df):
    # TODO: implement this
    pass

departure_timestamp = df.map_partitions(compute_departure_timestamp)
departure_timestamp.head()

In [None]:
%load solutions/dataframe-5.py

# Additional Resources

* [DataFrame documentation](https://docs.dask.org/en/latest/dataframe.html)
* [DataFrame screencast](https://youtu.be/AT2XtFehFSQ)
* [DataFrame API](https://docs.dask.org/en/latest/dataframe-api.html)
* [DataFrame examples](https://examples.dask.org/dataframe.html)
* [Pandas documentation](https://pandas.pydata.org/pandas-docs/stable/)