Skip to content

Commit

Permalink
Flux for Spectrum bugfixes (#116)
Browse files Browse the repository at this point in the history
* Additional arguments can be passed through batch.

* Correction of the use of extend to append.

* Removal of OMPI vars from env.

* Reversal of env altering.

* Addition of mpi exe to batch.

* Removal of -gpu flag

* Addition of "allocated" to PENDING set.

* Correction of the check_status method call.

* Correction of the Flux import

* Addition of the EnvironmentError to try/catch for check_status.

* Addition of jobid to submission INFO logging.
  • Loading branch information
FrankD412 committed Jul 4, 2018
1 parent bc47d46 commit 1122fb2
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions maestrowf/interfaces/script/fluxscriptadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,13 @@ def __init__(self, **kwargs):
# NOTE: These libraries are compiled at runtime when an allocation
# is spun up.
self.flux = __import__("flux")
self.kvs = __import__("flux.kvs")
self.kvs = __import__("flux.kvs", globals(), locals(), ["kvs"])

# NOTE: Host doesn"t seem to matter for FLUX. sbatch assumes that the
# current host is where submission occurs.
self.add_batch_parameter("nodes", kwargs.pop("nodes", "1"))
self._mpi_exe = kwargs.pop("mpi")
self._addl_args = kwargs.pop("args", [])

self._exec = "#!/bin/bash"
# Header is only for informational purposes.
Expand Down Expand Up @@ -162,10 +164,11 @@ def get_parallelize_command(self, procs, nodes=None, **kwargs):
"-u", "PMI_FD",
"-u", "PMI_RANK",
"-u", "PMI_SIZE",
"mpirun",
"-gpu",
"-mca", "plm", "rsh",
"--map-by", "node"]
self._mpi_exe]

for item in self._addl_args:
args.append(item)

args.extend(["-hostfile", "$HOSTF"])
args.extend([
"-n",
Expand Down Expand Up @@ -259,7 +262,8 @@ def submit(self, step, path, cwd, job_map=None, env=None):
LOGGER.warning("Job creation failed")
return SubmissionCode.ERROR, -1

LOGGER.info("Submission returned status OK.")
LOGGER.info("Submission returned status OK. -- "
"Assigned identifier (%s)", resp["jobid"])
return SubmissionCode.OK, resp["jobid"]

def check_jobs(self, joblist):
Expand Down Expand Up @@ -336,7 +340,13 @@ def check_jobs(self, joblist):
"Error seen on path {} Unexpected behavior encountered."
.format(path)
)
# NOTE: I don't know if we should actually be returning here.
# It feels like we may not want to.
return JobStatusCode.ERROR, status
except EnvironmentError:
LOGGER.warning("Job ID (%s) not found in kvs. Setting state"
"to UNKNOWN.", jobid)
status[jobid] = self._state("unknown")

if not status:
return JobStatusCode.NOJOBS, status
Expand Down Expand Up @@ -371,7 +381,7 @@ def cancel_jobs(self, joblist):
)

if retcode != 0:
status = self.check_jobs([job])
retcode, status = self.check_jobs([job])
if status and status.get(job, None) in term_status:
retcode = 0

Expand All @@ -393,14 +403,16 @@ def _state(self, flux_state):
return State.RUNNING
elif flux_state == "pending" or flux_state == "runrequest":
return State.PENDING
elif flux_state == "submitted":
elif flux_state == "submitted" or flux_state == "allocated":
return State.PENDING
elif flux_state == "failed":
return State.FAILED
elif flux_state == "cancelled" or flux_state == "killed":
return State.CANCELLED
elif flux_state == "complete":
return State.FINISHED
elif flux_state == "unknown":
return State.UNKNOWN
else:
return State.UNKNOWN

Expand Down

0 comments on commit 1122fb2

Please sign in to comment.