<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">

# Personal Dask Cluster

Go to `<cluster url>` for your own Dask cluster. Use your first and last name provided to SciPy for the username and "dask" for the password.
(We don't actually do any authentication).

## Dask Quickstart

Dask scales python.
Today, we'll focus on how it scales pandas, but know that it's more general.

Pandas is fundamentally for in-memory datasets.
You can't have a DataFrame larger than your machine's RAM.

Dask dataframe lets you work with larger than memory datasets.
Dask breaks large problems into many small problems (task graph).
It then executes those small problems in parallel and in a small memory footprint (scheduler).
It provides user interfaces, like `dask.dataframe` or `dask.array`, which feel like NumPy and pandas.

In [None]:
import numpy as np
import pandas as pd
import dask.dataframe as dd

df = dd.from_pandas(
    pd.DataFrame({'A': np.random.choice(['a', 'b', 'c'], size=100),
                  'B': np.random.randn(100),
                  'C': np.random.uniform(size=100)}),
    npartitions=4
)
df

A dask dataframe has most of the same methods as pandas.

In [None]:
df.B + df.C

In [None]:
df[['B', 'C']].sum()

Dask DataFrame's methods are lazy.
This lets Dask build up a large chain of operations that can be executed in parallel.
When you say `df.sum()`, instead of computing the sum immediately, Dask builds up a *task graph*.

```python
df[['B', 'C']].sum().visualize(rankdir='LR')
```

<img src="graph.png">

When you're ready for a concrete result, call `compute`.

In [None]:
df[['B', 'C']].sum().compute()

Calling `compute` hands the task graph to the *scheduler*, which executes the graph in parallel.
Dask has several schedulers, depending on how you want to do the computation (using many threads, processes, or machines).
We'll be using the distributed scheduler, so we can see how dask scales pandas to a cluster of machines.
But Dask also works well on a single machine.
You write normal pandas operations, but the computation happens in a low-memory footprint.

# Distributed DataFrames and Efficiency

We will cover the following topics:

1. Persist common intermediate results in memory with `persist`
2. Partitions and partition size
3. Using indices to improve efficiency

In [None]:
from dask_kubernetes import KubeCluster
from dask.distributed import Client

The next cell will start up some workers for you. This make take a few minutes, but they widget will update automatically when the workers are ready. You don't need to do anything with the manual or adaptive scaling.

In [None]:
cluster = KubeCluster(n_workers=8)
cluster

**Be sure to open the diagnostics UI.**

In [None]:
client = Client(cluster)
client

## Moving to distributed

A few things change when moving from local to distributed computing.

1. Environment: Each worker is a separate machine, and needs to have the required libraries installed. This cluster was setup using [Kubernetes](http://dask.pydata.org/en/latest/setup/kubernetes.html#).
2. File system: Previously, every worker (threads, processes, or even the distributed scheduler in local mode) had access to your laptops file system. In a distributed environment, you'll need some kind of shared file system to read data (cloud storage like S3 or GCS, or a network file system)
3. Communication: Moving data between machines is relatively expensive. When possible, the distributed scheduler will ensure that tasks are scheduled to be run on workers that already have the required data. But some tasks will require data from multiple machines.

## The full airline dataset

We have the full airline dataset stored on `GCS`. This is the same as the one you've been working with, but includes all originating airports and a few extra columns. We change the `read_csv` call slightly to avoid the extra columns.

`dask.dataframe` has support for reading directly from `GCS`, so we can use our `read_csv` call from before.

In [None]:
import dask.dataframe as dd

columns = ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'DepTime', 'CRSDepTime',
           'ArrTime', 'CRSArrTime', 'UniqueCarrier', 'FlightNum', 'TailNum',
           'ActualElapsedTime', 'CRSElapsedTime', 'AirTime', 'ArrDelay',
           'DepDelay', 'Origin', 'Dest', 'Distance', 'TaxiIn', 'TaxiOut',
           'Cancelled']

df = dd.read_csv('gcs://anaconda-public-data/airline/(199)|(200)*.csv',
                 parse_dates={'Date': [0, 1, 2]},
                 dtype={'TailNum': object,
                        'CRSElapsedTime': float,
                        'Distance': float,
                        'Cancelled': bool},
                 usecols=columns)

In [None]:
df.head()

### Persist data in distributed memory

Every time we run an operation like `df[~df.Cancelled].DepDelay.max().compute()` we read through our dataset from disk.  This can be slow, especially because we're reading data from CSV.  We usually have two options to make this faster:

1.  Persist relevant data in memory, either on our computer or on a cluster
2.  Use a faster on-disk format, like HDF5 or Parquet

In this section we persist our data in memory.  On a single machine this is often done by doing a bit of pre-processing and data reduction with dask dataframe and then `compute`-ing to a Pandas dataframe and using Pandas in the future.  

```python
df = dd.read_csv(...)
df = df[df.Origin == 'LGA']  # filter down to smaller dataset
pdf = df.compute()  # convert to pandas
pdf ... # continue with familiar Pandas workflows
```

However on a distributed cluster when even our cleaned data is too large we still can't use Pandas.  In this case we ask Dask to persist data in memory with the `dask.persist` function.  This is what we'll do today.  This will help us to understand when data is lazy and when it is computing.

You can trigger computations using the persist method:

    x = x.persist()

or the dask.persist function for multiple inputs:

    x, y = dask.persist(x, y)

### Exercise

Persist the dataframe into memory.

- How long does the cell take to execute (look at the "busy" indicator in the top-right)?
- After it has persisted how long does it take to compute `df[~df.Cancelled].DepDelay.count().compute()`?
- Looking at the plots in the diagnostic web page (the link was printed above), what is taking up most of the time? (You can over over rectangles to see what function they represent)

In [None]:
df = # TODO: persist dataframe in memory

In [None]:
%time _ = df.Cancelled[~df.Cancelled].count().compute()

### Exercise

Repeat the groupby computation from the previous notebooks. What is taking all of the time now?

In [None]:
# What was the average departure delay from each airport?
df[~df.Cancelled].groupby('Origin').DepDelay.mean().nlargest(10).compute()

## Partitions

One `dask.dataframe` is composed of several Pandas dataframes.  The organization of these dataframes can significantly impact performance.  In this section we discuss two common factors that commonly impact performance:

1. The number of Pandas dataframes can affect overhead.  If the dataframes are too small then Dask might spend more time deciding what to do than Pandas spends actually doing it.  Ideally computations should take 100's of milliseconds.

2. If we know how the dataframes are sorted then certain operations become much faster

### Number of partitions and partition size

When we read in our data from CSV files we get potentially multiple Pandas dataframe for each file. Look at the metadata below to determine a few things about the current partitioning:
- How many partitions are there?
- Are the splits along the index between partitions known? If so, what are they?

In [None]:
# Number of partitions
df.npartitions

In [None]:
# Are the splits between partitions known?
df.known_divisions

In [None]:
# The splits between partitions. If unknown these are all `None`
df.divisions

### Exercise: How large is the DataFrame?

- How would you compute the memory usage of a single pandas DataFrame?
- Given your knowledge of Dask, how would you do it for a Dask DataFrame?

In [None]:
# Your code here...


In [None]:
%load memory-usage.py

## Sorted Index column

*This section doesn't have any exercises.  Just follow along.*

Many dataframe operations like loc-indexing, groupby-apply, and joins are *much* faster on a sorted index.  For example, if we want to get data for a particular day of data it *really* helps to know where that day is, otherwise we need to search over all of our data.

The Pandas model gives us a sorted index column.  Dask.dataframe copies this model, and it remembers the min and max values of every partition's index.

By default, our data doesn't have an index.

In [None]:
df.head()

So if we search for a particular day it takes a while because it has to pass through all of the data.

In [None]:
%time df[df.Date == '1992-05-05'].compute()

In [None]:
df[df.Date == '1992-05-05'].visualize(optimize_graph=True)

However if we set the `Date` column as the index then this operation can be much much faster.

Calling `set_index` followed by `persist` results in a new set of dataframe partitions stored in memory, sorted along the index column. To do this dask has to

- Shuffle the data by date, resulting in the same number of output partitions
- Set the index for each partition
- Store the resulting partitions in distributed memory

This can be a (relatively) expensive operation, but allows certain queries to be more optimized. 

Watch the diagnostics page while the next line is running to see how the shuffle and index operation progresses.

In [None]:
%%time
df = df.set_index('Date').persist()

After the index is set, we now have known divisions:

In [None]:
# Number of partitions
df.npartitions

In [None]:
# Are the splits between partitions known?
df.known_divisions

In [None]:
# The splits between partitions.
df.divisions

In [None]:
# The repr for a dask dataframe can also be useful for
# seeing partition information
df

Repeating the same query for all flights on a specific date, we can see that we're much faster after setting the index:

In [None]:
%time df.loc['1992-05-05'].compute()

If you look at the resulting graph, you can see that dask was able to optimize the computation to only look at a single partition:

In [None]:
df.loc['1992-05-05'].visualize(optimize_graph=True)

### Timeseries operations

When the index of a dask dataframe is a known `DatetimeIndes`, traditional pandas timeseries operations are supported. For example, now that we have a sorted index we can resample the `DepDelay` column into 1 month bins.

In [None]:
%matplotlib inline

In [None]:
%%time 
(df.DepDelay
   .resample('1M')
   .mean()
   .fillna(method='ffill')
   .compute()
   .plot(figsize=(10, 5)));

In [None]:
# When you're done with the `airlines` dataset
client.restart()

## Exercise: Explore the NYC Taxi dataset

We have some of the NYC Taxi ride dataset in parquet format stored in GCS.

In [None]:
taxi = dd.read_parquet("gcs://anaconda-public-data/nyc-taxi/nyc.parquet")
taxi.head()

Some questions?

- How large is the dataset? Will it fit in your cluster's RAM if you persist it?
- What's the average tip percent by hour?

In [None]:
# clean up, when finished with the notebook
client.close()
cluster.close()