Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MPI-capable tasks #9

Merged
merged 61 commits into from
May 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
120be38
Add proof of concept for tasks with MPI capabilities (in the same nam…
May 20, 2019
e9ca2bc
Tidy up a bit
May 20, 2019
e22b42b
Carfeul about (ab)using globals
May 20, 2019
091fd1e
Make script executable
May 20, 2019
6c2b8b7
Fix spelling error
May 20, 2019
8fa6bce
Merge branch 'mpi_enabled_tasks' of github.com:E-CAM/jobqueue_feature…
May 20, 2019
78bdffd
Merge branch 'master' into mpi_enabled_tasks
May 20, 2019
978176a
Start incorporating the necessary code
May 20, 2019
b00e9c0
Only import MPI if I need to
May 20, 2019
5c2715b
Add some error checking
May 21, 2019
356ea7e
Move some more functions to the module, improve errors
May 21, 2019
e147ba4
Add the wrapper script for dask_worker
May 21, 2019
af3a043
Simplify
May 21, 2019
7939481
Prepare for wrapping mpi_dask_worker.py call
May 21, 2019
7046a87
Add dask mpi launcher
May 21, 2019
4745140
Run black
May 21, 2019
11fcb70
Provide traceback when aborting
May 21, 2019
4b1c8fa
Enable switch for forking MPI programs
May 22, 2019
6317d56
Run black on codebase
May 22, 2019
d0eebce
Add default MPI launcher to tests
May 22, 2019
7ac93a4
Restore state in tests
May 22, 2019
628e94b
Prepare for more tests
May 22, 2019
6a14b26
Correct warning string
May 22, 2019
aec9be0
Correct warning string
May 23, 2019
f7b4fa9
Correct warning string
May 23, 2019
145556c
Imports done incorrectly
May 23, 2019
4b5d6d3
Add an example to test MPI functionality
May 23, 2019
d6f81a8
Add launcher to example
May 23, 2019
0e2e83d
Use trivial queue
May 23, 2019
e372d0c
Fix typo
May 23, 2019
0b5ea90
Only add additional kwargs if they are needed
May 23, 2019
7597b80
No forking other processes when using MPI to launch dask
May 23, 2019
09d5cf8
Add missing kwargs
May 23, 2019
1b9d8b8
Use simple queues in example
May 23, 2019
4552e29
Move requirements in CI
May 23, 2019
e2f0aaf
Only do the dask worker import where it is needed
May 24, 2019
8c830cd
Merge remote-tracking branch 'origin/mpi_enabled_tasks' into mpi_enab…
May 24, 2019
1ce44a6
Debugging
May 24, 2019
14f5e5d
Debugging
May 24, 2019
6a7168a
Serialize before submitting the task
May 24, 2019
0bf2dd1
Serialize before submitting the task
May 24, 2019
73450af
Serialize before submitting the task
May 24, 2019
af29b1f
Be more careful with args and kwargs
May 24, 2019
85be2ba
Get rid of a warning
May 24, 2019
040fd7a
Remove unnecessary print statements
May 24, 2019
55b3da0
Make sure we can run multiple tasks
May 24, 2019
60048dc
Make sure we can run multiple tasks
May 24, 2019
f3543a7
Make sure we can run multiple tasks
May 24, 2019
84a5d25
Flush strings
May 24, 2019
32a7e86
REmove bad print command
May 24, 2019
7bf1799
See if nanny works
May 24, 2019
a6c171e
Only do import when we need to
May 24, 2019
1788b86
Only do import when we need to
May 24, 2019
707a4f0
Revert last commit
May 24, 2019
db6f089
Address comment
May 29, 2019
584c33e
Tidy up before adding additional tests
May 29, 2019
4f0aace
Remove dask worker directories
May 29, 2019
e59b5a7
Add initial set of new tests
May 29, 2019
d48d54e
Additional tests
May 29, 2019
b586dc9
Final additional tests
May 29, 2019
6d3c6ea
Make sure Py2 is also passing
May 29, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ branches:

# command to install dependencies
install:
- pip install -r requirements.txt
# Enable running MPI tests
# Enable running MPI tests (and installing MPI4PY)
- sudo apt-get install -y -q openmpi-bin libopenmpi-dev
- pip install mpi4py
- pip install -r requirements.txt

# command to run tests
script:
Expand Down
37 changes: 37 additions & 0 deletions examples/forked_mpi_task_srun.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import print_function
import os

from dask.distributed import LocalCluster

from jobqueue_features.clusters import CustomSLURMCluster
from jobqueue_features.decorators import on_cluster, mpi_task
from jobqueue_features.mpi_wrapper import mpi_wrap
from jobqueue_features.functions import set_default_cluster

# import logging
# logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)

# set_default_cluster(LocalCluster)
set_default_cluster(CustomSLURMCluster)

custom_cluster = CustomSLURMCluster(
name="mpiCluster", walltime="00:04:00", nodes=2, mpi_mode=True, fork_mpi=True
)


@on_cluster(cluster=custom_cluster, cluster_id="mpiCluster")
@mpi_task(cluster_id="mpiCluster")
def mpi_wrap_task(**kwargs):
return mpi_wrap(**kwargs)


# @on_cluster() # LocalCluster
def main():
script_path = os.path.join(os.path.dirname(__file__), "resources", "helloworld.py")
t = mpi_wrap_task(executable="python", exec_args=script_path)
print(t.result()["out"])
print(t.result()["err"])


if __name__ == "__main__":
main()
3 changes: 3 additions & 0 deletions examples/mpi_tasks_complex.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
walltime="00:15:00",
nodes=2,
mpi_mode=True,
fork_mpi=True,
queue_type="gpus",
maximum_scale=5,
env_extra=[
Expand All @@ -46,6 +47,7 @@
walltime="00:15:00",
nodes=4,
mpi_mode=True,
fork_mpi=True,
maximum_scale=10,
queue_type="knl",
python="python",
Expand All @@ -68,6 +70,7 @@
walltime="00:15:00",
nodes=2,
mpi_mode=True,
fork_mpi=True,
maximum_scale=10,
env_extra=[
"module --force purge",
Expand Down
63 changes: 47 additions & 16 deletions examples/mpi_tasks_srun.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,67 @@
from __future__ import print_function
import os

from dask.distributed import LocalCluster
import sys

from jobqueue_features.clusters import CustomSLURMCluster
from jobqueue_features.decorators import on_cluster, mpi_task
from jobqueue_features.mpi_wrapper import mpi_wrap
from jobqueue_features.functions import set_default_cluster
from jobqueue_features.mpi_wrapper import SRUN

# import logging
# logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)

# set_default_cluster(LocalCluster)
set_default_cluster(CustomSLURMCluster)
# logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.DEBUG)

custom_cluster = CustomSLURMCluster(
name="mpiCluster", walltime="00:04:00", nodes=2, mpi_mode=True, queue_type="gpus"
name="mpiCluster", walltime="00:03:00", nodes=2, mpi_mode=True, mpi_launcher=SRUN
)


@mpi_task(cluster_id="mpiCluster")
def mpi_wrap_task(**kwargs):
return mpi_wrap(**kwargs)
def task1(task_name):
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
name = MPI.Get_processor_name()
all_nodes = comm.gather(name, root=0)
if all_nodes:
all_nodes = set(all_nodes)
else:
all_nodes = []
# Since it is a return value it will only get printed by root
return_string = "Running %d tasks of type %s on nodes %s." % (
size,
task_name,
all_nodes,
)
# The flush is required to ensure that the print statements appear in the job log
# files
print(return_string)
sys.stdout.flush()
return return_string


@mpi_task(cluster_id="mpiCluster")
def task2(name, task_name="default"):
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
# This only appears in the slurm job output
return_string = "Hi %s, my rank is %d for task of type %s" % (name, rank, task_name)
# The flush is required to ensure that the print statements appear in the job log
# files
print(return_string)
sys.stdout.flush()
return return_string


# @on_cluster() # LocalCluster
@on_cluster(cluster=custom_cluster, cluster_id="mpiCluster")
def main():
script_path = os.path.join(os.path.dirname(__file__), "resources", "helloworld.py")
t = mpi_wrap_task(executable="python", exec_args=script_path)
print(t.result()["out"])
print(t.result()["err"])
t1 = task1("task1")
t2 = task1("task1, 2nd iteration")
t3 = task2("Alan", task_name="Task 2")
print(t1.result())
print(t2.result())
print(t3.result())


if __name__ == "__main__":
Expand Down
12 changes: 11 additions & 1 deletion jobqueue_features/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,17 @@
from .custom_exceptions import ClusterException
from .decorators import task, on_cluster, mpi_task
from .functions import set_default_cluster
from .mpi_wrapper import mpi_wrap, MPIEXEC, SRUN
from .mpi_wrapper import (
mpi_wrap,
MPIEXEC,
SRUN,
which,
serialize_function_and_args,
deserialize_and_execute,
mpi_deserialize_and_execute,
verify_mpi_communicator,
flush_and_abort,
)

from ._version import get_versions

Expand Down
47 changes: 47 additions & 0 deletions jobqueue_features/cli/mpi_dask_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env python
"""
Distribution of MPI enabled tasks
"""

from jobqueue_features.mpi_wrapper import (
mpi_deserialize_and_execute,
serialize_function_and_args,
shutdown_mpitask_worker,
verify_mpi_communicator,
)


# Add the no-nanny option so we don't fork additional processes
MPI_DASK_WRAPPER_MODULE = "jobqueue_features.cli.mpi_dask_worker --no-nanny"


def prepare_for_mpi_tasks(root=0, comm=None):

if comm is None:
from mpi4py import MPI

comm = MPI.COMM_WORLD
verify_mpi_communicator(comm)

rank = comm.Get_rank()

if rank == root:
# Start dask so root reports to scheduler and accepts tasks
# Task distribution is part of task itself (via our wrapper)
from distributed.cli import dask_worker

dask_worker.go()

# As a final task, send a shutdown to the other MPI ranks
serialized_object = serialize_function_and_args(shutdown_mpitask_worker)
mpi_deserialize_and_execute(
serialized_object=serialized_object, root=root, comm=comm
)
else:
while True:
# Calling with no serialized_object means these are non-root processes
mpi_deserialize_and_execute(root=root, comm=comm)


if __name__ == "__main__":
prepare_for_mpi_tasks()
77 changes: 66 additions & 11 deletions jobqueue_features/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
from dask.distributed import Client, LocalCluster
from typing import TypeVar, Dict # noqa

from .cli.mpi_dask_worker import MPI_DASK_WRAPPER_MODULE
from .mpi_wrapper import mpi_wrap

import logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -84,13 +87,16 @@ class CustomClusterMixin(object):
gpu_job_extra : List[str]
Extra scheduler arguments when requesting a GPU job
warnings : List[str]
A string that holds any desired warning (is turned into a list of
warnings in self.warnings)
A string that holds any desired warning (is turned into a list of warnings in
self.warnings)
mpi_mode : bool
Whether the cluster is to run MPI tasks (jobqueue only manages a single
core, the rest are by the mpi_launcher)
Whether the cluster is to run MPI tasks
mpi_launcher : str
The command that launches MPI jobs (srun, mpiexec, mpirun,...)
fork_mpi: bool
If true, assume all tasks for the cluster fork MPI processes (using mpi_wrap())
rather than that the task itself is MPI-enabled (jobqueue will then only manage
a single core, the rest are managed by the mpi_launcher)
nodes : int
The number of nodes required for MPI
ntasks_per_node : int
Expand All @@ -115,6 +121,7 @@ class CustomClusterMixin(object):
warnings = None # type: List[str]
mpi_mode = None # type: bool
mpi_launcher = None # type: str
fork_mpi = None # type: bool
nodes = None # type: int
ntasks_per_node = None # type: int
cpus_per_task = None # type: int
Expand All @@ -140,9 +147,14 @@ def update_init_kwargs(self, **kwargs): # type: (Dict[...]) -> Dict[...]
self._get_gpu_job_extra(kwargs.get("gpu_job_extra"))
self._get_warnings(kwargs.get("warning"))

# Now do MPI related kwargs
# Now do MPI related kwargs.
# Check if tasks use MPI interface or will fork MPI processes
self._get_fork_mpi(kwargs.get("fork_mpi"))
# Gather parameters for distribution of MPI/OpenMP processes (this also
# modifies the cores reported to dask by the worker)
kwargs = self._update_kwargs_cores(**kwargs)
# Check for any updates to other modifiable jobqueue values: name, queue, memory
# Check for any updates to other modifiable jobqueue values:
# name, queue, memory
kwargs = self._update_kwargs_modifiable(**kwargs)
# update job_extra as needed, first check if we should initialise it
kwargs = self._update_kwargs_job_extra(**kwargs)
Expand Down Expand Up @@ -263,6 +275,13 @@ def _get_mpi_launcher(self, mpi_launcher, default=None): # type: (str) -> None
"be set via the mpi_launcher kwarg or the yaml configuration"
)

def _get_fork_mpi(self, fork_mpi, default=False): # type: (bool) -> None
self.fork_mpi = (
fork_mpi
if isinstance(fork_mpi, bool)
else self.get_kwarg(name="fork-mpi", default=default)
)

def _get_maximum_scale(self, maximum_scale, default=1):
# type: (int) -> None
self.maximum_scale = maximum_scale if maximum_scale is not None else default
Expand Down Expand Up @@ -378,9 +397,12 @@ def _update_kwargs_cores(self, **kwargs): # type: (Dict[...]) -> Dict[...]
self.openmp_env_extra = self.get_kwarg("openmp-env-extra") or []

# We need to "trick" jobqueue into managing an MPI job, we will pretend
# there is on one core available but we will actually allocate more. It
# will then schedule tasks to this Cluster type that can in turn fork out
# MPI executables using our wrapper
# there is on one core available (root) but we will actually allocate more.
# It will then schedule tasks to this Cluster type that can, depending on
# the value of 'fork_mpi', either:
# - Leverage all available processes via the tasks MPI capabilities and
# MPI.COMM_WORLD
# - or fork out MPI executables using our wrapper
kwargs.update({"cores": 1})

else:
Expand Down Expand Up @@ -504,7 +526,7 @@ def __init__(self, **kwargs):
mpi_job_extra.extend(kwargs["job_extra"])
kwargs.update({"job_extra": mpi_job_extra})
super(CustomSLURMCluster, self).__init__(**kwargs)
self._update_script_nodes()
self._update_script_nodes(**kwargs)
self.client = Client(self) # type: Client
# Log all the warnings that we may have accumulated
if self.warnings:
Expand All @@ -513,14 +535,47 @@ def __init__(self, **kwargs):
logger.debug(warning)
logger.debug("\n")

def _update_script_nodes(self): # type: () -> None
def _update_script_nodes(self, **kwargs): # type: () -> None
# If we're not in mpi_mode no need to do anything
if not self.mpi_mode:
return

# When in MPI mode, after jobqueue has initialised we update the jobscript with
# the `real` number of MPI tasks
self.job_header = self.job_header.replace(
"#SBATCH -n 1\n", "#SBATCH -n {}\n".format(self.mpi_tasks)
)

# The default for jobqueue is not to use an MPI launcher (since it is not MPI
# aware). However, if self.fork_mpi=False then the tasks intended for this
# cluster are MPI-enabled. In order to give them an MPI environment we need to
# use our custom wrapper and launch with our MPI launcher
if not self.fork_mpi:
command_template = self._command_template
dask_worker_module = "distributed.cli.dask_worker"
if dask_worker_module in command_template:
command_template = command_template.replace(
dask_worker_module, MPI_DASK_WRAPPER_MODULE
)
else:
raise RuntimeError(
"Python module {} not found in command template:\n{}".format(
dask_worker_module, command_template
)
)
# The first part of the string is the python executable to use for the
# worker
python, arguments = command_template.split(" ", 1)
# Wrap the launch command with our mpi wrapper
command_template = mpi_wrap(
python, exec_args=arguments, return_wrapped_command=True, **kwargs
)
self.warnings.append(
"Replaced command template\n\t{}\nwith\n\t{}\nin jobscript".format(
self._command_template, command_template
)
)
self._command_template = command_template


ClusterType = TypeVar("ClusterType", JobQueueCluster, LocalCluster, CustomSLURMCluster)