# OMP and MPI Routines in DASK

Dask is a flexible parallel computing library for analytics. It is designed to scale from single machines to clusters of machines. In this notebook, we will explore how to use Dask in parallelization routines often encounterd in OpenMP and MPI. This contains content from the official Dask documentation, modified and arranged in ways I found helpful.

In [2]:
import dask
import dask.array as da
import numpy as np

## OpenMP Routines

OpenMP is an API for parallel programming that supports multi-platform **shared memory** multiprocessing programming in C, C++, and Fortran. It consists of a set of compiler directives, library routines, and environment variables that influence run-time behavior. For simple parallelization, OpenMP is often used to parallelize loops. In this simple example, we are adding two arrays in parallel using OpenMP.

```c
#include <omp.h>
#include <stdio.h>
#include <stdlib.h>

int main(int argc, char *argv[]) {
    int n = 1000000;
    int i;
    double *a = (double *)malloc(n * sizeof(double));
    double *b = (double *)malloc(n * sizeof(double));
    double *c = (double *)malloc(n * sizeof(double));

    #pragma omp parallel for
    for (i = 0; i < n; i++) {
        a[i] = b[i] + c[i];
    }

    free(a);
    free(b);
    free(c);
    return 0;
}
```

A similar example in Python using Dask is shown below. Unlike omp parallel for, parallelization is achieved with dask without explicit loop parallelization.

In [None]:
# set up vectors
a = da.random.random((1000000), chunks=100000)
b = da.random.random((1000000), chunks=100000)
c = a + b
c.compute()

Here, we do not need to use any special directives to parallelize the task. Dask will automatically parallelize the loop for us. Computations are usually not done until the `compute` method is called. In this compute method, we can specify parameters such as the number of workers to use. Another parameter is the `scheduler` which can be used to specify the type of scheduler to use. The default scheduler is the `threaded` scheduler. Other schedulers include the `processes`, `distributed`, `synchronous` or `single-threaded` which can be used to force serial computation for debugging.

In [None]:
# c.compute(num_workers=4)
c.compute(num_workers=4, scheduler='single-threaded')

