Skip to content

Commit 647cf55

Browse files
committed
Tweaks to the Pulsar job runners handling of async messags.
- Report a more appropriate message for responding to "failed" status. - Respond to the new "lost" message (galaxyproject/pulsar@61ed774). - Do not fail "lost" or "failed" jobs that may Galaxy may think have already completed (roughly).
1 parent aad36d0 commit 647cf55

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

lib/galaxy/jobs/runners/pulsar.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
NO_REMOTE_GALAXY_FOR_METADATA_MESSAGE = "Pulsar misconfiguration - Pulsar client configured to set metadata remotely, but remote Pulsar isn't properly configured with a galaxy_home directory."
3333
NO_REMOTE_DATATYPES_CONFIG = "Pulsar client is configured to use remote datatypes configuration when setting metadata externally, but Pulsar is not configured with this information. Defaulting to datatypes_conf.xml."
3434
GENERIC_REMOTE_ERROR = "Failed to communicate with remote job server."
35+
FAILED_REMOTE_ERROR = "Remote job server indicated a problem running or monitoring this job."
36+
LOST_REMOTE_ERROR = "Remote job server could not determine this job's state."
3537

3638
# Is there a good way to infer some default for this? Can only use
3739
# url_for from web threads. https://gist.github.com/jmchilton/9098762
@@ -170,8 +172,13 @@ def _update_job_state_for_status(self, job_state, pulsar_status):
170172
if pulsar_status == "complete":
171173
self.mark_as_finished(job_state)
172174
return None
173-
if pulsar_status == "failed":
174-
self.fail_job(job_state)
175+
if pulsar_status in ["failed", "lost"]:
176+
if pulsar_status == "failed":
177+
message = FAILED_REMOTE_ERROR
178+
else:
179+
message = LOST_REMOTE_ERROR
180+
if not job_state.job_wrapper.get_job().finished:
181+
self.fail_job(job_state, message)
175182
return None
176183
if pulsar_status == "running" and not job_state.running:
177184
job_state.running = True
@@ -398,12 +405,12 @@ def finish_job( self, job_state ):
398405
log.exception("Job wrapper finish method failed")
399406
job_wrapper.fail("Unable to finish job", exception=True)
400407

401-
def fail_job( self, job_state ):
408+
def fail_job( self, job_state, message=GENERIC_REMOTE_ERROR ):
402409
"""
403410
Seperated out so we can use the worker threads for it.
404411
"""
405412
self.stop_job( self.sa_session.query( self.app.model.Job ).get( job_state.job_wrapper.job_id ) )
406-
job_state.job_wrapper.fail( getattr( job_state, "fail_message", GENERIC_REMOTE_ERROR ) )
413+
job_state.job_wrapper.fail( getattr( job_state, "fail_message", message ) )
407414

408415
def check_pid( self, pid ):
409416
try:

0 commit comments

Comments
 (0)