Skip to content

Commit

Permalink
Merge pull request #7004 from bernt-matthias/topic/univa3
Browse files Browse the repository at this point in the history
A new runner for DRMAA (currently UNIVA)
  • Loading branch information
jmchilton committed Nov 19, 2018
2 parents f968545 + 55f5235 commit 72b2781
Show file tree
Hide file tree
Showing 4 changed files with 706 additions and 39 deletions.
3 changes: 2 additions & 1 deletion lib/galaxy/dependencies/__init__.py
Expand Up @@ -89,7 +89,8 @@ def check_mysql_python(self):

def check_drmaa(self):
return ("galaxy.jobs.runners.drmaa:DRMAAJobRunner" in self.job_runners or
"galaxy.jobs.runners.slurm:SlurmJobRunner" in self.job_runners)
"galaxy.jobs.runners.slurm:SlurmJobRunner" in self.job_runners or
"galaxy.jobs.runners.drmaauniva:DRMAAUnivaJobRunner" in self.job_runners)

def check_pbs_python(self):
return "galaxy.jobs.runners.pbs:PBSJobRunner" in self.job_runners
Expand Down
106 changes: 68 additions & 38 deletions lib/galaxy/jobs/runners/drmaa.py
Expand Up @@ -24,7 +24,7 @@

log = logging.getLogger(__name__)

__all__ = ('DRMAAJobRunner', )
__all__ = ('DRMAAJobRunner',)

RETRY_EXCEPTIONS_LOWER = frozenset(['invalidjobexception', 'internalexception'])

Expand Down Expand Up @@ -68,6 +68,9 @@ def __init__(self, app, nworkers, **kwargs):
(exc.__class__.__name__, str(exc)))
from pulsar.managers.util.drmaa import DrmaaSessionFactory

# make the drmaa library also available to subclasses
self.drmaa = drmaa

# Subclasses may need access to state constants
self.drmaa_job_states = drmaa.JobState

