<a href="https://www.dask.org/" target="_blank">
<img src="http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg"
     align="right"
     width="30%"
     alt="Dask logo\">
</a>

# Scalability

In this notebook, we will explain how Dask achieves scalability from multi-core local machines to large distributed clusters in the cloud for conducting large scale data analytics.

**Introduction**

Dask employs the **client-server model** to map computations to multiple cores in a single machine or distributed clusters. 

* In the client-side, a Python/Notebook application can send tasks (computations) to a **Dask Cluster**.
* A Dask Cluster is composed of a **scheduler** and the **workers**.
* The scheduler receives tasks (computations) and decide which worker will perform every task.
* The workers perform computations and store/share results with other workers.

<center>
<img src="img/dask-cluster.png" width="80%"/>
</center>
<center>
<a href="https://tutorial.dask.org/00_overview.html" target="_blank" width="30%"> Reference: Dask Tutorial Documentation </a>
</center>

**Content**

1. Cluster in a Local Machine.
2. Cluster in a High Performance Computing System.
3. Cluster in a Cloud Computing System.

**Learning outcomes**

## 1. Cluster in a Local Machine

The local cluster is the best option for researchers who are **just learning Dask** or **just starting a large-scale data analysis**. This configuration allows the realization of preliminary tests to later deploy the solution in large Supercomputers or Cloud Computing infrastructure. 

You can define a local cluster in two ways [2].

* Implicitly, Dask crteates a default local cluster for you.
* Explicitly, You define the local cluster by yourself using the Dask library.

### 1.1. Implicit cluster definition

In the implicit mode, the user doesn't have to define the cluster. Once the user defines a computation and uses the method `compute`, a default cluster is created for her. The default configuration uses all computer's available cores. More information can be found in [2]. Let's take a look on this.

__1. Import required libraries, define required variables and functions__

In [None]:
import dask.array as da

In [None]:
x = da.random.random((10000,10000), chunks=(1000,1000))
x

__2. Visualize the computations to be performed per array chunk.__

_Hint: This computations will be performed in the cores available in your computer._

In [None]:
x.visualize() 

__3. Compute the result__

_Hint: Dask submit all the computations to the scheduler, then, the scheduler will be in charge of distributing the computations among the available computing cores._ 

In [None]:
x.compute() # This uses Dask default local cluster

__4. Get `compute` method documentation for available parameters.__

_Hint: Advanced users can decide if they want to execute their computations using `threads` or `processes` by setting the scheduler parameter, `.compute(scheduler=)`. In addition they can decide if they want their computations to run sequentially by setting the scheduler option to `synchronous`._

In [None]:
help(x.compute)

### 1.2. Explicit cluster definition

