# Dask for Distributed Computing in Python

## Introduction and installation

In [1]:
%%html
<!-- Colors: https://encycolorpedia.com/-->
<!-- Comments ommited due to a bug in Jupyter-->

<style>

    a:link { 
        color: #0000EE; 
    }
    a:visited { 
        color: #551A8B; 
    }
    a:active { 
        color: #EE0000; 
    }

    h1 { 
        font-size: 30px; 
        color: rgba(220, 20, 60, 1) !important;  
    }

    h2 {
     font-size: 25px;
     color: rgba(255, 140, 0, 1); /* Orange */		 
    }	 

    h3 {
     font-size: 20px;
     color:rgba(204, 85, 0, 1); /* Dark orange */		 
    }	 
        
    td {
      text-align: center;
    }

    div.highlight_red {    
        background-color: rgba(179, 0, 0, .1);
        background-opacity : 0.5;
        }

    div.highlight_red .title_box {
        background-color: rgba(179, 0, 0, .6);
        width: 100%;
    }

    div.highlight_green {    
        background-color: rgba(	19, 98, 7, .1);
        background-opacity : 0.5;
        }

    div.highlight_green .title_box {
        background-color: rgba(	19, 130, 7, .6);
        width: 100%;
    }

    div.highlight_turquoise {    
        background-color: rgba(	40, 154, 164, .1);
        background-opacity : 0.5;
        }

    div.highlight_turquoise .title_box {
        background-color: rgba(	40, 154, 164, .6);
        width: 100%;
    }

    div.highlight_purple {    
        background-color: rgba(120, 81, 169, .1);
        background-opacity : 0.5;
        }

    div.highlight_purple .title_box {
        background-color: rgba(120, 81, 169, .6);
        width: 100%;
    }

    div.highlight_blue {    
        background-color: rgba(	65, 105, 225, .1);
        background-opacity : 0.5;
    }

    div.highlight_blue .title_box {
        background-color: rgba(	65, 105, 225, .6);
        width: 100%;
    }

    .title{
        text-indent: 1%;
        padding: .25em;
        font-weight: bold;
        font-size: 18px;
        color : white;
    }

    .content{
        text-indent: 2%;
        padding: 1em;
        font-size: 14px;
    }

</style>

<div class="highlight_blue">
<div class="title_box">
    <div class="title">
        ☞ Summary
    </div>
</div>


<div class="content">

We provide a short tutorial on <a href="https://docs.dask.org/en/stable/" >Dask</a>, a Python library for parallel & distributed computing. In short, Dask is a **very pythonic** way of employing distributed computing and scaling common numerical libraries, such as ```numpy```. Some convenient functionalities, include:

- A very numpy/pandas-like behavior and syntax for many of its main functionalities
- Handling of very large objects and parallel computations, such as very large matrices or computationally expensive loops...
- ... with little overhead, due to the lazy handling of arrays and computations. The framework is very pythonic and scalable, being easy to adapt even for existing code.
- This means that one can handle large data even on lower-end systems, such as laptops, defining _Local Clusters_.
- Similarly, Dask is also very convenient in HPC environments, where it can be used as a scheduler. It integrates into the existing python workflow very seamlessly: one is often able to parallelize existing codebases with very little effort.
- Dask also comes with its own data structures and data types, which might be very convenient
- Helpful dashboard for debugging and monitoring

</div>

From <a href="https://docs.dask.org/en/stable/why.html#:~:text=Dask%20can%20enable%20efficient%20parallel,it%20doesn't%20have%20to.">why Dask?</a>: 

> "Moreover, Dask is co-developed with these libraries to ensure that they evolve consistently, minimizing friction when transitioning from a local laptop, to a multi-core workstation, and then to a distributed cluster. Analysts familiar with Pandas/Scikit-Learn/Numpy will be immediately familiar with their Dask equivalents, and have much of their intuition carry over to a scalable context."

The jargon here probably will not make much sense now, but feel free to come back to these diagrams anytime. When using Dask, we will basically be dealing with three data types (depending on how you count it):

- Usual types and data strctures from Python and its libraries: ints, floats, numpy arrays, lists and so on.
- The two types which fall under the umbrella of the **low-level API**:
  - **Delayed objects** -- which are lazy evatuations/task graphs which are only computed when necessary/desired -- greatly saving memory and computation time if properly managed.
  - **Futures** -- which work more or less as tokens or pointers to asynchronous computations running in the background. Instructions can be given to these functions, and they are performed only when the underlying computations have been completed -- this eliminates the need of "manually" checking whether the dependencies necessary for a given calculations have been done or not, greatly simplifying the workflow. This is useful for interactive and real-time computations.
- Types which fall under the umbrella of the **hight-level API**, namely objects such as **Dask arrays**. **dataframes** and so on -- which are based on their numpy/pandas counterpart and provide a high-level abstraction for these data types within the Dask framework.

This is illustrated in the diagram below:

<div style="text-align:center;">
<img src="images/dask_objects.png" alt="Dask Types">
</div>

