<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">


# Dask DataFrame - parallelized pandas

Looks and feels like pandas, but **parallel and distributed** - `dask.dataframe`.

At its core, the  module implements a "blocked parallel" `DataFrame` object that looks and feels like the pandas API, but for parallel and distributed workflows. One Dask `DataFrame` is comprised of many in-memory pandas `DataFrame`s separated along the index. One operation on a Dask `DataFrame` triggers many pandas operations on the constituent pandas `DataFrame`s in a way that is mindful of potential parallelism and memory constraints.


<img src="https://docs.dask.org/en/stable/_images/dask-dataframe.svg"
     align="right"
     width="30%"
     alt="Dask DataFrame is composed of pandas DataFrames"/>

## When to use `dask.dataframe`

pandas is great for tabular datasets that fit in memory. A general rule of thumb for pandas is:

> "Have 5 to 10 times as much RAM as the size of your dataset"
>
> ~ Wes McKinney (2017) in [10 things I hate about pandas](https://wesmckinney.com/blog/apache-arrow-pandas-internals/)

Here "size of dataset" means dataset size on _the disk_.

Dask becomes useful when the datasets exceed the above rule.

## Create datasets

Download the dataset you will be using in this notebook:

In [None]:
%run prep.py -d higgs

## Set up your local cluster

Create a local Dask cluster and connect it to the client. Don't worry about this bit of code for now, you will learn more in the Distributed notebook.

In [None]:
import dask
from dask.distributed import Client

client = Client(n_workers=4)
client

### Dask Diagnostic Dashboard

Dask Distributed provides a useful Dashboard to visualize the state of your cluster and computations.

If you're on **JupyterLab or Binder**, you can use the [Dask JupyterLab extension](https://github.com/dask/dask-labextension) (which should be already installed in your environment) to open the dashboard plots:
* Click on the Dask logo in the left sidebar
* Click on the magnifying glass icon, which will automatically connect to the active dashboard (if that doesn't work, you can type/paste the dashboard link http://127.0.0.1:8787 in the field)
* Click on **"Task Stream"**, **"Progress Bar"**, and **"Worker Memory"**, which will open these plots in new tabs
* Re-organize the tabs to suit your workflow!

Alternatively, click on the dashboard link displayed in the Client details above: http://127.0.0.1:8787/status. It will open a new browser tab with the Dashboard.

## Reading and working with datasets

Let's read in simulated pp collisions from the LHC and plot the Higgs peak in the invariant mass spectrum.

By convention, we import the module `dask.dataframe` as `dd`, and call the corresponding `DataFrame` object `ddf`.

**Note**: The term "Dask DataFrame" is slightly overloaded. Depending on the context, it can refer to the module or the DataFrame object. To avoid confusion, throughout this notebook:
- `dask.dataframe` (note the all lowercase) refers to the API, and
- `DataFrame` (note the CamelCase) refers to the object.

The following filename includes a glob pattern `*`, so all files in the path matching that pattern will be read into the same `DataFrame`.

In [None]:
import dask.dataframe as dd

higgs = dd.read_csv("data/higgs/higgs_*.csv")
higgs

Dask has not loaded the data yet, it has:
- investigated the input path and found that there are ten matching files
- intelligently created a set of jobs for each chunk -- one per original CSV file in this case

Notice that the representation of the `DataFrame` object contains no data - Dask has just done enough to read the start of the first file, and infer the column names and dtypes.

Some functions like `len` and `head` also trigger a computation. Specifically, calling `len` will:
- load actual data, (that is, load each file into a pandas DataFrame)
- then apply the corresponding functions to each pandas DataFrame (also known as a partition)
- combine the subtotals to give you the final grand total

In [None]:
# load and count number of rows
len(higgs)

You can view the start and end of the data as you would in pandas:

In [None]:
higgs.head()

## Computations with `dask.dataframe`

Let's compute the maximum `photon1_pt` in the dataset.

With just pandas, we would loop over each file to find the individual maximums, then find the final maximum over all the individual maximums.

In [None]:
import os
import pandas as pd

maxes = []

for file in os.listdir("data/higgs"):
    df = pd.read_csv(f"data/higgs/{file}")
    maxes.append(df.diphoton_pt.max())
    
print(max(maxes))

`dask.dataframe` lets us write pandas-like code, that operates on larger-than-memory datasets in parallel.

In [None]:
higgs.diphoton_pt.max().compute()

Lets do something even more exciting and plot the spectrum of the invariant di-photon mass for all events which have a leading photon with a transverse momentum greater than 80 GeV.

In [None]:
pt_cut = higgs["photon1_pt"] > 80e3
selected_myy = higgs["diphoton_mass"][pt_cut]

### Lazy Evaluation

Most Dask Collections, including Dask `DataFrame` are evaluated lazily, which means Dask constructs the logic (called task graph) of your computation immediately but "evaluates" them  only when necessary. You can view this task graph using `.visualize()`.

You will learn more about this in the Delayed notebook, but for now, note that we need to call `.compute()` to trigger actual computations.

So the code executed immediately because the only thing that happended is, that the computational graph was constructed. Let's first have a look at the graph

In [None]:
selected_myy.visualize()

Now, let's perform the computation

In [None]:
%time
selected_myy_materialized = selected_myy.compute()

and plot the invariant mass spectrum

In [None]:
from matplotlib import pyplot as plt

_ = plt.hist(selected_myy_materialized, bins=50)
plt.xlabel("Diphoton mass (MeV)")
plt.ylabel("Counts")

## Sharing Intermediate Results

When computing all of the above, we sometimes did the same operation more than once. For most operations, `dask.dataframe` stores the arguments, allowing duplicate computations to be shared and only computed once.

For example, let's compute the mean and standard deviation for departure delay of all non-canceled flights. Since Dask operations are lazy, those values aren't the final results yet. They're just the steps required to get the result.

If you compute them with two calls to compute, there is no sharing of intermediate computations.

In [None]:
pt_cut = higgs["photon1_pt"] > 80e3
selected_myy = higgs["diphoton_mass"][pt_cut]
mean_myy = selected_myy.mean()
std_myy = selected_myy.std()

In [None]:
%%time

mean_myy_res = mean_myy.compute()
std_myy_res = std_myy.compute()

### `dask.compute`

But let's try by passing both to a single `compute` call.

In [None]:
%%time

mean_myy_res, std_myy_res = dask.compute(mean_myy, std_myy)

Using `dask.compute` takes roughly 1/2 the time. This is because the task graphs for both results are merged when calling `dask.compute`, allowing shared operations to only be done once instead of twice. In particular, using `dask.compute` only does the following once:

- the calls to `read_csv`
- the filter (`[pt_cut]`)
- some of the necessary reductions (`sum`, `count`)

### `.persist()`

While using a distributed scheduler (you will learn more about schedulers in the upcoming notebooks), you can keep some _data that you want to use often_ in the _distributed memory_. 

`persist` generates "Futures" (more on this later as well) and stores them in the same structure as your output. You can use `persist` with any data or computation that fits in memory.

In [None]:
selected_myy = selected_myy.persist()  # returns back control immediately

## Close your local Dask Cluster

It's good practice to always close any Dask cluster you create:

In [None]:
client.shutdown()