Skip to content

Commit

Permalink
Bugfix/slurm job checking (#405)
Browse files Browse the repository at this point in the history
  • Loading branch information
jwhite242 committed Aug 12, 2022
1 parent 06fd2f2 commit db2cb0f
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 36 deletions.
2 changes: 1 addition & 1 deletion maestrowf/abstracts/interfaces/schedulerscriptadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def _substitute_parallel_command(self, step_cmd, **kwargs):
LOGGER.error(msg)
raise ValueError(msg)

if total_nodes > nodes:
if nodes and total_nodes > nodes:
msg = "Total nodes ({}) requested exceeds the " \
"maximum requested ({})".format(total_nodes, nodes)
LOGGER.error(msg)
Expand Down
247 changes: 212 additions & 35 deletions maestrowf/interfaces/script/slurmscriptadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,47 +245,53 @@ def submit(self, step, path, cwd, job_map=None, env=None):
"Submission returned an error (see next line).\n%s", err)
return SubmissionRecord(SubmissionCode.ERROR, retcode)

def check_jobs(self, joblist):
def _check_jobs_squeue(self, joblist, status):
"""
For the given job list, query execution status.
This method uses the scontrol show job <jobid> command and does a
This method uses squeue command to query the scheduler and does a
regex search for job information.
:param joblist: A list of job identifiers to be queried.
:returns: The return code of the status query, and a dictionary of job
identifiers to their status.
:param status: Dictionary of jobid:job status to fill out
:returns: The return code of the status query, status dictionary
"""
# TODO: This method needs to be updated to use sacct.
# squeue options:
# -u = username to search queues for.
# -t = list of job states to search for. 'all' for all states.

# The squeue command output is split with the following indices
# used for specific information:
# 0 - Job Identifier
# 1 - Queue
# 2 - Job name
# 3 - User
# 4 - State [Passed to _state]
# 5 - Current Execution Time
# 6 - Assigned Node Count
# 7 - Hostname and assigned node identifier list

cmd = "squeue -u $USER -t all"

# Indices of needed columns in squeue output
data_row_offset = 1 # Just header, no header/row separator
state_index = 4
jobid_index = 0

# status = {}
# for jobid in joblist:
# LOGGER.debug("Looking for jobid %s", jobid)
# status[jobid] = None

p = start_process(cmd)
output, err = p.communicate()
retcode = p.wait()

status = {}
for jobid in joblist:
LOGGER.debug("Looking for jobid %s", jobid)
status[jobid] = None

if retcode == 0:
for job in output.split("\n")[1:]:
if retcode == 0: # Successfully checked scheduler, parse output
for job in output.split("\n")[data_row_offset:]:
LOGGER.debug("Job Entry: %s", job)
# The squeue command output is split with the following indices
# used for specific information:
# 0 - Job Identifier
# 1 - Queue
# 2 - Job name
# 3 - User
# 4 - State [Passed to _state]
# 5 - Current Execution Time
# 6 - Assigned Node Count
# 7 - Hostname and assigned node identifier list

job_split = re.split(r"\s+", job)
state_index = 4
jobid_index = 0

# Check for blank entry in first column
if job_split[0] == "":
LOGGER.debug("Removing blank entry from head of status.")
job_split = job_split[1:]
Expand All @@ -296,21 +302,189 @@ def check_jobs(self, joblist):
continue

