<img src="images/dask_horizontal.svg" align="left" width="30%">

# Introducing Dask

**First,...**

<img src="images/should-i-use-dask.png" width="50%">

Dask is a parallel computing library that scales the existing Python libraries. This tutorial will introduce Dask and parallel data analysis more generally.


## Learning Objectives 

- Describe components that make up Dask
- Deploy a local Dask Distributed Cluster and access the diagnostics dashboard


## Prerequisites


| Concepts | Importance | Notes |
| --- | --- | --- |
| Familiarity with Python | Helpful | |


- **Time to learn**: *5 minutes*



## Dask Components 

Dask is composed of two main parts:

- **Dask Collections**
- **Dynamic Task Scheduling**

<img src="images/Dask Overview (Light).png" width="80%">

1. High-level collection APIs:
  - **Dask Array**: Parallel NumPy Arrays
  - **Dask DataFrame**: Parallel Pandas DataFrames
  - **Dask Bag**: Parallel lists
  - **Dask ML**: Parallel Scikit-learn


2. Low-level collection APIs:
  - **Dask Delayed**: Lazy parallel objects
  - **Dask Futures**: Eager parallel objects


3. Task Scheduling
  - **Scheduler**: 
    - creates and manages directed acyclic graphs (DAG)s
    - distributes tasks to workers
    
    
    
<div class="admonition alert alert-info">
    <p class="admonition-title" style="font-weight:bold">Lazy evaluation vs eager evaluation</p>
    <ul>
        
        <li> Lazy evaluation: objects are evaluated just in time when the results are needed </li> 
    
<li>Eager evaluation: objects are evaluated in real time regardless if the results are needed immediately or not </li>
    </ul>
</div>
    


## Advantages of using Dask

- **Familiarity**: Dask collections such as Dask Array, Dask DataFrames provide decent NumPy and Pandas compatible APIs.
- **Responsive**: Dask is designed with interactive computing in mind. 
    - It provides rapid feedback and diagnostics to aid humans
- **Scale up and scale down**: It scales well from single machine (laptop) to clusters (100s of machines)
    - This ease of transition between single machine to moderate clusters makes it easy for users to prototype their workflows on their local machines and seamlessy transition to a cluster when needed. 
    - This also gives users a lot of flexibility when choosing the best to deploy and run their workflows. 
- **Flexibility**: Dask supports interfacing with popular cluster resource managers such as PBS/SLURM/Kubernetes, etc.. with a minimal amount of effort

<img src="images/Dask Cluster Manager (Light)(1).png" width="80%">

## Task Graphs

