In [None]:
# IGNORE THIS CELL WHICH CUSTOMIZES LAYOUT AND STYLING OF THE NOTEBOOK !
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
import warnings

import matplotlib.pyplot as plt

warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings = lambda *a, **kw: None
from IPython.core.display import HTML

HTML(open("../documents/custom.html", "r").read())

<br/>
<span style="background:#f0f0e0;padding:1em">Copyright (c) 2020-2022 ETH Zurich, Scientific IT Services. This work is licensed under <a href="https://creativecommons.org/licenses/by-nc/4.0/">CC BY-NC 4.0</a></span><br/>
<br/>

<p style="font-size: 2.5em; font-weight: bold;">Section 6b: Parallel Computing on an HPC Cluster</p>

# Pythonic solutions for Scaling Up and Out

<p>
<img src="./images/Scale_Up_Out.png" width="640">
</p>

**Main Idea**: The general strategy for scaling in a sustainable way is to develop on a laptop (several cores) and deploy on an HPC cluster. 

On one hand we can run the PC code directly **locally** on a single node of the HPC cluster but on a bigger node, which is **scaled up**.  

On the other hand there are several mature solutions that allow us to start on a PC and also run **distributed** over more nodes of the cluster, which is called **to scale out**:
- [IPython Parallel](https://ipyparallel.readthedocs.io/en/latest/) - `ipyparallel`
    - mature solution that provides a high flexibility
- [Dask](https://docs.dask.org/en/latest/) - `dask`
    - newer solution that is focused around tasks and comes with batteries included (e.g. interface for NumPy, Pandas, scikit-learn, ...)
- [MPI for Python](https://mpi4py.readthedocs.io/en/stable/) - `mpi4py`  (not covered in this course)
    - for HPC-heavy users 
- [mpipool](https://github.com/mpipool/mpipool) - `mpipool` (not covered in this course)
    - a wrapper of `mpi4py` that mimics the interface provided by the multiprocessing "pools" available in the Python standard library, i.e. `concurrent.futures.ProcessPoolExecutor` and `multiprocessing.Pool`
- [Ray](https://www.ray.io/) - `ray` (not covered in this course)
    - a tool focused mainly on machine learning tasks
    
[MPI](https://en.wikipedia.org/wiki/Message_Passing_Interface) stands for Message Passing Interface. It is the standard that defines the API (in C and Fortran) and the semantics for the communication between processes and it is used by many applications able to scale out from a PC to supercomputers; especially if the underlying processing cannot easily be split into independent tasks. There are several implementations, e.g. [Open MPI](https://www.open-mpi.org/), [MPICH](https://www.mpich.org/). For further information on MPI on Euler, see the [scicomp wiki](https://scicomp.ethz.ch/wiki/MPI_on_Euler)

Alternatively one can use a workflow management system that has HPC integration. 



## Scaling Up - Using PC code on an HPC Cluster Node

In order to run our code developed on our PC to the HPC cluster and profit from scaling up we should make sure that:
1. the computing environment is compatible,
2. the job is submitted to only one node.

Compatible **computing environment** implies compatible:
- platform (x86-64 on Euler) 
- operating system (Linux on Euler)
- system applications
- Python version 
- Python packages 

The platform and the operating system are given in Euler and there is not so much flexibility. 
The system applications, Python version and packages provide more flexibility and we can achieve a good compatibility by using ([containers](https://scicomp.ethz.ch/wiki/Singularity) are also an option):
- on **our PC** a **Python version** available on Euler modules (e.g. Python 3.8, 3.9, or 3.10),
- **virtual environments** `venv` on both our PC and Euler, where we can install the desired Python packages.

In case important applications or system libraries are missing on Euler, please contact the HPC team (cluster-support@id.ethz.ch).

<div class="alert alert-warning">
  <strong>Warning!</strong>
    
  conda environments [are not recommended by the HPC team](https://scicomp.ethz.ch/wiki/Conda), since these environments consist of **many** small files which put pressure on the parallel file system.
</div>

Next, one has to make sure that the submitted job to LSF is scheduled to a **single node**.  
This can be guaranteed using `--nodes 1`. At the moment, the maximum number of cores per node is 128 on Euler VI and Euler VII (please check https://scicomp.ethz.ch/wiki/Euler).  
In case we need 32 cores on a single node we can request them using:

```bash
# 32 cores on the same node
$ sbatch -n 32 --nodes 1 --wrap "COMMAND TO EXECUTE"
```

We would need more than one core only if our code can profit from parallelization. Otherwise we should ask for only one core `-n 1`. 

<div class="alert alert-warning">
  <strong>Warning!</strong> 
  
  `os.cpu_count()` or `multiprocessing.cpu_count()` provides the number of cores available on the **node**, and **not** the ones available to the **submitted job**. Therefore, for a small number of cores, we will encounter a high oversubscription.</br>
    We should use the environment variable `SLURM_NTASKS` which provides the number of processors allocated to the job (the number after `sbatch -n`).
</div>

**Select a CPU model**

In case we want a specific node, we can request it using `--constraint=model_name`, where the models corresponding to the nodes are:
- Euler IV: `XeonGold_6150`
- Euler V: `XeonGold_5118`
- Euler VI: `EPYC_7742`
- Euler VII: `EPYC_7H12`

```bash
# 12 cores on a single node on Euler VI
$ sbatch -n 12 --constraint=EPYC_7742 --nodes 1 --wrap ....
```

## Exercise Scaling Up "Compute Pi" on Euler (Hands-on) [10min]

**You should replace `<...>` with something sensible.**

In this exercise we will run the "Compute Pi" exercise from Section 5, where 

```python
num_workers = int(os.environ.get("SLURM_NTASKS", os.cpu_count()))
```

that is, if `SLURM_NTASKS` environment variable is available it is used, otherwise `os.cpu_count()` is used.

In [None]:
pycat euler_scripts/pi_scale_up_hpc.py

1. Login to the cluster and load the new environment system

```bash
$ ssh <username>@euler.ethz.ch

# on the cluster
$ env2lmod
```

2. Load the fast-python module and activate the corresponding virtual environment

```bash
$ module load gcc/8.2.0 fast_python_workshop_cpu/2022.1.0
$ venv_cpu_init
```

3. Identify `~/euler_scripts/pi_scale_up_hpc.py` and submit it `sbatch --wrap ...` option to run on 1 and 3 cores 

```bash
$ sbatch <...> "python ~/euler_scripts/pi_scale_up_hpc.py"
```

## Scaling Out - General Considerations

Scaling out means using multiple nodes at the same time.  
Even using efficiently several cores on one node can be challenging, so dealing with multiple nodes can be **demanding**.  
So, before starting to scale out our code we should check carefully whether we really have to do it.   
Moreover we should keep in mind the message from Amdahl's and Gustafson's laws and therefore focus on the weak scaling case, i.e. **increase the number of processors together with the problem size**. 

Again the **computing environment** has to be compatible on all nodes. On Euler cluster this is done automatically, so we have to follow the advice given in the previous subsection.

The solutions that we will present next - IPython Parallel and Dask - have a similar pattern for the parallelism called **Manager-Workers** approach:
- the **Manager** is performing some initial calculation and **schedules** work to the workers,
- the **Workers** are doing the work.

![](./images/manager_worker.svg)

One key thing that we will have to do when scaling out is to **share data** between nodes (at least between the manager and the worker).

There are two main ways of sharing data between two nodes:
- via the network,
- using the parallel file system (write to a file from one node and read from the file from another node).

Using the network is generally faster (see below), so it is highly recommended to use it (or to use a library that relies on it).

Once we have the code ready, we should test it on a **small test job**, e.g. submit to 2-3 nodes, with 1-2 cores per node.  
This can be achieved using `--nodes"` option:

```bash
# 9 cores, 3 cores per node
$ sbatch -n 9 --nodes 3
```

###  Recap: Latency Comparison Numbers (~2012)

```
                                             Real time             
L1 cache reference ......................... 0.5 ns                
Execute typical instruction ................   1 ns                
L2 cache reference ........................... 7 ns                
Main memory reference ...................... 100 ns                
Send 2K bytes over 1 Gbps network ....... 20,000 ns  =  20 µs      
SSD random read ........................ 150,000 ns  = 150 µs      
Read 1 MB sequentially from memory ..... 250,000 ns  = 250 µs      
Read 1 MB sequentially from SSD  ..... 1,000,000 ns  =   1 ms      
Send packet CA->Netherlands->CA .... 150,000,000 ns  = 150 ms      
```

Source: https://gist.github.com/jboner/2841832

## Scaling Out - IPython Parallel

[IPython Parallel](https://ipyparallel.readthedocs.io/en/latest/intro.html) is a mature library, formally part of the IPython package, that
> enables all types of parallel applications to be developed, executed, debugged, and monitored interactively. Hence, the `I` in IPython. 

**Note**: We will focus on **programmatic ways** of using IPython Parallel since they allow us to scale out to Euler and run batch jobs.  
**Note**: We will use new features of IPython Parallel available starting from **version 7**.

### Demo IPython Parallel "Hello World" on multi-node
We will test a simple script `euler_scripts/ipyparallel_demo.py`

In [None]:
pycat euler_scripts/ipyparallel_demo.py

**To run on Euler**

```bash
$ env2lmod
$ module load gcc/8.2.0 fast_python_workshop_cpu/2022.1.0
$ venv_cpu_init
$ sbatch -n 2 --nodes 2 --wrap "python ~/euler_scripts/ipyparallel_demo.py"
$ less slurm-....out

INFO:ipyparallel.cluster.cluster.1665571093-7pqt:Starting 2 engines with <class 'ipyparallel.cluster.launcher.MPIEngineSetLauncher'>
INFO:ipyparallel.cluster.cluster.1665571093-7pqt:Stopping controller
INFO:ipyparallel.cluster.cluster.1665571093-7pqt:Stopping engine(s): 1665571094
[{'cwd': '/cluster/home/chadhat',
  'hostname': 'eu-a2p-278',
  'pid': 88425,
  'python': '3.10.4 (main, May 30 2022, 08:01:42) [GCC 8.2.0]'},
 {'cwd': '/cluster/home/chadhat',
  'hostname': 'eu-a2p-279',
  'pid': 1664,
  'python': '3.10.4 (main, May 30 2022, 08:01:42) [GCC 8.2.0]'}]
```

By checking the **`hostname`** we notice the job was running on 2 different nodes with the hostnames.

### Architecture

IPython Parallel relies on an **IPython Cluster** (which follows the **Manager-Workers** pattern) consists of: 
- **IPython Engine (Workers)**:  kernel(s) able to run Python code enabling therefore parallel and distributed computing (each kernel is a **process**). The engines cannot communicate with each other to share data.
- **IPython Controller (Manager)**: an interface and single point of contact for working with the engine(s)
    - **IPython Hub**: keeps track of everything (engine connection, schedulers, clients, task requests and results)
    - **IPython Schedulers**: all actions that run on an engine go through it, and it provides a non-blocking interface to the engines, i.e. it returns immediately

The connection to the cluster is done via the object `Client`. For the execution a `View` is used:
 - `DirectView` class for explicit access
 - `LoadBalancedView` class for a `pool`-like interface 
<p>
<img src="./images/ipython_parallel.png" width="400">
<div>Source: <a href=https://ipyparallel.readthedocs.io/en/latest/tutorial/intro.html#architecture-overview>IPython Parallel - Architecture Overview</a></div>
</p>

### IPython Cluster on PC and Euler

We can use IPython clusters both on on our PC and on Euler.

The main actions that we can perform are:
- configure,
- start,
- stop.

JupyterLab and Jupyter Notebook provide extensions to create it using the graphical user interface.  
IPython Parallel provides a command line interface to start a cluster.

However, next we will focus on the programmatic approach introduced in version 7 that can be done directly in a Python script or Jupyter Notebook.

**Configure the IPython Cluster on our PC**

In [None]:
import ipyparallel as ipp

print(f"{ipp.version_info=}")

n_engines = 3

## Configure
cluster = ipp.Cluster(n=n_engines)
cluster

- by convention `ipyparallel` is imported as `ipp`
- we define the IPython Cluster that we want to start, and it is good practice to already provide the desired number of engines, e.g. `n=3`.

So far we configured the cluster, and as we can see it has an auto-generated identifiers named `cluster_id` and a predefined `profile` called `default`.

**Configure the IPython Cluster on Euler**

```python
import ipyparallel as ipp

n_engines = int(os.environ.get("SLURM_NTASKS", os.cpu_count()))

cluster = ipp.Cluster(
    n=n_engines,
    controller_ip="*",
    engine_launcher_class="MPI",
    location="server.local",
)
```

The arguments `controller_ip="*"`, `engine_launcher_class="MPI"`, and `location="server.local"` are needed to run on Euler on multiple nodes.  
It requires the [`mpi4py` package](https://mpi4py.readthedocs.io/en/stable/) and an MPI implementation, e.g. the `openmpi/4.1.4` module.

**Start the IPython Cluster**

In [None]:
## Start
cluster.start_cluster_sync()

## Connect a client
client = cluster.connect_client_sync()

## Make sure all engines are connected
client.wait_for_engines(n=n_engines)

**Stop the IPython Cluster**

In [None]:
## Stop the cluster
cluster.stop_cluster_sync()

### Direct Interface - `DirectView` 1/2

The direct interface is used to provide us direct access to the engines - the workhorse of our IPyparallel cluster.


In [None]:
import ipyparallel as ipp

n_engines = 3

## Configure
cluster = ipp.Cluster(n=n_engines)

## Start
cluster.start_cluster_sync()

## Connect a client
client = cluster.connect_client_sync()

## Make sure all engines are connected
client.wait_for_engines(n=n_engines)

`client.block = True` will make sure that all calls will block until all engines are done.

In [None]:
client.block = True

The **engines** are identified by integers:

In [None]:
client.ids

and we can create a `DirectView` using the **list access** to the client.
This can be done on a given engine:

In [None]:
## DirectView associated with a given engine, e.g. 0
client[0]

Or even with all engines:

In [None]:
dview = client[:]
dview

**Calling functions** - `apply()`

The DirectView is similar to the `ProcessPoolExecutor` from the `concurrent.futures`.   
We can use it to call functions, and in that case each engine that is associated with the view will call it.

We define a self-contained function that we will run both locally and on engines.
<div class="alert alert-warning">
  <strong>Warning!</strong> 
  The engines should run the imports as well!
</div>

In [None]:
def summary(secs=1):
    import os
    import sys
    import time

    time.sleep(secs)
    print(f"I slept for {secs} seconds")
    return secs

    return {
        "cwd": os.getcwd(),
        "python": sys.version,
        "hostname": os.uname().nodename,
        "pid": os.getpid(),
    }

We can call the function **locally**:

In [None]:
summary()

or on **engine 0** (using the list access) and `apply` method:

In [None]:
client[0].apply(summary, 2)

and the value is returned.

We can call a generic function `f(*args,**kwargs)` using `view.apply(f, *args, **kwargs)`. 

We can call it on all engines with the help of the previously defined `dview = client[:]`:

In [None]:
dview.apply(summary, 2)

In this case the values are returned in a list, where for each engine the corresponding value is returned.

In case we run with `block = False`, it immediately returns an `AsyncResult` - a `concurrent.futures.Future` subclass.  
So we can reuse the methods that we learned in the previous section, e.g. `done()` or `result()`.

In [None]:
dview.block = False
result = dview.apply(summary, 4)
result

In [None]:
result.done()  ## or .ready() only for IPython Parallel

In [None]:
result.result()  ## or .get() only for IPython Parallel

In [None]:
result.done()

As we can see the engine output is missing. In order to get it we can use `.display_outputs()` method:

In [None]:
result.display_outputs()

In [None]:
dview.block = True

**Parallel `map()`**  

The DirectView `map()` method can be used to parallelize the `map()` built-in function, where the order of the gathered results is kept by default.

In [None]:
dview.map(lambda x: x**2, range(7))

As we can see, for `block=True` the result of the `map` is returned as a list of values, which is similar to the `map` from `concurrent.futures`.  
However, for non-blocking views, an `AsyncMapResult` is returned, which is an iterable of `AsyncResult`.

It is interesting to check more carefully what `map` is actually doing. We define:

In [None]:
import time


def sleeping(secs, time_start):
    import time

    time.sleep(secs)
    return (secs, int(time.time() - time_start))

In [None]:
time_start = time.time()
%time dview.map(sleeping, range(8,0,-1),8*[time_start])

![](./images/Direct_Interface.svg)

As you can see this is not the most efficient way of running these tasks.  
The **scheduler** is scattering the data on the engines, running the function, and the results are returned back.

We can monitor the progress (done tasks) of the non-blocking case using the client method `wait_interactive`:

In [None]:
dview.map(sleeping, range(8, 0, -1), 8 * [time_start], block=False)
client.wait_interactive()

**Can we do it better?**

IPython Parallel provides a way to schedule such tasks in a better way via the `LoadBalancedView`.

### Task interface - `LoadBalancedView`

The task interface can be used to have dynamic load balancing and is constructed from the client view using the `load_balanced_view()` method:

In [None]:
lview = client.load_balanced_view()
lview.block = True
lview

It can be used for the parallelized version of `map()`.  
It is recommended for functions where the execution time per item varies significantly.  
It should be preferred compared to the direct view because the tasks are **dynamically load balanced**.  
The IPython Scheduler, which is part of the IPython Controller, is responsible for assigning the jobs. The default scheme used to schedule the jobs is called **least load** and assigns tasks to the engine with the fewest outstanding tasks. By default only one task can be outstanding on each engine.

In [None]:
import time


def sleeping(secs, time_start):
    import time

    time.sleep(secs)
    return (secs, int(time.time() - time_start))

In [None]:
time_start = time.time()
%time lview.map(sleeping, range(8,0,-1),8*[time_start])

![](./images/Task_Interface.svg)

We can also monitor this case:

In [None]:
lview.map(sleeping, range(8, 0, -1), 8 * [time_start], block=False)
client.wait_interactive()

But in this case, we really monitor the 8 tasks, and not the engines.

**Task Dependencies**

The task interface allows one to build dependencies between functions, and therefore to construct a directed acyclic graph (DAG).  
This is an advanced topic that we do not cover in this course.  
In case you are interested in this topic please see https://ipyparallel.readthedocs.io/en/latest/tutorial/task.html#dependencies.

### Direct Interface - `DirectView` 2/2

Next we will try to understand better what `DirectView` is doing in `map`. In fact, it takes care of the following tasks:
- **share data** between the current kernel and the engines (the scatter and the gather), and
- **execute** code or functions on engines.

**Sharing data**

For sharing Python objects one can use a dictionary-style interface where one can update a variable using the key (see the code below `dview["b"] = 3`).  
IPython Parallel will pickle the object (serialize it), send it to the engine and unpickle it.  
The values of a variable are returned in a list, where for each engine the corresponding value is returned.

In [None]:
dview["b"] = 3
dview["b"]

In case multiple variables are updated one can store them in a dictionary and update (`push`) them once.  
To return (`pull`) them, a tuple with the variable names is required.

In [None]:
dview.push({"x": 10, "y": 11})
dview.pull(("x", "y"))

**Scatter and Gather**

In case a bigger object needs to be partitioned to different engines, IPython Parallel provides `scatter`. For the inverse operation, i.e. returning scattered partitions, one can use `gather`:

In [None]:
l = list(range(8, 0, -1))
l

In [None]:
dview.scatter("l", l)
dview["l"]

In [None]:
dview.gather("l")

So now we understand how `DirectView.map()` works.

![](./images/Direct_Interface.svg)

<div class="alert alert-warning">
  <strong>Warning!</strong> 
    
  Sharing big objects can be very expensive. We should try to generate the data directly on the engine. For instance, instead of passing the contents of a file from the local kernel to the engine we can pass only the file path and read the file directly on the engine.</br>
  NumPy arrays are not copied and are read-only. In case we want to modify them we have to copy them explicitly. 
  
  See: https://ipyparallel.readthedocs.io/en/6.2.0/details.html
</div>

In [None]:
import numpy as np

In [None]:
my_array = np.zeros((4,4))


def set_value(a):
    a[0, :] = 1
    return a


client[0].apply_sync(set_value, my_array)

In [None]:
my_array.flags.writeable

In [None]:
my_array = np.zeros((4,4))


def set_value(a):
    if not a.flags.writeable:
        a = a.copy()
    a[0, :] = 1
    return a


client[0].apply_sync(set_value, my_array)

**Running code**

So far we used the `DirectView` to apply functions with `dview.apply()` and `dview.map()`.  
Moreover we can use the `DirectView` also to:
- execute code: `dview.execute("code")`, which corresponds to the `exec()` built-in function executed on the desired engines, and
- run code from a file: `dview.run(path)`, which reads the contents of a file and calls the previous `dview.execute` .

Next we will execute an assignment directly on the engines:

In [None]:
dview.execute("a=3")
dview["a"]

And next we will execute the code from a file.  
We check the content using `%pycat` magic:

In [None]:
%pycat ./now.py

In [None]:
dview.run("./now.py")
dview["d"]

In [None]:
dview.execute("print(numpy.__version__)")

In [None]:
result = dview.execute("import numpy; print(numpy.__version__)")

In [None]:
result.display_outputs()

<div class="alert alert-warning">
  <strong>Warning!</strong> 
  All packages need to be inside the function or loaded beforehand by the engine.
</div>

### Executor API - `ViewExecutor`

Every `View` has an `.executor` property to provide the API from `concurrent.futures`.  
Moreover the client has an `executor()`, that will provide an executor corresponding precisely to `LoadBalancedView`.

In [None]:
executor = client.executor()
executor

In [None]:
%%time
time_start = time.time()
results = executor.map(sleeping, range(8, 0, -1), 8 * [time_start])
list(results)

### In Practice


When we use IPython Parallel we have to take care of the `import` statements since each kernel should know what has to be imported (and how in case of renaming).
Instead of adding this to each function (the way that we did it so far) we can do all imports using the direct view.

The benefit is that we can focus on the parallelization and minimize the changes to the original code.

In [None]:
## Stop the previous cluster
cluster.stop_cluster_sync()

Next we will use this approach for the new `sleeping` function.

In [None]:
import time

import ipyparallel as ipp


## we want to use IPython Parallel with this function
def sleeping(secs, time_start):

    time.sleep(secs)
    return (secs, int(time.time() - time_start))


n_engines = 3

## Configure the cluster
cluster = ipp.Cluster(n=n_engines)

## Start the ipython parallel cluster
cluster.start_cluster_sync()

## Connect a client to the cluster
client = cluster.connect_client_sync()

## Make sure all engines are connected
client.wait_for_engines(n=n_engines)

We `execute` the imports on all engines:

In [None]:
dview = client[:]
dview.block = True
dview.execute("import time", block=False)
client.wait_interactive()

In [None]:
lview = client.load_balanced_view()
lview.block = True
time_start = time.time()
results = lview.map(sleeping, range(8, 0, -1), 8 * [time_start], block=False)
client.wait_interactive()

In [None]:
list(results)

**Tips**

Use both interfaces: 
- the direct interface is used to load libraries, assign values to engine-local variables, etc;
- the task interface is used to benefit from the dynamic load balancing.  

**Note**: IPython Parallel can be used for heterogeneous computing. `UnmetDependency` from `ipyparallel.error` can instruct the schedule to not run tasks on some engines (see [here](https://ipyparallel.readthedocs.io/en/latest/tutorial/task.html?highlight=unmet#functional-dependencies)).

### Exercise Scaling Out "Compute Pi" using IPython Parallel (Hands-on) [20min]

Adapt the previous example example "euler_scripts/pi_scale_up_hpc.py" to use IPython Parallel and test it locally and then on Euler using 3 engines.  
Try to keep the function `approx_pi` unchanged.  
Which interface (`DirectView` vs `LoadBalancedView`) do you want to use to replace the `ProcessPoolExecutor`?  
*Note*: In case you want to increase the number of processors you should increase the number of points as well.

In [None]:
pycat euler_scripts/pi_scale_up_hpc.py

### Further Reading
- https://ipyparallel.readthedocs.io/en/latest/tutorial/index.html
- SciPy 2014 Tutorial (the API is slightly outdated):
    - Video: [Part 1](https://www.youtube.com/watch?v=y4hgalfhc1Y&t=1146s&ab_channel=Enthought), [Part 2](https://www.youtube.com/watch?v=-9ijnHPCYhY&ab_channel=Enthought), [Part 3](https://www.youtube.com/watch?v=U5mhpKkIx2Y&t=2637s&ab_channel=Enthought)
    - https://github.com/minrk/IPython-parallel-tutorial/blob/master/Index.ipynb

## Scaling Out - Dask
> Dask is a flexible library for parallel computing in Python.

Dask consists of a 
- **dynamic task scheduler** that is responsible for scheduling tasks.
- "Big Data" **collections** that provide a similar interface for NumPy, Pandas, Python, iterators.  

Dask allows one to perform out-of-memory computation (using the collections) or to even use distributed environments like HPC clusters via the `dask.distributed` package . 

*Dask also provides a single machine scheduler in the `dask` package but it does not scale out, and therefore the `dask.distributed` schedule is the recommended one.*


### Demo Dask "Hello World" on multi-node
We will first test a simple script: `euler_scripts/dask_demo.py`

In [None]:
pycat euler_scripts/dask_demo.py

**To run on Euler**

```bash
$ env2lmod
$ module load gcc/8.2.0 fast_python_workshop_cpu/2022.1.0
$ venv_cpu_init
$ sbatch -n 4 --nodes 2 --wrap "mpirun python ~/euler_scripts/dask_demo.py"

[{'cwd': '/cluster/home/chadhat',
  'hostname': 'eu-a2p-277',
  'pid': 79574,
  'python': '3.10.4 (main, May 30 2022, 08:01:42) [GCC 8.2.0]'},
 {'cwd': '/cluster/home/chadhat',
  'hostname': 'eu-a2p-278',
  'pid': 124228,
  'python': '3.10.4 (main, May 30 2022, 08:01:42) [GCC 8.2.0]'},
 {'cwd': '/cluster/home/chadhat',
  'hostname': 'eu-a2p-277',
  'pid': 79574,
  'python': '3.10.4 (main, May 30 2022, 08:01:42) [GCC 8.2.0]'},
 {'cwd': '/cluster/home/chadhat',
  'hostname': 'eu-a2p-278',
  'pid': 124228,
```

By checking **`hostname`** we see that the job was running on 2 different nodes with the hostnames.

### Architecture of `dask.distributed`

The **Dask Distributed Cluster** (which follows the **Manager-Workers** pattern) consists of: 
- **Dask Workers**: they compute tasks, store and serve computed results (each worker is a **process** on its own that can have several threads, and it sends work to a `concurrent.futures.ThreadPoolExecutor`). They can communicate with each other to share data.
- **Dask Scheduler (Manager)**: it sends tasks to run on workers.

In case we decide to use threads we have to make sure that: 
1. our code is not already using multithreading, and
2. the problem can take advantage of multithreading (e.g. IO bounded, the GIL is released)

The connection to the cluster is done via the `Client` object which submits tasks to the scheduler to be executed.

<p>
<img src="./images/dask_architecture.png" width="600">
<div>Source: <a href=https://github.com/dask/dask/issues/4471#issuecomment-896799678>Dask Distributed - Architecture Diagram</a></div>
</p>


### Dask Distributed Cluster on PC and Euler

We can use Dask clusters both on our PC and on Euler.

The main actions that we can perform are:
- configure and start, and
- stop.

JupyterLab provides extensions to create a Dask cluster using a graphical user interface.

Next we will focus on the programmatic approach that can be used directly from a Python script or Jupyter Notebook.  
On Euler we will have to use also the command line interface.

**Configure and Start the Dask Distributed Cluster on our PC - `LocalCluster`**

In [None]:
from dask.distributed import Client, LocalCluster

## Configure and start
cluster = LocalCluster(n_workers=3, threads_per_worker=1)
cluster

In [None]:
client = Client(cluster)
client.wait_for_workers(3)

We set up a local cluster with 3 workers, each worker with 1 thread.


In [None]:
cluster.scheduler

In case the distributed cluster is unresponsive we can reset it using the `restart()` method.

In [None]:
client.restart();

**Configure and Start the Dask Distributed Cluster on Euler - `dask-mpi`**

In order to have a good integration with the Euler cluster we have used [Dask-MPI](http://mpi.dask.org/en/latest/). 

For **batch jobs** you have to run them as [MPI jobs on Euler](https://scicomp.ethz.ch/wiki/Using_the_batch_system#MPI), e.g. one has to submit a Python script `script.py` on Euler with
```bash
$ sbatch [options] mpirun python script.py
```

The script needs to call the [`initialize()` method](http://mpi.dask.org/en/latest/generated/dask_mpi.core.initialize.html#dask-mpi-core-initialize) provided by Dask-MPI.


>Initialize a Dask cluster using mpi4py.
Using mpi4py, MPI rank 0 launches the Scheduler, MPI rank 1 passes through to the client script, and all other MPI ranks launch workers.
source: http://mpi.dask.org/en/latest/batch.html

```python
import os

from dask_mpi import initialize

## memory in bytes on Euler
mem = (
    1024 * 1024 * int(os.environ["SLURM_MEM_PER_CPU"])
    if os.environ.get("SLURM_MEM_PER_CPU")
    else "auto"
)

# run within MPI env
initialize(nthreads=1, memory_limit=mem, local_directory="~/dask-mpi-workers")
```
The first proces (MPI rank 0)  is used for the scheduler, the next for the client script and all others are used for actual workers.

Therefore the number of workers is equal with the number of processors (passed via `-n` to SLURM) minus 2.

The [`dask-mpi` Python package](http://mpi.dask.org/) is required to run on Euler on multiple nodes. It requires the [`mpi4py` package](https://mpi4py.readthedocs.io/en/stable/) and an MPI implementation, e.g. the `openmpi/4.1.4` module.


<div class="alert alert-warning">
  <strong>Warning!</strong> 
  In case you want to use Dask-MPI with interactive jobs you have to use a different approach - please see  <a href="http://mpi.dask.org/en/latest/interactive.html">Dask-MPI with Interactive Jobs</a>. <br />
</div>

<div class="alert alert-warning">
  <strong>Warning!</strong> 
  Do not use  <a href="http://jobqueue.dask.org/en/latest/">Dask-Jobqueue</a>. <br />
  Each worker runs as a separate batch job, and therefore we can easily submit many small batch jobs that can impact the entire cluster negatively.
</div>

### Dask Arrays - `dask.array`

> Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays that would not fit in the memory using all of our cores. 

It is one of Dask's built-in **collections**.  

**Creating Dask Arrays**  

We start with a familiar NumPy 2-dimensional array:

In [None]:
## Define and start the cluster
cluster = LocalCluster(n_workers=3, threads_per_worker=1)
client = Client(cluster)

In [None]:
import numpy as np

x_np = np.random.rand(9, 12)
x_np

In order to use **Dask Array** we have to replace `numpy` with `dask.array`. 
We can create a Dask Array from a NumPy array using `from_array()` method.

Next we should **chunk** the initial array into many smaller NumPy-like arrays.

These arrays will then be used smartly by **Dask Array**. This precisely corresponds to a **domain decomposition**:

In [None]:
import dask.array as da

x_da = da.from_array(x_np, chunks=(3, 4))
x_da

`chunks=(3, 4)` represents the **size** along each of the dimensions:
- 3 chunks, each of **size** 3 along the first dimension of size 9, and
- 3 chunks, each of **size** 4 along the second dimension of size 12.

One can specify even the sizes of each block (see below):
- 3 chunks of: size 1, size 4, and size 4 along the first dimension of size 9, and
- 3 chunks of: size 3, size 3, and size 6 along the second dimension of size 12.

In [None]:
x_da = da.from_array(x_np, chunks=((1, 4, 4), (3, 3, 6)))
x_da

In [None]:
import dask
dask.config.get('array.chunk-size')

This allows for a high flexibility.


<p>
<img src="./images/dask-array-black-text.svg" width="400">
<div>Source: <a href=https://docs.dask.org/en/latest/array.html#design>Dask Arrays</a></div>
</p>

**Chunk size**: In practice the chunk size should be adjusted such that the entire work can be done in the memory, close to the upper boundary (see more [here](https://blog.dask.org/2021/11/02/choosing-dask-chunk-sizes)).

Dask Arrays can be loaded from or stored in diverse sources, e.g. HDF5, a file format relevant for Big Data use cases.  
This is an advanced topic that we do not cover in this course.  
In case you are interested in this topic please see https://docs.dask.org/en/latest/array-creation.html .

**Computation with Dask Array**

Dask Arrays support a subset of the NumPy methods. So we can expect to do the main Numpy operations with Dask Arrays, but not all.

For example, we can easily do a mean with NumPy as follows:

In [None]:
res_np = x_np.mean()
res_np

The same method also exists for Dask Arrays:

In [None]:
res_da = x_da.mean()
res_da

However, `Dask` is **lazy**.  
Dask will first plan the split and parallelization of the computation. It creates a so-called **task graph**.  This step automatically takes care of potential **functional decomposition**.
However, up till this point the computation does not start. 

The computation can be performed with the `.compute()` method which on completion returns the results.

The task graph created by Dask can be visualized using the `.visualize()` method.

In [None]:
res_da.visualize()

In [None]:
res_da.compute()

As we can see a Dask Array is able to split the computation in tasks. Next we will show how to check that indeed the tasks are parallelized by using the Dask Dashboard.

**Stop the Dask Cluster**

In [None]:
## Stop the cluster
client.shutdown()

<div class="alert alert-warning">
  <strong>Warning!</strong> 
  Keep in mind that Dask Array API is a subset of NumPy API. <br />
  Check <a href="https://docs.dask.org/en/latest/array-api.html">dask.array API</a> for the details.  
</div>

### Dask Dashboard - JupyterLab extension

The main tool to perform live diagnostics is the dask dashboard.    
The dashboard is accessible from the `client.dashboard_link` property. Pay attention to the link below corresponding to the `Dashboard`, e.g. [http://127.0.0.1:8787/status]( http://127.0.0.1:8787/status).

In [None]:
client.dashboard_link

The JupyterLab extensions allows us to integrate the dashboard directly in JupyterLab.


<div class="alert alert-success">
  <strong>Exercise: Dask Dashboard [5 min] - Task Interface</strong>
    
1. Organize your workspace to mimic the image provided below (insert the link in the `DASK DASHBOARD URL`).
</div>

![](./images/dask_dashboard.png)

In order to see plots in action we will define a more intensive computation on a larger dask array:

In [None]:
x = da.random.random((20000, 20000), chunks=(2000, 2000))

In [None]:
res = (da.sin(x) + da.cos(x.T)).sum()

res.visualize()

In [None]:
res.compute()

In [None]:
res.compute()

### Delayed

> Sometimes problems do not fit into one of the collections like `dask.array` or `dask.dataframe`. In these cases, users can parallelize custom algorithms using the simpler `dask.delayed` interface. This allows one to create graphs directly with a simple annotation of normal python code.

It is one of the built-in Dask **collections**.  

In [None]:
import time


def sleeping(secs):

    time.sleep(secs)
    return secs


def add(x, y):
    return x + y

Next we define a simple dependency between three tasks:

In [None]:
x = sleeping(1)
y = sleeping(2)
z = add(x, y)
z

Next we annotate such a computation and get the "lazy" dask task graph.

In [None]:
import dask

x = dask.delayed(sleeping)(1)
y = dask.delayed(sleeping)(2)
z = dask.delayed(add)(x, y)
z.visualize()

In [None]:
z.compute()

Therefore Dask Delayed allows us to express the functional decomposition explicitly.  
This can be easily combined with **domain decomposition**.  

Suppose that we want to increment all numbers in a list and compute their `sum` in the end.


In [None]:
data = range(8, 0, -1)

output = []
for x in data:
    a = dask.delayed(sleeping)(x)
    output.append(a)

total = dask.delayed(sum)(output)
total.visualize()

**`@dask.delayed` decorator**

Alternatively one can decorate the functions directly:

In [None]:
@dask.delayed
def sleeping(secs):

    time.sleep(secs)
    return secs


data = range(8, 0, -1)

output = []
for x in data:
    a = sleeping(x)
    output.append(a)

total = dask.delayed(sum)(output)
total.visualize()

In [None]:
total.compute()

In [None]:
%time total.compute()

In [None]:
sum(range(8, 0, -1))

### Futures
> Dask supports a real-time task framework that extends Python’s `concurrent.futures` interface. This interface is good for arbitrary task scheduling like `dask.delayed`, but is **immediate** rather than **lazy**, which provides some more flexibility in situations where the computations may evolve over time.

The Dask `Client` implements the `concurrent.futures` interface that we introduced in the previous section.  
So we can simply use:
- `client.submit()` to pass a function to the workers for execution (it returns immediately),
- `.result()` to get the result locally in the master process represented by the notebook.

On HPC clusters the code execution can happen even on a different node, so we have to keep in mind that getting the data locally can be "expensive" in terms of execution-time. 

In [None]:
import time


def sleeping(secs):

    time.sleep(secs)
    return secs


future = client.submit(sleeping, 4)
future

In [None]:
future.done()

In [None]:
future.result()

In [None]:
future.done()

In [None]:
%%time
future = client.submit(sleeping, 5)
future.result()

In [None]:
%%time
future = client.submit(sleeping, 5)
future.result()

Why not 4 seconds?

**Pure functions**

By default, `dask.distributed` assumes that the functions are [**pure**](https://toolz.readthedocs.io/en/latest/purity.html) (same for delayed), i.e.:
- the function depends only on its inputs (no hidden states),
- the evaluation does not cause side effects, e.g. update global variable. 

The scheduler avoids redundant computation based on this assumption - if the result is already in memory it will be used. Therefore it is using the memoization that we learned in Section 3 (Caching and memoization).

We can change the default behavior by using the `pure=False` keyword:

In [None]:
%%time
future = client.submit(sleeping, 5, pure=False)
future.result()

A typical case of a non-pure function is the random function.  
We create a function that creates a list of random numbers:

In [None]:
from random import random


def my_random(size):
    return [random() for i in range(size)]

In [None]:
my_random(3)

In [None]:
my_random(3)

We see that the results are different.  
Next we will submit it twice.

In [None]:
client.submit(my_random, 3).result()

In [None]:
client.submit(my_random, 3).result()

But we got the same result, since dask treated the function as pure.

<div class="alert alert-warning">
  <strong>Warning!</strong>
    
  By default, Dask assumes that the function as are [**pure**](https://toolz.readthedocs.io/en/latest/purity.html).

</div>

**Parallel `map()`**  

Dask Futures provides a `client.map()` method which can be used to parallelize the `map()` built-in function. 

`map()` returns a list of Futures, and similar to the `concurrent.futures` case we can use `as_completed` to get them.

In [None]:
import time


def sleeping(secs, time_start):
    time.sleep(secs)
    return (secs, int(time.time() - time_start))

In [None]:
%%time

from dask.distributed import as_completed

time_start = time.time()
results = client.map(sleeping, range(8, 0, -1), 8 * [time_start], pure=False)
for tmp in as_completed(results):
    print(tmp.result())

Alternatively, the results can be gathered potentially more efficiently using the `Client.gather` method:

In [None]:
%%time
time_start = time.time()
results = client.map(sleeping, range(8, 0, -1), 8 * [time_start], pure=False)
client.gather(results)

In [None]:
client.shutdown()

### Exercise Scaling Out "Compute Pi" using Dask Futures (Hands-on) [20min]

Adapt the previous example "euler_scripts/pi_scale_up_hpc.py" to use Dask Futures and test it locally and then on Euler using 3 workers (5 processors).    
Try to keep `approx_pi` function unchanged.  
*Note*: In case you want to increase the number of processors you should increase the number of points as well.

In [None]:
pycat euler_scripts/pi_scale_up_hpc.py

### Further Reading
- https://dask.org/
- https://docs.dask.org/en/latest/array.html
- https://docs.dask.org/en/latest/delayed.html
- https://distributed.dask.org/en/latest/
- https://docs.dask.org/en/latest/array.html
- https://tutorial.dask.org/
- https://www.youtube.com/watch?v=_u0OQm9qf_A&t=3583s&ab_channel=Dask