# Introduction to dask-mpi

Before proceeding to this notebook, we suggest the reading of ["Introduction to dask"](Dask.ipynb)

## Initialization
### Interactive jobs

When dask is used interactively (e.g. like here in a notebook), dask-mpi needs to be run in the background as a server with a command of the kind
```bash
mpirun -n $((N+1)) dask-mpi --no-nanny --scheduler-file scheduler.json --nthreads 1
```
where `N+1` is the total number of processes having one scheduler and N workers.

Then in the notebook we connect to the server by doing
```python
from dask.distributed import Client
client = Client(scheduler_file="scheduler.json")
```

### Batch jobs

When dask is used in a script, the script needs to be executed in parallel with a command of the kind
```bash
mpirun -n $((N+1)) python script.py
```
and the first line of script.py should be
```python
from dask_mpi import initialize
initialize(nthreads=1, nanny=False)

from dask.distributed import Client
client = Client()
```

For more details about dask-mpi refer to its [documentation](https://mpi.dask.org/en/latest/index.html).

## Example
In the following we start start the server and connect to it.

In [1]:
import sh
import tempfile

# Since dask-mpi produces several file we create a temporary directory
tmppath = tempfile.mkdtemp()
sh.cd(tmppath)

# Here we set the number of workers
workers = 8
threads_per_worker = 1

# The command runs in the background (_bg=True) and the stdout(err) is stored in tmppath+"/log.out(err)"
server = sh.mpirun("-n", workers+1, "dask-mpi", "--no-nanny", "--nthreads", threads_per_worker,
          "--scheduler-file", "scheduler.json", _bg = True, _out="log.out", _err="log.err")


In [2]:
from dask.distributed import Client
client = Client(scheduler_file=tmppath+"/scheduler.json")
client

0,1
Client  Scheduler: tcp://192.168.10.16:46261  Dashboard: http://192.168.10.16:8787/status,Cluster  Workers: 8  Cores: 8  Memory: 16.68 GB


## Workers

Information about the workers can be get using
```python
client.scheduler_info()["workers"]
```
that returns a dictionary with keys the workers name and content the last update about the worker.


In [3]:
workers = list(client.scheduler_info()["workers"].keys())
workers

['tcp://192.168.10.16:33753',
 'tcp://192.168.10.16:34363',
 'tcp://192.168.10.16:34605',
 'tcp://192.168.10.16:37009',
 'tcp://192.168.10.16:39157',
 'tcp://192.168.10.16:40921',
 'tcp://192.168.10.16:45143',
 'tcp://192.168.10.16:45551']

In [4]:
# The known information are for example
client.scheduler_info()["workers"][workers[0]]

{'type': 'Worker',
 'id': 8,
 'host': '192.168.10.16',
 'resources': {},
 'local_directory': '/tmp/tmps33b4cfl/worker-e4qqnt8g',
 'name': 8,
 'nthreads': 1,
 'memory_limit': 2085506048,
 'last_seen': 1601214775.2609286,
 'services': {},
 'metrics': {'cpu': 0.0,
  'memory': 72876032,
  'time': 1601214775.2127116,
  'read_bytes': 0.0,
  'write_bytes': 0.0,
  'num_fds': 37,
  'executing': 0,
  'in_memory': 0,
  'ready': 0,
  'in_flight': 0,
  'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}},
 'nanny': None}

## Distributed operations
We can initialize a group of workers for performing a task using the function 
```python
client.scatter(list, workers = None or workers, broadcast=False, hash=False)
```
where one of each element of the list will be given to one of the workers in a round-robin based. The list of workers can be selected between the workers available.

The content of the list should contain information that the worker needs to proceed.

Here a dummy example.

In [5]:
dummy = range(len(workers))
group = client.scatter(dummy, workers=workers, broadcast=False, hash=False)
group

[<Future: finished, type: builtins.int, key: int-db0f3161d56946bb8642270285ee7122>,
 <Future: finished, type: builtins.int, key: int-1929910361ed416ab7c98d664cb6f21d>,
 <Future: finished, type: builtins.int, key: int-245b2c7b5cdc48de93b62cbfd2e21acd>,
 <Future: finished, type: builtins.int, key: int-50941fa65e3f484c955683743bd2b1e5>,
 <Future: finished, type: builtins.int, key: int-ddc03a75227a475ca29e54ca56fb458a>,
 <Future: finished, type: builtins.int, key: int-14fc35189ce1442fb546530be8e53ca0>,
 <Future: finished, type: builtins.int, key: int-ef49600552284520a63800e9138055ff>,
 <Future: finished, type: builtins.int, key: int-a29b2929f8ae4cf5ad2e24e432c2b47a>]

In [6]:
client.who_has(group)

{'int-ef49600552284520a63800e9138055ff': ('tcp://192.168.10.16:45143',),
 'int-db0f3161d56946bb8642270285ee7122': ('tcp://192.168.10.16:33753',),
 'int-50941fa65e3f484c955683743bd2b1e5': ('tcp://192.168.10.16:37009',),
 'int-245b2c7b5cdc48de93b62cbfd2e21acd': ('tcp://192.168.10.16:34605',),
 'int-14fc35189ce1442fb546530be8e53ca0': ('tcp://192.168.10.16:40921',),
 'int-ddc03a75227a475ca29e54ca56fb458a': ('tcp://192.168.10.16:39157',),
 'int-1929910361ed416ab7c98d664cb6f21d': ('tcp://192.168.10.16:34363',),
 'int-a29b2929f8ae4cf5ad2e24e432c2b47a': ('tcp://192.168.10.16:45551',)}

In [7]:
[g.result() for g in group]

[0, 1, 2, 3, 4, 5, 6, 7]

To check that they are actually distributed we get the rank of each process.

In [8]:
def get_rank(*args,comm=None):
    if comm is None:
        from mpi4py.MPI import COMM_WORLD as comm
    return comm.rank

ranks = client.map(get_rank, group)
ranks = [rank.result() for rank in ranks]
ranks

[8, 1, 2, 3, 7, 5, 6, 4]

We note that `rank = 0` is not in the list because indeed the scheduler is running on it and not a worker.

Thus any MPI operation need to be run on a communcator involing only the workers and not the scheduler.

In [9]:
def create_comm(*args, ranks=None, comm=None):
    assert ranks
    if comm is None:
        from mpi4py.MPI import COMM_WORLD as comm
    return comm.Create_group(comm.group.Incl(ranks))

comms = client.map(create_comm, group, workers=workers, ranks=ranks, actor=True)
comms

[<Future: pending, key: create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-0>,
 <Future: pending, key: create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-1>,
 <Future: pending, key: create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-2>,
 <Future: pending, key: create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-3>,
 <Future: pending, key: create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-4>,
 <Future: pending, key: create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-5>,
 <Future: pending, key: create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-6>,
 <Future: pending, key: create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-7>]

In [10]:
comms = [comm.result() for comm in comms]
comms

[<Actor: Intracomm, key=create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-0>,
 <Actor: Intracomm, key=create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-1>,
 <Actor: Intracomm, key=create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-2>,
 <Actor: Intracomm, key=create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-3>,
 <Actor: Intracomm, key=create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-4>,
 <Actor: Intracomm, key=create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-5>,
 <Actor: Intracomm, key=create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-6>,
 <Actor: Intracomm, key=create_comm-899b7920-c821-45b7-b7bb-4278c163ccf7-7>]

In [11]:
client.scheduler.workers()

<coroutine object PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc at 0x7fb381684ef0>

In [12]:
[comm.rank for comm in comms]

[0, 1, 2, 3, 4, 5, 6, 7]

In [13]:
reductions = [comm.allreduce(1) for comm in comms]
[r.result() for r in reductions]

[8, 8, 8, 8, 8, 8, 8, 8]