Dask represents distributed/parallel computations with task graphs, more specifically [directed acyclic graphs](https://en.wikipedia.org/wiki/Directed_acyclic_graph).

- A task is a function that you want to call and its corresponding inputs
- A task graph is a collection of (1) the functions we want to call + their inputs (2) their dependencies. 


Directed acyclic graphs are made up of nodes and have a clearly defined start and end, a single traversal path, and no looping 

<img src="images/dask-task-stream.gif">

---
## The Dask Squad - Key Players

### Dask Client
The Client is what interfaces between your Python code and the scheduler - this is the primary API that comes from `dask.distributed`

The `dask.distributed` system is composed of a single centralized scheduler and one or more worker processes. [Deploying](https://docs.dask.org/en/latest/setup.html) a remote Dask cluster involves some additional effort. But doing things locally is just involves creating a `LocalCluster` object and connecting this object to a `Client` object, which lets you interact with the "cluster" (local threads or processes on your machine). For more information see [here](https://docs.dask.org/en/latest/setup/single-distributed.html). 

<img src="images/Distributed Overview (Light).png">

Note that `LocalCluster()` takes a lot of optional [arguments](https://distributed.dask.org/en/latest/local-cluster.html#api), to configure the number of processes/threads, memory limits and other 

### Dask Schedulers

As we have seen so far, Dask allows you to simply construct graphs of tasks with dependencies, as well as have graphs created automatically for you using functional, Numpy or Xarray syntax on data collections. None of this would be very useful, if there weren't also a way to execute these graphs, in a parallel and memory-aware way.

Dask comes with four available schedulers:

- "threaded" (aka "threading"): a scheduler backed by a thread pool
- "processes": a scheduler backed by a process pool
- "single-threaded" (aka "sync"): a synchronous scheduler, good for debugging
- distributed: a distributed scheduler for executing graphs on multiple machines, see below.

To select one of these for computation, you can specify at the time of asking for a result, e.g.,
```python
myvalue.compute(scheduler="single-threaded")  # for debugging
```

You can also set a default scheduler either temporarily
```python
with dask.config.set(scheduler='processes'):
    # set temporarily for this block only
    # all compute calls within this block will use the specified scheduler
    myvalue.compute()
    anothervalue.compute()
```

Or globally
```python
# set until further notice
dask.config.set(scheduler='processes')
```

## Distributed Dask clusters for HPC and Cloud environments

Dask can be deployed on distributed infrastructure, such as a an HPC system or a cloud computing system. There is a growing ecosystem of Dask deployment projects that faciliate easy deployment and scaling of Dask clusters on a wide variety of computing systems.

### HPC

#### Dask Jobqueue (https://jobqueue.dask.org/)

- `dask_jobqueue.PBSCluster`
- `dask_jobqueue.SlurmCluster`
- `dask_jobqueue.LSFCluster`
- etc.

#### Dask MPI (https://mpi.dask.org/)

- `dask_mpi.initialize`

### Cloud

#### Dask Kubernetes (https://kubernetes.dask.org/)

- `dask_kubernetes.KubeCluster`

#### Dask Cloud Provider (https://cloudprovider.dask.org)

- `dask_cloudprovider.FargateCluster`
- `dask_cloudprovider.ECSCluster`
- `dask_cloudprovider.ECSCluster`

#### Dask Gateway (https://gateway.dask.org/)

- `dask_gateway.GatewayCluster`


---
## Spinning Up a Cluster
Below, we will create a `LocalCluster`, connecting that cluster to your client!

### Imports

In [1]:
import dask
from dask.distributed import Client, LocalCluster

### Create the Cluster and Client

In [5]:
cluster = LocalCluster()
cluster

Perhaps you already have a cluster running?
Hosting the HTTP server on port 50410 instead


Tab(children=(HTML(value='<div class="jp-RenderedHTMLCommon jp-RenderedHTML jp-mod-trusted jp-OutputArea-outpu…

Now that we have our cluster, we can connect that to our client! You can click on the Dashboard link below to view the diagnostics dashboard!

In [7]:
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:50410/status,

0,1
Dashboard: http://127.0.0.1:50410/status,Workers: 4
Total threads: 16,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:50411,Workers: 4
Dashboard: http://127.0.0.1:50410/status,Total threads: 16
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://10.2.2.56:50426,Total threads: 4
Dashboard: http://10.2.2.56:50429/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:50415,
Local directory: /Users/mgrover/git_repos/pythia-foundations/core/dask/dask-worker-space/worker-33yczt2a,Local directory: /Users/mgrover/git_repos/pythia-foundations/core/dask/dask-worker-space/worker-33yczt2a

0,1
Comm: tcp://10.2.2.56:50425,Total threads: 4
Dashboard: http://10.2.2.56:50428/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:50414,
Local directory: /Users/mgrover/git_repos/pythia-foundations/core/dask/dask-worker-space/worker-5jj96j37,Local directory: /Users/mgrover/git_repos/pythia-foundations/core/dask/dask-worker-space/worker-5jj96j37

0,1
Comm: tcp://10.2.2.56:50427,Total threads: 4
Dashboard: http://10.2.2.56:50430/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:50417,
Local directory: /Users/mgrover/git_repos/pythia-foundations/core/dask/dask-worker-space/worker-wpsf1i20,Local directory: /Users/mgrover/git_repos/pythia-foundations/core/dask/dask-worker-space/worker-wpsf1i20

0,1
Comm: tcp://10.2.2.56:50422,Total threads: 4
Dashboard: http://10.2.2.56:50423/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:50416,
Local directory: /Users/mgrover/git_repos/pythia-foundations/core/dask/dask-worker-space/worker-cri0j6d9,Local directory: /Users/mgrover/git_repos/pythia-foundations/core/dask/dask-worker-space/worker-cri0j6d9


Your dashboard should look something like this! We have not sent any tasks to the client or scheduler yet, so it appears fairly empty.


![dask-dashboard](images/dask_dashboard.png)

There are a **number** of different dashboard we can access here, but this main "Status" page is typically sufficient. One other tab you may be interested in is the "Workers".

We see the same story as above - 4 workers, with a total of **16 GB of memory**. Something else to note here is that each worker as a total of 4 threads, with a total of 16 threads. The machine (a 16 inch Macbook Pro) used for this example has a total of **8 CPUs**, with two threads each, with a total of **16 threads**. Dask, by default, uses the maximum amount of memory and CPU resources available; this can be changed by adding additional arguments to `LocalCluster` or whichever cluster you are using.

![](images/workers.png)

For example, we can close down our cluster and client and create a new one

In [9]:
cluster.close()
client.close()

For example, if we wanted two workers with more memory on each worker, we could pass in `n_workers=2` to our `LocalCluster`.

In [12]:
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 50865 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:50865/status,

0,1
Dashboard: http://127.0.0.1:50865/status,Workers: 2
Total threads: 16,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:50866,Workers: 2
Dashboard: http://127.0.0.1:50865/status,Total threads: 16
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://10.2.2.56:50873,Total threads: 8
Dashboard: http://10.2.2.56:50875/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:50869,
Local directory: /Users/mgrover/git_repos/pythia-foundations/core/dask/dask-worker-space/worker-ck3kae1c,Local directory: /Users/mgrover/git_repos/pythia-foundations/core/dask/dask-worker-space/worker-ck3kae1c

0,1
Comm: tcp://10.2.2.56:50874,Total threads: 8
Dashboard: http://10.2.2.56:50876/status,Memory: 8.00 GiB
Nanny: tcp://127.0.0.1:50870,
Local directory: /Users/mgrover/git_repos/pythia-foundations/core/dask/dask-worker-space/worker-bk7g_ezv,Local directory: /Users/mgrover/git_repos/pythia-foundations/core/dask/dask-worker-space/worker-bk7g_ezv


When you open the dashboard, you will notice that there are now **two workers** with **8GB of memory** each, and **8 threads** on each worker.

This may be helpful when are trying to place **more data** on each worker.

![](images/less_workers.png)

## Summary

There are multiple key players within Dask, with the key components being:
- The client (what connects your analysis code with the scheduler)
- The scheduler (what distributes the different tasks to your workers)
- The workers (the components actually executing your code)

The `cluster` connects the scheduler and the workers, whereas the client provides a connector between your code and the cluster.

We spun up a `LocalCluster`, and provided an overview of the dashboard as well as how to configure your cluster setup.

### What's Next
Additional notebooks will cover actually sending tasks to the your scheduler and cluster! With more detail about:
- An overview of distributed computing
- An overview of `dask.arrays`
- How `Xarray` interfaces with `dask.arrays`

---

## Resources and references

* Reference
    *  [Docs](https://dask.org/)
    *  [Examples](https://examples.dask.org/)
    *  [Code](https://github.com/dask/dask/)
    *  [Blog](https://blog.dask.org/)
*  Ask for help
    *   [`dask`](http://stackoverflow.com/questions/tagged/dask) tag on Stack Overflow, for usage questions
    *   [github discussions](https://github.com/dask/dask/discussions) for general, non-bug, discussion, and usage questions
    *   [github issues](https://github.com/dask/dask/issues/new) for bug reports and feature requests