Expand Down Expand Up @@ -227,6 +230,9 @@ def _complete_terminal_job(self, ajs, drmaa_state, **kwargs):
be overridden by subclasses to improve post-mortem and reporting of
failures.
Returns True if job was not actually terminal, None otherwise.
(Note: This function always returns None. Hence this function actually
does not determine if a job was terminal, but the implementation
in the subclasses is supposed to do this.)
"""
if drmaa_state == drmaa.JobState.FAILED:
if ajs.job_wrapper.get_state() != model.Job.states.DELETED:
Expand All @@ -241,6 +247,65 @@ def _complete_terminal_job(self, ajs, drmaa_state, **kwargs):
if ajs.job_wrapper.get_state() != model.Job.states.DELETED:
self.work_queue.put((self.finish_job, ajs))

def check_watched_item(self, ajs, new_watched):
"""
look at a single watched job, determine its state, and deal with errors
that could happen in this process. to be called from check_watched_items()
returns the state or None if exceptions occured
in the latter case the job is appended to new_watched if a
1 drmaa.InternalException,
2 drmaa.InvalidJobExceptionnot, or
3 drmaa.DrmCommunicationException occured
(which causes the job to be tested again in the next iteration of check_watched_items)
- the job is finished as errored if any other exception occurs
- the job is finished OK or errored after the maximum number of retries
depending on the exception
Note that None is returned in all cases where the loop in check_watched_items
is to be continued
"""
external_job_id = ajs.job_id
galaxy_id_tag = ajs.job_wrapper.get_id_tag()
state = None
try:
assert external_job_id not in (None, 'None'), '(%s/%s) Invalid job id' % (galaxy_id_tag, external_job_id)
state = self.ds.job_status(external_job_id)
# Reset exception retries
for retry_exception in RETRY_EXCEPTIONS_LOWER:
setattr(ajs, retry_exception + '_retries', 0)
except (drmaa.InternalException, drmaa.InvalidJobException) as e:
ecn = type(e).__name__
retry_param = ecn.lower() + '_retries'
state_param = ecn.lower() + '_state'
retries = getattr(ajs, retry_param, 0)
log.warning("(%s/%s) unable to check job status because of %s exception for %d consecutive tries: %s", galaxy_id_tag, external_job_id, ecn, retries + 1, e)
if self.runner_params[retry_param] > 0:
if retries < self.runner_params[retry_param]:
# will retry check on next iteration
setattr(ajs, retry_param, retries + 1)
new_watched.append(ajs)
return None
if self.runner_params[state_param] == model.Job.states.OK:
log.warning("(%s/%s) job will now be finished OK", galaxy_id_tag, external_job_id)
self.work_queue.put((self.finish_job, ajs))
elif self.runner_params[state_param] == model.Job.states.ERROR:
log.warning("(%s/%s) job will now be errored", galaxy_id_tag, external_job_id)
self.work_queue.put((self.fail_job, ajs))
else:
raise Exception("%s is set to an invalid value (%s), this should not be possible. See galaxy.jobs.drmaa.__init__()", state_param, self.runner_params[state_param])
return None
except drmaa.DrmCommunicationException as e:
log.warning("(%s/%s) unable to communicate with DRM: %s", galaxy_id_tag, external_job_id, e)
new_watched.append(ajs)
return None
except Exception:
# so we don't kill the monitor thread
log.exception("(%s/%s) unable to check job status: %s" % (galaxy_id_tag, external_job_id))
log.warning("(%s/%s) job will now be errored" % (galaxy_id_tag, external_job_id))
ajs.fail_message = "Cluster could not complete job"
self.work_queue.put((self.fail_job, ajs))
return None
return state

def check_watched_items(self):
"""
Called by the monitor thread to look at each watched job and deal
Expand All @@ -251,43 +316,8 @@ def check_watched_items(self):
external_job_id = ajs.job_id
galaxy_id_tag = ajs.job_wrapper.get_id_tag()
old_state = ajs.old_state
try:
assert external_job_id not in (None, 'None'), '(%s/%s) Invalid job id' % (galaxy_id_tag, external_job_id)
state = self.ds.job_status(external_job_id)
# Reset exception retries
for retry_exception in RETRY_EXCEPTIONS_LOWER:
setattr(ajs, retry_exception + '_retries', 0)
except (drmaa.InternalException, drmaa.InvalidJobException) as e:
ecn = type(e).__name__
retry_param = ecn.lower() + '_retries'
state_param = ecn.lower() + '_state'
retries = getattr(ajs, retry_param, 0)
log.warning("(%s/%s) unable to check job status because of %s exception for %d consecutive tries: %s", galaxy_id_tag, external_job_id, ecn, retries + 1, e)
if self.runner_params[retry_param] > 0:
if retries < self.runner_params[retry_param]:
# will retry check on next iteration
setattr(ajs, retry_param, retries + 1)
new_watched.append(ajs)
continue
if self.runner_params[state_param] == model.Job.states.OK:
log.warning("(%s/%s) job will now be finished OK", galaxy_id_tag, external_job_id)
self.work_queue.put((self.finish_job, ajs))
elif self.runner_params[state_param] == model.Job.states.ERROR:
log.warning("(%s/%s) job will now be errored", galaxy_id_tag, external_job_id)
self.work_queue.put((self.fail_job, ajs))
else:
raise Exception("%s is set to an invalid value (%s), this should not be possible. See galaxy.jobs.drmaa.__init__()", state_param, self.runner_params[state_param])
continue
except drmaa.DrmCommunicationException as e:
log.warning("(%s/%s) unable to communicate with DRM: %s", galaxy_id_tag, external_job_id, e)
new_watched.append(ajs)
continue
except Exception:
# so we don't kill the monitor thread
log.exception("(%s/%s) unable to check job status" % (galaxy_id_tag, external_job_id))
log.warning("(%s/%s) job will now be errored" % (galaxy_id_tag, external_job_id))
ajs.fail_message = "Cluster could not complete job"
self.work_queue.put((self.fail_job, ajs))
state = self.check_watched_item(ajs, new_watched)
if state is None:
continue
if state != old_state:
log.debug("(%s/%s) state change: %s" % (galaxy_id_tag, external_job_id, self.drmaa_job_state_strings[state]))
Expand Down

0 comments on commit 72b2781

Please sign in to comment.