<center>
<table>
  <tr>
    <td><img src="http://www.nasa.gov/sites/all/themes/custom/nasatwo/images/nasa-logo.svg" width="100"/> </td>
     <td><img src="https://github.com/astg606/py_materials/blob/master/logos/ASTG_logo.png?raw=true" width="80"/> </td>
     <td> <img src="https://www.nccs.nasa.gov/sites/default/files/NCCS_Logo_0.png" width="130"/> </td>
    </tr>
</table>
</center>

        
<center>
<h1><font color= "blue" size="+3">ASTG Python Courses</font></h1>
</center>

---

<center><h1> <font color="red">Introduction to Dask</font></h1></center>

In [None]:
%%html
<!DOCTYPE html>
<html lang="en">
  <head> </head>
  <body>
<script src="https://bot.voiceatlas.mysmce.com/v1/chatlas.js"></script>
<app-chatlas
	atlas-id="f759a188-f8bb-46bb-9046-3b1b961bd6aa"
	widget-background-color="#3f51b5ff"
	widget-text-color="#ffffffff"
	widget-title="Chatlas">
</app-chatlas>
  </body>
</html>

## <font color="red">Reference Document</font>

- <a href="https://docs.dask.org/en/latest/why.html">Why Dask?</a>
- <a href="https://github.com/dask/dask-tutorial">dask-tutorial</a>
- <a href="https://www.manning.com/books/data-science-with-python-and-dask">Data Science with Python and Dask</a>
- <a href="https://www.manifold.ai/dask-and-machine-learning-preprocessing-tutorial">Dask and Machine Learning: Preprocessing Tutorial</a>
- <a href="https://carpentries-incubator.github.io/lesson-parallel-python/aio/index.html">Parallel Programming in Python</a>
- <a href="https://www.youtube.com/watch?v=uGy5gT2vLdI&feature=youtu.be"> Working with the Python DASK library (video)</a>
- <a href="https://www.youtube.com/watch?v=t_GRK4L-bnw&feature=youtu.be">Who uses Dask (video)</a>

