Skip to content

Commit

Permalink
Capture stdout and stderr for kubernetes runner
Browse files Browse the repository at this point in the history
There is currently no way to discriminate stdout and stderr
from kubernetes logs (kubernetes/kubernetes#28167),
but Galaxy really needs to know about the stderr.
To work around this  we capture the stdout and stderr into files using
the approach outined in https://stackoverflow.com/a/692407.
  • Loading branch information
mvdbeek committed Feb 19, 2019
1 parent 5c5f199 commit b43f198
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 14 deletions.
27 changes: 22 additions & 5 deletions lib/galaxy/jobs/command_factory.py
Expand Up @@ -30,6 +30,8 @@ def build_command(
create_tool_working_directory=True,
remote_command_params={},
metadata_directory=None,
stdout_file=None,
stderr_file=None,
):
"""
Compose the sequence of commands necessary to execute a job. This will
Expand Down Expand Up @@ -103,6 +105,8 @@ def build_command(
if include_work_dir_outputs:
__handle_work_dir_outputs(commands_builder, job_wrapper, runner, remote_command_params)

if stdout_file and stderr_file:
commands_builder.capture_stdout_stderr(stdout_file, stderr_file)
commands_builder.capture_return_code()

if include_metadata and job_wrapper.requires_setting_metadata:
Expand Down Expand Up @@ -227,24 +231,37 @@ def __init__(self, initial_command=u''):
# the last thing that happens is an exit with return code.
self.return_code_captured = False

def prepend_command(self, command):
def prepend_command(self, command, sep=";"):
if command:
self.commands = u"%s; %s" % (command,
self.commands = u"%s%s %s" % (command,
sep,
self.commands)
return self

def prepend_commands(self, commands):
return self.prepend_command(u"; ".join(c for c in commands if c))

def append_command(self, command):
def append_command(self, command, sep=';'):
if command:
self.commands = u"%s; %s" % (self.commands,
command)
self.commands = u"%s%s %s" % (self.commands,
sep,
command)
return self

def append_commands(self, commands):
self.append_command(u"; ".join(c for c in commands if c))

def capture_stdout_stderr(self, stdout_file, stderr_file):
self.prepend_command("""out="${TMPDIR:-/tmp}/out.$$" err="${TMPDIR:-/tmp}/err.$$"
mkfifo "$out" "$err"
trap 'rm "$out" "$err"' EXIT
tee -a stdout.log < "$out" &
tee -a stderr.log < "$err" >&2 &""",
sep="")
self.append_command("> '{stdout_file}' 2> '{stderr_file}'".format(stdout_file=stdout_file,
stderr_file=stderr_file),
sep="")

def capture_return_code(self):
if not self.return_code_captured:
self.return_code_captured = True
Expand Down
26 changes: 20 additions & 6 deletions lib/galaxy/jobs/runners/__init__.py
Expand Up @@ -193,8 +193,13 @@ def parse_destination_params(self, params):
"""
raise NotImplementedError()

def prepare_job(self, job_wrapper, include_metadata=False, include_work_dir_outputs=True,
modify_command_for_container=True):
def prepare_job(self,
job_wrapper,
include_metadata=False,
include_work_dir_outputs=True,
modify_command_for_container=True,
stdout_file=None,
stderr_file=None):
"""Some sanity checks that all runners' queue_job() methods are likely to want to do
"""
job_id = job_wrapper.get_id_tag()
Expand All @@ -220,7 +225,9 @@ def prepare_job(self, job_wrapper, include_metadata=False, include_work_dir_outp
job_wrapper,
include_metadata=include_metadata,
include_work_dir_outputs=include_work_dir_outputs,
modify_command_for_container=modify_command_for_container
modify_command_for_container=modify_command_for_container,
stdout_file=stdout_file,
stderr_file=stderr_file,
)
except Exception as e:
log.exception("(%s) Failure preparing job" % job_id)
Expand All @@ -243,8 +250,13 @@ def stop_job(self, job_wrapper):
def recover(self, job, job_wrapper):
raise NotImplementedError()

def build_command_line(self, job_wrapper, include_metadata=False, include_work_dir_outputs=True,
modify_command_for_container=True):
def build_command_line(self,
job_wrapper,
include_metadata=False,
include_work_dir_outputs=True,
modify_command_for_container=True,
stdout_file=None,
stderr_file=None):
container = self._find_container(job_wrapper)
if not container and job_wrapper.requires_containerization:
raise Exception("Failed to find a container when required, contact Galaxy admin.")
Expand All @@ -254,7 +266,9 @@ def build_command_line(self, job_wrapper, include_metadata=False, include_work_d
include_metadata=include_metadata,
include_work_dir_outputs=include_work_dir_outputs,
modify_command_for_container=modify_command_for_container,
container=container
container=container,
stdout_file=stdout_file,
stderr_file=stderr_file,
)

def get_work_dir_outputs(self, job_wrapper, job_working_directory=None, tool_working_directory=None):
Expand Down
9 changes: 6 additions & 3 deletions lib/galaxy/jobs/runners/kubernetes.py
Expand Up @@ -89,13 +89,16 @@ def queue_job(self, job_wrapper):
# We currently don't need to include_metadata or include_work_dir_outputs, as working directory is the same
# where galaxy will expect results.
log.debug("Starting queue_job for job " + job_wrapper.get_id_tag())
if not self.prepare_job(job_wrapper, include_metadata=False, modify_command_for_container=False, yield_return_code=False):
return

ajs = AsynchronousJobState(files_dir=job_wrapper.working_directory,
job_wrapper=job_wrapper,
job_destination=job_wrapper.job_destination)

if not self.prepare_job(job_wrapper,
include_metadata=False,
modify_command_for_container=False,
stdout_file=ajs.output_file,
stderr_file=ajs.error_file):
return

script = self.get_job_file(job_wrapper, exit_code_path=ajs.exit_code_file, shell=job_wrapper.shell)
try:
Expand Down

0 comments on commit b43f198

Please sign in to comment.