Skip to content

Commit

Permalink
Add MPI-capable tasks (#9)
Browse files Browse the repository at this point in the history
* Add proof of concept for tasks with MPI capabilities (in the same namespace)

* Tidy up a bit

* Carfeul about (ab)using globals

* Make script executable

* Fix spelling error

* Start incorporating the necessary code

* Only import MPI if I need to

* Add some error checking

* Move some more functions to the module, improve errors

* Add the wrapper script for dask_worker

* Simplify

* Prepare for wrapping mpi_dask_worker.py call

* Add dask mpi launcher

* Run black

* Provide traceback when aborting

* Enable switch for forking MPI programs

* Run black on codebase

* Add default MPI launcher to tests

* Restore state in tests

* Prepare for more tests

* Correct warning string

* Correct warning string

* Correct warning string

* Imports done incorrectly

* Add an example to test MPI functionality

* Add launcher to example

* Use trivial queue

* Fix typo

* Only add additional kwargs if they are needed

* No forking other processes when using MPI to launch dask

* Add missing kwargs

* Use simple queues in example

* Move requirements in CI

* Only do the dask worker import where it is needed

* Debugging

* Debugging

* Serialize before submitting the task

* Serialize before submitting the task

* Serialize before submitting the task

* Be more careful with args and kwargs

* Get rid of a warning

* Remove unnecessary print statements

* Make sure we can run multiple tasks

* Make sure we can run multiple tasks

* Make sure we can run multiple tasks

* Flush strings

* REmove bad print command

* See if nanny works

* Only do import when we need to

* Only do import when we need to

* Revert last commit

* Address comment

* Tidy up before adding additional tests

* Remove dask worker directories

* Add initial set of new tests

* Additional tests

* Final additional tests

* Make sure Py2 is also passing
  • Loading branch information
ocaisa committed May 29, 2019
1 parent 71ba25a commit 696f815
Show file tree
Hide file tree
Showing 14 changed files with 607 additions and 106 deletions.
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ branches:

# command to install dependencies
install:
- pip install -r requirements.txt
# Enable running MPI tests
# Enable running MPI tests (and installing MPI4PY)
- sudo apt-get install -y -q openmpi-bin libopenmpi-dev
- pip install mpi4py
- pip install -r requirements.txt

# command to run tests
script:
Expand Down
37 changes: 37 additions & 0 deletions examples/forked_mpi_task_srun.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import print_function
import os

from dask.distributed import LocalCluster

from jobqueue_features.clusters import CustomSLURMCluster
from jobqueue_features.decorators import on_cluster, mpi_task
from jobqueue_features.mpi_wrapper import mpi_wrap
from jobqueue_features.functions import set_default_cluster

# import logging
# logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)

# set_default_cluster(LocalCluster)
set_default_cluster(CustomSLURMCluster)

custom_cluster = CustomSLURMCluster(
name="mpiCluster", walltime="00:04:00", nodes=2, mpi_mode=True, fork_mpi=True
)


@on_cluster(cluster=custom_cluster, cluster_id="mpiCluster")
@mpi_task(cluster_id="mpiCluster")
def mpi_wrap_task(**kwargs):
return mpi_wrap(**kwargs)


# @on_cluster() # LocalCluster
def main():
script_path = os.path.join(os.path.dirname(__file__), "resources", "helloworld.py")
t = mpi_wrap_task(executable="python", exec_args=script_path)
print(t.result()["out"])
print(t.result()["err"])


if __name__ == "__main__":
main()
3 changes: 3 additions & 0 deletions examples/mpi_tasks_complex.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
walltime="00:15:00",
nodes=2,
mpi_mode=True,
fork_mpi=True,
queue_type="gpus",
maximum_scale=5,
env_extra=[
Expand All @@ -46,6 +47,7 @@
walltime="00:15:00",
nodes=4,
mpi_mode=True,
fork_mpi=True,
maximum_scale=10,
queue_type="knl",
python="python",
Expand All @@ -68,6 +70,7 @@
walltime="00:15:00",
nodes=2,
mpi_mode=True,
fork_mpi=True,
maximum_scale=10,
env_extra=[
"module --force purge",
Expand Down
63 changes: 47 additions & 16 deletions examples/mpi_tasks_srun.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,67 @@
from __future__ import print_function
import os

from dask.distributed import LocalCluster
import sys

from jobqueue_features.clusters import CustomSLURMCluster
from jobqueue_features.decorators import on_cluster, mpi_task
from jobqueue_features.mpi_wrapper import mpi_wrap
from jobqueue_features.functions import set_default_cluster
from jobqueue_features.mpi_wrapper import SRUN

# import logging
# logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)

# set_default_cluster(LocalCluster)
set_default_cluster(CustomSLURMCluster)
# logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.DEBUG)

custom_cluster = CustomSLURMCluster(
name="mpiCluster", walltime="00:04:00", nodes=2, mpi_mode=True, queue_type="gpus"
name="mpiCluster", walltime="00:03:00", nodes=2, mpi_mode=True, mpi_launcher=SRUN
)


@mpi_task(cluster_id="mpiCluster")
def mpi_wrap_task(**kwargs):
return mpi_wrap(**kwargs)
def task1(task_name):
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
name = MPI.Get_processor_name()
all_nodes = comm.gather(name, root=0)
if all_nodes:
all_nodes = set(all_nodes)
else:
all_nodes = []
# Since it is a return value it will only get printed by root
return_string = "Running %d tasks of type %s on nodes %s." % (
size,
task_name,
all_nodes,
)
# The flush is required to ensure that the print statements appear in the job log
# files
print(return_string)
sys.stdout.flush()
return return_string


@mpi_task(cluster_id="mpiCluster")
def task2(name, task_name="default"):
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# This only appears in the slurm job output
return_string = "Hi %s, my rank is %d for task of type %s" % (name, rank, task_name)
# The flush is required to ensure that the print statements appear in the job log
# files
print(return_string)
sys.stdout.flush()
return return_string


# @on_cluster() # LocalCluster
@on_cluster(cluster=custom_cluster, cluster_id="mpiCluster")
def main():
script_path = os.path.join(os.path.dirname(__file__), "resources", "helloworld.py")
t = mpi_wrap_task(executable="python", exec_args=script_path)
print(t.result()["out"])
print(t.result()["err"])
t1 = task1("task1")
t2 = task1("task1, 2nd iteration")
t3 = task2("Alan", task_name="Task 2")
print(t1.result())
print(t2.result())
print(t3.result())


if __name__ == "__main__":
Expand Down
12 changes: 11 additions & 1 deletion jobqueue_features/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,17 @@
from .custom_exceptions import ClusterException
from .decorators import task, on_cluster, mpi_task
from .functions import set_default_cluster
from .mpi_wrapper import mpi_wrap, MPIEXEC, SRUN
from .mpi_wrapper import (
mpi_wrap,
MPIEXEC,
SRUN,
which,
serialize_function_and_args,
deserialize_and_execute,
mpi_deserialize_and_execute,
verify_mpi_communicator,
flush_and_abort,
)

from ._version import get_versions

Expand Down
File renamed without changes.
47 changes: 47 additions & 0 deletions jobqueue_features/cli/mpi_dask_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env python
"""
Distribution of MPI enabled tasks
"""

from jobqueue_features.mpi_wrapper import (
mpi_deserialize_and_execute,
serialize_function_and_args,
shutdown_mpitask_worker,
verify_mpi_communicator,
)


# Add the no-nanny option so we don't fork additional processes
MPI_DASK_WRAPPER_MODULE = "jobqueue_features.cli.mpi_dask_worker --no-nanny"


def prepare_for_mpi_tasks(root=0, comm=None):

if comm is None:
from mpi4py import MPI

comm = MPI.COMM_WORLD
verify_mpi_communicator(comm)

rank = comm.Get_rank()

if rank == root:
# Start dask so root reports to scheduler and accepts tasks
# Task distribution is part of task itself (via our wrapper)
from distributed.cli import dask_worker

dask_worker.go()

# As a final task, send a shutdown to the other MPI ranks
serialized_object = serialize_function_and_args(shutdown_mpitask_worker)
mpi_deserialize_and_execute(
serialized_object=serialized_object, root=root, comm=comm
)
else:
while True:
# Calling with no serialized_object means these are non-root processes
mpi_deserialize_and_execute(root=root, comm=comm)


if __name__ == "__main__":
prepare_for_mpi_tasks()
77 changes: 66 additions & 11 deletions jobqueue_features/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from dask.distributed import Client, LocalCluster
from typing import TypeVar, Dict # noqa

from .cli.mpi_dask_worker import MPI_DASK_WRAPPER_MODULE
from .mpi_wrapper import mpi_wrap

import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -84,13 +87,16 @@ class CustomClusterMixin(object):
gpu_job_extra : List[str]
Extra scheduler arguments when requesting a GPU job
warnings : List[str]
A string that holds any desired warning (is turned into a list of
warnings in self.warnings)
A string that holds any desired warning (is turned into a list of warnings in
self.warnings)
mpi_mode : bool
Whether the cluster is to run MPI tasks (jobqueue only manages a single
core, the rest are by the mpi_launcher)
Whether the cluster is to run MPI tasks
mpi_launcher : str
The command that launches MPI jobs (srun, mpiexec, mpirun,...)
fork_mpi: bool
If true, assume all tasks for the cluster fork MPI processes (using mpi_wrap())
rather than that the task itself is MPI-enabled (jobqueue will then only manage
a single core, the rest are managed by the mpi_launcher)
nodes : int
The number of nodes required for MPI
ntasks_per_node : int
Expand All @@ -115,6 +121,7 @@ class CustomClusterMixin(object):
warnings = None # type: List[str]
mpi_mode = None # type: bool
mpi_launcher = None # type: str
fork_mpi = None # type: bool
nodes = None # type: int
ntasks_per_node = None # type: int
cpus_per_task = None # type: int
Expand All @@ -140,9 +147,14 @@ def update_init_kwargs(self, **kwargs): # type: (Dict[...]) -> Dict[...]
self._get_gpu_job_extra(kwargs.get("gpu_job_extra"))
self._get_warnings(kwargs.get("warning"))

# Now do MPI related kwargs
# Now do MPI related kwargs.
# Check if tasks use MPI interface or will fork MPI processes
self._get_fork_mpi(kwargs.get("fork_mpi"))
# Gather parameters for distribution of MPI/OpenMP processes (this also
# modifies the cores reported to dask by the worker)
kwargs = self._update_kwargs_cores(**kwargs)
# Check for any updates to other modifiable jobqueue values: name, queue, memory
# Check for any updates to other modifiable jobqueue values:
# name, queue, memory
kwargs = self._update_kwargs_modifiable(**kwargs)
# update job_extra as needed, first check if we should initialise it
kwargs = self._update_kwargs_job_extra(**kwargs)
Expand Down Expand Up @@ -263,6 +275,13 @@ def _get_mpi_launcher(self, mpi_launcher, default=None): # type: (str) -> None
"be set via the mpi_launcher kwarg or the yaml configuration"
)

def _get_fork_mpi(self, fork_mpi, default=False): # type: (bool) -> None
self.fork_mpi = (
fork_mpi
if isinstance(fork_mpi, bool)
else self.get_kwarg(name="fork-mpi", default=default)
)

def _get_maximum_scale(self, maximum_scale, default=1):
# type: (int) -> None
self.maximum_scale = maximum_scale if maximum_scale is not None else default
Expand Down Expand Up @@ -378,9 +397,12 @@ def _update_kwargs_cores(self, **kwargs): # type: (Dict[...]) -> Dict[...]
self.openmp_env_extra = self.get_kwarg("openmp-env-extra") or []

# We need to "trick" jobqueue into managing an MPI job, we will pretend
# there is on one core available but we will actually allocate more. It
# will then schedule tasks to this Cluster type that can in turn fork out
# MPI executables using our wrapper
# there is on one core available (root) but we will actually allocate more.
# It will then schedule tasks to this Cluster type that can, depending on
# the value of 'fork_mpi', either:
# - Leverage all available processes via the tasks MPI capabilities and
# MPI.COMM_WORLD
# - or fork out MPI executables using our wrapper
kwargs.update({"cores": 1})

else:
Expand Down Expand Up @@ -504,7 +526,7 @@ def __init__(self, **kwargs):
mpi_job_extra.extend(kwargs["job_extra"])
kwargs.update({"job_extra": mpi_job_extra})
super(CustomSLURMCluster, self).__init__(**kwargs)
self._update_script_nodes()
self._update_script_nodes(**kwargs)
self.client = Client(self) # type: Client
# Log all the warnings that we may have accumulated
if self.warnings:
Expand All @@ -513,14 +535,47 @@ def __init__(self, **kwargs):
logger.debug(warning)
logger.debug("\n")

def _update_script_nodes(self): # type: () -> None
def _update_script_nodes(self, **kwargs): # type: () -> None
# If we're not in mpi_mode no need to do anything
if not self.mpi_mode:
return

# When in MPI mode, after jobqueue has initialised we update the jobscript with
# the `real` number of MPI tasks
self.job_header = self.job_header.replace(
"#SBATCH -n 1\n", "#SBATCH -n {}\n".format(self.mpi_tasks)
)

# The default for jobqueue is not to use an MPI launcher (since it is not MPI
# aware). However, if self.fork_mpi=False then the tasks intended for this
# cluster are MPI-enabled. In order to give them an MPI environment we need to
# use our custom wrapper and launch with our MPI launcher
if not self.fork_mpi:
command_template = self._command_template
dask_worker_module = "distributed.cli.dask_worker"
if dask_worker_module in command_template:
command_template = command_template.replace(
dask_worker_module, MPI_DASK_WRAPPER_MODULE
)
else:
raise RuntimeError(
"Python module {} not found in command template:\n{}".format(
dask_worker_module, command_template
)
)
# The first part of the string is the python executable to use for the
# worker
python, arguments = command_template.split(" ", 1)
# Wrap the launch command with our mpi wrapper
command_template = mpi_wrap(
python, exec_args=arguments, return_wrapped_command=True, **kwargs
)
self.warnings.append(
"Replaced command template\n\t{}\nwith\n\t{}\nin jobscript".format(
self._command_template, command_template
)
)
self._command_template = command_template


ClusterType = TypeVar("ClusterType", JobQueueCluster, LocalCluster, CustomSLURMCluster)

0 comments on commit 696f815

Please sign in to comment.