Advanced users prefer to define the cluster by themsleves, i.e., explicitly, this give to them more flexibility for the configuration of the cluster. For example, they can define how many **workers** they want to use and how many `cores` and `memory` they want each worker to have. Finally, when you define the cluster by yoursleve you will be able to access the [Dask Dashborad](https://www.youtube.com/watch?v=EX_voquHdk0&t=293s).

In the explicit mode, you can define a cluster in one of two ways:
* By defining the client
* By defining the cluster and the client

#### 1.2.1. By defining the client

__1. Import required libraries, define required variables and functions__

In [None]:
from dask.distributed import Client

__2. Create a cluster by defining the client__

_Hint: The creation of the client, implies the creation of a cluster._

_Note: **Be carefull** you should not execute the cell bellow multiple times, that will create a lot of clusters._

_Activities:_ 

1. Run the cell bellow
2. Use the option `Lauch dashborad in JupyterLab`, this will display the Dask Dashboard.
3. In the Dashboard you will need to change the ip addres from `127.0.0.1` to `192.5.87.217`
4. The Daskboard has a plently of options to evaluate the performance of our cluster when performing a computation. Take a look on some of the options, for instance `cluster`.

In [None]:
client = Client()
client

__3. Showdown the cluster__

_Hint: This is **MANDATORY**, one you finish using a cluster you must turn it of, since it will release the computing resources your cluster was using_

In [None]:
# Shutdown the connected scheduler and workers 
client.shutdown()

#### 1.2.2. By defining the cluster and the client

__1. Import required libraries, define required variables and functions__

## 2. Cluster in a High Performance Computing System

The High Performance Computing (HPC) system for researchers with a clear research objective and understanding on how to use Dask to scale their experiments beyond the capabilities of a single computer. Depending on how Dask was configured in the HPC, it willbring significant advantages in communication-intensive computations.

__1. Import required libraries, define required variables and functions__

In [None]:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client

__2. Create a Dask cluster__

_Hint: Here you will define how namy `cores` and `memory` will have every Dask worker._

In [None]:
cluster = SLURMCluster(
    cores=1,
    memory="2 GB"
)

__3. Deploy the workers of your Dask cluster__

In [None]:
cluster.scale(4)

__4. Check if the Dask worker were deployed in the High Performance Computing System.__

In [None]:
!squeue -u `whoami`

In [None]:
import dask.array as da

In [None]:
x = da.ones(shape=(1000,1000),chunks=(10,10))
x

In [None]:
z = x + x
z

In [None]:
z.visualize()

## 3. Cluster in a Cloud Computing System

In [1]:
import coiled

In [2]:
cluster = coiled.Cluster(name='ss1',n_workers=5)

Output()

Output()

In [3]:
from dask.distributed import Client

In [4]:
client = Client(cluster)
client

0,1
Connection method: Cluster object,Cluster type: coiled.Cluster
Dashboard: https://cluster-dbaff.dask.host/1p01cD5dMdzLyn9v/status,

0,1
Dashboard: https://cluster-dbaff.dask.host/1p01cD5dMdzLyn9v/status,Workers: 5
Total threads: 20,Total memory: 74.28 GiB

0,1
Comm: tls://10.0.36.245:8786,Workers: 5
Dashboard: proxy/8787/status,Total threads: 20
Started: Just now,Total memory: 74.28 GiB

0,1
Comm: tls://10.0.40.2:45705,Total threads: 4
Dashboard: proxy/8787/status,Memory: 14.86 GiB
Nanny: tls://10.0.40.2:42321,
Local directory: /scratch/dask-scratch-space/worker-q6hg609_,Local directory: /scratch/dask-scratch-space/worker-q6hg609_

0,1
Comm: tls://10.0.37.95:33299,Total threads: 4
Dashboard: proxy/8787/status,Memory: 14.86 GiB
Nanny: tls://10.0.37.95:45235,
Local directory: /scratch/dask-scratch-space/worker-jqi7tuzx,Local directory: /scratch/dask-scratch-space/worker-jqi7tuzx

0,1
Comm: tls://10.0.40.24:42275,Total threads: 4
Dashboard: proxy/8787/status,Memory: 14.86 GiB
Nanny: tls://10.0.40.24:41119,
Local directory: /scratch/dask-scratch-space/worker-ndjbe4ms,Local directory: /scratch/dask-scratch-space/worker-ndjbe4ms

0,1
Comm: tls://10.0.33.96:32853,Total threads: 4
Dashboard: proxy/8787/status,Memory: 14.85 GiB
Nanny: tls://10.0.33.96:38487,
Local directory: /scratch/dask-scratch-space/worker-8r_kd4ni,Local directory: /scratch/dask-scratch-space/worker-8r_kd4ni

0,1
Comm: tls://10.0.40.25:36709,Total threads: 4
Dashboard: proxy/8787/status,Memory: 14.86 GiB
Nanny: tls://10.0.40.25:43455,
Local directory: /scratch/dask-scratch-space/worker-aw_f0ppp,Local directory: /scratch/dask-scratch-space/worker-aw_f0ppp


In [5]:
import dask

# generate random timeseries of data
df = dask.datasets.timeseries("2000", "2005", partition_freq="2w").persist()

# perform a groupby with an aggregation
df.groupby("name").aggregate({"x": "sum", "y": "max"}).compute()

Unnamed: 0_level_0,x,y
name,Unnamed: 1_level_1,Unnamed: 2_level_1
Laura,-1797.07617,1.0
Bob,-1613.952826,1.0
Patricia,1602.427319,1.0
Michael,1739.535661,1.0
Xavier,205.926563,0.999999
Norbert,2537.161149,1.0
Tim,-1144.599646,0.999999
Hannah,284.8451,1.0
Alice,1249.997793,1.0
Ingrid,-1306.448132,1.0


In [None]:
cluster.shutdown()
client.close()

Cerrar la conexion con coiled

# [Excerise 3](labs/Lab3.ipynb)

In [7]:
client = Client()
client 

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: proxy/8787/status,

0,1
Dashboard: proxy/8787/status,Workers: 4
Total threads: 4,Total memory: 14.85 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:40921,Workers: 4
Dashboard: proxy/8787/status,Total threads: 4
Started: Just now,Total memory: 14.85 GiB

0,1
Comm: tcp://127.0.0.1:37791,Total threads: 1
Dashboard: proxy/40173/status,Memory: 3.71 GiB
Nanny: tcp://127.0.0.1:34789,
Local directory: /scratch/dask-scratch-space/worker-uq8c9bkw,Local directory: /scratch/dask-scratch-space/worker-uq8c9bkw

0,1
Comm: tcp://127.0.0.1:33263,Total threads: 1
Dashboard: proxy/41281/status,Memory: 3.71 GiB
Nanny: tcp://127.0.0.1:44593,
Local directory: /scratch/dask-scratch-space/worker-ugzbsx9g,Local directory: /scratch/dask-scratch-space/worker-ugzbsx9g

0,1
Comm: tcp://127.0.0.1:37229,Total threads: 1
Dashboard: proxy/34371/status,Memory: 3.71 GiB
Nanny: tcp://127.0.0.1:33631,
Local directory: /scratch/dask-scratch-space/worker-g7uh81ta,Local directory: /scratch/dask-scratch-space/worker-g7uh81ta

0,1
Comm: tcp://127.0.0.1:34795,Total threads: 1
Dashboard: proxy/39731/status,Memory: 3.71 GiB
Nanny: tcp://127.0.0.1:39473,
Local directory: /scratch/dask-scratch-space/worker-5bkjws_t,Local directory: /scratch/dask-scratch-space/worker-5bkjws_t


# References

1. Deploy Dask Clusters - https://docs.dask.org/en/stable/deploying.html
2. Single-Machine Scheduler - https://www.devdoc.net/python/dask-2.23.0-doc/setup/single-machine.html
3. Getting Started with Dask: A Dask Setup Guide - https://www.youtube.com/watch?v=TQM9zIBzNBo&t=82s
4. Scheduler Overview - https://docs.dask.org/en/stable/scheduler-overview.html
5. JupyterLab Extension: How to Integrate Dask Dashboards & JupyterLab in 5 Minutes - https://www.youtube.com/watch?v=EX_voquHdk0&t=293s

# References

1. Dask on HPC Introduction - https://www.youtube.com/watch?v=FXsgmwpRExM&t=9s
2. dask_jobqueue.SLURMCluster - https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SLURMCluster.html
3. Configure Dask-Jobqueue - https://jobqueue.dask.org/en/latest/configuration-setup.html