# What is Dask?

Dask is a flexible library for parallel computing in Python.

Dask is composed of two parts:

Dynamic task scheduling optimized for computation. This is similar to Airflow, Luigi, Celery, or Make, but optimized for interactive computational workloads.

“Big Data” collections like parallel arrays, dataframes, and lists that extend common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory or distributed environments. These parallel collections run on top of dynamic task schedulers.

### Types of Dask Clusters

There are different ways to set up your dask cluster. 

- Local mode, which allows for parallelism but not distributed computing
- Dask Cluster using the Dask Web UI - enables both distributed computing and parallelism, but requires manual input of the scheduler address using the Dask Web UI
- Dask Cluster using automatic look up service - this is the easiest to use and enables both distributed computing and parallelism


### Setting up the Dask Cluster - Local Mode

To set up Dask in local mode (on your laptop, without distributed characteristics), use the code containing the command LocalCluster. 

In [1]:
## set up the cluster in local mode if desire parallelism only

# from dask.distributed import LocalCluster, Client
# cluster = LocalCluster(n_workers = 2)
# client = Client(cluster)
# client

### Setting up the Dask Cluster - Cluster with Automatic Lookup 

Setting up the cluster to run in distributed mode requires that the scheduler is called.  One do so with the code below.

In [2]:
from dask.distributed import Client
import os

service_host = os.environ["DASK_SCHEDULER_SERVICE_HOST"]
service_port = os.environ["DASK_SCHEDULER_SERVICE_PORT"]
client = Client(f"{service_host}:{service_port}")

# look at the client and scheduler

client

# you should now be connected to the cluster
# Dashboard link from the client object is clickable but will not route in Domino
# Use the embedded Dask Web UI tab instead


+-------------+----------------+----------------+----------------+
| Package     | client         | scheduler      | workers        |
+-------------+----------------+----------------+----------------+
| blosc       | 1.9.2          | 1.10.2         | 1.10.2         |
| cloudpickle | 1.6.0          | 2.0.0          | 2.0.0          |
| dask        | 2021.06.2      | 2022.01.0      | 2022.01.0      |
| distributed | 2021.06.2      | 2022.01.0      | 2022.01.0      |
| lz4         | 3.1.1          | 3.1.10         | 3.1.10         |
| msgpack     | 1.0.0          | 1.0.3          | 1.0.3          |
| numpy       | 1.20.3         | 1.21.1         | 1.21.1         |
| pandas      | MISSING        | 1.3.0          | 1.3.0          |
| python      | 3.8.11.final.0 | 3.8.12.final.0 | 3.8.12.final.0 |
| toolz       | 0.11.1         | 0.11.2         | 0.11.2         |
+-------------+----------------+----------------+----------------+
Notes: 
-  msgpack: Variation is ok, as long as everything is

0,1
Connection method: Direct,
Dashboard: http://dask-633b0445cbb8c95f99ad4172-dask-scheduler.domino-compute.svc.cluster.local:8787/status,

0,1
Comm: tcp://10.0.102.170:8786,Workers: 2
Dashboard: http://10.0.102.170:8787/status,Total threads:  2
Started:  3 minutes ago,Total memory:  8.00 GiB

0,1
Comm: tcp://10.0.113.105:3000,Total threads: 1
Dashboard: http://10.0.113.105:8787/status,Memory: 4.00 GiB
Nanny: tcp://10.0.113.105:3001,
Local directory: /tmp/dask-worker-space/worker-80u6a1z0,Local directory: /tmp/dask-worker-space/worker-80u6a1z0
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 0.0%,Last seen: Just now
Memory usage: 127.48 MiB,Spilled bytes: 0 B
Read bytes: 286.599396205787 B,Write bytes: 0.95 kiB

0,1
Comm: tcp://10.0.101.3:3000,Total threads: 1
Dashboard: http://10.0.101.3:8787/status,Memory: 4.00 GiB
Nanny: tcp://10.0.101.3:3001,
Local directory: /tmp/dask-worker-space/worker-1hoy7niz,Local directory: /tmp/dask-worker-space/worker-1hoy7niz
Tasks executing: 0,Tasks in memory: 0
Tasks ready: 0,Tasks in flight: 0
CPU usage: 2.0%,Last seen: Just now
Memory usage: 128.72 MiB,Spilled bytes: 0 B
Read bytes: 286.0050459798342 B,Write bytes: 132.00232891376965 B


### What can Dask do?

Dask is a distributed computing api that excels at machine learning at scale.  
While executing this tutorial, the tab on the top right of the screen 'Dask Web UI' will enable real-time monitoring of the executions, so you can understand how the compute resources are being used.

We can see that user CPU times are greatly reduced even on a small task like calculating pi.  However wall-clock time (the amount of time that passes by as we experienced it) does not get reduced until the data is large.  When data is so large that it will not fit onto a single node, Dask can still do its job.  

This follows the old adage, don't distribute until you need to do so because you won't notice much of a speed-up in time for small tasks.

Let's look at calculating pi to see what what is going on here. 

First we'll import the libraries we need.

