![IE](../img/ie.png)

# Extra: Big Data Science in Python with Dask

### Juan Luis Cano Rodríguez <jcano@faculty.ie.edu> - Master in Business Analytics and Big Data (2020-06-12)

## Larger than RAM `DataFrame`s 

A decade ago, **pandas** brought a revolution by enabling Python users to perform data analysis and manipulation in desktop computers and laptops. However, by that time CPU speed evolution had already slowed down dramatically, and technologies like Hadoop had appeared to enable manipulation of massive datasets ("Big Data").

![Clock speeds](../img/clock.jpg)

Essentially two axis of improvement exist to accelerate the performance of data analysis pipelines:

1. **Scale up**: Better machines, faster CPUs. Incremental progress over the latest years, but no longer exponential growth (Moore's law)
2. **Scale out**: More machines, parallel processing, GPUs, TPUs. More complexity in heterogeneous setups, not all tasks can be easily parallelized.

![Scale](../img/scale.png)

The ecosystem of solutions that try to improve data analysis workflows in Python is vast, both for scaling up and scaling out. In this session we will study Dask, a Python library and ecosystem that leverages existing APIs (NumPy, pandas) and extends them to support parallel computing.

# Dask

<img src="../img/dask.svg" width="300px" />

From the documentation:

> Dask is a flexible library for parallel computing in Python.
>
> Dask emphasizes the following virtues:
> 
> - **Familiar**: Provides parallelized NumPy array and Pandas DataFrame objects
> - **Flexible**: Provides a task scheduling interface for more custom workloads and integration with other projects.
> - **Native**: Enables distributed computing in pure Python with access to the PyData stack.
> - **Fast**: Operates with low overhead, low latency, and minimal serialization necessary for fast numerical algorithms
> - **Scales up**: Runs resiliently on clusters with 1000s of cores
> - **Scales down**: Trivial to set up and run on a laptop in a single process
> - **Responsive**: Designed with interactive computing in mind, it provides rapid feedback and diagnostics to aid humans

![Dask overview](../img/dask-overview.svg)

## Installation

Installation is trivially simple. With conda:

```
$ conda install dask
```

or, alternatively, with pip:

```
$ pip install dask[complete]
```

## Lazy evaluation

Let's do a simple example with `dask.array` to understand how lazy evaluation works.

If we try to perform any operation on this array, it does not execute immediately:

Instead, Dask builds a task graph with all the necessary operations and its dependencies so we can visualize it and reason about it. This graph is stored in common Python data structures like dicts, lists, tuples and arrays:

And we can visualize it:

If we want to actually perform the operation, we need to call `.compute()`:

Which is the same API we use to transform our Dask array into a normal NumPy array:

## Demo: NYC taxis

Apart from `dask.array` mimicking the NumPy API, we also have `dask.dataframe` that behaves like pandas DataFrames.

<img src="../img/dask-dataframe.svg" width="300px" />

To study how do they work, we will use a subset of the NYC taxi dataset:

```
https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-01.csv
https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-02.csv
https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-03.csv
https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-04.csv
```

(see https://github.com/toddwschneider/nyc-taxi-data for more info)

## Prerequisites

These will be the packages we will need:

```
$ conda install dask python-graphviz ipywidgets notebook
```

## Getting started

In [None]:
!du ../data/yellow*.csv -h -s

In [None]:
payment_types = {
    1: "Credit Card",
    2: "Cash",
    3: "No Charge",
    4: "Dispute",
    5: "Unknown",
    6: "Voided trip"
}

### Exercise

1. Create a new `tip_fraction` column with the tip amount divided by the fare amount (_note: use only nonzero values of both to avoid division errors and biased results_)
2. Group the dataframe by pickup hour, and compute the mean `tip_fraction` by pickup hour
3. Visualize the mean `tip_fraction` by pickup hour in a line plot

## Limitations

What does this warning mean? In Dask, some operations are partition-sensitive https://docs.dask.org/en/latest/dataframe-design.html#partitions. Some of them will give us a warning, and some others will fail directly:

<div class="alert alert-danger">This will load all the data in RAM! Do not run it if you are in a constrained environment</div>

In [None]:
# daily_mean.compute()