Skip to content

Commit

Permalink
Addition of reservation to bsub headers. (#367)
Browse files Browse the repository at this point in the history
  • Loading branch information
Francesco Di Natale committed Jul 14, 2021
1 parent 60e007a commit 8665f9f
Showing 1 changed file with 18 additions and 14 deletions.
32 changes: 18 additions & 14 deletions maestrowf/interfaces/script/lsfscriptadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,18 @@ def __init__(self, **kwargs):
self.add_batch_parameter("bank", kwargs.pop("bank"))
self.add_batch_parameter("queue", kwargs.pop("queue"))
self.add_batch_parameter("nodes", kwargs.pop("nodes", "1"))
reservation = kwargs.get("reservation", None)
if reservation:
self.add_batch_parameter("reservation", reservation)

self._exec = "#!/bin/bash"
self._header = {
"nodes": "#BSUB -nnodes {nodes}",
"queue": "#BSUB -q {queue}",
"bank": "#BSUB -G {bank}",
"walltime": "#BSUB -W {walltime}",
"job-name": "#BSUB -J {job-name}",
"output": "#BSUB -o {output}",
"reservation": "#BSUB -U {reservation}",
"error": "#BSUB -e {error}",
}

Expand All @@ -95,7 +98,7 @@ def __init__(self, **kwargs):
"reservation": "-J",
}

self._extension = ".lsf.sh"
self._extension = "lsf.sh"

def get_header(self, step):
"""
Expand All @@ -105,16 +108,23 @@ def get_header(self, step):
:returns: A string of the header based on internal batch parameters and
the parameter step.
"""
run = dict(step.run)
batch_header = dict(self._batch)
batch_header["nodes"] = run.pop("nodes", self._batch["nodes"])
batch_header["nodes"] = step.run.get("nodes", self._batch["nodes"])
batch_header["job-name"] = step.name.replace(" ", "_")
batch_header["output"] = "{}.%J.out".format(batch_header["job-name"])
batch_header["error"] = "{}.%J.err".format(batch_header["job-name"])

# Updte the batch header with the values from the step's resources
batch_header.update(
{
resource: value for (resource, value) in step.run.items()
if value
}
)

# LSF requires an hour and minutes format. We need to attempt to split
# and correct if we get something that's coming in as HH:MM:SS
walltime = run.pop("walltime")
walltime = step.run.get("walltime")
wt_split = walltime.split(":")
if len(wt_split) == 3:
# If wall time is specified in three parts, we'll just calculate
Expand All @@ -128,9 +138,10 @@ def get_header(self, step):

batch_header["walltime"] = walltime

modified_header = [self._exec]
modified_header = ["#!{}".format(self._exec)]
for key, value in self._header.items():
modified_header.append(value.format(**batch_header))
if key in batch_header:
modified_header.append(value.format(**batch_header))

return "\n".join(modified_header)

Expand Down Expand Up @@ -187,13 +198,6 @@ def submit(self, step, path, cwd, job_map=None, env=None):
identiifer.
"""
args = ["bsub"]

if "reservation" in self._batch:
args += [
"-U",
self._batch["reservation"]
]

args += ["-cwd", cwd, "<", path]
cmd = " ".join(args)
LOGGER.debug("cwd = %s", cwd)
Expand Down

0 comments on commit 8665f9f

Please sign in to comment.