if job_split[jobid_index] in status:
LOGGER.debug("ID Found. %s -- %s", job_split[state_index],
LOGGER.debug("ID Found. %s -- %s",
job_split[state_index],
self._state(job_split[state_index]))

status[job_split[jobid_index]] = \
self._state(job_split[state_index])

if any([jstatus is None for _, jstatus in status.items()]):
missing_jobids = [jobid for jobid, jstatus in status.items()
if jstatus is None]
LOGGER.debug(
"Lost track of Job Entries using 'squeue': %s",
', '.join([str(jobid) for jobid in missing_jobids]))

return JobStatusCode.OK, status

elif retcode == 1:
LOGGER.warning("User '%s' has no jobs executing. Returning.",
getpass.getuser())
return JobStatusCode.NOJOBS, status

elif retcode == 127:
LOGGER.warning("Could not find 'squeue' command. Returning."),
return JobStatusCode.ERROR, status

else:
LOGGER.error("Error code '%s' seen. Unexpected behavior "
"encountered.")
return JobStatusCode.ERROR, status

def _check_jobs_sacct(self, joblist, status):
"""
For the given job list, query execution status.
This method uses the sacct -j=<jobid> command and does a
regex search for job information.
:param joblist: A list of job identifiers to be queried.
:param status: Dictionary of jobid:jobstate for job status
:returns: The return code of the status query, and a dictionary of job
identifiers to their status.
.. note:: slurm versions > 21.08 enable json/yaml output options
.. note:: While more robust than squeue, testing reveals this
cmd is not always available to users
"""
cmd = "sacct -u $USER --jobs={}"
# Note: can add similar columns as squeue defaults to if needed
# sacct -u $USER --jobs=jobid1,jobid2,jobid3 \
# --format=jobid,partition,jobname,user,state,time,nnodes,\
# nodelist,reason
# NOTE: --jobs works different from querying without fixed list ->
# not specifying this requires manual specification of time frames
# and could be error prone when resuming studies some time later

# columns exposed in sacct
# 1 - JobID (includes entries for job steps too: jobid.step)
# 2 - JobName (includes job step names
# 3 - Partition
# 4 - Account
# 5 - AllocCPUs
# 6 - State
# 7 - ExitCode

# First two rows define columns and then header separators '----'
data_row_offset = 2
state_index = 5
jobid_index = 0

cmd = cmd.format(','.join(joblist))
p = start_process(cmd)
output, err = p.communicate()
retcode = p.wait()

for jobid in joblist:
LOGGER.debug("Looking for jobid %s with sacct", jobid)
status[jobid] = None

if retcode == 0:
for job in output.split("\n")[data_row_offset:]:
LOGGER.debug("Job Entry: %s", job)
job_split = re.split(r"\s+", job)

LOGGER.debug("Entry split: %s", job_split)
if not job_split:
LOGGER.debug("Continuing...")
continue

if job_split[jobid_index] in status:
LOGGER.debug("ID Found. %s -- %s", job_split[state_index],
self._state(job_split[state_index]))
status[job_split[jobid_index]] = \
self._state(job_split[state_index])

if any([jstatus is None for _, jstatus in status.items()]):
missing_jobids = [jobid for jobid, jstatus in status.items()
if jstatus is None]
LOGGER.debug(
"Lost track of Job Entries using 'sacct': %s",
', '.join([str(jobid) for jobid in missing_jobids])
)

return JobStatusCode.OK, status

elif retcode == 1:
# NOTE: can this actually happen with sacct?
LOGGER.warning("Could not find user '%s's jobs: %s. Returning.",
[jobid for jobid, jstatus in status.items()
if jstatus is None],
getpass.getuser(),
)
return JobStatusCode.NOJOBS, status

elif retcode == 127:
LOGGER.warning("Could not find 'sacct' command. Returning."),
return JobStatusCode.ERROR, status

else:
LOGGER.error("Error code '%s' seen. Unexpected behavior "
"encountered.")
return JobStatusCode.ERROR, status

def check_jobs(self, joblist):
"""
For the given job list, query execution status.
This method uses the scontrol show job <jobid> command and does a
regex search for job information.
:param joblist: A list of job identifiers to be queried.
:returns: The return code of the status query, and a dictionary of job
identifiers to their status.
"""
status = {}
for jobid in joblist:
LOGGER.debug("Looking for jobid %s", jobid)
status[jobid] = None

job_status_codes = []
job_status_code, status = self._check_jobs_squeue(joblist, status)

job_status_codes.append(job_status_code)

# Fallback -> check with sacct if squeue can't find it
if any([jstatus is None for _, jstatus in status.items()]):
missing_jobids = [jobid for jobid, jstatus in status.items()
if jstatus is None]

job_status_code, status = self._check_jobs_sacct(missing_jobids,
status)

job_status_codes.append(job_status_code)

# Check for any jobs still missing and mark them as lost
if any([jstatus is None for _, jstatus in status.items()]):
missing_jobids = [jobid for jobid, jstatus in status.items()
if jstatus is None]
LOGGER.debug("Temporarily lost track of Job Entries: %s",
', '.join([str(jobid) for jobid in missing_jobids]))

# for jobid in missing_jobids:
# status[jobid] = State.LOST

# Possible status codes:
# OK -> checking status worked
# NOJOBS -> checking status worked, no jobs found
# ERROR -> job check cmd not found, or unknown error
# Second one will override, so any OK value will win out, and error
# only if both are errors

# is_status_ok = [status_code == JobStatusCode.OK
# for status_code in job_status_codes]
if any([status_code == JobStatusCode.OK
for status_code in job_status_codes]):
return JobStatusCode.OK, status

elif all([status_code == JobStatusCode.NOJOBS
for status_code in job_status_codes]):
return JobStatusCode.NOJOBS, status
# elif all([status_code == JobStatusCode.ERROR
# for status_code in job_status_code]):
# return JobStatusCode.ERROR, status
else:
return JobStatusCode.ERROR, status

def cancel_jobs(self, joblist):
"""
For the given job list, cancel each job.
Expand Down Expand Up @@ -344,21 +518,24 @@ def _state(self, slurm_state):
:returns: A Study.State enum corresponding to parameter job_state.
"""
LOGGER.debug("Received SLURM State -- %s", slurm_state)
if slurm_state == "R":
if slurm_state == "R" or slurm_state == "RUNNING":
return State.RUNNING
elif slurm_state == "PD":
elif slurm_state == "PD" or slurm_state == "PENDING":
return State.PENDING
elif slurm_state == "CG":
elif slurm_state == "CG" or slurm_state == "COMPLETING":
# NOTE: this doesn't appear to show up with sacct, so maybe remove?
return State.FINISHING
elif slurm_state == "CD":
elif slurm_state == "CD" or slurm_state == "COMPLETED":
return State.FINISHED
elif slurm_state == "NF":
elif slurm_state == "NF" or slurm_state == "NODE_FAIL":
return State.HWFAILURE
elif slurm_state == "TO":
elif slurm_state == "TO" or slurm_state == "TIMEOUT":
return State.TIMEDOUT
elif slurm_state == "ST" or slurm_state == "F":
elif (slurm_state == "ST" or
slurm_state == "F" or
slurm_state == "FAILED"):
return State.FAILED
elif slurm_state == "CA":
elif slurm_state == "CA" or slurm_state == "CANCELLED":
return State.CANCELLED
else:
return State.UNKNOWN
Expand Down
39 changes: 39 additions & 0 deletions samples/hello_world/hello_bye_parameterized_slurm.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
description:
name: hello_bye_world
description: A study that says hello and bye to multiple people.

batch:
type : slurm
host : rzgenie
bank : wbronze
queue : pdebug

env:
variables:
OUTPUT_PATH: ./sample_output/hello_world
labels:
OUT_FORMAT: $(GREETING)_$(NAME).txt

study:
- name: hello_world
description: Say hello to someone!
run:
cmd: |
$(LAUNCHER) echo "$(GREETING), $(NAME)!" > $(OUT_FORMAT)
procs: 1

- name: bye_world
description: Say bye to someone!
run:
cmd: |
$(LAUNCHER)[1p] echo "Bye, World!" > bye.txt
procs: 1
depends: [hello_world]

global.parameters:
NAME:
values: [Pam, Jim, Michael, Dwight]
label: NAME.%%
GREETING:
values: [Hello, Ciao, Hey, Hi]
label: GREETING.%%

0 comments on commit db2cb0f

Please sign in to comment.