Skip to content

Commit

Permalink
Addition of new state names for Flux state mapping (#407)
Browse files Browse the repository at this point in the history
  • Loading branch information
FrankD412 committed Dec 1, 2022
1 parent db2cb0f commit 6d0bd1b
Showing 1 changed file with 63 additions and 34 deletions.
97 changes: 63 additions & 34 deletions maestrowf/interfaces/script/_flux/flux0_26_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@
from math import ceil
import os

from maestrowf.abstracts.enums import CancelCode, JobStatusCode, State, \
StepPriority, SubmissionCode
from maestrowf.abstracts.enums import (
CancelCode,
JobStatusCode,
State,
StepPriority,
SubmissionCode,
)
from maestrowf.abstracts.interfaces.flux import FluxInterface

LOGGER = logging.getLogger(__name__)
Expand All @@ -20,11 +25,11 @@ class FluxInterface_0260(FluxInterface):

flux_handle = None
_urgencies = {
StepPriority.HELD: 0,
StepPriority.MINIMAL: 1,
StepPriority.LOW: 9,
StepPriority.MEDIUM: 16,
StepPriority.HIGH: 24,
StepPriority.HELD: 0,
StepPriority.MINIMAL: 1,
StepPriority.LOW: 9,
StepPriority.MEDIUM: 16,
StepPriority.HIGH: 24,
StepPriority.EXPEDITE: 31,
}

Expand All @@ -43,8 +48,17 @@ def get_flux_urgency(cls, urgency) -> int:

@classmethod
def submit(
cls, nodes, procs, cores_per_task, path, cwd, walltime,
ngpus=0, job_name=None, force_broker=True, urgency=StepPriority.MEDIUM
cls,
nodes,
procs,
cores_per_task,
path,
cwd,
walltime,
ngpus=0,
job_name=None,
force_broker=True,
urgency=StepPriority.MEDIUM,
):
try:
cls.connect_to_flux()
Expand All @@ -59,20 +73,32 @@ def submit(
if force_broker:
LOGGER.debug(
"Launch under Flux sub-broker. [force_broker=%s, "
"nodes=%d]", force_broker, nodes
"nodes=%d]",
force_broker,
nodes,
)
ngpus_per_slot = int(ceil(ngpus / nodes))
jobspec = flux.job.JobspecV1.from_nest_command(
[path], num_nodes=nodes, cores_per_slot=cores_per_task,
num_slots=procs, gpus_per_slot=ngpus_per_slot)
[path],
num_nodes=nodes,
cores_per_slot=cores_per_task,
num_slots=procs,
gpus_per_slot=ngpus_per_slot,
)
else:
LOGGER.debug(
"Launch under root Flux broker. [force_broker=%s, "
"nodes=%d]", force_broker, nodes
"nodes=%d]",
force_broker,
nodes,
)
jobspec = flux.job.JobspecV1.from_command(
[path], num_tasks=procs, num_nodes=nodes,
cores_per_task=cores_per_task, gpus_per_task=ngpus)
[path],
num_tasks=procs,
num_nodes=nodes,
cores_per_task=cores_per_task,
gpus_per_task=ngpus,
)

LOGGER.debug("Handle address -- %s", hex(id(cls.flux_handle)))
if job_name:
Expand All @@ -88,23 +114,23 @@ def submit(

# Submit our job spec.
jobid = flux.job.submit(
cls.flux_handle, jobspec, waitable=True,
urgency=urgency
cls.flux_handle, jobspec, waitable=True, urgency=urgency
)
submit_status = SubmissionCode.OK
retcode = 0

LOGGER.info("Submission returned status OK. -- "
"Assigned identifier (%s)", jobid)
LOGGER.info(
"Submission returned status OK. -- "
"Assigned identifier (%s)",
jobid,
)
except ConnectionResetError as exception:
LOGGER.error(
"Submission failed -- Message (%s).", exception)
LOGGER.error("Submission failed -- Message (%s).", exception)
jobid = -1
retcode = -2
submit_status = SubmissionCode.ERROR
except Exception as exception:
LOGGER.error(
"Submission failed -- Message (%s).", exception)
LOGGER.error("Submission failed -- Message (%s).", exception)
jobid = -1
retcode = -1
submit_status = SubmissionCode.ERROR
Expand Down Expand Up @@ -148,8 +174,7 @@ def get_statuses(cls, joblist):
# all systems.
cls.connect_to_flux()

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
LOGGER.debug("Handle address -- %s", hex(id(cls.flux_handle)))

jobs_rpc = flux.job.list.JobList(cls.flux_handle, ids=joblist)

Expand Down Expand Up @@ -181,11 +206,10 @@ def cancel(cls, joblist):
# all systems.
cls.connect_to_flux()

LOGGER.debug(
"Handle address -- %s", hex(id(cls.flux_handle)))
LOGGER.debug("Handle address -- %s", hex(id(cls.flux_handle)))
LOGGER.debug(
"Attempting to cancel jobs.\nJoblist:\n%s",
"\n".join(str(j) for j in joblist)
"\n".join(str(j) for j in joblist),
)

cancel_code = CancelCode.OK
Expand All @@ -203,17 +227,22 @@ def cancel(cls, joblist):

@staticmethod
def state(state):
if state == "CD":
return State.FINISHED
elif state == "F":
return State.FAILED
if state == "D":
return State.PENDING
elif state == "S":
return State.QUEUED
elif state == "R":
return State.RUNNING
elif state == "PD":
return State.PENDING
elif state == "C":
return State.FINISHING
elif state == "CD":
return State.FINISHED
elif state == "F":
return State.FAILED
elif state == "CA":
return State.CANCELLED
elif state == "TO":
return State.TIMEDOUT
else:
LOGGER.error(f"Unhandled state: {state}")
return State.UNKNOWN

0 comments on commit 6d0bd1b

Please sign in to comment.