# Forking out command line MPI tasks
Imagine I have a workflow that requires me to run an MPI executable (e.g., `mpiexec -n 4 <my_executable>`) and process the result. We can do that via the use of the `mpi_wrap()` function from the library, such that Dask manages the MPI runtime task that launches the job.
`mpi_wrap()` returns a dictionary:
```python
{
    "cmd": cmd_launched, 
    "out": stdout_output, 
    "err": stderr_output
}
```
This (currently) requires a little boilerplate code to work as one might expect (see the example below).

## Interacting with the task
If you need to grab information from the executed task, you can either do something to parse this dictionary, or interact with the executed task via the file system (e.g., read a result file).

In [None]:
import os
from jobqueue_features.clusters import CustomSLURMCluster
from jobqueue_features.decorators import on_cluster, mpi_task
from jobqueue_features.mpi_wrapper import mpi_wrap
from jobqueue_features.clusters_controller import (
    clusters_controller_singleton as controller,
)

This time when we declare our cluster we need to add a few additional keyword arguments:
* `mpi_mode` to let the cluster know we will use MPI
* `fork_mpi`, to indicate that we will be forking MPI processes
* `nodes` to indicate how much resources we want use (there are many other options for this)

In [None]:
custom_cluster = CustomSLURMCluster(
    name="mpiCluster",
    mpi_mode=True,
    fork_mpi=True,
    nodes=2,
)

In [None]:
custom_cluster

We need boilerplate code because our decorators insert some kwargs that are required for `mpi_wrap` to execute the task (things like the number of processes since these usually have to be communicated to the MPI runtime)

In [None]:
@on_cluster(cluster=custom_cluster)
@mpi_task(cluster=custom_cluster)
def mpi_wrap_task(**kwargs):
    return mpi_wrap(**kwargs)

And now we can use our newly created function `mpi_wrap_task()` in the same way we would use `mpi_wrap()`

In [None]:
def forked_mpi():
    script_path = os.path.join("resources", "helloworld.py")
    t = mpi_wrap_task(
        executable="python", 
        exec_args=script_path
    )
    
    return t

Not let's run it

In [None]:
result = forked_mpi()

The result is a future, let's get the response and see what we ran

In [None]:
final_result = result.result()
print("Ran\n\t", final_result["cmd"])
output = final_result["out"]
# Need to decode the output string so it is easily printed
print(output.decode('UTF-8'))

Let's clean up after ourselves

In [None]:
controller._close()

# Scaling up

Let's make a new cluster that has the capability to scale. Each worker will have one node, and since we know we can have a maximum of 2 workers running at the same time, we can also set our `maximum_jobs`.

In [None]:
multi_fork_cluster = CustomSLURMCluster(
    name="multi_fork_cluster",
    mpi_mode=True,
    fork_mpi=True,
    nodes=1,
    maximum_jobs=2
)

Let's redefine our task to use the new cluster.

In [None]:
@on_cluster(cluster=multi_fork_cluster)
@mpi_task(cluster=multi_fork_cluster)
def mpi_wrap_task(**kwargs):
    return mpi_wrap(**kwargs)

Now let's make a whole list of tasks to run on the cluster and check which nodes they run on.

In [None]:
tasks = []
for x in range(100):
    tasks.append(forked_mpi())

Now let's count the results

In [None]:
c1_count = 0
c2_count = 0
for job in tasks:
    result = job.result()["out"]
    if 'c1'.encode() in result:
        c1_count += 1
    elif 'c2'.encode() in result:
        c2_count += 1
print("c1: {} \nc2: {}".format(c1_count, c2_count))

# Bookkeeping for your `future`s

But look in the dashboard, there is still a worker running and all our results are still in memory. This is because the futures are still in the current context and the garbage collector doesn't know we are finished with them yet.

What happens if that running job dies?

In [None]:
!squeue | grep batch | awk '{print $1}' | xargs -i scancel {}

Everything gets recalculated (eventually)!

If we know we are finished with our future let's release it once we have what we need.

Let's also take the opportunity to leverage a feature of `distributed` for working with futures, the function `as_complete()` which returns futures in the order that they complete. Watch the dashboard this time around to see the different behaviour.

In [None]:
from distributed import as_completed

tasks = []
for x in range(100):
    tasks.append(forked_mpi())

c1_count = 0
c2_count = 0
for job in as_completed(tasks):
    result = job.result()["out"]
    # Now we have what we need, cancel the future
    job.cancel()
    
    if 'c1'.encode() in result:
        c1_count += 1
    elif 'c2'.encode() in result:
        c2_count += 1
print("c1: {} \nc2: {}".format(c1_count, c2_count))

Let's clean up after ourselves

## Exercise

1. Redefine `forked_mpi` as `forked_mpi_2` function so it will execute the `resources/square.py` script.
2. Get the result, split lines, map to integers and sum numbers from outputs.

In [None]:
def forked_mpi_2():
    # 1. define script path
    # 2. make a task with `mpi_wrap_task`
    # 3. return task
    pass # remove it and insert code here

task = forked_mpi_2()  # 4. get task
# 5. get result from task,
# 6. get value from result with key "out" and map it to int

In [None]:
controller._close()