Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait_finished method for job API (regarding #240) #242

Merged
merged 9 commits into from
Sep 9, 2022
86 changes: 69 additions & 17 deletions pyslurm/pyslurm.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ cdef class config:
Ctl_dict['cpu_freq_govs'] = self.__Config_ptr.cpu_freq_govs
Ctl_dict['cred_type'] = slurm.stringOrNone(self.__Config_ptr.cred_type, '')
Ctl_dict['debug_flags'] = self.__Config_ptr.debug_flags
Ctl_dict['def_mem_per_cp'] = self.__Config_ptr.def_mem_per_cpu
Ctl_dict['def_mem_per_cpu'] = self.__Config_ptr.def_mem_per_cpu
Ctl_dict['dependency_params'] = slurm.stringOrNone(self.__Config_ptr.dependency_params, '')
Ctl_dict['eio_timeout'] = self.__Config_ptr.eio_timeout
Ctl_dict['enforce_part_limits'] = bool(self.__Config_ptr.enforce_part_limits)
Expand Down Expand Up @@ -1027,16 +1027,16 @@ cdef class partition:

if record.def_mem_per_cpu & slurm.MEM_PER_CPU:
if record.def_mem_per_cpu == slurm.MEM_PER_CPU:
Part_dict['def_mem_per_cp'] = "UNLIMITED"
Part_dict['def_mem_per_cpu'] = "UNLIMITED"
Part_dict['def_mem_per_node'] = None
else:
Part_dict['def_mem_per_cp'] = record.def_mem_per_cpu & (~slurm.MEM_PER_CPU)
Part_dict['def_mem_per_cpu'] = record.def_mem_per_cpu & (~slurm.MEM_PER_CPU)
Part_dict['def_mem_per_node'] = None
elif record.def_mem_per_cpu == 0:
Part_dict['def_mem_per_cp'] = None
Part_dict['def_mem_per_cpu'] = None
Part_dict['def_mem_per_node'] = "UNLIMITED"
else:
Part_dict['def_mem_per_cp'] = None
Part_dict['def_mem_per_cpu'] = None
Part_dict['def_mem_per_node'] = record.def_mem_per_cpu

if record.default_time == slurm.INFINITE:
Expand Down Expand Up @@ -1778,35 +1778,55 @@ cdef class job:

return retList

def find_id(self, jobid):
"""Retrieve job ID data.
cdef _load_single_job(self, jobid):
"""
Uses slurm_load_job to setup the self._job_ptr for a single job given by the jobid.
After calling this, the job pointer can be used in other methods
to operate on the informations of the job.

This method accepts both string and integer formats of the jobid. It
calls slurm_xlate_job_id() to convert the jobid appropriately.
This works for single jobs and job arrays.
This method accepts both string and integer formate of the jobid. It
calls slurm_xlate_job_id to convert the jobid appropriately.

:param str jobid: Job id key string to search
:returns: List of dictionary of values for given job id
:rtype: `list`
Raises an value error if the jobid does not correspond to a existing job.

:param str jobid: The jobid
:returns: void
:rtype: None.
"""
cdef:
int apiError
int rc

# jobid can be given as int or string
if isinstance(jobid, int) or isinstance(jobid, long):
jobid = str(jobid).encode("UTF-8")
else:
jobid = jobid.encode("UTF-8")

# convert jobid appropriately for slurm
jobid_xlate = slurm.slurm_xlate_job_id(jobid)

# load the job which sets the self._job_ptr pointer
rc = slurm.slurm_load_job(&self._job_ptr, jobid_xlate, self._ShowFlags)

if rc == slurm.SLURM_SUCCESS:
return list(self.get_job_ptr().values())
else:
if rc != slurm.SLURM_SUCCESS:
apiError = slurm.slurm_get_errno()
raise ValueError(slurm.stringOrNone(slurm.slurm_strerror(apiError), ''), apiError)

def find_id(self, jobid):
"""Retrieve job ID data.

This method accepts both string and integer formats of the jobid.
This works for single jobs and job arrays. It uses the internal
helper _load_single_job to do slurm_load_job. If the job corresponding
to the jobid does not exist, a ValueError will be raised.

:param str jobid: Job id key string to search
:returns: List of dictionary of values for given job id
:rtype: `list`
"""
self._load_single_job(jobid)
return list(self.get_job_ptr().values())

def find_user(self, user):
"""Retrieve a user's job data.

Expand Down Expand Up @@ -2883,6 +2903,38 @@ cdef class job:
#return "Submitted batch job %s" % job_id
return job_id

def wait_finished(self, jobid):
"""
Block until the job given by the jobid finishes.
This works for single jobs, as well as job arrays.
:param jobid: The job id of the slurm job.
To reference a job with job array set, use the first/"master" jobid
(the same as given by squeue)
:returns: The exit code of the slurm job.
:rtype: `int`
"""
exit_status = -9999
complete = False
while not complete:
complete = True
p_time.sleep(5)
self._load_single_job(jobid)
for i in range(0, self._job_ptr.record_count):
self._record = &self._job_ptr.job_array[i]
if IS_JOB_COMPLETED(self._job_ptr.job_array[i]):
exit_status_arrayjob = None
if WIFEXITED(self._record.exit_code):
exit_status_arrayjob = WEXITSTATUS(self._record.exit_code)
else:
exit_status_arrayjob = 1
# set exit code to the highest of all jobs in job array
exit_status = max([exit_status, exit_status_arrayjob])
else:
# go on with the next interation, unil all jobs in array are completed
complete = False
slurm.slurm_free_job_info_msg(self._job_ptr)
return exit_status


def slurm_pid2jobid(uint32_t JobPID=0):
"""Get the slurm job id from a process id.
Expand Down
76 changes: 76 additions & 0 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,79 @@ def test_job_kill():
# time.sleep(3)
# test_job_search_after = pyslurm.job().find_id(test_job_id)[0]
# assert_equals(test_job_search_after.get("job_state"), "FAILED")


def test_job_wait_finished():
"""Job: Test job().wait_finished()."""
test_job = {
"wrap": "sleep 30",
"job_name": "pyslurm_test_job",
"ntasks": 1,
"cpus_per_task": 1,
}
test_job_id = pyslurm.job().submit_batch_job(test_job)
start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]

# wait for the job to finish
exit_code = pyslurm.job().wait_finished(test_job_id)

end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
assert start_job_state != "COMPLETED"
assert end_job_state == "COMPLETED"
assert exit_code == 0

# test again with another wrap
test_job = {
"wrap": "sleep 300; exit 1", # "exit 1" should yield failure ending
"job_name": "pyslurm_test_job",
"ntasks": 1,
"cpus_per_task": 1,
}
test_job_id = pyslurm.job().submit_batch_job(test_job)
start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]

# wait for the job to finish
exit_code = pyslurm.job().wait_finished(test_job_id)

end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
assert start_job_state != "COMPLETED"
assert end_job_state == "FAILED"
assert exit_code == 1


def test_job_wait_finished_w_arrays():
"""Job: Test job().wait_finished() with job arrays."""
test_job = {
"wrap": "sleep 30; exit 0",
"job_name": "pyslurm_array_test_job",
"ntasks": 1,
"cpus_per_task": 1,
"array_inx": "0,1,2",
}
test_job_id = pyslurm.job().submit_batch_job(test_job)
start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
# wait for the job to finish
exit_code = pyslurm.job().wait_finished(test_job_id)
end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
assert start_job_state != "COMPLETED"
assert end_job_state == "COMPLETED"
assert exit_code == 0

# test for exit codes: maximum exit code of all array jobs
test_job = {
# use array ID as exit code to yield different exit codes: 0, 1, 2
"wrap": "sleep 30; exit $SLURM_ARRAY_TASK_ID",
"job_name": "pyslurm_array_test_job",
"ntasks": 1,
"cpus_per_task": 1,
"array_inx": "0,1,2",
}
test_job_id = pyslurm.job().submit_batch_job(test_job)
start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
# wait for the job to finish
exit_code = pyslurm.job().wait_finished(test_job_id)
end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
assert start_job_state != "COMPLETED"
# exit code 2 (the maximum of all) should yield FAILED for the entire job
assert end_job_state == "FAILED"
assert exit_code == 2