Skip to content

Commit

Permalink
Flux adapter (Spectrum MPI) (#82)
Browse files Browse the repository at this point in the history
Introduce a Flux Adapter specifically for Spectrum MPI.

* first attempt at a flux adapter, very basic, probably not functional

Bugfix: If reservation AND if reservation is set, then use it.

Bugfix: If reservation is not empty, replace bank.

Fixed an issue where check_status is able to return nothing.

Correction to cast an int to a str in a join.

switching to flux api, many splash-specific hacks unfortunately

flux reworked

tweak

Addition of a short-circuit clause to avoid RPC.

Avoid making an RPC for an already empty job list.

fail gracefully for joblists that aren't lists

Correction to previous short circuit code.

use a sub-instance for multi-node jobs, explicitly generate an mpirun hostlist

fix hostlist usage

get mpirun going between nodes, use rsh, round-robin across nodes

remove autobind from adapter

add ncores

tweak to mpirun generation

switch ncores def

clear PMI_

split out err, rename output

split out err, rename output

split out err, rename output

allow cancel to fall back to kill, then check done

make use of ncores parameter in submission

switch to using cores per task

Updates to how additional arguments are passed.

Previously we were popping items out of the dictionaries that we passed in. Now we use copy and get.

rework for mpich launch

Update to add "cores per task" as a run parameter.

removing path

tweak for better submission parameters

* Corrected key assignment in options.

* Tweak to pass walltime in seconds.

* Correction of a missing time formatter.

* Addition of different complete status codes.

* Addition of "runrequest" to encountered states.

* Corrections to flux_status call and cast

* Correction to walltime pass.

* Correction of "walltime" var to "wt".

* Update to allow for GPUs to be specified.

* Correction of a missed comma.

* Corrections to style.

* More style changes.

* Corrections for style in study.

* Style correction for adapter import.

* Renamed FluxScriptAdapter to prepend Spectrum variant name.

* Moved Flux runtime importing internal to the class.

* Correction to fix import of the renamed Spectrum variant.

* go back to mpirun

* Minor style fix.
  • Loading branch information
trws authored and FrankD412 committed May 7, 2018
1 parent 2baf994 commit 7f8b237
Show file tree
Hide file tree
Showing 6 changed files with 491 additions and 20 deletions.
18 changes: 13 additions & 5 deletions maestrowf/abstracts/interfaces/schedulerscriptadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,12 @@ def _substitute_parallel_command(self, step_cmd, **kwargs):
err_msg = "{} attempting to allocate {} {} for a parallel call with" \
" a maximum allocation of {}"

nodes = kwargs.pop("nodes")
procs = kwargs.pop("procs")
nodes = kwargs.get("nodes")
procs = kwargs.get("procs")
addl_args = dict(kwargs)
addl_args.pop("nodes")
addl_args.pop("procs")

LOGGER.debug("nodes=%s; procs=%s", nodes, procs)
LOGGER.debug("step_cmd=%s", step_cmd)
# See if the command contains a launcher token in it.
Expand Down Expand Up @@ -205,7 +209,9 @@ def _substitute_parallel_command(self, step_cmd, **kwargs):
LOGGER.error(msg)
raise ValueError(msg)

pcmd = self.get_parallelize_command(_procs, _nodes, **kwargs)
pcmd = self.get_parallelize_command(
_procs, _nodes, **addl_args
)
cmd = cmd.replace(match.group(), pcmd)

# Verify that the total nodes/procs used is within maximum.
Expand All @@ -227,7 +233,7 @@ def _substitute_parallel_command(self, step_cmd, **kwargs):
# any parameters, replace it there with full nodes and procs.
# Otherwise, just return the command. A user may simply want to run
# an unparallelized code in a submission.
pcmd = self.get_parallelize_command(procs, nodes, **kwargs)
pcmd = self.get_parallelize_command(procs, nodes, **addl_args)
# Catch the case where the launcher token appears on its own
if self.launcher_var in step_cmd:
LOGGER.debug("'%s' found in cmd -- %s",
Expand Down Expand Up @@ -262,7 +268,9 @@ def get_scheduler_command(self, step):

# If the user is requesting nodes, we need to request the nodes and
# set up the command with scheduling.
if "nodes" in step.run or "procs" in step.run:
_procs = step.run.get("procs")
_nodes = step.run.get("nodes")
if _procs or _nodes:
to_be_scheduled = True
cmd = self._substitute_parallel_command(
step.run["cmd"],
Expand Down
2 changes: 1 addition & 1 deletion maestrowf/datastructures/core/executiongraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ def check_study_status(self):
logger.info("No jobs found.")
return retcode, step_status
else:
msg = "Unknown Error (Code = {retcode})".format(retcode)
msg = "Unknown Error (Code = {})".format(retcode)
logger.error(msg)
return retcode, step_status

Expand Down
20 changes: 11 additions & 9 deletions maestrowf/datastructures/core/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,17 @@ def __init__(self):
self.name = ""
self.description = ""
self.run = {
"cmd": "",
"depends": "",
"pre": "",
"post": "",
"restart": "",
"nodes": "",
"procs": "",
"walltime": "",
"reservation": ""
"cmd": "",
"depends": "",
"pre": "",
"post": "",
"restart": "",
"nodes": "",
"procs": "",
"gpus": "",
"cores per task": 1,
"walltime": "",
"reservation": ""
}

def apply_parameters(self, combo):
Expand Down
15 changes: 11 additions & 4 deletions maestrowf/interfaces/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,23 @@
"""Collection of custom adapters for interfacing with various systems."""
import logging

from maestrowf.interfaces.script import LocalScriptAdapter, SlurmScriptAdapter
from maestrowf.interfaces.script import \
LocalScriptAdapter, \
SlurmScriptAdapter, \
SpectrumFluxScriptAdapter

__all__ = ("SlurmScriptAdapter", "ScriptAdapterFactory")
__all__ = (
"LocalScriptAdapter", "SlurmScriptAdapter", "SpectrumFluxScriptAdapter",
"ScriptAdapterFactory"
)
LOGGER = logging.getLogger(__name__)


class ScriptAdapterFactory(object):
factories = {
"slurm": SlurmScriptAdapter,
"local": LocalScriptAdapter,
"slurm": SlurmScriptAdapter,
"local": LocalScriptAdapter,
"flux-spectrum": SpectrumFluxScriptAdapter,
}

@classmethod
Expand Down
6 changes: 5 additions & 1 deletion maestrowf/interfaces/script/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,9 @@
###############################################################################
from maestrowf.interfaces.script.localscriptadapter import LocalScriptAdapter
from maestrowf.interfaces.script.slurmscriptadapter import SlurmScriptAdapter
from maestrowf.interfaces.script.fluxscriptadapter import \
SpectrumFluxScriptAdapter

__all__ = ("LocalScriptAdapter", "SlurmScriptAdapter")
__all__ = (
"LocalScriptAdapter", "SlurmScriptAdapter", "SpectrumFluxScriptAdapter"
)

0 comments on commit 7f8b237

Please sign in to comment.