# 4.2 Introducing Dask

<img src="./images/Pandas_Dask_DataFrames.png" width="1000"/>

## *Subjects covered*

* Warming up with a short example of data cleaning using Dask DataFrames
* Visualizing DAGs generated by Dask workloads with graphviz
* Exploring how the Dask task scheduler applies the concept of DAGs to coordinate execution of code


## *Content*

- [Installing Dask and additional packages](#Installing-Dask-and-additional-packages)
- [Hello Dask - A first look at the DataFrame API](#Hello-Dask---A-first-look-at-the-DataFrame-API)
    - [Examining the metadata of Dask objects](#Examining-the-metadata-of-Dask-objects)
    - [Running computations with the compute method](#Running-computations-with-the-compute-method)
    - [Making complex computations more efficient with persist](#Making-complex-computations-more-efficient-with-persist)
- [Visualising DAGs](#Visualising-DAGs)
    - [Visualising a simple DAG using Dask Delayed objects](#Visualising-a-simple-DAG-using-Dask-Delayed-objects)
    -  [Visualalising more complex DAGs with loops and collections](#Visualising-more-complex-DAGs-with-loops-and-collections)
    - [Reducing DAG complexity with persist](#Reducing-DAG-complexity-with-persist)
- [Task scheduling](#Task-scheduling)
- [Summary](#Summary)

## Installing Dask and additional packages

### With Anaconda - Dask and Dask-ML

* if Dask and Dask-ML are not already installed, type at the command line

`conda install dask`

`conda install dask-ml`

### With Anaconda - graphviz and pyarrow

`conda install -c conda-forge pyarrow`

`conda install -c conda-forge dill`

`conda install graphviz`

`conda install python-graphviz`

### Without Anaconda

This option may turn into a pain in the neck. It is strongly recommended to use Anaconda instead (see above). 

`pip install ipython jupyter dask graphviz python-graphviz pandas numpy matplotlib seaborn bokeh pyarrow sqlalchemy holoviews geoviews dask-ml nltk dill`

## Hello Dask - A first look at the DataFrame API

First attempt using Dask DataFrame API for exploratory analysis
* Reading a data file
* Scanning data for missing values
* Dropping columns 
    * that are missing too much data
    * are not useful for analysis

### Examining the metadata of Dask objects

First import all needed modules.

In [2]:
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from matplotlib import pyplot as plt

ImportError: No module named 'dask'

Next read data from csv-file. The files are stored in folder 'nyc-parking-tickets'. The follwoing command reads the 2017-file into a Dask DataFrame.

In [2]:
df = dd.read_csv('nyc-parking-tickets/*2017.csv')
df

Unnamed: 0_level_0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,Street Code2,Street Code3,Vehicle Expiration Date,Violation Location,Violation Precinct,Issuer Precinct,Issuer Code,Issuer Command,Issuer Squad,Violation Time,Time First Observed,Violation County,Violation In Front Of Or Opposite,House Number,Street Name,Intersecting Street,Date First Observed,Law Section,Sub Division,Violation Legal Code,Days Parking In Effect,From Hours In Effect,To Hours In Effect,Vehicle Color,Unregistered Vehicle?,Vehicle Year,Meter Number,Feet From Curb,Violation Post Code,Violation Description,No Standing or Stopping Violation,Hydrant Violation,Double Parking Violation
npartitions=33,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1
,int64,object,object,object,object,int64,object,object,object,int64,int64,int64,int64,float64,int64,int64,int64,object,object,object,object,object,object,object,object,object,int64,int64,object,object,object,object,object,object,float64,int64,object,int64,object,object,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


**Inspecting the Dask DataFrame**

<img src="./images/Inspecting_Dask_DataFrame.png" width="900"/>

**Why we see what we see**

* Pandas would display a **sample** of the data
* Dask shows **only metadata** of DataFrame
* Dask tries to **intelligently infer** datatypes from the data, just as Pandas does
* Its ability to do so accurately is **limited** by the fact that Dask was built to handle **medium** and **large** datasets that **can’t be loaded into RAM at once**.
* Large datasets that could be **scattered across multiple physical machines** in a **distributed filesystem**
* Dask DataFrames **employ random sampling methods** to profile and infer datatypes from a **small sample** of the data

**Best practice**: explicitly set datatypes rather than relying on Dasks inference process

**More information provided by the DataFrame metadata**

The DataFrame’s metadata provides insight into how Dask’s scheduler is deciding to break up the work of processing this file.

*  The `npartitions` value shows how many partitions the DataFrame is split into
* The 2017 file
    * is slightly over 2 GB in size
    * ==> 33 partitions of roughly 64 MB
* Instead of loading entire file into RAM all at once, each Dask worker thread will work on processing the file one 64 MB chunk at a times
* These chunks are called partitions
* See figure in next slide

<img src="./images/Dask_DataFrame_partitions.png" width="900"/>

**Dask splits large data files into multiple partitions and works on one partition at a time**

* The DataFrame consists of **two** partitions
* Therefore, the **single Dask DataFrame** is made up of **two smaller Pandas DataFrames**
* Each partition can be loaded **into memory** and worked on **one at a time** or **in parallel**
* In this case, the worker node **first** picks up partition 1 and processes it, and **saves the result** in a **temporary holding space**
* **Next** it picks up partition 2 and processes it, saving the result to a **temporary holding space**
* **Finally**, it **combines** the results and **ships it down to our client**, which **displays** the result
* The worker node **can work on smaller pieces** of the data at a time
    * work can be **distributed out to many machines**
    * or, in the case of a **local cluster**, work can proceed on very large datasets **without resulting in out-of-memory errors**

* The DataFrame consists of 99 tasks
* ==> Dask created a DAG with 99 nodes to process the data
* The graph consists of 99 nodes because each partition requires three operations to be created
    * reading the raw data
    * splitting the data into the appropriatly sized block
    * initialise the underlying DataFrame object



* In total, 33 partitions with 3 tasks per partition results in 99 tasks
* If we had 33 workers in our worker pool, the entire file could be worked on simultaneously
* With just one worker, Dask will cycle through each partition one at a time



**Counting missing values in a DataFrame**

In [3]:
missing_values = df.isnull().sum()
missing_values

Dask Series Structure:
npartitions=1
Date First Observed    int64
Violation Time           ...
dtype: int64
Dask Name: dataframe-sum-agg, 166 tasks

**Code comments**

* Syntax for counting null values looks a lot like Pandas
* However, the resulting Series object doesn’t give us the output we might expect
* Instead of getting the missing counts, Dask returns some metadata information about the expected result
* Dask hasn’t actually done any processing yet because it uses *lazy computation*
* This means that what Dask has actually done under the hood is prepare another DAG, which was then stored in the `missing_values` variable
* The data isn’t computed until the task graph is explicitly executed
* This behavior makes it possible to build up complex task graphs quickly without having to wait for each intermediate step to finish



* The tasks count has grown to a total 166 now
    * 99 tasks from the DAG used to read the data file and create the DataFrame named `df`
    * 66 tasks (2 per partition) to check for nulls and sum
    * 1 task to collect all the pieces together into a single Series object and return the answer

**Calculate the percent of missing values in the DataFrame**

In [4]:
missing_count = ((missing_values / df.index.size) * 100)
missing_count

Dask Series Structure:
npartitions=1
Date First Observed    float64
Violation Time             ...
dtype: float64
Dask Name: mul, 235 tasks

**Note**
* The number of tasks has increased again
* The datatype of the resulting Series changed from `int64` to `float64`, because of the division operation

### Running computations with the compute method

In [5]:
with ProgressBar():
    missing_count_pct = missing_count.compute()
missing_count_pct

[#####################################   ] | 92% Completed |  1min  1.3s

  args2 = [_execute_task(a, cache) for a in args]


[########################################] | 100% Completed |  1min  3.1s


Summons Number                         0.000000
Plate ID                               0.006739
Registration State                     0.000000
Plate Type                             0.000000
Issue Date                             0.000000
Violation Code                         0.000000
Vehicle Body Type                      0.395361
Vehicle Make                           0.676199
Issuing Agency                         0.000000
Street Code1                           0.000000
Street Code2                           0.000000
Street Code3                           0.000000
Vehicle Expiration Date                0.000000
Violation Location                    19.183510
Violation Precinct                     0.000000
Issuer Precinct                        0.000000
Issuer Code                            0.000000
Issuer Command                        19.093212
Issuer Squad                          19.101506
Violation Time                         0.000583
Time First Observed                   92

* Use `.compute()` to make Dask compute the result and display the results
* The DAG is a logical representation of the results, but the actual results aren't calculated (materialised) until explicitly computed

Now let's drop all columns with 60% missing values or more. Note that this is an arbitrarily picked limit for this example.

In [6]:
columns_to_drop = missing_count_pct[missing_count_pct > 60].index
with ProgressBar():
    df_dropped = df.drop(columns_to_drop, axis=1).persist()

[#####################################   ] | 93% Completed | 49.7s

  args2 = [_execute_task(a, cache) for a in args]


[########################################] | 100% Completed | 51.0s


Note that the resulting object is a Pandas object.

In [7]:
columns_to_drop

Index(['Time First Observed', 'Intersecting Street', 'Violation Legal Code',
       'Unregistered Vehicle?', 'Meter Number',
       'No Standing or Stopping Violation', 'Hydrant Violation',
       'Double Parking Violation'],
      dtype='object')

In [8]:
type(columns_to_drop)

pandas.core.indexes.base.Index

* Pandas objects and Dask objects can be mixed, since each each partition of a Dask DataFrame is a Pandas DataFrame.
* The Pandas Series objects is made available to all threads, so they can use it in their computations
* With a cluster, the Pandas Series object will be serialised and broadcasted to all worker nodes

### Making complex computations more efficient with persist

* As soon as a node in the active task graph emits results, its intermediate work is discarded in order to minimize memory usage
* It would be inefficient to re-read the columns into memory every time we want to make an additional calculation just to drop them again
* If we want to do something additional with the filtered data (for example, look at the first five rows of the DataFrame), we would have to go to the trouble of re-running the entire chain of transformations again
* To avoid repeating the same calculations many times over, Dask allows us to store intermediate results of a computation so they can be reused. Using the `.persist()` method of the Dask DataFrame tells Dask to try to keep as much of the intermediate result in memory as possible
* In case Dask needs some of the memory being used by the persisted DataFrame, it will select a number of partitions to drop from memory
* These dropped partitions will be recalculated on the fly when needed, and although it may take some time to recalculate the missing partitions, it is still likely to be much faster than recomputing the entire DataFrame

Now let's check the top rows of the new DataFrame.

In [9]:
df_dropped.head(10)

Unnamed: 0,Summons Number,Plate ID,Registration State,Plate Type,Issue Date,Violation Code,Vehicle Body Type,Vehicle Make,Issuing Agency,Street Code1,...,Law Section,Sub Division,Days Parking In Effect,From Hours In Effect,To Hours In Effect,Vehicle Color,Vehicle Year,Feet From Curb,Violation Post Code,Violation Description
0,5092469481,GZH7067,NY,PAS,07/10/2016,7,SUBN,TOYOT,V,0,...,1111,D,,,,GY,2001,0,,FAILURE TO STOP AT RED LIGHT
1,5092451658,GZH7067,NY,PAS,07/08/2016,7,SUBN,TOYOT,V,0,...,1111,D,,,,GY,2001,0,,FAILURE TO STOP AT RED LIGHT
2,4006265037,FZX9232,NY,PAS,08/23/2016,5,SUBN,FORD,V,0,...,1111,C,,,,BK,2004,0,,BUS LANE VIOLATION
3,8478629828,66623ME,NY,COM,06/14/2017,47,REFG,MITSU,T,10610,...,408,l2,Y,0700A,0700P,WH,2007,0,04,47-Double PKG-Midtown
4,7868300310,37033JV,NY,COM,11/21/2016,69,DELV,INTER,T,10510,...,408,h1,Y,0700A,0700P,WHITE,2007,0,31 6,69-Failure to Disp Muni Recpt
5,5096917368,FZD8593,NY,PAS,06/13/2017,7,SUBN,ME/BE,V,0,...,1111,D,,,,WH,2012,0,,FAILURE TO STOP AT RED LIGHT
6,1413609545,X20DCM,NJ,PAS,08/03/2016,40,SDN,TOYOT,P,54070,...,408,C3,BBBBBBB,ALL,ALL,WHITE,0,1,,
7,4628525523,326SF9,MA,PAS,12/21/2016,36,UT,BMW,V,0,...,1180,B,,,,,2001,0,,PHTO SCHOOL ZN SPEED VIOLATION
8,4627113330,HCA5464,NY,OMS,11/21/2016,36,SUBN,DODGE,V,0,...,1180,B,,,,BK,2016,0,,PHTO SCHOOL ZN SPEED VIOLATION
9,4006478550,VAD7274,VA,PAS,10/05/2016,5,4D,BMW,V,0,...,1111,C,,,,BLK,2008,0,,BUS LANE VIOLATION


## Visualising DAGs

* Let's peek "under the hood" and see the actual DAGs that the task schedulers create
* Dask uses the graphviz library to generate visual representations of the DAGs created by the task scheduler
* You will be able to inspect the DAG backing any Dask Delayed object by calling the `.visualize()` method on the object

### Visualising a simple DAG using Dask Delayed objects

* Let's step away from the Dask DataFrame object example (high-level API)
* Step down a level of abstraction to the Dask Delayed object (low-level API)
* The DAGs that Dask creates for even simple DataFrame operations can grow quite large and be hard to visualise
* Therefore, for convenience, we’ll use Dask Delayed objects for this example so we have better control over composition of the DAG

In [None]:
import dask.delayed as delayed
from dask.diagnostics import ProgressBar

def inc(i):
    return i + 1

def add(x, y):
    return x + y

x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)

#z.visualize()

<img src="./images/DAG_comp_01.png" width="300"/>

### Visualising more complex DAGs with loops and collections

In [None]:
def add_two(x):
    return x + 2

def sum_two_numbers(x,y):
    return x + y

def multiply_four(x):
    return x * 4

data = [1, 5, 8, 10]

step1 = [delayed(add_two)(i) for i in data]
total = delayed(sum)(step1)
#total.visualize()

* Here, instead of creating a Delayed object from a single function call, the Delayed constructor is placed inside a list comprehension that iterates over the list of numbers
* `step1` becomes a list of Delayed objects instead of a list of integers
* The `sum` function is passed  the list of Delayed objects
* As before, this code ultimately represents a graph
* Variable `total` is a Delayed object ==> can be visualised

<img src="./images/DAG_comp_02.png" width="650"/>

* Dask draws DAGs from the bottom up
* Four numbers in a list called `data` corresponds to four nodes at the bottom of the DAG
* The circles on the Dask DAGs represent function calls
    * `add_two` called four times
    * `sum` called one time
* Squares on the DAG represent intermediate results
* Just like with the DataFrame, Dask doesn’t actually compute the answer until the `.compute()` method was called on the total object


**DAG with values superimposed over the computation**

<img src="./images/DAG_comp_03.png" width="800"/>

Add another degree of complexity to the DAG by multiplying every number by four before collecting the result.

In [None]:
def add_two(x):
    return x + 2

def sum_two_numbers(x,y):
    return x + y

def multiply_four(x):
    return x * 4

data = [1, 5, 8, 10]

step1 = [delayed(add_two)(i) for i in data]
step2 = [delayed(multiply_four)(j) for j in step1]
total = delayed(sum)(step2)
#total.visualize()

**DAG for Delayed object** `total`

<img src="./images/DAG_comp_04.png" width="600"/>

### Reducing DAG complexity with persist

Add another layer to the DAG by adding the sum in `total` back to each of the original numbers and then compute the sum of these number.

In [None]:
def add_two(x):
    return x + 2

def sum_two_numbers(x,y):
    return x + y

def multiply_four(x):
    return x * 4

data = [1, 5, 8, 10]

step1 = [delayed(add_two)(i) for i in data]
step2 = [delayed(multiply_four)(j) for j in step1]
total = delayed(sum)(step2)

data2 = [delayed(sum_two_numbers)(k, total) for k in data]
total2 = delayed(sum)(data2)
#total2.visualize()

**DAG for** `total2` **before using** `.persist()`

<img src="./images/DAG_comp_05.png" width="500"/>

**Why persisting calculations can help reduce DAG complexity**

* Every time you call the `compute` method on a Delayed object, Dask will step through the **complete DAG** to generate the result
* May be OK for simple calculations, but becomes quickly inefficient with larger DAGs repeating calculations over and over again
* Workaround: persist intermediate results that need to be reused

**Rewrite code and use** `.persist()`

In [None]:
def add_two(x):
    return x + 2

def sum_two_numbers(x,y):
    return x + y

def multiply_four(x):
    return x * 4

data = [1, 5, 8, 10]

step1 = [delayed(add_two)(i) for i in data]
step2 = [delayed(multiply_four)(j) for j in step1]
total = delayed(sum)(step2)
total_persisted = total.persist()
#total_persisted.visualize()

data2 = [delayed(sum_two_numbers)(l, total_persisted) for l in data]
total2 = delayed(sum)(data2)
#total2.visualize()

**Chaining a DAG from a persisted DAG -** `total2` **after using** `.persist()`

<img src="./images/DAG_comp_06.png" width="800"/>

**DAG for NYC data**

<img src="./images/DAG_NYC_data.png" />

## Task scheduling

* Using `compute` advantageous considering the time it might take to churn through petabytes of data
*  Define the complete string of transformations that Dask should perform on the data without having to wait for one computation to finish before defining the next


**Lazy computations**
* Lazy computations allow Dask to split work into smaller logical pieces, which helps avoid loading the entire data structure that it’s operating on into memory
* NYC parking ticket example
    * Dask divided the 2 GB file into 33 64 MB chunks
    * Dask operated on 8 chuncks at a time
    * ==> maximum memory consumption for the entire operation didn’t exceed 512 MB
    * This is important as the size of the datasets you work on stretches into the terabyte and petabyte range

## Summary

- Computations on Dask DataFrames are structured by the **task scheduler** using **DAGs**.
- Computations are constructed **lazily**, and the *compute* method is called to execute the computation and retrieve the results.
- You can call the *visualise* method on **any** Dask object to see a **visual representation** of the underlying DAG.
- Computations can be **streamlined** by using the *persist* method to store and reuse **intermediate results** of complex computations.