From b43f1985d71fadf3f907feeab2b529408dc05588 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Sun, 4 Nov 2018 10:58:07 +0100 Subject: [PATCH] Capture stdout and stderr for kubernetes runner There is currently no way to discriminate stdout and stderr from kubernetes logs (https://github.com/kubernetes/kubernetes/issues/28167), but Galaxy really needs to know about the stderr. To work around this we capture the stdout and stderr into files using the approach outined in https://stackoverflow.com/a/692407. --- lib/galaxy/jobs/command_factory.py | 27 ++++++++++++++++++++++----- lib/galaxy/jobs/runners/__init__.py | 26 ++++++++++++++++++++------ lib/galaxy/jobs/runners/kubernetes.py | 9 ++++++--- 3 files changed, 48 insertions(+), 14 deletions(-) diff --git a/lib/galaxy/jobs/command_factory.py b/lib/galaxy/jobs/command_factory.py index bd2aa503bcd3..848214a7dd2a 100644 --- a/lib/galaxy/jobs/command_factory.py +++ b/lib/galaxy/jobs/command_factory.py @@ -30,6 +30,8 @@ def build_command( create_tool_working_directory=True, remote_command_params={}, metadata_directory=None, + stdout_file=None, + stderr_file=None, ): """ Compose the sequence of commands necessary to execute a job. This will @@ -103,6 +105,8 @@ def build_command( if include_work_dir_outputs: __handle_work_dir_outputs(commands_builder, job_wrapper, runner, remote_command_params) + if stdout_file and stderr_file: + commands_builder.capture_stdout_stderr(stdout_file, stderr_file) commands_builder.capture_return_code() if include_metadata and job_wrapper.requires_setting_metadata: @@ -227,24 +231,37 @@ def __init__(self, initial_command=u''): # the last thing that happens is an exit with return code. self.return_code_captured = False - def prepend_command(self, command): + def prepend_command(self, command, sep=";"): if command: - self.commands = u"%s; %s" % (command, + self.commands = u"%s%s %s" % (command, + sep, self.commands) return self def prepend_commands(self, commands): return self.prepend_command(u"; ".join(c for c in commands if c)) - def append_command(self, command): + def append_command(self, command, sep=';'): if command: - self.commands = u"%s; %s" % (self.commands, - command) + self.commands = u"%s%s %s" % (self.commands, + sep, + command) return self def append_commands(self, commands): self.append_command(u"; ".join(c for c in commands if c)) + def capture_stdout_stderr(self, stdout_file, stderr_file): + self.prepend_command("""out="${TMPDIR:-/tmp}/out.$$" err="${TMPDIR:-/tmp}/err.$$" +mkfifo "$out" "$err" +trap 'rm "$out" "$err"' EXIT +tee -a stdout.log < "$out" & +tee -a stderr.log < "$err" >&2 &""", + sep="") + self.append_command("> '{stdout_file}' 2> '{stderr_file}'".format(stdout_file=stdout_file, + stderr_file=stderr_file), + sep="") + def capture_return_code(self): if not self.return_code_captured: self.return_code_captured = True diff --git a/lib/galaxy/jobs/runners/__init__.py b/lib/galaxy/jobs/runners/__init__.py index 3ddec1e070eb..4eb6ddaaf936 100644 --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -193,8 +193,13 @@ def parse_destination_params(self, params): """ raise NotImplementedError() - def prepare_job(self, job_wrapper, include_metadata=False, include_work_dir_outputs=True, - modify_command_for_container=True): + def prepare_job(self, + job_wrapper, + include_metadata=False, + include_work_dir_outputs=True, + modify_command_for_container=True, + stdout_file=None, + stderr_file=None): """Some sanity checks that all runners' queue_job() methods are likely to want to do """ job_id = job_wrapper.get_id_tag() @@ -220,7 +225,9 @@ def prepare_job(self, job_wrapper, include_metadata=False, include_work_dir_outp job_wrapper, include_metadata=include_metadata, include_work_dir_outputs=include_work_dir_outputs, - modify_command_for_container=modify_command_for_container + modify_command_for_container=modify_command_for_container, + stdout_file=stdout_file, + stderr_file=stderr_file, ) except Exception as e: log.exception("(%s) Failure preparing job" % job_id) @@ -243,8 +250,13 @@ def stop_job(self, job_wrapper): def recover(self, job, job_wrapper): raise NotImplementedError() - def build_command_line(self, job_wrapper, include_metadata=False, include_work_dir_outputs=True, - modify_command_for_container=True): + def build_command_line(self, + job_wrapper, + include_metadata=False, + include_work_dir_outputs=True, + modify_command_for_container=True, + stdout_file=None, + stderr_file=None): container = self._find_container(job_wrapper) if not container and job_wrapper.requires_containerization: raise Exception("Failed to find a container when required, contact Galaxy admin.") @@ -254,7 +266,9 @@ def build_command_line(self, job_wrapper, include_metadata=False, include_work_d include_metadata=include_metadata, include_work_dir_outputs=include_work_dir_outputs, modify_command_for_container=modify_command_for_container, - container=container + container=container, + stdout_file=stdout_file, + stderr_file=stderr_file, ) def get_work_dir_outputs(self, job_wrapper, job_working_directory=None, tool_working_directory=None): diff --git a/lib/galaxy/jobs/runners/kubernetes.py b/lib/galaxy/jobs/runners/kubernetes.py index fb68f78e8241..efb61a9c91a2 100644 --- a/lib/galaxy/jobs/runners/kubernetes.py +++ b/lib/galaxy/jobs/runners/kubernetes.py @@ -89,13 +89,16 @@ def queue_job(self, job_wrapper): # We currently don't need to include_metadata or include_work_dir_outputs, as working directory is the same # where galaxy will expect results. log.debug("Starting queue_job for job " + job_wrapper.get_id_tag()) - if not self.prepare_job(job_wrapper, include_metadata=False, modify_command_for_container=False, yield_return_code=False): - return - ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper, job_destination=job_wrapper.job_destination) + if not self.prepare_job(job_wrapper, + include_metadata=False, + modify_command_for_container=False, + stdout_file=ajs.output_file, + stderr_file=ajs.error_file): + return script = self.get_job_file(job_wrapper, exit_code_path=ajs.exit_code_file, shell=job_wrapper.shell) try: