Skip to content

Commit

Permalink
Align implementations of the PBS and the Torque scheduler regarding t…
Browse files Browse the repository at this point in the history
…he use of `qstat`
  • Loading branch information
ekouts committed Nov 20, 2020
1 parent 8b6a23f commit 330c7b4
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 125 deletions.
117 changes: 102 additions & 15 deletions reframe/core/schedulers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@
_run_strict = functools.partial(osext.run_command, check=True)


JOB_STATES = {
'Q': 'QUEUED',
'H': 'HELD',
'R': 'RUNNING',
'E': 'EXITING',
'T': 'MOVED',
'W': 'WAITING',
'S': 'SUSPENDED',
'C': 'COMPLETED',
}


class _PbsJob(sched.Job):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -156,24 +168,99 @@ def finished(self, job):

return job.completed

def _poll_job(self, job):
if job is None:
def _update_nodelist(self, job, nodespec):
if job.nodelist is not None:
return

with osext.change_dir(job.workdir):
output_ready = (os.path.exists(job.stdout) and
os.path.exists(job.stderr))
job._nodelist = [x.split('/')[0] for x in nodespec.split('+')]
job._nodelist.sort()

done = job.cancelled or output_ready
if done:
t_now = time.time()
if job.completion_time is None:
job._completion_time = t_now
def poll(self, *jobs):
if jobs:
# Filter out non-jobs
jobs = [job for job in jobs if job is not None]

time_from_finish = t_now - job.completion_time
if time_from_finish > PBS_OUTPUT_WRITEBACK_WAIT:
job._completed = True
if not jobs:
return

completed = osext.run_command(
f'qstat -f {" ".join(job.jobid for job in jobs)}'
)

# Depending on the configuration, completed jobs will remain on the job
# list for a limited time, or be removed upon completion.
# If qstat cannot find any of the job IDs, it will return 153.
# Otherwise, it will return with return code 0 and print information
# only for the jobs it could find.
if completed.returncode == 153:
self.log('Return code is 153: jobids not known by scheduler, '
'assuming all jobs completed')
for job in jobs:
job._state = 'COMPLETED'

return

if completed.returncode != 0:
raise JobSchedulerError(
f'qstat failed with exit code {completed.returncode} '
f'(standard error follows):\n{completed.stderr}'
)

# Store information for each job separately
jobinfo = {}
for job_raw_info in completed.stdout.split('\n\n'):
jobid_match = re.search(
r'^Job Id:\s*(?P<jobid>\S+)', job_raw_info, re.MULTILINE
)
if jobid_match:
jobid = jobid_match.group('jobid')
jobinfo[jobid] = job_raw_info

def poll(self, *jobs):
for job in jobs:
self._poll_job(job)
if job.jobid not in jobinfo:
self.log(f'Job {job.jobid} not known to scheduler, '
f'assuming job completed')
job._state = 'COMPLETED'
job._completed = True
continue

info = jobinfo[job.jobid]
state_match = re.search(
r'^\s*job_state = (?P<state>[A-Z])', info, re.MULTILINE
)
if not state_match:
self.log(f'Job state not found (job info follows):\n{info}')
continue

state = state_match.group('state')
job._state = JOB_STATES[state]
nodelist_match = re.search(
r'exec_host = (?P<nodespec>[\S\t\n]+)',
info, re.MULTILINE
)
if nodelist_match:
nodespec = nodelist_match.group('nodespec')
nodespec = re.sub(r'[\n\t]*', '', nodespec)
self._update_nodelist(job, nodespec)

if job.state == 'COMPLETED':
exitcode_match = re.search(
r'^\s*exit_status = (?P<code>\d+)',
info, re.MULTILINE,
)
if exitcode_match:
job._exitcode = int(exitcode_match.group('code'))

# We report a job as finished only when its stdout/stderr are
# written back to the working directory
stdout = os.path.join(job.workdir, job.stdout)
stderr = os.path.join(job.workdir, job.stderr)
out_ready = os.path.exists(stdout) and os.path.exists(stderr)
done = job.cancelled or out_ready
if done:
job._completed = True
elif (job.state in ['QUEUED', 'HELD', 'WAITING'] and
job.max_pending_time):
if (time.time() - job.submit_time >= job.max_pending_time):
self.cancel(job)
job._exception = JobError('maximum pending time exceeded')
109 changes: 0 additions & 109 deletions reframe/core/schedulers/torque.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,115 +18,6 @@
from reframe.core.schedulers.pbs import PbsJobScheduler, _run_strict