![fig_dask](https://miro.medium.com/max/1000/1*D6mSsdWECFLn6wJne4VTjg.png)


# <font color="red"> What is Dask?</font>

- A flexible library for parallel computing in Python that makes it easy to build intuitive workflows for ingesting and analyzing large, distributed datasets. 
- A native parallel analytics tool designed to integrate seamlessly with Numpy, Pandas, and Scikit-Learn. 
- An out-of-core (data is read into memory from disk on an as-needed basis) parallelization library that seamlessly integrates with existing NumPy and Pandas data structures to address the following:
     * **The available dataset does not fit in memory of a single machine.**
     * **The data processing task is time consuming and needs to be scaled and sped up.**
- Orchestrates parallel threads or processes for us and help speed up processing times.
   - Works by distributing larger computations and breaking them down into smaller computations through a task scheduler and task workers.

Dask consists of several different components and APIs, which can be categorized into three layers: the scheduler, low-level APIs, and high-level APIs.

- Dask provides a few high-level constructs called Dask Bags, Dask DataFrames, and Dask Arrays. They provide an easy-to-use interface to parallelize many of the typical data transformations in Machine Learning (ML) workflows. 
- Dask allows the creation of highly customized job execution graphs by using their extensive Python API (e.g., `dask.delayed`) and integration with existing data structures.


![fig_layers](http://bicortex.com/bicortex/wp-content/post_content//2019/06/Dask_APIs_Architecture.png)
Image Source: bicortex.com


The diagram below describes the steps Dask takes to manipulate data.

- The operation is broken down into a sequence of operations on smaller partitions of our data (without having to read the whole dataset into memory).
- Dask reads each partition as it is needed and computes the intermediate results. 
- The intermediate results are aggregated into the final result.
- Dask handles all of that sequencing internally for us. 
- On a single machine, Dask can use threads or processors to parallelize these operations. 

![fig_proc](https://www.manifold.ai/hs-fs/hubfs/Blog%20Post%20Illos/ML%20pipelines%20-%20dask%20single%20machine.jpeg?width=600&name=ML%20pipelines%20-%20dask%20single%20machine.jpeg)
Image Source: www.manifold.ai


**Advantages of Using Dask**

- Fully implemented in Python and natively scales NumPy, Pandas, and scikit-learn.
- Can be used effectively to work with both medium datasets on a single machine and large datasets on a cluster.
- Can be used as a general framework for parallelizing most Python objects.
- Has a very low configuration and maintenance overhead.



>Dask provides high-level Array, Bag, and DataFrame collections that mimic NumPy, lists, and Pandas but can operate in parallel on datasets that don’t fit into main memory. Dask’s high-level collections are alternatives to NumPy and Pandas for large datasets.

**Recall on Processes and Threads**

- A process is an execution of a program. 
- A thread is a single execution sequence within the process.
- A process can contain multiple threads.
- Threads are used for small tasks, whereas processes are used for more ‘heavyweight’ tasks. 

![threads](https://pediaa.com/wp-content/uploads/2018/07/Difference-Between-Process-and-Thread-Comparison-Summary-684x1024.jpg)
Image Source: pediaa.com


**The regular Python can only run one thread at the time.**

>Dask offers an easy and consistent way to parallelize computations that scales from a single laptop to clusters with thousands of cores. It's based on a task scheduler that distributes Python function calls across multiple threads, processes or cluster nodes.

### Import Modules

Uncomment the next two cells if in Google Colab.

In [None]:
#!python -m pip install dask[dataframe] --upgrade

In [None]:
#!pip install memory_profiler

In [None]:
import warnings
warnings.filterwarnings('ignore')

In [None]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import os
import dask
import dask.array as da
import dask.dataframe as dd
from dask.diagnostics import ProgressBar 

In [None]:
print(f"Numpy version:  {np.__version__}")
print(f"Pandas version: {pd.__version__}")
print(f"Dask   version: {dask.__version__}")

In [None]:
from memory_profiler import memory_usage
import memory_profiler
%load_ext memory_profiler

**We may want to first determine the system information.**

In [None]:
import math
def convert_size(size):
    """
      Convert from KB to another unit.
    """
    if (size == 0):
       return '0B'
    size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
    i = int(math.floor(math.log(size,1024)))
    p = math.pow(1024,i)
    s = round(size/p,2)
    return " ".join([str(s),size_name[i]])

In [None]:
import platform
import psutil

print("="*20, "System Information", "="*20)
uname = platform.uname()
print(f"           System: {uname.system}")
print(f"        Node Name: {uname.node}")
print(f"          Release: {uname.release}")
print(f"          Version: {uname.version}")
print(f"          Machine: {uname.machine}")
print(f"        Processor: {uname.processor}")
print("="*20, "CPU Information", "="*20)
cpufreq = psutil.cpu_freq()
print("# logical cores = # physical cores times # threads ")
print("                    that can run on each physical core.")
print(f"   Physical cores: {psutil.cpu_count(logical=False)}")
print(f"    Logical cores: {psutil.cpu_count(logical=True)}")
print(f"Current frequency: {psutil.cpu_freq().current}")
print(f"    Min frequency: {psutil.cpu_freq().min}")
print(f"    Max frequency: {psutil.cpu_freq().max}")
print("="*20, "Memory Information", "="*20)
svmem = psutil.virtual_memory()
print(f"     Total memory: {convert_size(svmem.total)}")
print(f" Available memory: {convert_size(svmem.available)}")
svmem = psutil.virtual_memory()
print("="*60)

### Setting Up the Progress Bar

- You can use Dask’s built-in Progress Bar he progress on any `get()` or `compute()` calls. 
- Here we will use the global registration where the Progress Bar will be displayed for all computations.

In [None]:
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()

# <font color="red"> Parallelize Code with `dask.delayed`</font>

- A simple way to parallelize the code.
- Allows users to delay function calls into a task graph with dependencies.
- Systems like `dask.dataframe` are built with `dask.delayed`.

**Simple Example**

Consider the following functions:

In [None]:
import time

def increment(x):
    time.sleep(1.0)
    return x + 1

def double(x):
    time.sleep(1.0)
    return 2 * x

def add(x, y):
    time.sleep(1.0)
    return x + y

In [None]:
%%time

x = increment(1)
y = increment(2)
z = add(x, y)

- We use the `dask.delayed` decorator to parallelize the functions `increment` and `add`.
- By decorating the functions, we record what we want to compute as tasks into graphs that will be run later on parallel hardware.

In [None]:
xd = dask.delayed(increment)(1)
yd = dask.delayed(increment)(2)
zd = dask.delayed(add)(xd, yd)
zd

- When we call the delayed version by passing the arguments, exactly as before, but the original function isn't actually called yet.
- A delayed object is made, which keeps track of the function to call and the arguments to pass to it.
- We use the `visualize` method (relies on the `graphviz` package) that provide a visual representation of the operations being performed.

In [None]:
zd.visualize(rankdir='LR')

- Note that we have not physically calculated **total** yet.
- We need to apply the `compute` method to get the answer. 
- <font color="red">It is only here that the data are loaded into memory for calculations</font>.
- The calculations are done through using a local thread pool.

In [None]:
%%time
dask.compute(zd)

**Using `delayed` in Loops**

Consider the sequential code with two for-loops:

In [None]:
%%time

n = 10
data = [i+1 for i in range(n)]

out = list()
for x in data:
    y = increment(x)
    z = double(y)
    out.append(z)
    
total = 0
for z in out:
    total = add(total, z)

total

We can parallelize the above using the `delayed` decorator:

In [None]:
n = 10
data = [i+1 for i in range(n)]

out = list()
for x in data:
    y = dask.delayed(increment)(x)
    z = dask.delayed(double)(y)
    out.append(z)
    
totald = 0
for z in out:
    totald = dask.delayed(add)(totald, z)

totald

We can also get the visual representation through a task graph.

In [None]:
totald.visualize()

In [None]:
%%time
dask.compute(totald)

### Exercise 1

Use the `delayed` decorator to parallelize the code below:

In [None]:
def is_odd(x):
    return x%2

In [None]:
%%time

n = 10
data = [i+1 for i in range(n)]

results = list()

for x in data:
    if is_odd(x):
        y = double(x)
    else:
        y = increment(x)
    results.append(y)

total = sum(results)
print(total)

<p>
<p>

<details><summary><b>Click here to access the solution</b></summary>
<p>


```python
n = 10
data = [i+1 for i in range(n)]

results = list()

for x in data:
    if is_odd(x):
        y = dask.delayed(double)(x)
    else:
        y = dask.delayed(increment)(x)
    results.append(y)

total = dask.delayed(sum)(results)
```

</p>
</details>

### Example: Palindromic Words

- A palindromic word is a word which characters read the same backward as forward. 
- Some examples of palindromes are `redivider`, `deified`, `civic`, `radar`, `level`, `rotor`, `kayak`, `reviver`, `racecar`, `madam`, and `refer`.

We want to find the number of palindromes from a list of words.

In [None]:
def is_palindrome(s):
    return s.upper() == s.upper()[::-1]

In [None]:
list_words = [
    'complete', 'abstraction', 'from', 'compass', 'sights', 'sounds', 
    'Human', 'shapes', 'interferences', 'troubles', 'joys', 'were', 
    'they', 'were', 'there', "man", 'seemed', 'shaded', 'hemisphere', 
    'globe', 'sentient', 'being', 'save', 'himself', "rather", 
    "Abba", "Aibohphobia", "Bib", "Bob", "Civic", "Deified", 
    "Detartrated", "Dewed", "Eve", "Hannah", "Kayak", "Level", 
    "Madam", "Malayalam", "Minim", "Mom", "Murdrum", "Noon", "Nun", 
    "Otto", "Peep", "Pop", "Racecar", "Radar", "Redder", "Refer",
    "Repaper", "Rotator", "Rotavator", "Rotor", "Sagas", 
    "Sis", "Solo", "Stats", "Tattarrattat", "Tenet",
    'redivider', 'deified', 'civic', 'radar', 'level',
    'Being', 'not', 'without', 'frequent', 'consciousness',
    'that', 'there', 'was', 'some', 'charm', 'this', 'life', 'stood',
    'still', 'after', 'looking', 'sky', 'useful', 'instrument',
    'regarded', 'appreciative', 'spirit', 'work', 'art',
    'superlatively', 'beautiful', 'moment', 'seemed',
    'impressed', 'with', 'speaking', 'loneliness', 'scene',
    "brother", "system", "SISteR", "TEXT", "paREnts", "python",
    "Numpy", "Dask", "PanDaS"
] 

len(list_words)

**Using Regular Python**

In [None]:
%%time
palindromes_py = [is_palindrome(s) for s in list_words]
total_py = sum(palindromes_py)
total_py

**Using Dask**

In [None]:
palindromes_da = [dask.delayed(is_palindrome)(s) for s in list_words]
total_da = dask.delayed(sum)(palindromes_da)

In [None]:
total_da.visualize()

In [None]:
%%time
result = total_da.compute()
result

If we use Dask Bag, we will do the same computations faster:

In [None]:
import dask.bag as db
bag = db.from_sequence(list_words)
bag.map(is_palindrome).visualize()

In [None]:
%time
result= sum(bag.map(is_palindrome).compute())
result

**<font color="red">Important Lessons</font>**

- The `delayed` decorator adds overhead.
- It is good not to use it when a task requires a little amount of time.
- Call `delayed` on the function not the result.
- Break up computations into many pieces. You achieve parallelism by having many delayed calls, not by using only a single one: Dask will not look inside a function decorated with `delayed` and parallelize that code internally.

### Exercise 2

Use Dask to parallelize the code below (calculations of `pi`):

In [None]:
%%time

import random

def approximate_pi(num_samples):
    num_points_circ = 0

    for i in range(num_samples):
        # Select an arbitrary point in [-1,1]x[-1,1]
        x = random.uniform(-1, 1)
        y = random.uniform(-1, 1)

        # Check if the point is inside the circle
        if x**2 + y**2 < 1.0:
            num_points_circ += 1

    return 4 * num_points_circ / num_samples

def mean(*args):
    return sum(args) / len(args)

number_samples = 10**6
number_experiments = 10

pi_approx = mean(*[approximate_pi(number_samples) for i in range(number_experiments)])

print("Approximation of Pi: {}".format(pi_approx))

<p>
<p>
<details><summary><b>Click here to access the solution</b></summary>
<p>


```python
number_samples = 10**6
number_experiments = 10

pi_approx = dask.delayed(mean)(*[dask.delayed(approximate_pi)(number_samples) for i in range(number_experiments)])

print("Approximation of Pi: {}".format(pi_approx.compute()))
```

</p>

# <font color="red"> Dask Array</font>

- Dask arrays coordinate many Numpy arrays, arranged into chunks within a grid. 
    - _Parallel_: Uses all of the cores on your computer
    - _Larger-than-memory_: Lets you work on datasets that are larger than your available memory by breaking up your array into many small pieces, operating on those pieces in an order that minimizes the memory footprint of your computation, and effectively streaming data from disk.
    - _Blocked Algorithms_: Perform large computations by performing many smaller computations
- They support a large subset of the Numpy API.

![fig_array](https://miro.medium.com/max/1388/1*JfQnXJ5_R104bPyE8_XhwQ.png)

**Create a Dask Array**

- Create a 20000x20000 array of random numbers, represented as many numpy arrays of size 1000x1000 (or smaller if the array cannot be divided evenly). 
- There are 400 (20x20) numpy arrays of size 1000x1000.

In [None]:
x = da.random.random((10000, 40000), chunks=(1000, 1000))
x

The array:
- Has 2.98 Gb
- Is organized in 400 chunks of `1000x1000` Numpy arrays.
- Each chunk has 7.64 Mb

Similar information can be obtained from:

In [None]:
print(f"     Type: {type(x)}")
print(f"    Shape: {x.shape}")
print(f"     Size: {x.size}")
print(f"Num bytes: {x.nbytes} B or {convert_size(x.nbytes)}")
print(f"   Chunks: {x.chunks}")

We can use Numpy syntax:

In [None]:
y = 2.0 + x.T
y.shape

In [None]:
mu = x.mean(axis=0)
mu

In [None]:
z = y[::2, 5000:].mean(axis=1)
z

In [None]:
z.visualize(rankdir="LR")

Use the **`compute()`** function if you want your result as a NumPy array.

In [None]:
mu[0].compute()

In [None]:
w = z.compute()
print(type(w), w.shape )

**Persit Data in Memory**

- If you have the available RAM for your dataset then you can persist data in memory.
- This allows future computations to be much faster.

In [None]:
%time y.sum().compute()

In [None]:
y = y.persist()

In [None]:
%time y[0, 0].compute()

In [None]:
%time y.sum().compute()

**Numpy against Dask**

In [None]:
def f_numpy():
    x = np.random.normal(10, 0.1, size=(20000, 20000)) 
    y = x.mean(axis=0)[::100]

`%%memit` 

- Measures the memory use of a single statement.
- Provides the peak memory and incremental memory growth 

In [None]:
%%memit
f_numpy()

In [None]:
%%time
f_numpy()

In [None]:
def f_dask():
    x = da.random.normal(10, 0.1, size=(20000, 20000), 
                         chunks=(1000, 1000))
    y = x.mean(axis=0)[::100].compute() 

In [None]:
%%memit
f_dask()

In [None]:
%%time
f_dask()

Reshapping the chunk size might provide a better performance:

In [None]:
def f_dask2():
    x = da.random.normal(10, 0.1, size=(20000, 20000), 
                         chunks=(2000, 500))
    y = x.mean(axis=0)[::100].compute() 

In [None]:
%%time
f_dask2()

**Dask finished faster, but used more total CPU time because Dask was able to transparently parallelize the computation because of the chunk size.**

**<font color="red">Things to Consider</font>**

- If your data fits in RAM and you are not performance bound, then using NumPy might be the right choice. Dask adds another layer of complexity which may get in the way.
- **If you are just looking for speedups rather than scalability then you may want to consider using Numba for manipulating Numpy arrays.**
- How to select the chunk size?
     - Too small: huge overheads.
     - Poorly aligned with data: inefficient reading.
     - Recommended to have a chuck size of at least 100 Mb.
     - Choose a chunk size that is large in order to reduce the number of chunks that Dask has to think about (which affects overhead) but also small enough so that many of them can fit in memory at once. Dask will often have as many chunks in memory as twice the number of active threads.
   

**Avoid Oversubscribing Threads**
     
- By default Dask will run as many concurrent tasks as you have logical cores. 
- It assumes that each task will consume about one core.
- Many array-computing libraries (used in Dask) are themselves multi-threaded, which can cause contention and low performance.
- For better performance, we need to explicitly specify the use of one thread:

```bash
   export OMP_NUM_THREADS=1
   export MKL_NUM_THREADS=1
   export OPENBLAS_NUM_THREADS=1
```

## <font color="red">Memory Profiling</font>

- We use the `memory_profiler` package to track memory usage.
- It's written totally in python and monitors process which is running python code as well as line by line memory usage by code. 
- We use the `memory_usage()` and pass the parameter `interval` for the frequency of measuring the memory usage.

In [None]:
def sum_with_numpy():
    # Serial implementation
    np.arange(10**8).sum()

def sum_with_dask():
    # Parallel implementation
    work = da.arange(10**8).sum()
    work.compute()

memory_numpy = memory_usage(sum_with_numpy, interval=0.01)
memory_dask = memory_usage(sum_with_dask, interval=0.01)

# Plot results
plt.plot(memory_numpy, label='numpy')
plt.plot(memory_dask, label='dask')
plt.xlabel('Time step')
plt.ylabel('Memory / MB')
plt.legend(loc='best')
plt.show()

You also use Dask profiling options:

In [None]:
from dask.diagnostics import Profiler, ResourceProfiler
work = da.arange(10**8).sum()
with Profiler() as prof, ResourceProfiler(dt=0.001) as rprof:
    result2 = work.compute()

from bokeh.plotting import output_notebook
from dask.diagnostics import visualize
visualize([prof,rprof], output_notebook())

In [None]:
with ResourceProfiler(dt=0.001) as rprof2:
    result = np.arange(10**8).sum()
visualize([rprof2], output_notebook())

# <font color="red"> Dask DataFrames</font>

- Pandas is great for tabular datasets that fit in memory. 
- Dask becomes useful when the dataset you want to analyze is larger than your machine's RAM. 
- Dask DataFrames:
     - Coordinate many Pandas DataFrames, partitioned along an index. 
     - Support a large subset of the Pandas API.
- One operation on a Dask DataFrame triggers many Pandas operations on the constituent pandas DataFrames in a way that is mindful of potential parallelism and memory constraints.
- Some of the operations that are really fast if you use Dask Dataframes:
     - Arithmetic operations (multiplying or adding to a Series)
     - Common aggregations (`mean`, `min`, `max`, `sum`, etc.)
     - Calling `apply`
     - Calling `value_counts()`, `drop_duplicates()` or `corr()`
     - Filtering with `loc`, `isin`, and row-wise selection

![fig_df](https://pythondata.com/wp-content/uploads/2016/11/Screen-Shot-2016-11-24-at-6.52.24-PM-168x300.png)

### <font color="green"> NYC Flights Dataset</font>

Data is specific to flights (in 1990's) out of the three airports in the New York City area.

Download the remote data:

In [None]:
import urllib.request

print("\t Downloading NYC dataset...", end="\n", flush=True)

url = "https://storage.googleapis.com/dask-tutorial-data/nycflights.tar.gz"
filename, header = urllib.request.urlretrieve(url, "nycflights.tar.gz")

print("\t Done!", flush=True)

In [None]:
!ls -lrt

Extract the `.csv` files from the tar file:

In [None]:
import tarfile

with tarfile.open(filename, mode="r:gz") as flights:
     flights.extractall("data/")

In [None]:
!ls -lrt data/nycflights

Read all the files at once:

In [None]:
import os

df = dd.read_csv(os.path.join("data", "nycflights", "*.csv"), 
                parse_dates={"Date": [0, 1, 2]})
df

- The representation of the dataframe object contains no data. 
- `pandas.read_csv` reads in the entire file before inferring datatypes.
- `dask.dataframe.read_csv` only reads in a sample from the beginning of the file (or first file). These inferred datatypes are then enforced when reading all partitions.

We can display the first few rows:

In [None]:
df.head()

If we display the last few rows, we have a problem:

In [None]:
df.tail()

- There is an issue with the data types of few columns.
- The datatypes inferred in the sample are incorrect.
- We can fix it by reading the files again and specify the appropriate data types.

In [None]:
df = dd.read_csv(os.path.join("data", "nycflights", "*.csv"), 
                parse_dates={"Date": [0, 1, 2]},
                dtype={'TailNum': str,
                       'CRSElapsedTime': float,
                       'Cancelled': bool})

In [None]:
df.tail()

### <font color="blue">Perform Operations as with `Pandas DataFrames`</font>

**Maximum value of a column**:

- We now want to compute the maximum of the `DepDelay` column.
- With `Pandas`, we would loop over each file to find the individual maximums, then find the final maximum over all the individual maximums.
- `dask.dataframe` allows us to write pandas-like code that operates on large than memory datasets in parallel.

In [None]:
df.DepDelay.max().visualize()

In [None]:
%time df.DepDelay.max().compute()

If we do the same thing in `Pandas`, we will have:

In [None]:
%%time

import glob

list_files = glob.glob("data/nycflights/*csv")
   
maxes = list()
for file_name in list_files:
    pddf = pd.read_csv(file_name)
    maxes.append(pddf.DepDelay.max())

final_max = max(maxes)

print("Final Maximum: ", max(maxes))

**Plotting**

In [None]:
df[df.Dest == 'PIT'].compute().plot(kind='scatter', 
                                    x="DayOfWeek", 
                                    y="DepDelay")

**Other Operations**

Number of non-cancelled flights:

In [None]:
len(df[~df.Cancelled])

Number of non-cancelled flights were taken from each airport:

In [None]:
df[~df.Cancelled].groupby('Origin').Origin.count().compute()

Average departure delay from each airport:

In [None]:
df.groupby("DayOfWeek").DepDelay.mean().compute()

Group by destinations and count:

In [None]:
df.groupby("Dest").count().compute()

In [None]:
df.groupby("Dest")["ArrDelay"].mean().compute()

In [None]:
df[df.ArrDelay+df.DepDelay>30.0].groupby("Dest").Dest.count().compute()

**Sharing Intermediate Results**

- We sometimes do the same operation more than once. 
- For most operations, `dask.dataframe` hashes the arguments, allowing duplicate computations to be shared, and only computed once.

In [None]:
non_cancelled = df[~df.Cancelled]
mean_delay = non_cancelled.DepDelay.mean()
std_delay = non_cancelled.DepDelay.std()

In [None]:
%%time
mean_delay_res = mean_delay.compute()
std_delay_res = std_delay.compute()

We pass both to a single `compute` call:

In [None]:
%%time

mean_delay_res, std_delay_res = da.compute(mean_delay, std_delay)

The task graphs for both results are merged when calling dask.compute, allowing shared operations to only be done once instead of twice.

In [None]:
dask.visualize(mean_delay, std_delay)

### Exercise 3

- Consider the code below that computes the mean departure delay per airport. 
- Parallelize the code using Dask.

In [None]:
%%time 

sum_delays = list()
count_delays = list()

for file_name in list_files:
    pddf = pd.read_csv(file_name)
    by_origin = pddf.groupby('Origin')
    loc_total = by_origin.DepDelay.sum()
    loc_count = by_origin.DepDelay.count()
    sum_delays.append(loc_total)
    count_delays.append(loc_count)

total_delays = sum(sum_delays)
n_flights = sum(count_delays)
mean_delays = total_delays / n_flights
print("Mean delays: {}".format(mean_delays))

<p>
<p>

<details><summary><b>Click here to access the solution</b></summary>
<p>


```python
%%time 

df.groupby("Origin")["DepDelay"].mean().compute()
```

</p>
</details>

### <font color="blue">Example of Machine Learning with Dask</font>

Grab columns from the Dask DataFrame:

In [None]:
df_train = df[["CRSDepTime", "CRSArrTime", "Cancelled"]]
df_train

You can query the shape (note delayed # of sample):

In [None]:
df_train.shape

In [None]:
num_cols = len(df_train.columns)
print(num_cols)

**Basic EDA**

We can get descriptive statistics:

In [None]:
df_train.describe().compute()

Perform searches and operations on the data:

In [None]:
df_train.isnull().sum().compute()

**Create the Model**

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

model = keras.Sequential()
model.add(layers.Dense(20, 
                       input_dim=num_cols, 
                       activation='relu'))
model.add(layers.Dense(1,  activation="sigmoid"))

model.compile(loss="binary_crossentropy", optimizer="sgd", )

In [None]:
#tf.keras.utils.plot_model(model, show_shapes=True, show_layer_names=True)

**Train the Model**

Generate batches of data:

In [None]:
def batch_data_generator(df, fraction=0.01):
    while True:
          batch = df.sample(frac=fraction)
          X = batch.iloc[:, :-1]
          y = batch.iloc[:, -1]
          yield (X.compute(), y.compute())

We never run of memory while doing the training:

```
   steps_per_epoch * batch_size = number_of_rows_in_train_data
```

In [None]:
model.fit_generator(generator=batch_data_generator(df_train),
                    steps_per_epoch=100)

# <font color="red"> Task Schedulers</font>

- After Dask generates the task graphs, it needs to execute them on parallel hardware. 
- It is the role of a task scheduler. 
- There are different task schedulers. Each will consume a task graph and compute the same result, but with different performance characteristics.

![schedulers](https://docs.dask.org/en/latest/_images/dask-overview.svg)

Image Source: [https://docs.dask.org/en/latest/](https://docs.dask.org/en/latest/)


Dask networks are composed of three pieces:
- **Centralized scheduler**: Manages workers and assigns the tasks that need to be completed by them.
- **Workers**: Are threads, processes, or separate machines in a cluster. They execute the computations from the computation graph: do the calculations, hold onto results, and communicate results to each other.
- **One or multiple clients**: interact (Jupyter noteboooks or scripts) with users and submit work to the scheduler for execution on the workers.


![networks](https://miro.medium.com/max/700/0*9JHQAjTVoKbm2f4X.png)
Image Source: [Steven Gon](https://gongster.medium.com/dask-an-introduction-and-tutorial-b42f901bcff5)

To execute the task graphs there are two types of schedulers:
* **Single machine**: Provides basic features on a local process or thread pool. It is simple and cheap to use, although it can only be used on a single machine and does not scale
* **Distributed**: Offers more features, but also requires a bit more effort to set up. It can run locally or distributed across a cluster.

## <font color="blue"> Single Machine Scheduler</font>

Consider the following example:

In [None]:
n = 10
data = [i+1 for i in range(n)]

out = list()
for x in data:
    y = dask.delayed(increment)(x)
    z = dask.delayed(double)(y)
    out.append(z)
    
totald = 0
for z in out:
    totald = dask.delayed(add)(totald, z)

**Single thread**

- The single-threaded synchronous scheduler executes all computations in the local thread with no parallelism at all.
- It is useful for debugging or profiling.

In [None]:
%time totald.compute(scheduler='synchronous')

**Local threads**

Uses `multiprocessing.pool.ThreadPool`

Use all the processors

In [None]:
%time totald.compute(scheduler='threads')

Use some of the processors:

In [None]:
%time totald.compute(scheduler='threads', num_workers=2)

We can choose to use a single thread:

In [None]:
%time totald.compute(scheduler='single-threaded')

**Local processes**

- The multiprocessing scheduler executes computations with a local `multiprocessing.Pool`.
- Every task and all of its dependencies are shipped to a local process, executed, and then their result is shipped back to the main process. 
- Moving data to remote processes and back can introduce performance penalties, particularly when the data being transferred between processes is large. 
- The multiprocessing scheduler is an excellent choice when workflows are relatively linear, and so does not involve significant inter-task data transfer as well as when inputs and outputs are both small, like filenames and counts.

In [None]:
import multiprocessing
print (multiprocessing.cpu_count())

Use all the processors:

In [None]:
%time result = totald.compute(scheduler='processes')

Use some of the processors:

In [None]:
%time result = totald.compute(scheduler='processes', num_workers=2)

In [None]:
from dask.diagnostics import ProgressBar
pbar = ProgressBar()
pbar.register()

In [None]:
result = totald.compute(scheduler='processes', num_workers=2)

### Threads or Processes?

- **Use the threaded scheduler** if your computation is dominated by non-Python code, as is primarily the case when operating on numeric data in NumPy arrays, Pandas DataFrames, or using any of the other C/C++/Cython based projects in the ecosystem.
   - It is lightweight.
   - Little overhead.
   - Tranferring data between tasks not expensives because everything happens in the same process.
- **Use the multipeocessing scheduler** if your computation is dominated by processing pure Python objects like strings, dictionaries, or lists.
   - It is lightweight.
   - Every task and all of its dependencies are shipped to a local process, executed, and then their result is shipped back to the main process.
   - Moving data to remote processes and back can introduce performance penalties, particularly when the data being transferred between processes is large. 
   - Is an excellent choice when workflows are relatively linear, and so does not involve significant inter-task data transfer as well as when inputs and outputs are both small, like filenames and counts.

## <font color="blue">Distributed Scheduler</font>

- The Dask distributed scheduler can either be setup on a cluster or run locally on a personal machine. 
- It is a centrally managed, distributed, dynamic task scheduler. 
     - The central dask-scheduler process coordinates the actions of several dask-worker processes spread across multiple machines and the concurrent requests of several clients.
     - The scheduler is asynchronous and event-driven, simultaneously responding to requests for computation from multiple clients and tracking the progress of multiple workers.
     - The event-driven and asynchronous nature makes it flexible to concurrently handle a variety of workloads coming from multiple users at the same time while also handling a fluid worker population with failures and additions. 
     - Workers communicate amongst each other for bulk data transfer over TCP.
- To set up `dask.distributed`, we need to create a client instance by calling `Client` class from `dask.distributed`. 
- It will internally create a dask scheduler and dask workers. 
- We will get the **link of the dashboard** where we can analyze tasks running in parallel. 
- We can pass a number of workers (using the `n_workers` argument) and threads to use per worker process (using the `threads_per_worker` argument).
- As soon as you create a client, Dask will automatically start using it.

In [None]:
from dask.distributed import Client
client = Client()
client = Client(n_workers=3, threads_per_worker=4)
client.cluster

If you aren’t in jupyterlab and using the `dask-labextension`, you can  click the `Dashboard` link to open up the diagnostics dashboard.

In [None]:
import random

def random_slow_add(x, y):
    time.sleep(random.randrange(3,10))
    return x + y

In [None]:
results = list()

for x in data:
    y = dask.delayed(random_slow_add)(x, 1)
    results.append(y)
    
total = dask.delayed(sum)(results)

In [None]:
%time result = total.compute()
result

Shut down the cluster:

In [None]:
client.close()

**<font color="red">Things to Consider</font>**

- Each Dask task has overhead (about 1 ms). If you have a lot tasks this overhead can add up. It is a good idea to give each task more than a few seconds of work.
- To better understand how your program is performing, check the [Dask Performance Diagnostics](https://distributed.dask.org/en/latest/diagnosing-performance.html) documentation. You can also view the [video](https://docs.dask.org/en/stable/diagnostics-distributed.html) to find out how to group your work into fewer, more substantial tasks. This might mean that you call lazy operations at once instead of individually. This might also repartitioning your dataframe(s).
- A good rule of thumb for choosing number of threads per Dask worker is to choose the square root of the number of cores per node. 
     - In general more threads per worker are good for a program that spends most of its time in NumPy, SciPy, Numba, etc., and fewer threads per worker are better for simpler programs that spend most of their time in the Python interpreter.
- The Dask scheduler runs on a single thread, so assigning it its own node is a waste.
- There is no hard limit on Dask scaling. The task overhead though will eventually start to swamp your calculation depending on how long each task takes to compute. 

## <font color="blue"> Example with DataFrame</font>

Build a Pandas DataFrame with 100000 rows and two columns with values selected randomly between 1 and 1000.

In [None]:
num_rows = 100000
df = pd.DataFrame({'X':np.random.randint(1000, size=num_rows),
                   'Y':np.random.randint(1000, size=num_rows)})
df

Write a function that computes the sum of square for each column of the DataFrame.

In [None]:
def add_squares(df):
    return df.X**2 + df.Y**2

Measure the time it takes to call the function:

In [None]:
%%timeit
df['add_squares'] = df.apply(add_squares,axis=1)

In [None]:
df

### <font color="green">Parallelize using Dask `Map_Partition`</font>

We construct a Dask DataFrame from pandas dataframe using `from_pandas` function and specify the number of partitions (`nparitions`) to break this dataframe into.

```python
   dd = ddf.from_pandas(df, npartitions=N)
```

`ddf` is the name you imported Dask Dataframes with, and `npartitions` is an argument telling the Dataframe how you want to partition it.

Each partition will run on a different thread, and communication between them will become too costly if there are too many.

We will break into 4 partitions (number of available cores):

In [None]:
ddf = dd.from_pandas(df, npartitions=4)

We will apply `add_squares` method on each of these partitions:

In [None]:
%%time

ddf['z'] = ddf.map_partitions(add_squares, 
                               meta=(None, 'int64')).compute()

In [None]:
def myfunc(x, y):
    return y * (x**2 + 1)

In [None]:
%%time

df1 = df.apply(lambda row: myfunc(row.X, row.Y), axis=1)

In [None]:
import multiprocessing
ddf = dd.from_pandas(df, npartitions=4*multiprocessing.cpu_count())
ddf

In [None]:
%%time

ddfz = ddf.map_partitions(lambda data: 
                              data.apply(lambda row: myfunc(row.X, row.Y), axis=1)).compute(scheduler='processes')