<font color="white">.</font> | <font color="white">.</font> | <font color="white">.</font>
-- | -- | --
![RDS](https://library.columbia.edu/content/dam/templates/libraryweb/banners/banner_research-data-services.png) | <h1><font size="+6">Data Club</font></h1> | ![FRC](https://rcfoundations.research.columbia.edu/sites/default/files/logo/foundation.png)

---

<center><h1> <font color="green">Introduction to Dask</font></h1></center>

## <font color="grey">Helpful References</font>

- <a href="https://docs.dask.org/en/latest/why.html">Why Dask?</a>
- <a href="https://github.com/dask/dask-tutorial">dask-tutorial</a>
- <a href="https://www.manning.com/books/data-science-with-python-and-dask">Data Science with Python and Dask</a>
- <a href="https://www.manifold.ai/dask-and-machine-learning-preprocessing-tutorial">Dask and Machine Learning: Preprocessing Tutorial</a>
- <a href="https://carpentries-incubator.github.io/lesson-parallel-python/aio/index.html">Parallel Programming in Python</a>
- <a href="https://www.youtube.com/watch?v=uGy5gT2vLdI&feature=youtu.be"> Working with the Python DASK library (video)</a>
- <a href="https://www.youtube.com/watch?v=t_GRK4L-bnw&feature=youtu.be">Who uses Dask (video)</a>

* Note: This notebook borrows extensively from the Nasa Center for Climate Simulation Advanced Software Technology Group Python series.  Check out their full class list <a href="https://www.nccs.nasa.gov/nccs-users/user-events/python-classes">here</a>.

![fig_dask](https://miro.medium.com/max/1000/1*D6mSsdWECFLn6wJne4VTjg.png)


# <font color="green"> What is Dask?</font>

- A flexible library for parallel computing in Python 
- Sympatico with Numpy, Pandas, and Scikit-Learn 
- A python library to address these problems:
     * **Available data does not fit in a single machine's memory.**
     * **Data processing task needs to be sped up.**
- Orchestrates parallel threads or processes and help speed up processing times.

- Dask components and APIs in three layers: 
    * the scheduler
    * low-level APIs
    * and high-level APIs.

- High-level constructs 
    * Dask Bags
    * Dask DataFrames
    * Dask Arrays. 
    
- Highly customized job execution graphs & integration with existing data structures.


![fig_layers](http://bicortex.com/bicortex/wp-content/post_content//2019/06/Dask_APIs_Architecture.png)
Image Source: bicortex.com


![fig_proc](https://www.manifold.ai/hs-fs/hubfs/Blog%20Post%20Illos/ML%20pipelines%20-%20dask%20single%20machine.jpeg?width=600&name=ML%20pipelines%20-%20dask%20single%20machine.jpeg)
Image Source: www.manifold.ai


**Advantages of Using Dask**

- Matively scales NumPy, Pandas, and scikit-learn.
- Effective with both medium datasets on a single machine and large datasets on a cluster.
- A general framework for parallelizing Python objects.
- Low configuration and maintenance overhead.


**Recall on Processes and Threads**

- A process is an execution of a program. 
- A thread is a single execution sequence within the process.
- A process can contain multiple threads.
- Threads perform small tasks, whereas processes are used for more ‘heavyweight’ tasks. 

| Process | Thread |
| --- | --- |
| Resouce intensive | Uses fewer resources |
| Processes do not share memory with each other. | Threads share memory with each other within the same process. |
| Inter process communication is slow. | Inter thread communication is fast. |
| Expensive context switching between processes.  | Cheap context switching between threads of the same process. |
| If one is blocked, no other can execute until that one unblocks. | While one is blocked and waiting, a second in the same task can run. |
| Runs independently & is isolated from other processes | One thread can read, write or change another thread's data |

Regular Python can only run one thread at the time.

### Import Modules

In [None]:
!python -m pip install dask[dataframe] --upgrade

In [None]:
!pip install memory_profiler

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
import dask
import dask.array as da
import dask.dataframe as dd
from dask.diagnostics import ProgressBar 

In [None]:
print("Pandas version: ", pd.__version__)
print("Dask   version: ", dask.__version__)

In [None]:
from memory_profiler import memory_usage
import memory_profiler
%load_ext memory_profiler

# <font color="green"> Parallelize Code with `dask.delayed`</font>

- A simple way to parallelize the code.
- Delay function calls into a task graph with dependencies.
- Systems like `dask.dataframe` are built with `dask.delayed`.

**Simple Example**

Consider the following functions:

- Use the `dask.delayed` decorator to parallelize the functions `increment` and `add`.
- By decorating the functions, we record tasks to be computed later as graphs on parallel hardware.

- Call the delayed version by passing the arguments, as before, but the original function isn't called yet.
- A delayed object is made, which keeps track of the function to call and the arguments to pass it.
- The `visualize` method (needs the `graphviz` package) represents the operations to be performed.

- **total** not physically calculated yet.
- Apply the `compute` method to get the answer. 
- <font color="red">Only now are the data loaded into memory for calculations</font>.
- Calculations done using a local thread pool.

**Using `delayed` in Loops**

Consider the sequential code with two for-loops:

We can parallelize the above using the `delayed` decorator:

We can also get the visual representation through a task graph.

### Exercise 1

Use the `delayed` decorator to parallelize the code below:

### Example: Palindromic Words

- A palindromic word is a word which characters read the same backward as forward. 
- Some examples of palindromes are `redivider`, `deified`, `civic`, `radar`, `level`, `rotor`, `kayak`, `reviver`, `racecar`, `madam`, and `refer`.

We want to find the number of palindromes from a list of words.

In [None]:
string_list = ['redivider', 'deified', 'civic', 'radar', 'level',
               'rotor', 'kayak', 'reviver', 'racecar', 'madam', 'refer',
               'Being',  'man', 'not', 'without', 'frequent', 'consciousness',
               'that', 'there', 'was', 'some', 'charm', 'this', 'life', 'stood',
               'still', 'after', 'looking', 'sky', 'useful', 'instrument',
               'regarded', 'appreciative', 'spirit', 'work', 'art',
               'superlatively', 'beautiful', 'moment', 'seemed',
               'impressed', 'with', 'speaking', 'loneliness', 'scene',
               'rather', 'complete', 'abstraction', 'from', 'compass',
               'sights', 'sounds', 'man', 'Human', 'shapes', 'interferences',
               'troubles', 'joys', 'were', 'they', 'were', 'there',
               'seemed', 'shaded', 'hemisphere', 'globe', 'sentient', 'being',
               'save', 'himself']

**Using Regular Python**

**Using Dask**

The same computations go faster with a Dask Bag:

**<font color="red">Important Lessons</font>**

- The `delayed` decorator adds overhead.
- No need to use it on fast tasks.
- Call `delayed` on the function, not the result.
- Break up computations into many pieces. You achieve parallelism by having many delayed calls: Dask will not parallelize the code inside a function decorated with `delayed`.

### Exercise 2

Use Dask to parallelize the code below (calculations of `pi`):

# <font color="red"> Dask Array</font>

- Dask arrays coordinate many Numpy arrays, arranged into chunks within a grid. 
    - _Parallel_: Uses all of the cores on your computer
    - _Larger-than-memory_: Lets you work with data larger than your available memory by breaking arrays into small pieces, operating on those pieces in an order that minimizes the computation's memory footprint, and efficiently streaming data from disk.
    - _Blocked Algorithms_: Perform large computations by performing many smaller computations
- They support a large subset of the Numpy API.

![fig_array](https://miro.medium.com/max/1388/1*JfQnXJ5_R104bPyE8_XhwQ.png)

**Create a Dask Array**

- Create a 20000x20000 array of random numbers, representing many numpy arrays of size 1000x1000 (or smaller if the array cannot be divided evenly). 
- There are 400 (20x20) numpy arrays of size 1000x1000.

We can use Numpy syntax:

Use the **`compute()`** function if you want your result as a NumPy array.

**Persist Data in Memory**

- If you have the available RAM for your dataset then you can persist data in memory.
- This allows future computations to be much faster.

**Numpy against Dask**

Reshapping the chunk size might provide a better performance:

**Dask finished faster, but used more total CPU time because Dask was able to transparently parallelize the computation because of the chunk size.**

**<font color="red">Things to Consider</font>**

- Use NumPy if your data fits in RAM to avoid Dask's extra layer of complexity.
- If you neeed speedups rather than scalability then use Numba for manipulating Numpy arrays.
- How to select the chunk size?
     - Too small: huge overheads.
     - Poorly aligned with data: inefficient reading.
     - Optimal chuck size at least 100 Mb.
     - Choose a chunk size large enough to reduce the number of chunks that Dask has to manage but small enough that many can fit in memory at once. Dask will often have as many chunks in memory as twice the number of active threads.
   

**Avoid Oversubscribing Threads**
     
- By default Dask runs as many concurrent tasks as you have logical cores. 
- It assumes that each task will consume about one core.
- Many array-computing libraries (used in Dask) are themselves multi-threaded, which can cause contention and low performance.
- For better performance, we need to explicitly specify the use of one thread:

```python
   export OMP_NUM_THREADS=1
   export MKL_NUM_THREADS=1
   export OPENBLAS_NUM_THREADS=1
```

## <font color="green">Memory Profiling</font>

- We use the `memory_profiler` package to track memory usage.
- It's written in python and monitors processes running python code as well as line by line memory usage. 
- We use the `memory_usage()` and pass the parameter `interval` to track the frequency of measuring the memory usage.

You also use Dask profiling options:

# <font color="red"> Dask DataFrames</font>

- Use Pandas for tabular data that fit in memory. 
- Dask DataFrames:
     - Coordinate many Pandas DataFrames, partitioned along an index. 
     - Support a large subset of the Pandas API.
     
     
- One operation on a Dask DataFrame triggers many Pandas operations on the constituent pandas DataFrames.
- Speedy Dask Dataframe operations:
     - Arithmetic operations 
     - Common aggregations (`mean`, `min`, `max`, `sum`, etc.)
     - Calling `apply`
     - Calling `value_counts()`, `drop_duplicates()` or `corr()`
     - Filtering with `loc`, `isin`, and row-wise selection

![fig_df](https://pythondata.com/wp-content/uploads/2016/11/Screen-Shot-2016-11-24-at-6.52.24-PM-168x300.png)

### <font color="green"> NYC Flights Dataset</font>

Data is specific to flights (in 1990's) out of the three airports in the New York City area.

Download the remote data:

In [None]:
import urllib.request

print("\t Downloading NYC dataset...", end="\n", flush=True)

url = "https://storage.googleapis.com/dask-tutorial-data/nycflights.tar.gz"
filename, header = urllib.request.urlretrieve(url, "nycflights.tar.gz")

print("\t Done!", flush=True)

Extract the `.csv` files from the tar file:

Read all the files at once:

- The representation of the dataframe object contains no data. 
- `pandas.read_csv` reads in the entire file before inferring datatypes.
- `dask.dataframe.read_csv` only reads in a sample from the beginning of the file (or first file). These inferred datatypes are then enforced when reading all partitions.

Let's view the first rows:

Trying to view last rows causes a problem:

- There is an issue with the data types of few columns.
- The datatypes inferred in the sample are incorrect.
- We can fix it by reading the files again and specify the appropriate data types.

### <font color="blue">Perform Operations as with `Pandas DataFrames`</font>

**Maximum value of a column**:

- Let's compute the maximum of the `DepDelay` column.
- With `Pandas`, we would loop over each file to find the individual maximums, then find the final maximum over all the individual maxima.
- `dask.dataframe` allows us to write pandas-like code that operates on large than memory datasets in parallel.

If we do the same thing in `Pandas`, we will have:

**Plotting**

**Other Operations**

Number of non-cancelled flights:

Number of non-cancelled flights were taken from each airport:

Average departure delay from each airport:

Group by destinations and count:

**Sharing Intermediate Results**

- We sometimes do the same operation more than once. 
- For most operations, `dask.dataframe` hashes the arguments, allowing duplicate computations to be shared, and only computed once.

We pass both to a single `compute` call:

The task graphs for both results are merged when calling dask.compute, allowing shared operations to only be done once instead of twice.

### Exercise 3

- Consider the code below that computes the mean departure delay per airport. 
- Parallelize the code using Dask.

### <font color="blue">Example of Machine Learning with Dask</font>

Grab columns from the Dask DataFrame:

You can query the shape (note delayed # of sample):

**Basic EDA**

We can get descriptive statistics:

Perform searches and operations on the data:

**Create the Model**

In [None]:
#tf.keras.utils.plot_model(model, show_shapes=True, show_layer_names=True)

**Train the Model**

Generate batches of data:

We never run of memory while doing the training:

```
   steps_per_epoch * batch_size = number_of_rows_in_train_data
```

# <font color="red"> Schedulers</font>

- After Dask generates the task graphs, it needs to execute them on parallel hardware. 
- It is the role of a task scheduler. 
- There are different task schedulers. Each will consume a task graph and compute the same result, but with different performance characteristics.

![schedulers](https://docs.dask.org/en/latest/_images/dask-overview.svg)

Image Source: [https://docs.dask.org/en/latest/](https://docs.dask.org/en/latest/)

To execute the task graphs there are two types of schedulers:
* **Single machine**: Provides basic features on a local process or thread pool. It is simple and cheap to use, although it can only be used on a single machine and does not scale
* **Distributed**: Offers more features, but also requires a bit more effort to set up. It can run locally or distributed across a cluster.

## <font color="blue"> Single Machine Scheduler</font>

Consider the following example:

**Single thread**

- The single-threaded synchronous scheduler executes all computations in the local thread with no parallelism at all.
- It is useful for debugging.

**Local threads**

Uses `multiprocessing.pool.ThreadPool`

Use all the processors

Use some of the processors:

**Local processes**

- The multiprocessing scheduler executes computations with a local `multiprocessing.Pool`.
- Every task and all of its dependencies are shipped to a local process, executed, and then their result is shipped back to the main process. 
- Moving data to remote processes and back can introduce performance penalties, particularly when the data being transferred between processes is large. 
- The multiprocessing scheduler is an excellent choice when workflows are relatively linear, and so does not involve significant inter-task data transfer as well as when inputs and outputs are both small, like filenames and counts.

Use all the processors:

Use some of the processors:

## <font color="blue">Distributed Scheduler</font>

- The Dask distributed scheduler can either be set up on a cluster or on a personal machine. 
- It is a centrally managed, distributed, dynamic task scheduler. 
     - The central dask-scheduler process coordinates the actions of several dask-worker processes spread across multiple machines and the concurrent requests of several clients.
     - The scheduler is asynchronous and event-driven, simultaneously responding to requests for computation from multiple clients and tracking the progress of multiple workers.
     - The event-driven and asynchronous nature makes it flexible to concurrently handle a variety of workloads coming from multiple users at the same time while also handling a fluid worker population with failures and additions. 
     - Workers communicate amongst each other for bulk data transfer over TCP.
- To set up `dask.distributed`, we need to create client instance by calling `Client` class from `dask.distributed`. 
- It will internally create a dask scheduler and dask workers. 
- We will get the **link of the dashboard** where we can analyze tasks running in parallel. 
- We can pass a number of workers (using the `n_workers` argument) and threads to use per worker process (using the `threads_per_worker` argument).
- As soon as you create a client, Dask will automatically start using it.

If you aren’t in jupyterlab and using the `dask-labextension`, you can  click the `Dashboard` link to open up the diagnostics dashboard.

Shut down the cluster:

**<font color="red">Things to Consider</font>**

- Each Dask task has overhead (about 1 ms). With a lot tasks this overhead can add up. It is a good idea to give each task more than a few seconds of work.
- To better understand how your program is performing, check the [Dask Performance Diagnostics](https://distributed.dask.org/en/latest/diagnosing-performance.html) documentation. You can also view the [video](https://docs.dask.org/en/stable/diagnostics-distributed.html) to find out how to group your work into fewer, more substantial tasks. This might mean that you call lazy operations at once instead of individually. This might also repartitioning your dataframe(s).
- A good rule of thumb for choosing number of threads per Dask worker is to choose the square root of the number of cores per node. 
     - In general more threads per worker are good for a program that spends most of its time in NumPy, SciPy, Numba, etc., and fewer threads per worker are better for simpler programs that spend most of their time in the Python interpreter.
- The Dask scheduler runs on a single thread, so assigning it its own node is a waste.
- There is no hard limit on Dask scaling. The task overhead though will eventually start to swamp your calculation depending on how long each task takes to compute. 

## <font color="blue"> Example with DataFrame</font>

Build a Pandas DataFrame with 100000 rows and two columns with values selected randomly between 1 and 1000.

Write a function that computes the sum of square for each column of the DataFrame.

Measure the time it takes to call the function:

### <font color="green">Parallelize using Dask `Map_Partition`</font>

We construct a Dask DataFrame from pandas dataframe using `from_pandas` function and specify the number of partitions (`nparitions`) to break this dataframe into.

```python
   dd = ddf.from_pandas(df, npartitions=N)
```

`ddf` is the name you imported Dask Dataframes with, and `npartitions` is an argument telling the Dataframe how you want to partition it.

Each partition will run on a different thread, and communication between them will become too costly if there are too many.

We will break into 4 partitions (number of available cores):

We will apply `add_squares` method on each of these partitions: