<center><img src="../../fig/ICHEC_Logo.jpg" alt="Drawing" style="width: 500px;"/>

# <center>Dask MPI<center/>
******
***





- Up to now with Dask we have been using a single node.

- The local cluster approach will not work over multiple nodes.

- The issue is how the workers comminucate with the scheduler.

- As we have seen we can use multiple nodes using MPI.

### Operation

- Dask mpi splits the scheduler and workers over MPI processes.

- Rank 0 runs the scheduler.

- Rank 1 runs the python script.

- Ranks 2 and above are the workers.

- Dask mpi is built on top of mpi4py.

### Calculate Pi

- To illustrate how this is done we will use the previous example.

- Below is the complete python code. 

- There is only one more package to import.

In [None]:
%%writefile dask_MPI_calculate_pi.py
import numpy as np

import dask
import dask.array as da
from dask.distributed import Client
from dask_mpi import initialize

import time

- The function itself is unchanged from last time when we used dask arrays.

In [None]:
%%writefile -a dask_MPI_calculate_pi.py

def dask_calculate_pi(size_in_bytes,nchunks):
    
    """Calculate pi using a Monte Carlo method."""
    
    rand_array_shape = (int(size_in_bytes / 8 / 2), 2)
    chunk_size = int(rand_array_shape[0]/nchunks)
    print(chunk_size)
    
    # 2D random array with positions (x, y)
    xy = da.random.uniform(low=0.0, high=1.0, size=rand_array_shape, chunks=chunk_size)
    print(f" Created xy\n {xy}\n")
    print(f" Number of partitions/chunks is {xy.numblocks}\n")
    
    
    # check if position (x, y) is in unit circle
    xy_inside_circle = (xy ** 2).sum(axis=1) < 1

    # pi is the fraction of points in circle x 4
    pi = 4 * xy_inside_circle.sum() / xy_inside_circle.size
    
    result = pi.compute()

    print(f"\nfrom {xy.nbytes / 1e9} GB randomly chosen positions")
    print(f"   pi estimate: {result}")
    print(f"   pi error: {abs(result - np.pi)}\n")
    
    return result

- The changes come in the main body of the code.

- You still need to initialise the scheduler and workers by calling Client.

- However this time it is called without any arguments.

- The system of MPI processes is created by calling initialize.

- You can see that the parameters are setup at this call and not through Client().

- One thing you may notice is that the number of workers has not been set.

In [None]:
%%writefile -a dask_MPI_calculate_pi.py

if __name__ == '__main__':
    initialize(nthreads=4,memory_limit='40GB')

    client = Client()


    t0 = time.time()
    print(client)
    dask_calculate_pi(100000000000,40)
    t1 = time.time()
    print("time taken for dask is " + str(t1-t0))
    
    close.client()

- This is set at execute time.

- Below is an example python script.

In [None]:
%%writefile dask_MPI_calculate_pi.slurm
#!/bin/bash
#SBATCH --nodes=2
#SBATCH --time=01:00:00
#SBATCH -A course
#SBATCH --job-name=calpi
#SBATCH -p CourseDevQ
#SBATCH --reservation=CourseMay


module purge
module load conda openmpi/gcc/3.1.2
module list

source activate /ichec/home/users/course00/conda_HPC

cd $SLURM_SUBMIT_DIR


mpirun -n 6 -npernode 3 python -u dask_MPI_calculate_pi.py

exit 0

- The number of workers has been set by the number of processes we create.

- In this case it is 4 because rank 0 is the scheduler and rank 1 runs the python script.

- The workers come into play when there are parallel tasks to run.

- Just to prove that it will work over multiple nodes I have asked for 3 processes to run per node.

- This version is not faster than using plain dask but it allows more memory per worker.

## Exercise

- We cannot run this example through the notebook.

- But you can prove to yourself by running the cells above and submitting the sluem script.

|<img src="../../fig/notebooks/Terminalicon2.png" height=100 width=100>|
|:--:|
| dask_MPI_calculate_pi |

# Summary

- We can use dask over multiple nodes in Kay by using dask-mpi.

- This is more scalable than the LocalCluster in terms of number of workers and memory per worker.

- Using dask-array we can handle larger than memory problems.

# Links

__[Dask MPI](http://mpi.dask.org/en/latest/)__