From 647cf55a11a71e1fc7743b685ded0b6dd1b0fffd Mon Sep 17 00:00:00 2001 From: John Chilton Date: Fri, 10 Apr 2015 15:39:36 -0400 Subject: [PATCH] Tweaks to the Pulsar job runners handling of async messags. - Report a more appropriate message for responding to "failed" status. - Respond to the new "lost" message (https://github.com/galaxyproject/pulsar/commit/61ed774ae6c67e7f0d891999def7233d1572013c). - Do not fail "lost" or "failed" jobs that may Galaxy may think have already completed (roughly). --- lib/galaxy/jobs/runners/pulsar.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/lib/galaxy/jobs/runners/pulsar.py b/lib/galaxy/jobs/runners/pulsar.py index f1da51ff963d..0463f26ec22c 100644 --- a/lib/galaxy/jobs/runners/pulsar.py +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -32,6 +32,8 @@ NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE = "Pulsar misconfiguration - Pulsar client configured to set metadata remotely, but remote Pulsar isn't properly configured with a galaxy_home directory." NO_REMOTE_DATATYPES_CONFIG = "Pulsar client is configured to use remote datatypes configuration when setting metadata externally, but Pulsar is not configured with this information. Defaulting to datatypes_conf.xml." GENERIC_REMOTE_ERROR = "Failed to communicate with remote job server." +FAILED_REMOTE_ERROR = "Remote job server indicated a problem running or monitoring this job." +LOST_REMOTE_ERROR = "Remote job server could not determine this job's state." # Is there a good way to infer some default for this? Can only use # url_for from web threads. https://gist.github.com/jmchilton/9098762 @@ -170,8 +172,13 @@ def _update_job_state_for_status(self, job_state, pulsar_status): if pulsar_status == "complete": self.mark_as_finished(job_state) return None - if pulsar_status == "failed": - self.fail_job(job_state) + if pulsar_status in ["failed", "lost"]: + if pulsar_status == "failed": + message = FAILED_REMOTE_ERROR + else: + message = LOST_REMOTE_ERROR + if not job_state.job_wrapper.get_job().finished: + self.fail_job(job_state, message) return None if pulsar_status == "running" and not job_state.running: job_state.running = True @@ -398,12 +405,12 @@ def finish_job( self, job_state ): log.exception("Job wrapper finish method failed") job_wrapper.fail("Unable to finish job", exception=True) - def fail_job( self, job_state ): + def fail_job( self, job_state, message=GENERIC_REMOTE_ERROR ): """ Seperated out so we can use the worker threads for it. """ self.stop_job( self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) ) - job_state.job_wrapper.fail( getattr( job_state, "fail_message", GENERIC_REMOTE_ERROR ) ) + job_state.job_wrapper.fail( getattr( job_state, "fail_message", message ) ) def check_pid( self, pid ): try: