# MPI single task

Lest start from imports

In [None]:
import os
from jobqueue_features import (
    MPIEXEC,
    CustomSLURMCluster,
)

from jobqueue_features import on_cluster, mpi_task, get_task_mpi_comm

Let's define task script path.

In [None]:
script_path = os.path.abspath(
    os.path.join(os.getcwd(), "docker_config", "slurm", "tutorial_tasks", "resources", "helloworld.py")
)

Let's define cluster configuration.

In [None]:
common_kwargs = {
    "walltime": "00:04:00",
    "cores_per_node": 2,
    "minimum_cores": 2,
    "hyperthreading_factor": 1,
    "ntasks_per_node": 2,
    "memory": "512 MB",
    "mpi_mode": True,
    "env_extra": [
        "export OMPI_ALLOW_RUN_AS_ROOT_CONFIRM=1",
        "export OMPI_ALLOW_RUN_AS_ROOT=1",
    ],
    "mpi_launcher": MPIEXEC,
    "local_directory": "/tmp",
    "queue": "batch",
}

Let's start cluster.

In [None]:
custom_cluster = CustomSLURMCluster(
    name="mpiCluster",
    nodes=2,
    **common_kwargs
)

Let's define tasks.

In [None]:
@on_cluster(cluster=custom_cluster)
@mpi_task(cluster_id=custom_cluster.name)
def task1(task_name):
    from mpi4py import MPI

    comm = get_task_mpi_comm()
    size = comm.Get_size()
    name = MPI.Get_processor_name()
    all_nodes = comm.gather(name, root=0)
    if all_nodes:
        all_nodes = list(set(all_nodes))
        all_nodes.sort()
    else:
        all_nodes = []
    # Since it is a return  value it will only get printed by root
    return_string = "Running %d tasks of type %s on nodes %s." % (
        size,
        task_name,
        all_nodes,
    )
    return return_string

@on_cluster(cluster=custom_cluster)
@mpi_task(cluster_id=custom_cluster.name)
def task2(name, task_name="default"):
    comm = get_task_mpi_comm()
    rank = comm.Get_rank()
    # This only appears in the slurm job output
    return_string = "Hi %s, my rank is %d for task of type %s" % (
        name,
        rank,
        task_name,
    )
    return return_string

First run our tasks:

In [None]:
tasks = []
tasks.append(
    task1("task1")
)
tasks.append(
    task1("task1, 2nd iteration")
)
tasks.append(
    task2("Alan", task_name="Task 2")
)

Then check the results of it:

In [None]:
for task in tasks:
    print(task.result())

It's alive!