One can always easily convert between these types and paralellize (or gather) computations using the interfaces in Dask. We will see this in detail in the upcoming sections, but we give a graphical summary here for reference. Note how within this framework we have ideas such as the notion of [MapReduce](https://en.wikipedia.org/wiki/MapReduce) in our workflow:

<br>
<div style="text-align:center;">
<img src="images/dask_objects_conversion.png" alt="Dask Types Conversion">
</div>

#### ⚙️ Instructions

Although you can run and test many of these functions on your local machine, it would be ideal to already do that inside the login or interactive node of a cluster. One should do port-forwarding through (i) a port for Jupyter-lab and (ii) another one for the dashboard functionalities in Dask. This is done through ports ```8888``` and ```8787```, respectively (by default):

```
ssh -L 8888:localhost:8888 -L 8787:localhost:8787 -J user@gate-adress.com user@adress.com
```

We start by importing some libraries and making the necessary installations. Note that dask, or at least some other tools and dependencies such as 
<a href="https://www.open-mpi.org">open MPI</a> and <a href="https://mpi4py.readthedocs.io/en/stable/tutorial.html">mpi4py</a>, might be already available on the Cluster (if that is your case).

```
conda create -n dask_env
conda activate dask_env
conda install dask dask-jobqueue dask-mpi mpi4py -c conda-forge
```

<div class="highlight_green">
<div class="title_box">
    <div class="title">
        ❐ Packages
    </div>
</div>


<div class="content">

We have installed four packages here:

- <a href="https://www.dask.org">Dask</a>, the base library 
- <a href="https://www.dask.org">Dask-Jobqueue</a>, a Dask package for deployment in typical queuing systems found in HPC, such as SLURM 
- <a href="https://mpi.dask.org/en/latest/index.html">Dask-mpi</a>, an interface between Dask and existing MPI environments. Finally, we install mpi4py as a prerequisite (if necessary, since it is often included in many HPC systems)
</div>

#### First glance: dask array

In [11]:
import numpy as np
import dask
import dask.array as da
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
import os
import sys

As mentioned above, Dask has many convenient capabilities, such as dealing with <a href="https://examples.dask.org/array.html">Dask arrays</a>, which helps us manipulating very large arrays through chunks. We take this as a first example of the features of this library:  

<div style="text-align:center;">
<img src="images/dask_array.png" alt="Dask Array">
</div>

Also note how it inherits, by design, much of the syntax from ```numpy```:

In [3]:
# The chunk sizes must add up to the dimension of the full matrix
partitioned_array = da.random.random((10000, 10000), chunks=((7000, 3000), (6000, 2000, 2000)))
partitioned_array

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,320.43 MiB
Shape,"(10000, 10000)","(7000, 6000)"
Dask graph,6 chunks in 1 graph layer,6 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 762.94 MiB 320.43 MiB Shape (10000, 10000) (7000, 6000) Dask graph 6 chunks in 1 graph layer Data type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,320.43 MiB
Shape,"(10000, 10000)","(7000, 6000)"
Dask graph,6 chunks in 1 graph layer,6 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


We can access these chunks individually, if necessary:

In [4]:
partitioned_array.blocks[0, 1]

Unnamed: 0,Array,Chunk
Bytes,106.81 MiB,106.81 MiB
Shape,"(7000, 2000)","(7000, 2000)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 106.81 MiB 106.81 MiB Shape (7000, 2000) (7000, 2000) Dask graph 1 chunks in 2 graph layers Data type float64 numpy.ndarray",2000  7000,

Unnamed: 0,Array,Chunk
Bytes,106.81 MiB,106.81 MiB
Shape,"(7000, 2000)","(7000, 2000)"
Dask graph,1 chunks in 2 graph layers,1 chunks in 2 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


## A first example

### The scheduler: dask_jobqueue and single-node processes

<div class="highlight_green">
<div class="title_box">
    <div class="title">
        ❐ Relevant documentation
    </div>
</div>


<div class="content">
<ul>
    <li>Check the documentation for <a href ="https://jobqueue.dask.org/en/latest/index.html">jobqueue</a>, with an explanation on <a href="https://jobqueue.dask.org/en/latest/howitworks.html">how it works</a> on the background and the <a href="https://jobqueue.dask.org/en/latest/configuration.html">configuration</a>. For SLURM Clusters in particular, see <a href="https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SLURMCluster.html">this link</a>.</li>
</ul>
</div>
</div>

The simplest way of using Dask is probably to paralellize single-node process. Basically, what we will be doing now is use Dask to aid us in scheduling a bunch of different processes which do _not_  communicate with each other directly and operate asynchroniously. For that, we will basically set up a bunch of parameters for jobs which will be sent to the cluster. Dask will then be responsible for organizing and submiting everything through a scheduler, such as SLURM, which we will be using here. 

- More concretely, we can start by defining the number of _cores_ and number of _processes_ which will be used **in a single job**, as well as the total number of concurrent _jobs_:

In [22]:
n_cores = 72
n_processes = 4
n_jobs = 2

The number of cores per process is then just ```n_cores```/```n_processes```. Thus, in our example, each "task" would have 18 cpus available. With that in mind, we can use the ```SLURMCluster``` class from  ```dask_jobqueue```:

In [23]:
# The number of cores/processes should not go beyond the capabilities of a single node/macine!

cluster = SLURMCluster(cores=n_cores                  # Total number of Kernels in the job
                       , processes = n_processes      # Number of Python processes to cut up each job. This will be de number of workers PER JOB
                       , memory="4GB"                 # Memory available for the job 
                       , walltime='00:30:00'
                       , local_directory=os.getcwd()  # Get current directory with os
                       , log_directory=os.getcwd()   
                      )

Perhaps you already have a cluster running?
Hosting the HTTP server on port 39761 instead


<div class="highlight_red">
<div class="title_box">
    <div class="title">
        ⚠ Note
    </div>
</div>

<div class="content">
These are the characteristics of a simple task/worker running in a single node. When we create an object in this class Dask automatically sets up an template for a job script, which it later submits by choosing (scaling) the number of jobs with the command below. The cell above really just sets up the desired parameters.</div>
</div>

In [24]:
# Submits n jobs w/ the configuration above
cluster.scale(jobs=n_jobs)

We can just the number of jobs, as shown above. However, it is also possible to specify either the number of workers, memory or cores. Dask will then choose the corresponding number of jobs. By using the ```job_script``` method we can get an idea of how things are working behind the scenes:

In [26]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -e Cluster/Jupyter-Gabriel/Dask-test//dask-worker-%J.err
#SBATCH -o Cluster/Jupyter-Gabriel/Dask-test//dask-worker-%J.out
#SBATCH -n 1
#SBATCH --cpus-per-task=72
#SBATCH --mem=4G
#SBATCH -t 00:30:00

/u/alvesgo/conda-envs/default_env/bin/python -m distributed.cli.dask_worker tcp://130.183.223.16:33457 --name dummy-name --nthreads 36 --memory-limit 1.86GiB --nworkers 2 --nanny --death-timeout 60 --local-directory Cluster/Jupyter-Gabriel/Dask-test/



You can also check the status of these submissions through the terminal just to be sure that everything is working as expected:

In [None]:
# We can see the scheduled jobs running this terminal command:
!squeue -u alvesgo

Node that this approach with Dask creates ```n_jobs```, so it is limited by the maximum number of running jobs on a Cluster (16 in the cluster used here). We should use other APIs, such as dask-mpi, in order to allocate multiple nodes per job. For instance, one might be interested in a single job which uses 50 nodes rather than 50 single node jobs. This might very well be your use case so feel free to check the last part of this notebook.

Finally, note that the we can cleraly print the jobs/cluster information with the built-in functions in the library as as final check:

In [20]:
# Connect to that cluster
client = Client(cluster) 
print(client)

<Client: 'tcp://130.183.223.16:39785' processes=0 threads=0, memory=0 B>


Here we have used the ```Client``` class to establish a connection with the Dask cluster. Per the documentation:

> It provides an asynchronous user interface around functions and futures.

The _client_ is the central process where we controle everything. They receive the results from the _workers_.

The graphical representation on notebooks is also very convenient. Here you can see the number of workers and their hardware specification:

In [None]:
client

<div class="highlight_green">
<div class="title_box">
    <div class="title">
        ❐ Dashboard
    </div>
</div>


<div class="content">
By accessing <a href="https://www.dask.org">http://localhost:8787/status</a> you can find a very complete dashboard containing information about your ongoing jobs and workers. This <a href=~https://docs.dask.org/en/latest/dashboard.html~>page</a> from the documentation provides a nice overview.
</div>
</div>

### Monte Carlo Method

First we define a function which assigns random $(x, y)$ coordinates to the column of a matrix. Each column is a "realization" of the MC method. We then divide this large matrix into chunks, which we send to the workers. For this we use the built-in array format, which is analogous to numpy arrays:

In [217]:
def xy_coord_random(num_runs, num_chunks):

    chunk_size = num_runs//num_chunks

    # Returns a dask array with entries between 0 and run
    return da.random.uniform(low=0, high=1, size=(2, num_runs), chunks=(2, chunk_size))

In [218]:
xy_coord_random(100, 5)

Unnamed: 0,Array,Chunk
Bytes,1.56 kiB,320 B
Shape,"(2, 100)","(2, 20)"
Dask graph,5 chunks in 1 graph layer,5 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 1.56 kiB 320 B Shape (2, 100) (2, 20) Dask graph 5 chunks in 1 graph layer Data type float64 numpy.ndarray",100  2,

Unnamed: 0,Array,Chunk
Bytes,1.56 kiB,320 B
Shape,"(2, 100)","(2, 20)"
Dask graph,5 chunks in 1 graph layer,5 chunks in 1 graph layer
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [212]:
import random

# Total number of monte carlo runs
runs = 1e6
 
def dask_array_MC(num_runs, num_chunks):
    """Computes pi with monte carlo simulations. Performs num_runs/num_chunks on each worker"""

    # Generates the random coordinates in appropriate chunk sizes
    xy_coordinates = xy_coord_random(num_runs, num_chunks) 

    # Computes the distance from the origin
    distance_origin = (xy_coordinates ** 2).sum(axis=0)

    # Counts the points within the circle
    in_circle = (distance_origin < 1)

    # Ratio area (slice of) circle/area square
    pi = 4 * in_circle.mean()

    return pi

We can compute the serial time:

In [223]:
%%time
chunks = 1
print("Estimated value of pi for", chunks, "chunks:", dask_array_MC(runs, 1).compute())

Estimated value of pi for 1 chunks: 3.1428112
CPU times: user 77.9 ms, sys: 0 ns, total: 77.9 ms
Wall time: 489 ms


Since we have $n$ workers, each worker will perform $10^6/n$ simulations. With our current parameters we have $n=16$ workers:

In [224]:
%%time
chunks = 32
print("Estimated value of pi for", chunks, "chunks:", dask_array_MC(runs, 1).compute())

Estimated value of pi for 32 chunks: 3.1422432
CPU times: user 67.8 ms, sys: 0 ns, total: 67.8 ms
Wall time: 477 ms


Note that using a larger number of chunks now _increases_ the wall time. We have more chunks than available workers, so many of these calculations will queue up:

In [225]:
%%time
chunks = 128
print("Estimated value of pi for", chunks, "chunks:", dask_array_MC(runs, 1).compute())

Estimated value of pi for 128 chunks: 3.1410944
CPU times: user 62.7 ms, sys: 90 µs, total: 62.8 ms
Wall time: 444 ms


## dask.delayed  | Parallelizing a task with lazy evaluations

In this section we will show how to extract and use the results from calculations performed by different nodes and workers; centralizing all the peripheral computations.

As example, we define three different matrices $A$, $B$ and $C$. We can parallelize a for loop containing the matrix operations using the interface ```dask.delayed(...)(...)```. This is a wrapper responsible for operating with the function in the _first_ argument in a lazy manner, while the arguments of the function of interest come into the second set of parantheses. The function will essentially build a task graph in the background with our function calls.

- Actually, you can even use the usual decorator synthax in Python in order to avoid too much boiler plate when paralellizing stuff with Dask. <a href="https://docs.dask.org/en/stable/delayed.html#decorator">Check this page</a>, and also [this one](https://docs.dask.org/en/latest/delayed-api.html) in the API.

In [235]:
# Squares a matrix w/ matrix multiplication
def squared(arr):
    return arr@arr

# Defines 3 random matrices of size n
n=5000
mat_A =  np.random.rand(n, n)
mat_B =  np.random.rand(n, n)
mat_C =  np.random.rand(n, n)

# Output containing the (tokens/futures) for each squared value
output = []

# Passes the 'squared' to dask.delayed in order to paralellize it and then appends to the list
for mat in [mat_A, mat_B, mat_C]:
    mat = dask.delayed(squared)(mat) 
    output.append(mat)               

# A + B + C using 'sum'
total = dask.delayed(sum)(output)

Also, Note that the parallellzation stems from the fact that we are _simultaneously_ operating on the matrices $A$, $B$ and $C$. The internal steps of the matrix multiplication however are **not** parallel. The syntax here with ```dask.delayed(squared)``` might look a bit weird, but what is happening behind the scenes is just <a href="https://toolz.readthedocs.io/en/latest/curry.html" >currying</a>. A lot of stuff happening is very functional! See also <a href="https://www.stratascratch.com/blog/go-to-guide-to-currying-in-python/">this link</a> for more details.

- An <a href="https://softwareengineering.stackexchange.com/questions/293851/what-is-it-about-functional-programming-that-makes-it-inherently-adapted-to-para">excellent question on this topic </a> can be found in SE

Regardless of the details, all these operations are done lazily, and when you access ```dask.delayed(...)(...)``` you get a _token_ pointing to the result of your calculation:

In [236]:
total

Delayed('sum-dbeeff7e-3d5a-4ff6-812e-b24a650dc822')

You can then use the method ```compute``` to get the numerical result, that is, a _concrete value_:

In [237]:
total.compute()

array([[3751.96892472, 3744.5341925 , 3750.79766574, ..., 3772.88617264,
        3710.84912013, 3760.78695976],
       [3742.43949997, 3754.26777582, 3773.77063438, ..., 3767.1661149 ,
        3735.62635214, 3760.77346046],
       [3751.35205524, 3753.20059541, 3769.94835268, ..., 3766.99535821,
        3722.44762785, 3788.21781464],
       ...,
       [3774.13223642, 3763.1392995 , 3784.17558326, ..., 3784.93699223,
        3755.32813315, 3786.16227172],
       [3777.17175516, 3769.73576613, 3787.77744033, ..., 3793.49471824,
        3755.29158264, 3791.72912123],
       [3705.88167301, 3740.14915976, 3741.3643459 , ..., 3736.84298373,
        3701.47322593, 3748.37399958]])

We can also get a **graph representation** of these lazy objects:

In [None]:
total.dask

We can clearly see that ``Delayed`` objects act as proxies for the object they wrap, but all operations on them are done lazily by building up a dask graph internally. Compare it with the standard computation ```mat_A@mat_A + mat_B@mat_B + mat_C@mat_C``` to verify that you would get the very same result.

### Comparison with serial code

Now we compare this with what we would get w/ non-parallel code. For that we will try to solve for the eigenvalues for a bunch of large matrices. Additionaly, since we can have at most ```n_jobs``` $\times$```n_processes``` tasks (or workers) going on, this is the number of matrices we will try to diagonalize:

In [239]:
mat_lst = [np.random.rand(n, n) for i in range(n_jobs*n_processes)]

The single node job naturally takes a while:

In [35]:
%%time

# Computation done on 72 cpus on an interactive node in the RAVEN Cluster
output = []

for mat in mat_lst:
    output.append(np.linalg.eigvals(mat))

CPU times: user 5h 25min 21s, sys: 6min 20s, total: 5h 31min 41s
Wall time: 4min 36s


By deploying the workers and paralellizing we get much faster results:

In [240]:
%%time

output = []

for mat in mat_lst:
    # Lazy interfacing of np.linalg.eigvals
    output.append(dask.delayed(np.linalg.eigvals)(mat))

total = dask.delayed(output)

# Starts the computation
total = total.compute()

# For concreteness, prints the first matrix
total[0]

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


CPU times: user 6.11 s, sys: 7.18 s, total: 13.3 s
Wall time: 55.7 s


array([ 2.49960890e+03 +0.j        , -1.58445192e+01+13.16488534j,
       -1.58445192e+01-13.16488534j, ...,  1.78548352e-01 -0.35003405j,
       -4.11920976e-01 +0.36222131j, -4.11920976e-01 -0.36222131j])

## distributed.Future | The map/reduce paradigm

You might have noticed that we got some annoying (but very helpful) warning in the last section regarding the very large graphs. There is actually a small set of a few other very helpful functions for paralellization which will allow us to do some of these calculations more carefully, sometimes with less overhead.

- See map, submit and gather in the <a href="https://distributed.dask.org/en/stable/quickstart.html">quickstart page for the ```dask.distributed```</a> package. The <a href="https://distributed.dask.org/en/latest/client.html">documentation on ```client```</a> also provides some valuable insight.

<div class="highlight_blue">
<div class="title_box">
    <div class="title">
        ☞ Using futures
    </div>
</div>


<div class="content">

You can use the ```Submit``` and  ```Map```  to submit and perform remote calculations/functions in the distributed workers for **single or many tasks, respectively**. As mentioned before, these functions return just tokens, and no the computation result itself. Instead, the result is stored locally in the cluster. They can be retrieved with the ```Future.result``` or ```Client.gather``` methods.
</div>
</div>



This <a href="https://distributed.dask.org/en/latest/client.html">page on Client</a>, and more importantly, on <a href="https://docs.dask.org/en/latest/futures.html">futures</a>, has a few important details on:

- How future works and the purity of function calls
- And about the Async/await operations and efficiency of computations
- Besides, one important difference when compared to ```dask.delayed``` is that the evaluation is **immediate** (running in the background), **not lazy**

This [short video](https://youtu.be/07EiCpdhtDE) provides a very pedagogical introduction. From the documentation itself:

> "You can pass futures as inputs to submit. Dask automatically handles dependency tracking; once all input futures have completed, they will be moved onto a single worker (if necessary), and then the computation that depends on them will be started."

The future and keys are useful because they often avoid redundancy and computing the same thing over and over again. That is why it is important knowing that ```distribute``` **assumes that functions are pure by default**. One need to flag it as false otherwise!  And perhaps even more conveniently, you do not have to worry so much about dependency tracking, as outlined above -- this delegates a lot of the hardwork to the library! 

### An example

Let's do another silly example. We will square large matrices and then compute their eigenvalues:

In [99]:
%%time

# Redefines a new matrix list
mat_lst = [np.random.rand(n, n) for i in range(n_jobs*n_processes)]

# This is prepared in the scheduler (and then submited to the background):
squared_mat_lst = client.map(squared, mat_lst)

CPU times: user 9.9 s, sys: 917 ms, total: 10.8 s
Wall time: 9.32 s


We get a future object/a token pointing to it:

In [100]:
eig_lst[0]

We can retrieve it to the client with ```.result()```:

In [101]:
eig_lst[0].result()

array([[1275.82259087, 1235.85577784, 1249.04106904, ..., 1237.84133264,
        1243.03154767, 1260.15365486],
       [1287.65928071, 1245.23824462, 1252.98365661, ..., 1263.70408106,
        1255.99548535, 1273.9411894 ],
       [1260.47477711, 1231.30376769, 1243.35862719, ..., 1233.05677994,
        1230.9708148 , 1253.37178606],
       ...,
       [1279.23130428, 1248.45153704, 1257.19584802, ..., 1257.43233617,
        1254.33921521, 1270.38945455],
       [1275.23281068, 1246.66132112, 1245.23571806, ..., 1253.62257434,
        1250.99371266, 1268.62430216],
       [1268.98030849, 1236.69683326, 1239.72837433, ..., 1240.82074379,
        1236.43997673, 1252.2041179 ]])

You may see the Future above either as **pending** or **finished**, since the process is running in the background through the workers. Now we take this list of squared matrices ```squared_mat_lst``` and map ```np.linalg.eigvals``` over them:

In [None]:
%%time

# Maps the function call
mapped_eig_lst = client.map(np.linalg.eigvals, squared_mat_lst)

# Gathers the list of futures with .gather()
gathered_eig_lst = client.gather(mapped_eig_lst)

For concreteness, the first set of eigenvalues is:

In [None]:
gathered_eig_lst[0]

Finally, notice that ```gather``` retrieves data from the distributed workers. Meanwhile, ```Client.scatter```  does the reverse process: it scatters the local desired data to the distributed processes. When we do that we get a future pointing to that data. This might avoid an overhead of too much data movement. We discuss this in the next session.

<div class="highlight_green">
<div class="title_box">
    <div class="title">
        ❐ Remark
    </div>
</div>


<div class="content">
In short, the Dask workflow is very convenient, allowing for both <b>high level</b> and <b>low level</b> use:
    
- High level data structures which can be used out-of-the-box, such as the Dask arrays...
- ...as well a lower level APIs, suck as ```dask.delayed```, which allow us to write more arbitrary parallel code.
</div>
</div>

## Data locality and managing memory | Scattering and moving large data

### Submit()

<div class="highlight_blue">
<div class="title_box">
    <div class="title">
        ☞ Submitting tasks
    </div>
</div>


<div class="content">

As you have probably noticed in the previous examples, ```submit()``` is the routine used in ```dask.distributed``` to send _functions_ to workers, returning a _future_ object which corresponds to the async calculation running in the background in one of workers. The resulting future can be further handled by other Dask functionalities. On top of parallelizing and speeding up computations, ```submit()``` is also useful in managing memory when dealing with a distributed system.
</div>
</div>

For this example, we will generate very large numpy arrays:

In [5]:
def random_matrix(n):
    return np.random.rand(n,n)

Now, let us define a function ```random_matrix(n)``` which computes $c \mathrm{\: Tr\: } {\bf M}$ given a matrix $\bf M$ which is $n \times n$ and a scalar $c$.

In [6]:
# Trace of M times a constant
def dask_trace(M, parameter=1):
    return parameter*np.trace(M)

If we try to generate a large matrix in the client and then send it to the workers through Dask, we'll get a warning when invoking a function by wrapping it with ```dask.delayed```:

In [8]:
dim = 2**11

random_matrix_client = random_matrix(dim)
dask.delayed(dask_trace)(random_matrix_client).compute()

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


1021.1154255373143

Which has more or less the same size as the matrix:

In [12]:
print("Matrix size (in Mb): ", sys.getsizeof(random_matrix_client)/10**6)

Matrix size (in Mb):  33.55456


This is specially annyoing if we want to, let's say, map this over several different workers. There is a lot of data movement involved. How to fix this? There are a lot of options.
We we did in the previous section, we can just use ```submit``` -- which submits a _function application_, cobining it with ```delayed```. 
This command will make the scheduler tell one of the workers to execute the function ```random_matrix``` taking ```dim``` as the argument:

In [44]:
random_matrix_in_worker = client.submit(random_matrix, dim)
random_matrix_in_worker

Note that ```client.submit()``` takes a concrete value/normal python object and transforms it into a **future**. We could then retrieve the result with ```.result()```. For instance, we could use to retrieve the result back to the client and then just use the result for further computations. We could, let's say, take the trace:

In [45]:
np.trace(random_matrix_in_worker.result())

1016.4303913711992

However, bringing the result back to the scheduler is unnecessary. We can call the desired function lazily, as we did before. This is even better, since now we don't even need to bring the (large) intermediate result into play. We can just ask the worker to perform this computatation lazily, without ever bringing the matrix back to the client. After that we can simply use the ```.compute()``` method to evaluate the result:

In [49]:
computation_in_worker = dask.delayed(np.trace)(random_matrix_in_worker)
computation_in_worker

Delayed('trace-c6ad9bf5-f213-4982-8370-f4b86c6aa6d6')

In [51]:
computation_in_worker.compute()

1016.4303913711992

By being more careful we move much less data around and do not get the warning we had before!

### Scatter

<div class="highlight_blue">
<div class="title_box">
    <div class="title">
        ☞ Sending data from the client to the workers
    </div>
</div>


<div class="content">

Another very useful feature is the [data scattering](https://distributed.dask.org/en/stable/locality.html#data-scatter) functionality, which allows us to transmit and distribute data from the clients to the workers, which you can find in the linked documentation. This is very similar to the ```submit()``` method, and is even more important concerning the management of memory across the distributed system or local computer.


If desired, it is possible data to specific workers. However, often we are also interested in sending the _same_ data to _all workers_. This is what we will do in this toy example. In this case we need the option ```broadcast=True```:
</div>
</div>

In [52]:
# Sends data to all workers
scattered_futures = client.scatter([1, 2, 3], broadcast=True)  
scattered_futures

[<Future: finished, type: int, key: int-35dba5d75538a9bbe0b4da4422759a0e>,
 <Future: finished, type: int, key: int-beb4dbf9af069aa2df7b147229965085>,
 <Future: finished, type: int, key: int-f2577a6fc29b900fe7d4c6321346be48>]

<div class="highlight_red">
<div class="title_box">
    <div class="title">
        ⚠ Note
    </div>
</div>

<div class="content">
    
- Note that while <code>submit()</code> sends a <b>function application</b>, <code>scatter()</code> sends  <b>data</b>
- Both methods ```distributed.Client.submit() ``` and  <code>distributed.Client.scatter()</code> return **a future object**
- Also, note that depending on preference or specific implementation, interfacing with <code>delayed()</code> might be prefered when working with a large computation graph
</div>

As an example, let us, once again, use the random matrix ```random_matrix_client``` we had crated **locally** in the client. We can take this large amound of data and **send it to workers** so they can operate with it. This can be troublesome, since whenever the workers request the data we have to send it again -- so ```random_matrix_client``` is essentially being sent everytime:

In [54]:
# Maps the trace function on the local matrix created above with three different parameters as an argument
mapped_results = client.map(lambda c : dask.delayed(dask_trace)(random_matrix_client, c), [0.5, 1, 2])
dask.compute(*client.gather(mapped_results))

This may cause some slowdown.
Consider scattering data ahead of time and using futures.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.


(510.55771276865715, 1021.1154255373143, 2042.2308510746286)

In the code below we fix the problem, since by preemptively scattering the data among all workers, it is readily available to them. 

<div class="highlight_blue">
<div class="title_box">
    <div class="title">
        ☞ The trick
    </div>
</div>


<div class="content">

More concretely, this works as follows: the method ```client.scatter()``` will create a future, and all further computation will point to this future -- thus, there will be no excessive data movement like in the previous example. This is useful since without ```scatter()``` we might need to resend the data whenever it is requested by one of the workers, even with ```submit()``` and alike. So, by combining ```scatter()``` -- for transfering _data_ -- and the other methods we saw before, such as ```submit()```and ```delayed()``` -- for submitting _tasks and operations_, we can greatly improve memory management in our workflow:
</div>
</div>

In [61]:
# Scatters data among workers -- returns  list of futures
scattered_matrix = client.scatter(random_matrix(dim), broadcast=True)  

# Maps the function dask_trace over the data above, wrapping it with delayed
mapped_results_scatter = client.map(lambda c : dask.delayed(dask_trace)(scattered_matrix, c), [0.5, 1, 2])

# Gathers the results form the futures obtained from client.gather() -- which returns a list of delayed objects, then computes them objects w/ dask.compute()
dask.compute(*client.gather(mapped_results_scatter))

(500.4189466208778, 1000.8378932417556, 2001.6757864835113)

Note that here we did not really get any warnings!

### Manually operating with workers

Finally, to conclude, we show how this can be done in a more manual fashion. With ```client.scheduler_info()['workers']``` we can get the address of each worker and then submit tasks to specific workers:

In [69]:
worker_addresses = client.scheduler_info()['workers'].keys()
worker_addresses = list(worker_addresses)
worker_addresses

['tcp://127.0.0.1:38551', 'tcp://127.0.0.1:42935']

<div class="highlight_red">
<div class="title_box">
    <div class="title">
        ⚠ Note
    </div>
</div>

<div class="content">
    
Data broadcasting with ```client.scatter()```, specially replicas, might be actually incompatible with the [active memory manager in Dask](https://distributed.dask.org/en/latest/active_memory_manager.html#reducereplicas). It must be manually disabled! See this [issue](https://github.com/dask/distributed/issues/8657) on Github.
</div>

To test this, we scatter some data:

In [70]:
# Manually stopping the active memory manager so data is truly scattered to ALL workers. Otherwise one of the workers will get it
client.amm.stop()
future = client.scatter(random_matrix(dim), broadcast=True)

And ```client.who_has()``` let us see which workers have access to this future:

In [71]:
client.who_has(future)

Key,Copies,Workers
ndarray-3e5b140800c226ec94d30d5d1014a9a6,2,"tcp://127.0.0.1:42935, tcp://127.0.0.1:38551"


Note that **both** workers can access the matrix in ```future```. Had we passed the argument ```client.scatter(..., broadcast=False)```, only one worker would have gotten the data. These addresses can then be passed as parameters to other methods in the API:

In [72]:
# Functions the toy example
function_A = np.trace
function_B = np.max
function_C = np.min

# Computes the trace
computation_A = client.submit(function_A, future, workers=worker_addresses[0])

# Computes the maximum
computation_B = client.submit(function_B, future, workers=worker_addresses[1])

The first future is in the first client, while the second future in the second client:

In [74]:
client.who_has([computation_A, computation_B])

Key,Copies,Workers
trace-1b32970dcbb17b553284755869faf8b9,1,tcp://127.0.0.1:38551
max-c80fd339a55195987a3f0ed1257f23ad,1,tcp://127.0.0.1:42935


Here we have used different functions in ```client.submit()``` in this toy example because Dask is actually quite smart about this: had we submitted the same function to be applied on the future, Dask would just point to the same fugure in both computations (remember the purity in Dask!):

In [77]:
computation_equal_1 = client.submit(function_C, future, workers=worker_addresses[0])
computation_equal_2 = client.submit(function_C, future, workers=worker_addresses[1])
client.who_has([computation_equal_1, computation_equal_2])

Key,Copies,Workers
min-41181ac1d1cf5f8a69e8801e7da5c982,1,tcp://127.0.0.1:38551


We can see in the result above both futures/computations are stored in the same worker. Finally, we can retrieve the results:

In [79]:
print(client.gather(computation_A))
print(client.gather(computation_B))

1000.5495096049874
0.9999999861343417


<div class="highlight_green">
<div class="title_box">
    <div class="title">
        ❐ Remarks
    </div>
</div>


<div class="content">

- One can find a good summary on ```map```, ```submit```, ```persist``` and ```compute``` on [on this post](https://medium.com/@mameralomari1/dask-map-submit-persist-compute-an-overview-of-differences-570a696e7158#:~:text=map%20allows%20you%20to%20apply,the%20result%20of%20the%20task.). Also refer back to the diagram in the beginning.
- The overall concepts on data locality can also be found <a heref="https://distributed.dask.org/en/stable/locality.html">in the documentation</a>
- An alterntive we have not discussed here, which is similar to <code>compute()</code>, is the <code>persist()</code> method found in the link above. Acoording to the documentation:

> "This function is particularly useful when using distributed systems, because the results will be kept in distributed memory, rather than returned to the local process as with compute."

- A very good point is also made [in this question](https://stackoverflow.com/questions/41806850/dask-difference-between-client-persist-and-client-compute):
  
> "More pragmatically, I recommend using persist when your result is large and needs to be spread among many computers and using compute when your result is small and you want it on just one computer."

- Overall, the difference between persist and compute can be found [in this part of the documentation](https://distributed.dask.org/en/stable/memory.html#:~:text=In%20this%20example,for%20fast%20analyses.). In short, ```compute()``` is blocking and returns the result to the host. This is dangerous if the computation is too large. Meanwhile, ```persist()```, as the name suggests, keeps the date in the distributed cluster. More concretely,  
- Even using Dask, we can have a lot of fine-grained control. There are examples in the documentation where one can assigns different parts/steps in the computation to specific workers
- This nice [question on Stack Overflow](https://stackoverflow.com/questions/41471248/how-to-efficiently-submit-tasks-with-large-arguments-in-dask-distributed) also provides a comprehensive
   example.
- ```client.run()``` can run the same function in all workers at once, but this returns in a dictionary and is done outside the scheduling system
- One should be careful with the policy of reducing replicas in the [active memory management](https://distributed.dask.org/en/latest/active_memory_manager.html#reducereplicas)
</div>
</div>

## Other scheduling methods

### A local client

It is also possible to set up **local workers** using ```distributed.deploy.local.LocalCluster```. This is useful when you do not need or do not have access to a full-fledged cluster/HPC systems and you still want to use the dask features -- which might be helpful nevertheless, even with a smaller number of cores and RAM available. All the commands you'd have to use in a proper cluster is the same, the only difference arises we deploy the client:

In [4]:
from dask.distributed import Client, LocalCluster

# Set up local cluster on your own machine/laptop
cluster_local = LocalCluster(n_workers=2
                             , threads_per_worker = 36
                             , memory_limit="64GB" 
                             , local_directory='data')    

client = Client(cluster_local)

We can then check it:

In [None]:
client

To close and delete; in order to fully reset everything:

In [None]:
client.shutdown()
del(cluster)

### Dask and MPI

Finally, we focus on the [Dask-MPI package](https://docs.dask.org/en/stable/deploying-hpc.html#using-mpi), running it for [interactive jobs](https://mpi.dask.org/en/latest/interactive.html). This package allows us to use many nodes in a single job instead -- instead of sending a buch of different single-node jobs -- this might open up a few possibilities, specially if your cluster limits the number of concurrent jobs you can run, which can be a bottleneck for ```Dask.distributed```.

<div class="highlight_red">
<div class="title_box">
    <div class="title">
        ⚠ Note
    </div>
</div>

<div class="content">
However, this time around we will use a slightly different workflow. The configuration might depend a lot on your current environment. I will perform this test, once again, for a cluster equipped with SLURM. This will be important for two reasons:

- We will first have to **run the mpi command within a job script**, allocating the appropriate number of nodes and cores per node. We also want to match the number of mpi ranks in the command with the total number of cores
- We have to be careful with the nannies depending on the MPI environment. See the warning [here](https://mpi.dask.org/en/latest/interactive.html#:~:text=MPI%20Jobs%20and%20Dask%20Nannies).
</div>
</div>

Also note that we will only be using MPI to start the Dask cluster and not for inter-node communication.

MPI packages might be readily available at your cluster, so be sure to check that beforehand. Nevertheless, we will install the [necessary packages](https://mpi4py.readthedocs.io/en/latest/install.html#using-conda) -- together with Dask-MPI, using ocnda:

```
conda install -c conda-forge mpi4py openmpi dask-mpi
```

You can test whether your mpi installation is properly running with:

In [78]:
%%bash
mpirun -n 5 python -m mpi4py.bench helloworld

Hello, World! I am process 0 of 5 on raven05.
Hello, World! I am process 1 of 5 on raven05.
Hello, World! I am process 2 of 5 on raven05.
Hello, World! I am process 3 of 5 on raven05.
Hello, World! I am process 4 of 5 on raven05.


Once everything is setup, we can start the MPI process with a batch script. Here is an example:

```
#!/usr/bin/env bash

#SBATCH --job-name=dask_mpi_test
#SBATCH -o ./out.%j
#SBATCH -e ./err.%j
#SBATCH -J dask-mpi-test
#SBATCH -p general
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=8
#SBATCH --mem=32GB
#SBATCH -t 00:10:00
#SBATCH --export=ALL          # Might be necessary to export env. varibles, making conda work

# Activate Anaconda environment
source activate dask_env

# Run the MPI command
mpirun -np 8 dask-mpi --worker-class distributed.Worker --scheduler-file scheduler.json
```

We can save this to a ```dask_mpi_test.job``` file and then submit the job:

```
sbatch dask_mpi_test.job
```

The number of ranks in the ```mpirun``` command matches the total number of cores from the job. We also activate our current conda environment before hand with ```source activate dask_env```. After everything is done, the file ```scheduler.json``` should appear in the appropriate directory. Since mpi has launched the clusters, Dask can now connect the client to the workers:

In [94]:
from dask.distributed import Client

client = Client(scheduler_file=os.getcwd() + '/scheduler.json')

In my case I had to write down the absolute path. Further information and logs can be found on the output file of the job. Finally we can explicitly check whether we got the correct number of workers:

In [None]:
client

Finally, to close the client you can use the command:

In [88]:
client.close()

## References

<div class="highlight_purple">
<div class="title_box">
    <div class="title">
        🕮 Further references
    </div>
</div>

<div class="content">


- [1] Duke MIDS Fall 2023 Practical Data Science (IDS 720) Course, <a href="https://www.practicaldatascience.org/html/distributed_computing.html">Distributed Computing with dask</a> </li>
    
- [2] Prabhakar Rangarao, <a href="https://prabhakar-rangarao.medium.com/distributed-computing-with-dask-8001e223df88">Distributed Computing with Dask
</a></li>

- [3] Ciaron Linstead, <a href="https://gitlab.pik-potsdam.de/linstead/cluster-examples/-/tree/ea6db0986555fd5774753d402cb28e68bccc837b/python/dask">Hybrid Python mpi4py + Dask example</a></li>

- [4] <a href="https://docs.dask.org/en/latest/best-practices.html">Dask Best Practices </a></li>
- [5] @willirath, <a href="https://github.com/willirath/dask_jobqueue_workshop_materials"> Workshop materials for a 4h course on Dask Jobqueue </a></li>
- [6] Matthew Rocklin, <a href="https://blog.dask.org/2019/01/31/dask-mpi-experiment"> Running Dask and MPI programs together -- an experiment</a></li>
- [7] Sebastian B. Mohr, <a href="https://hps.vi4io.org/_media/teaching/autumn_term_2022/scap_sebastian_mohr_dask_performance.pdf"> Dask performance </a></li>
- [8] The [Dask documentation](https://docs.dask.org/en/stable/)
- [9] The [dask.distributed API documentation](https://distributed.dask.org/en/stable/api.html)
</div>    
</div>