Skip to content

Commit

Permalink
Flux Adapter Clean up and Addition of Job Priority (#379)
Browse files Browse the repository at this point in the history
* Additional logic for walltime parsing.

* Removal of forcing broker when multinode.

* Renamed the use_broker setting to nested.

* Addition of urgencies to job submission.

* Correction to Urgency mapping.

* Bugfix from isdigit to isnumeric.

* Addition of from_str for StepUrgency

* Added parsing of urgency to FluxAdapter.

* Addition of a float type urgency.

* Removed exception when finding entries that are not strings.

* Update to add get_priority to SchedulerScriptAdapter

* Addition of urgency mapping for Flux backend.

* Update to the Flux adapter to support urgency modifications.

* Rename StepUrgency to StepPriority

* Pass on types that can't be substituted.

* Removal of annotations to support 3.6

* Corrections to the passing of urgency values.

* Change enums to lower case.

* Addition of sensible capitalization of priority.

* Tweaks to Flux example for new keys.

* Removal of resource checks.
  • Loading branch information
FrankD412 committed Mar 12, 2022
1 parent a4bc459 commit aaa7eff
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 41 deletions.
31 changes: 30 additions & 1 deletion maestrowf/abstracts/enums/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
"""Package for providing enumerations for interfaces"""
from enum import Enum

__all__ = ("JobStatusCode", "State", "SubmissionCode", "StudyStatus")
__all__ = (
"JobStatusCode", "State", "SubmissionCode", "StepPriority", "StudyStatus"
)


class SubmissionCode(Enum):
Expand Down Expand Up @@ -75,3 +77,30 @@ class StudyStatus(Enum):
RUNNING = 1 # The Study is currently running
FAILURE = 2 # The Study has finished, but 1 or more steps failed
CANCELLED = 3 # The Study has finished, but was cancelled


class StepPriority(Enum):
"""Scheduler priority for submitted jobs"""
HELD = 0
MINIMAL = 1
LOW = 2
MEDIUM = 3
HIGH = 4
EXPEDITE = 5

@classmethod
def from_str(cls, priority):
_priority = priority.lower()

if _priority == "held":
return cls.HELD
if _priority == "minimal":
return cls.MINIMAL
if _priority == "medium":
return cls.MEDIUM
if _priority == "high":
return cls.HIGH
if _priority == "expedite":
return cls.EXPEDITE

raise ValueError(f"Priority '{priority}' not valid.")
40 changes: 32 additions & 8 deletions maestrowf/abstracts/interfaces/flux.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from abc import ABC, abstractclassmethod, abstractmethod, \
abstractstaticmethod
from abc import ABC, abstractmethod
import logging

from maestrowf.abstracts.enums import StepPriority

LOGGER = logging.getLogger(__name__)

try:
Expand Down Expand Up @@ -45,16 +46,30 @@ def connect_to_flux(cls):
except ImportError:
pass

@abstractclassmethod
@classmethod
@abstractmethod
def get_flux_urgency(cls, urgency) -> int:
"""
Map a fixed enumeration or floating point priority to a Flux urgency.
:param priority: Float or StepPriority enum representing priorty.
:returns: An integery mapping the urgency parameter to a Flux urgency.
"""
raise NotImplementedError()

@classmethod
@abstractmethod
def get_statuses(cls, joblist):
"""
Return the statuses from a given Flux handle and joblist.
:param joblist: A list of jobs to check the status of.
:return: A dictionary of job identifiers to statuses.
"""
...

@abstractstaticmethod
@classmethod
@abstractmethod
def state(state):
"""
Map a scheduler specific job state to a Study.State enum.
Expand All @@ -63,8 +78,10 @@ def state(state):
:param state: A string of the state returned by Flux
:return: The mapped Study.State enumeration
"""
...

@abstractclassmethod
@classmethod
@abstractmethod
def parallelize(cls, procs, nodes=None, **kwargs):
"""
Create a parallelized Flux command for launching.
Expand All @@ -74,11 +91,13 @@ def parallelize(cls, procs, nodes=None, **kwargs):
:param kwargs: Extra keyword arguments.
:return: A string of a Flux MPI command.
"""
...

@abstractclassmethod
@classmethod
@abstractmethod
def submit(
cls, nodes, procs, cores_per_task, path, cwd, walltime,
npgus=0, job_name=None, force_broker=False
npgus=0, job_name=None, force_broker=False, urgency=StepPriority.MEDIUM
):
"""
Submit a job using this Flux interface's submit API.
Expand All @@ -92,19 +111,23 @@ def submit(
:param ngpus: The number of GPUs to request on submission.
:param job_name: A name string to assign the submitted job.
:param force_broker: Forces the script to run under a Flux sub-broker.
:param urgency: Enumerated scheduling priority for the submitted job.
:return: A string representing the jobid returned by Flux submit.
:return: An integer of the return code submission returned.
:return: SubmissionCode enumeration that reflects result of submission.
"""
...

@abstractclassmethod
@classmethod
@abstractmethod
def cancel(cls, joblist):
"""
Cancel a job using this Flux interface's cancellation API.
:param joblist: A list of job identifiers to cancel.
:return: CancelCode enumeration that reflects result of cancellation.
"""
...

@property
@abstractmethod
Expand All @@ -117,3 +140,4 @@ def key(self):
:return: A string of the name of a FluxInterface class.
"""
...
10 changes: 10 additions & 0 deletions maestrowf/abstracts/interfaces/schedulerscriptadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,13 @@ def _state(self, job_state):
:returns: A Study.State enum corresponding to parameter job_state.
"""
pass

def get_priority(self, priority):
"""
Map a fixed enumeration or floating point priority to a batch priority.
:param priority: Float or StepPriority enum representing priorty.
:returns: A string, integer, or float value representing the mapped
priority to the batch scheduler.
"""
raise NotImplementedError()
11 changes: 5 additions & 6 deletions maestrowf/datastructures/core/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,6 @@ def _get_used_parameters(self, item, params):
"""
if not item:
return
elif isinstance(item, int):
return
elif isinstance(item, str):
for key in self.parameters.keys():
_ = r"\{}\({}\.*\w*\)".format(self.token, key)
Expand All @@ -344,10 +342,11 @@ def _get_used_parameters(self, item, params):
for each in item.values():
self._get_used_parameters(each, params)
else:
msg = "Encountered an object of type '{}'. Expected a str, list," \
" int, or dict.".format(type(item))
logger.error(msg)
raise ValueError(msg)
msg = \
"Encountered an object of type '{}'. Passing."\
.format(type(item))
logger.debug(msg)
return

def get_used_parameters(self, step):
"""
Expand Down
33 changes: 28 additions & 5 deletions maestrowf/interfaces/script/_flux/flux0_26_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import os

from maestrowf.abstracts.enums import CancelCode, JobStatusCode, State, \
SubmissionCode
StepPriority, SubmissionCode
from maestrowf.abstracts.interfaces.flux import FluxInterface

LOGGER = logging.getLogger(__name__)
Expand All @@ -19,11 +19,32 @@ class FluxInterface_0260(FluxInterface):
key = "0.26.0"

flux_handle = None
_urgencies = {
StepPriority.HELD: 0,
StepPriority.MINIMAL: 1,
StepPriority.LOW: 9,
StepPriority.MEDIUM: 16,
StepPriority.HIGH: 24,
StepPriority.EXPEDITE: 31,
}

@classmethod
def get_flux_urgency(cls, urgency) -> int:
if isinstance(urgency, str):
LOGGER.debug("Found string urgency: %s", urgency)
urgency = StepPriority.from_str(urgency)

if isinstance(urgency, StepPriority):
LOGGER.debug("StepUrgency urgency of '%s' given..", urgency)
return cls._urgencies[urgency]
else:
LOGGER.debug("Float urgency of '%s' given..", urgency)
return ceil(float(urgency) * 31)

@classmethod
def submit(
cls, nodes, procs, cores_per_task, path, cwd, walltime,
ngpus=0, job_name=None, force_broker=True
ngpus=0, job_name=None, force_broker=True, urgency=StepPriority.MEDIUM
):
try:
cls.connect_to_flux()
Expand All @@ -35,7 +56,7 @@ def submit(
# for a single node, don't use a broker -- but introduce a flag
# that can force a single node to run in a broker.

if force_broker or nodes > 1:
if force_broker:
LOGGER.debug(
"Launch under Flux sub-broker. [force_broker=%s, "
"nodes=%d]", force_broker, nodes
Expand Down Expand Up @@ -66,8 +87,10 @@ def submit(
jobspec.stderr = f"{job_name}.{{{{id}}}}.err"

# Submit our job spec.
jobid = \
flux.job.submit(cls.flux_handle, jobspec, waitable=True)
jobid = flux.job.submit(
cls.flux_handle, jobspec, waitable=True,
urgency=urgency
)
submit_status = SubmissionCode.OK
retcode = 0

Expand Down
25 changes: 12 additions & 13 deletions maestrowf/interfaces/script/fluxscriptadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ def __init__(self, **kwargs):
self.h = None
# Store the interface we're using
_version = kwargs.pop("version", FluxFactory.latest)
self.add_batch_parameter(
"version", _version)
self.add_batch_parameter("version", _version)
self._interface = FluxFactory.get_interface(_version)

@property
Expand All @@ -108,14 +107,17 @@ def _convert_walltime_to_seconds(self, walltime):
if isinstance(walltime, int) or isinstance(walltime, float):
LOGGER.debug("Encountered numeric walltime = %s", str(walltime))
return int(float(walltime) * 60.0)
elif isinstance(walltime, str) and walltime.isnumeric():
LOGGER.debug("Encountered numeric walltime = %s", str(walltime))
return int(float(walltime) * 60.0)
elif ":" in walltime:
# Convert walltime to seconds.
LOGGER.debug("Converting %s to seconds...", walltime)
seconds = 0.0
for i, value in enumerate(walltime.split(":")[::-1]):
seconds += float(value) * (60.0 ** i)
return seconds
elif not walltime:
elif not walltime or (isinstance(walltime, str) and walltime == "inf"):
return 0
else:
msg = \
Expand Down Expand Up @@ -180,9 +182,11 @@ def submit(self, step, path, cwd, job_map=None, env=None):
"""
nodes = step.run.get("nodes")
processors = step.run.get("procs", 0)
force_broker = step.run.get("use_broker", True)
force_broker = step.run.get("nested", False)
walltime = \
self._convert_walltime_to_seconds(step.run.get("walltime", 0))
urgency = step.run.get("priority", "medium")
urgency = self.get_priority(urgency)

# Compute cores per task
cores_per_task = step.run.get("cores per task", None)
Expand All @@ -203,14 +207,6 @@ def submit(self, step, path, cwd, job_map=None, env=None):

# Calculate nprocs
ncores = cores_per_task * nodes
# Check to make sure that cores_per_task matches if processors
# is specified.
if processors > 0 and processors > ncores:
msg = "Calculated ncores (nodes * cores per task) = {} " \
"-- procs = {}".format(ncores, processors)
LOGGER.error(msg)
raise ValueError(msg)

# Raise an exception if ncores is 0
if ncores <= 0:
msg = "Invalid number of cores specified. " \
Expand All @@ -221,7 +217,7 @@ def submit(self, step, path, cwd, job_map=None, env=None):
jobid, retcode, submit_status = \
self._interface.submit(
nodes, processors, cores_per_task, path, cwd, walltime, ngpus,
job_name=step.name, force_broker=force_broker
job_name=step.name, force_broker=force_broker, urgency=urgency
)

return SubmissionRecord(submit_status, retcode, jobid)
Expand Down Expand Up @@ -325,3 +321,6 @@ def _write_script(self, ws_path, step):
restart_path = None

return to_be_scheduled, script_path, restart_path

def get_priority(self, priority):
return self._interface.get_flux_urgency(priority)
15 changes: 14 additions & 1 deletion maestrowf/specification/schemas/yamlspecification.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,20 @@
{"type": "boolean"},
{"type": "string", "pattern": "^\\$\\(\\w+\\)$"}
]},
"use_broker": {"type": "boolean"},
"nested": {"type": "boolean"},
"priority": {
"anyOf": [
{
"type": "string",
"enum": [
"HELD", "MINIMAL", "LOW", "MEDIUM", "HIGH", "EXPEDITED",
"held", "minimal", "low", "medium", "high", "expedited",
"Held", "Minimal", "Low", "Medium", "High", "Expedited"
]
},
{"type": "number", "minimum": 0.0, "maximum": 1.0}
]
},
"qos": {"type": "string", "minLength": 1}
},
"additionalProperties": false,
Expand Down
11 changes: 5 additions & 6 deletions maestrowf/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,12 @@ def apply_function(item, func):
elif isinstance(item, dict):
return {
key: apply_function(value, func) for key, value in item.items()}
elif isinstance(item, int):
return item
else:
msg = "Encountered an object of type '{}'. Expected a str, list, int" \
", or dict.".format(type(item))
LOGGER.error(msg)
raise ValueError(msg)
msg = \
"Encountered an object of type '{}'. Passing." \
.format(type(item))
LOGGER.debug(msg)
return item


def csvtable_to_dict(fstream):
Expand Down
3 changes: 2 additions & 1 deletion samples/lulesh/lulesh_sample1_unix_flux.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ study:
nodes: 1
procs: 1
cores per task: 1
use_broker: True
nested: True
priority: high
walltime: "00:02:00"

global.parameters:
Expand Down

0 comments on commit aaa7eff

Please sign in to comment.