Skip to content

Commit

Permalink
[18.01] Fix missing job stdout/stderr on Pulsar transfer failures.
Browse files Browse the repository at this point in the history
Shouldn't break backward compatiblity at all without it - but to get the outputs Pulsar requires an update (galaxyproject/pulsar#159) as well.
  • Loading branch information
jmchilton committed Feb 14, 2018
1 parent adaacbc commit 7120739
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 5 deletions.
15 changes: 10 additions & 5 deletions lib/galaxy/jobs/runners/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def check_watched_item(self, job_state):
job_state = self._update_job_state_for_status(job_state, status)
return job_state

def _update_job_state_for_status(self, job_state, pulsar_status):
def _update_job_state_for_status(self, job_state, pulsar_status, full_status=None):
if pulsar_status == "complete":
self.mark_as_finished(job_state)
return None
Expand All @@ -261,7 +261,7 @@ def _update_job_state_for_status(self, job_state, pulsar_status):
else:
message = LOST_REMOTE_ERROR
if not job_state.job_wrapper.get_job().finished:
self.fail_job(job_state, message)
self.fail_job(job_state, message, full_status=full_status)
return None
if pulsar_status == "running" and not job_state.running:
job_state.running = True
Expand Down Expand Up @@ -517,10 +517,15 @@ def finish_job(self, job_state):
log.exception("Job wrapper finish method failed")
job_wrapper.fail("Unable to finish job", exception=True)

def fail_job(self, job_state, message=GENERIC_REMOTE_ERROR):
def fail_job(self, job_state, message=GENERIC_REMOTE_ERROR, full_status=None):
"""Seperated out so we can use the worker threads for it."""
self.stop_job(self.sa_session.query(self.app.model.Job).get(job_state.job_wrapper.job_id))
job_state.job_wrapper.fail(getattr(job_state, "fail_message", message))
stdout = ""
stderr = ""
if full_status:
stdout = full_status.get("stdout", "")
stderr = full_status.get("stderr", "")
job_state.job_wrapper.fail(getattr(job_state, "fail_message", message), stdout=stdout, stderr=stderr)

def check_pid(self, pid):
try:
Expand Down Expand Up @@ -744,7 +749,7 @@ def __async_update(self, full_status):
job_id = full_status["job_id"]
job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id(job_id)
job_state = self._job_state(job, job_wrapper)
self._update_job_state_for_status(job_state, full_status["status"])
self._update_job_state_for_status(job_state, full_status["status"], full_status=full_status)
except Exception:
log.exception("Failed to update Pulsar job status for job_id %s", job_id)
raise
Expand Down
16 changes: 16 additions & 0 deletions test/functional/tools/fail_writing_work_dir_file.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<tool id="fail_writing_work_dir_file" name="fail_writing_work_dir_file" version="0.1.0">
<command><![CDATA[
echo 'Some output' ;
#if not $failbool
echo 'Hello World' > 'foo.txt'
#end if
]]></command>
<inputs>
<param name="failbool" type="boolean" label="The failure property" checked="false" />
</inputs>
<outputs>
<data name="out_file1" format="txt" from_work_dir="foo.txt" />
</outputs>
<help>
</help>
</tool>
1 change: 1 addition & 0 deletions test/functional/tools/samples_tool_conf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
<tool file="identifier_collection.xml" />
<tool file="identifier_in_actions.xml" />
<tool file="fail_identifier.xml" />
<tool file="fail_writing_work_dir_file.xml" />
<tool file="tool_directory.xml" />
<tool file="output_action_change_format.xml" />
<tool file="collection_paired_test.xml" />
Expand Down

0 comments on commit 7120739

Please sign in to comment.