JOB_STATES = {
'Q': 'QUEUED',
'H': 'HELD',
'R': 'RUNNING',
'E': 'EXITING',
'T': 'MOVED',
'W': 'WAITING',
'S': 'SUSPENDED',
'C': 'COMPLETED',
}


@register_scheduler('torque')
class TorqueJobScheduler(PbsJobScheduler):
TASKS_OPT = '-l nodes={num_nodes}:ppn={num_cpus_per_node}'

def _update_nodelist(self, job, nodespec):
if job.nodelist is not None:
return

job._nodelist = [x.split('/')[0] for x in nodespec.split('+')]
job._nodelist.sort()

def poll(self, *jobs):
if jobs:
# Filter out non-jobs
jobs = [job for job in jobs if job is not None]

if not jobs:
return

completed = osext.run_command(
f'qstat -f {" ".join(job.jobid for job in jobs)}'
)

# Depending on the configuration, completed jobs will remain on the job
# list for a limited time, or be removed upon completion.
# If qstat cannot find any of the job IDs, it will return 153.
# Otherwise, it will return with return code 0 and print information
# only for the jobs it could find.
if completed.returncode == 153:
self.log('Return code is 153: jobids not known by scheduler, '
'assuming all jobs completed')
for job in jobs:
job._state = 'COMPLETED'

return

if completed.returncode != 0:
raise JobSchedulerError(
f'qstat failed with exit code {completed.returncode} '
f'(standard error follows):\n{completed.stderr}'
)

# Store information for each job separately
jobinfo = {}
for job_raw_info in completed.stdout.split('\n\n'):
jobid_match = re.search(
r'^Job Id:\s*(?P<jobid>\S+)', job_raw_info, re.MULTILINE
)
if jobid_match:
jobid = jobid_match.group('jobid')
jobinfo[jobid] = job_raw_info

for job in jobs:
if job.jobid not in jobinfo:
self.log(f'Job {job.jobid} not known to scheduler, '
f'assuming job completed')
job._state = 'COMPLETED'
job._completed = True
continue

info = jobinfo[job.jobid]
state_match = re.search(
r'^\s*job_state = (?P<state>[A-Z])', info, re.MULTILINE
)
if not state_match:
self.log(f'Job state not found (job info follows):\n{info}')
continue

state = state_match.group('state')
job._state = JOB_STATES[state]
nodelist_match = re.search(
r'exec_host = (?P<nodespec>[\S\t\n]+)',
info, re.MULTILINE
)
if nodelist_match:
nodespec = nodelist_match.group('nodespec')
nodespec = re.sub(r'[\n\t]*', '', nodespec)
self._update_nodelist(job, nodespec)

if job.state == 'COMPLETED':
exitcode_match = re.search(
r'^\s*exit_status = (?P<code>\d+)',
info, re.MULTILINE,
)
if exitcode_match:
job._exitcode = int(exitcode_match.group('code'))

# We report a job as finished only when its stdout/stderr are
# written back to the working directory
stdout = os.path.join(job.workdir, job.stdout)
stderr = os.path.join(job.workdir, job.stderr)
out_ready = os.path.exists(stdout) and os.path.exists(stderr)
done = job.cancelled or out_ready
if done:
job._completed = True
elif (job.state in ['QUEUED', 'HELD', 'WAITING'] and
job.max_pending_time):
if (time.time() - job.submit_time >= job.max_pending_time):
self.cancel(job)
job._exception = JobError('maximum pending time exceeded')
2 changes: 1 addition & 1 deletion unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def test_guess_num_tasks(minimal_job, scheduler):


def test_submit_max_pending_time(make_job, exec_ctx, scheduler):
if scheduler.registered_name in ('local', 'pbs'):
if scheduler.registered_name in ('local'):
pytest.skip(f"max_pending_time not supported by the "
f"'{scheduler.registered_name}' scheduler")

Expand Down

0 comments on commit 330c7b4

Please sign in to comment.