In [3]:
import dask
import dask.array as da
import numpy as np
import dask.dataframe as dd
import pandas as pd
import time
import seaborn as sn
import time

### Calculating Pi without and with using Dask: Watch for how the Code Is Written

Let's calculate pi without using Dask and see how long it takes for the cpu (pay attention to total cpu time), the user and the wall-clock time.  This is showing why dask code needs to be written correctly.  

In [4]:

##Compute pi without dask

def pi(N):
    time.sleep(10)

    # Initialize denominator
    k = 1

    # Initialize sum
    s = 0

    for i in range(N):

        # even index elements are positive
            if i % 2 == 0:
                s += 4/k
            else:
                s -= 4/k

        # denominator is odd
            k += 2
    return s


In [5]:
%%time
pi(10**7)

CPU times: user 1.23 s, sys: 0 ns, total: 1.23 s
Wall time: 11.2 s


3.1415925535897915

### Calculating Pi Using Dask the Wrong Way 

To calculate pi using dask, we can use the decorator function @dask.delayed or just delayed and the compute function to retrieve the output of our pi calculation. We can also use the client function to compute pi.  The delayed function will run tasks in parallel and is especially helpful for custom algorithms.  The client function will run dasks in parallel and distribute them in the cluster.  Although in the example below all of our code runs, and we call the right functions, this is not the most effective way to write code for dask.

What do we mean by right and wrong way? 

*Dask will be looking to spread individual tasks over different workers.* 

Wrong: If we have one large function, it will send that one large task to one worker. 
Right: If we split the calculation into smaller functions, dask will be able to send those smaller tasks to multiple workers and then compile the results at the end. 

Take a close look at the total cpu time and total wall clock time.  The CPU time is at many times less when using Dask, but the wall clock time is the same.  

What do you think is wrong with the code in this procedure?  Why doesn't the wall-clock time improve? Hint: Is Dask creating a 'future calculation' that is performed once the function is actually called?  In otherwords at runtime calculation?

In [6]:

## define function for pi without Dask, same function as defined earlier; 


def pi(N):
    time.sleep(10)

    # Initialize denominator
    k = 1

    # Initialize sum
    s = 0

    for i in range(N):

        # even index elements are positive
            if i % 2 == 0:
                s += 4/k
            else:
                s -= 4/k

        # denominator is odd
            k += 2
    return s


In [7]:
%%time

#Use the client to execute the original pi function

pi_client = client.submit(pi, 10**7)
pi_client = pi_client.result()
print(pi_client)

3.1415925535897915
CPU times: user 9.87 ms, sys: 1.22 ms, total: 11.1 ms
Wall time: 11.2 s


In [8]:
%%time

#Use dask delayed to execute the original pi function

from dask import delayed

pi = delayed(pi)(10**7)
pi.compute()

CPU times: user 11.1 ms, sys: 1.12 ms, total: 12.2 ms
Wall time: 11.2 s


3.1415925535897915

So we can see above that the fast calculations happen when calling pi with or without Dask (using the delayed or submit methods) with a wall-clock time is about 11 seconds.  What's the problem here?  Why isn't Dask making the code run faster?

### Speeding up the calculation of Pi, we need to write the code so it can be parallelized or distributed.

Here we create a function that can be run with only using only 1 term at a time, a iterating over a list and a single calculation (sum function).  
This function of pi will run in parallel on Dask.  You will notice an immediate time improvement.  
You'll notice that a single task is given in the routine which takes in a single term.  
Note: This is not the only way to parallelize a function, but in this case it is effective because it breaks larger tasks into smaller ones that can be run in parallel. In the code below you see how calculate pi in three ways, two of which use Dask.  The methods we show are correct and work:

- without Dask

- with paralellism and using futures (```delay``` function)

- with paralellism and distributed on the cluster (```client``` function)

Which way is fastest?  Why do you think so?  Hint: using the client function will distribute tasks over a cluster.

In [9]:
#Write a pi function that only computes one term at a time vs. earlier where the entire calculation 
# was defined within the function

def pi_single_term(i):
    time.sleep(0.01)
    # denominator is odd
    k = 2*i + 1
    # even index elements are positive
    if i % 2 == 0:
        x = 4/k
    # odd index elements are negative    
    else:
        x = - 4/k
    return x

In [10]:
%%time
## compute pi without Dask, using the new function 
s = []
for i in range(10**3):
    s.append(pi_single_term(i))
sum(s)

CPU times: user 20.7 ms, sys: 0 ns, total: 20.7 ms
Wall time: 10.1 s


3.140592653839794

In [11]:
%%time
## compute pi with Dask using Delayed

s = []
for i in range(10**3):
    s.append(dask.delayed(pi_single_term)(i))
dask.delayed(sum)(s).compute()

CPU times: user 78.8 ms, sys: 18.2 ms, total: 97 ms
Wall time: 5.62 s


3.140592653839794

In [12]:
%%time
## compute pi with Dask using Client

s=[]
for i in range(10**3):
    s.append(client.submit(pi_single_term, i))
client.submit(sum, s).result()

CPU times: user 313 ms, sys: 43.9 ms, total: 357 ms
Wall time: 5.59 s


3.140592653839794