Skip to content

Commit

Permalink
Additional flexibility to walltime parsing for Flux (#369)
Browse files Browse the repository at this point in the history
* Tweaks to make walltime more flexible.

* Correction to walltime check in Flux backend.

* Additional tweaks for robustness for convert walltime.

* Allow more types for walltime to be specified.

* Reintroduce conversion call.
  • Loading branch information
Francesco Di Natale committed Jul 24, 2021
1 parent 8665f9f commit dc7bf7a
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 22 deletions.
7 changes: 2 additions & 5 deletions maestrowf/interfaces/script/_flux/flux0_18_0.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from datetime import datetime
import errno
import logging
from math import ceil
Expand Down Expand Up @@ -96,10 +95,8 @@ def submit(
jobspec.cwd = cwd
jobspec.environment = dict(os.environ)

if walltime and walltime != "inf":
seconds = datetime.strptime(walltime, "%H:%M:%S")
seconds = seconds - datetime(1900, 1, 1)
jobspec.duration = seconds
if walltime > 0:
jobspec.duration = walltime

jobspec.stdout = f"{job_name}.{{{{id}}}}.out"
jobspec.stderr = f"{job_name}.{{{{id}}}}.err"
Expand Down
7 changes: 2 additions & 5 deletions maestrowf/interfaces/script/_flux/flux0_26_0.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from datetime import datetime
import logging
from math import ceil
import os
Expand Down Expand Up @@ -59,10 +58,8 @@ def submit(
jobspec.cwd = cwd
jobspec.environment = dict(os.environ)

if walltime and walltime != "inf":
seconds = datetime.strptime(walltime, "%H:%M:%S")
seconds = seconds - datetime(1900, 1, 1)
jobspec.duration = seconds
if walltime > 0:
jobspec.duration = walltime

jobspec.stdout = f"{job_name}.{{{{id}}}}.out"
jobspec.stderr = f"{job_name}.{{{{id}}}}.err"
Expand Down
31 changes: 20 additions & 11 deletions maestrowf/interfaces/script/fluxscriptadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
###############################################################################

"""Flux Scheduler interface implementation."""
from datetime import datetime
import logging
from math import ceil
import os
Expand Down Expand Up @@ -106,14 +105,24 @@ def extension(self):
return self._extension

def _convert_walltime_to_seconds(self, walltime):
if not walltime:
LOGGER.debug("Encountered inf walltime!")
return "inf"
# Convert walltime to seconds.
LOGGER.debug("Converting %s to seconds...", walltime)
wt = \
(datetime.strptime(walltime, "%H:%M:%S") - datetime(1900, 1, 1))
return int(wt.total_seconds())
if isinstance(walltime, int) or isinstance(walltime, float):
LOGGER.debug("Encountered numeric walltime = %s", str(walltime))
return int(float(walltime) * 60.0)
elif ":" in walltime:
# Convert walltime to seconds.
LOGGER.debug("Converting %s to seconds...", walltime)
seconds = 0.0
for i, value in enumerate(walltime.split(":")[::-1]):
seconds += float(value) * (60.0 ** i)
return seconds
elif not walltime:
return 0
else:
msg = \
f"Walltime value '{walltime}' is not an integer or colon-" \
f"separated string."
LOGGER.error(msg)
raise ValueError(msg)

def get_header(self, step):
"""
Expand Down Expand Up @@ -169,11 +178,11 @@ def submit(self, step, path, cwd, job_map=None, env=None):
:returns: The return status of the submission command and job
identiifer.
"""
# walltime = self._convert_walltime_to_seconds(step.run["walltime"])
nodes = step.run.get("nodes")
processors = step.run.get("procs", 0)
force_broker = step.run.get("use_broker", True)
walltime = step.run.get("walltime", "inf")
walltime = \
self._convert_walltime_to_seconds(step.run.get("walltime", 0))

# Compute cores per task
cores_per_task = step.run.get("cores per task", None)
Expand Down
7 changes: 6 additions & 1 deletion maestrowf/specification/schemas/yamlspecification.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@
{"type": "integer", "minimum": 1},
{"type": "string", "pattern": "^\\$\\(\\w+\\)$"}
]},
"walltime": {"type": "string", "minLength": 1},
"walltime": {
"anyOf": [
{"type": "string", "minLength": 1},
{"type": "integer", "minimum": 0},
{"type": "string", "pattern": "^\\$\\(\\w+\\)$"}
]},
"reservation": {"type": "string", "minLength": 1},
"exclusive": {
"anyOf": [
Expand Down

0 comments on commit dc7bf7a

Please sign in to comment.