Generally, if the computation being done is already implemented as a function in dask (which is mostly the case if it's in numpy), then you can just use the function and dask will automatically parallelize it for you. If the computation is not already implemented in dask, then the `dask.delayed` function can be used to make it recognized and treated as a dask function.

```python
import dask

@dask.delayed
def add(x, y):
   return x + y

a = [1, 2, 3, 4, 5]
b = [10, 20, 30, 40, 50]
c = add(a, b)    # no work has happened yet

c = c.compute()  # This triggers all of the above computations
```
In the case where there are overlaps that need to be handled, `map_overlap` can be used. This function has the ability to specify the overlap between the within blocks and boundary conditions for each dimension.

In [None]:
x = np.array([1, 1, 2, 3, 3, 3, 2, 1, 1])
x = da.from_array(x, chunks=5)
def derivative(x):
    return x - np.roll(x, 1)

y = x.map_overlap(derivative, depth=1, boundary=0)
y.compute()

`map_overlap` required inputs of the applied function to be the same array length. In other cases where this is not the case, or more complex operations are needed, `overlap` can be used to specify the overlap between the within blocks and boundary conditions for each dimension. Then, `map_blocks` can be used to apply the function to each block of the array. In all these cases, it is assumed that each block\chunk is handled independently (possibly by different threads).

In [None]:
x2 = da.overlap.overlap(x, depth=1, boundary=0)
y = x2.map_blocks(derivative)
y.compute()
da.overlap.trim_internal(y,{0:1}, boundary=0).compute()

`trim_internal` is a function that removes the overlap from a dask array. It is useful when you want to remove the overlap from a dask array after you have used `map_block` or `overlap.overlap` to add overlap to the array. 

In [15]:
A  = da.random.random((1000, 1000), chunks=(100, 100))
x = da.random.random(1000, chunks=(100,))
s = 5
b = da.dot(A, x) + s
# b.compute(num_workers=10)
b.compute()

array([263.40599389, 252.42654719, 253.06725858, 257.33083158,
       258.49945361, 257.63732228, 249.14777259, 262.1713444 ,
       261.9088465 , 254.84575984, 256.34316149, 247.92399336,
       260.93686225, 261.97536835, 257.88065693, 257.87799054,
       258.25710907, 250.85192789, 242.21893369, 257.91076453,
       251.8752586 , 257.26874207, 258.28753184, 252.2035897 ,
       252.53755057, 259.79976053, 253.08238644, 258.30206696,
       260.52663945, 252.33002278, 250.30884746, 257.52145745,
       244.4181332 , 258.19394442, 255.69127543, 257.55723445,
       250.98985734, 253.15740804, 258.42152554, 246.7024037 ,
       251.3032114 , 260.209792  , 256.14615177, 255.69431968,
       257.95262865, 259.30271262, 252.4115309 , 251.55576528,
       249.20607131, 248.80921865, 240.11557637, 258.65201632,
       258.9228841 , 250.931712  , 256.64019767, 249.99531679,
       257.18282259, 261.11485986, 259.63481209, 259.7350848 ,
       261.09120764, 264.47831382, 251.45760171, 259.39

## MPI Routines in Dask

MPI (Message Passing Interface) is a standardized and portable message-passing system that defines the syntax and semantics of library routines that can be called from C, C++, and Fortran. It allows for the communication between processes in distributed parallel programming. Commonly used routines include reduction, broadcasting, scattering, gathering, and scatter-gather. To do distributed computing in Dask, the `dask.distributed` module can be used.

The first thing you might do when using dask.distributed is to set up a cluster. This step can be skipped if you are using a local cluster. Otherwise there are ways for setting up a local cluster, cloud system or an HPC environment. The next is to create a `Client` object. This object is used to interact with the dask cluster. The `Client` object can be used to submit tasks to the cluster, monitor the cluster, and gather results from the cluster.

```python
# Another way to set up a local cluster
from dask.distributed import LocalCluster
# from dask_kubernetes import KubeCluster

cluster = LocalCluster()          # Fully-featured local Dask cluster
# cluster = KubeCluster()  # example, you can swap out for Kubernetes
client = cluster.get_client()

# Cloud setup
import coiled
cluster = coiled.Cluster(
    n_workers=100,
    region="us-east-2",
    worker_memory="16 GiB",
    spot_policy="spot_with_fallback",
)
client = cluster.get_client()

# HPC setup
from dask_jobqueue import PBSCluster
cluster = PBSCluster(
    cores=24,
    memory="100GB",
    queue="regular",
    account="my-account",
)
cluster.scale(jobs=100)
client = cluster.get_client()


# Now you can use the client to submit work to the cluster
# Dask works as normal and leverages the infrastructure defined above
c.sum().compute()
```

In [58]:
# easy to use dask.distributed to setup a cluster on local machine
from dask.distributed import Client, get_worker
client = Client()  # set up local cluster on your laptop

client # This displays a dashboard link to monitor the cluster
# client.scheduler_info() # This displays the scheduler info
# client.close()  # close the cluster

Perhaps you already have a cluster running?
Hosting the HTTP server on port 49921 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:49921/status,

0,1
Dashboard: http://127.0.0.1:49921/status,Workers: 4
Total threads: 12,Total memory: 15.82 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:49922,Workers: 4
Dashboard: http://127.0.0.1:49921/status,Total threads: 12
Started: Just now,Total memory: 15.82 GiB

0,1
Comm: tcp://127.0.0.1:49953,Total threads: 3
Dashboard: http://127.0.0.1:49954/status,Memory: 3.96 GiB
Nanny: tcp://127.0.0.1:49926,
Local directory: C:\Users\issah\AppData\Local\Temp\dask-worker-space\worker-61uc2tbf,Local directory: C:\Users\issah\AppData\Local\Temp\dask-worker-space\worker-61uc2tbf

0,1
Comm: tcp://127.0.0.1:49962,Total threads: 3
Dashboard: http://127.0.0.1:49963/status,Memory: 3.96 GiB
Nanny: tcp://127.0.0.1:49925,
Local directory: C:\Users\issah\AppData\Local\Temp\dask-worker-space\worker-13lhsxf5,Local directory: C:\Users\issah\AppData\Local\Temp\dask-worker-space\worker-13lhsxf5

0,1
Comm: tcp://127.0.0.1:49959,Total threads: 3
Dashboard: http://127.0.0.1:49960/status,Memory: 3.96 GiB
Nanny: tcp://127.0.0.1:49927,
Local directory: C:\Users\issah\AppData\Local\Temp\dask-worker-space\worker-j98htaje,Local directory: C:\Users\issah\AppData\Local\Temp\dask-worker-space\worker-j98htaje

0,1
Comm: tcp://127.0.0.1:49956,Total threads: 3
Dashboard: http://127.0.0.1:49957/status,Memory: 3.96 GiB
Nanny: tcp://127.0.0.1:49928,
Local directory: C:\Users\issah\AppData\Local\Temp\dask-worker-space\worker-7lj1igeh,Local directory: C:\Users\issah\AppData\Local\Temp\dask-worker-space\worker-7lj1igeh


In [15]:
# Now you can use the client to submit work to the cluster
# Dask works as normal and leverages the infrastructure defined above
a = da.random.random((10), chunks=10)
b = da.random.random((10), chunks=10)
c = a + b   # no work has happened yet
c.sum().compute()

def inc(x):
        return x + 1

x = client.submit(inc, 10) # for a function call on a single input
x.result()

xs = client.map(inc, [1, 2, 3, 4]) # for a function call on multiple inputs
client.gather(xs)

# An example for a multiple input function
# def add(x, y):
#     return x + y

# futures = client.map(add, [1, 2, 3, 4], [10, 20, 30, 40])
# results = client.gather(futures)
# results

10.757894553779016

### Distributing data (Broadcasting\Scattering)

Scattering is a technique used to send data to the various processes. This is useful when we want to send different data to different threads. Broadcasting is when all data is sent to all processes.

The dask documentation states the following:
>"In the common case distributed runs tasks on workers that already hold dependent data. If you have a task f(x) that requires some data x then that task will very likely be run on the worker that already holds x.If a task requires data split among multiple workers, then the scheduler chooses to run the task on the worker that requires the least data transfer to it." 

However, dask provides a way to send data to the workers if more control is needed. Dask clients have a `scatter` method that can be used to send data to the workers. The `scatter` method takes a dictionary of data and returns a dictionary of futures. The futures can be used to refer to the data on the workers. The `scatter` method also has an `broadcast` parameter that can be used to broadcast the data to all workers.

In [59]:
# Demonstrate the scattering/broadcasting of data
def f(x):
    worker = get_worker()  # The worker on which this task is running
    return x+1, worker.address

futures = client.scatter(range(18,30), workers=[0,1])

x = client.map(f, futures)
client.gather(x)


[<Future: finished, type: int, key: int-e9e0ac12a38f591e93f20303313b3b98>, <Future: finished, type: int, key: int-ace910284a9b4d9b877cf9f8e4fd9770>, <Future: finished, type: int, key: int-6234f530ffb77cc79e4130d405077fe2>, <Future: finished, type: int, key: int-54abf7066a03bddb650fbb1616e27bc5>, <Future: finished, type: int, key: int-8af1e866f46a9f199166f3e516c29547>, <Future: finished, type: int, key: int-f14df74c6fe0e49005ce8e0b85ccb8cd>, <Future: finished, type: int, key: int-bb5361e9e63bc408906ec838d1817eb3>, <Future: finished, type: int, key: int-1e9eef22eca3c7724d48d6e38ce21d9d>, <Future: finished, type: int, key: int-7c5fd0057a3c3a37de5af21c4bc781db>, <Future: finished, type: int, key: int-93d434661371fd077c2a114f1b40b39e>, <Future: finished, type: int, key: int-5b4ec30c09ef0f7eca3753c7f722b654>, <Future: finished, type: int, key: int-1c038a6357cd80d62ff8254dd775c7ab>]


[(19, 'tcp://127.0.0.1:49962'),
 (20, 'tcp://127.0.0.1:49962'),
 (21, 'tcp://127.0.0.1:49953'),
 (22, 'tcp://127.0.0.1:49953'),
 (23, 'tcp://127.0.0.1:49953'),
 (24, 'tcp://127.0.0.1:49962'),
 (25, 'tcp://127.0.0.1:49962'),
 (26, 'tcp://127.0.0.1:49962'),
 (27, 'tcp://127.0.0.1:49953'),
 (28, 'tcp://127.0.0.1:49953'),
 (29, 'tcp://127.0.0.1:49953'),
 (30, 'tcp://127.0.0.1:49962')]

Here, we split the data between only two workers using the 'workers=[0,1]' parameter. Hence, when we run functions on this data, the scheduler runs the function on these two workers since already they have the data. 

### Gathering

Gathering is a technique used to collect the results of multiple processes into a single result. In dask, the `gather` method can be used to collect the results of the futures into a single result. The `gather` method takes a list of futures and returns a list of results. 

In [60]:
def add(x, y):
    return x + y

futures = client.map(add, [1, 2, 3, 4], [10, 20, 30, 40])
results = client.gather(futures)
results

[11, 22, 33, 44]

### Reduction

Reduction is a technique used to combine the results of multiple threads into a single result. This is useful when we want to calculate the sum, product, maximum, minimum, or any other operation that can be performed on a set of numbers.

In [70]:
# futures = client.scatter(range(18,30), workers=[0,1])
futures = client.map(add, [1, 2, 3, 4], [10, 20, 30, 40])
results = client.submit(sum, futures) # sum is a built-in function, we use it here as the reduction operator.
results.result()

110