Skip to content

Commit

Permalink
Tool-centric annotation of out of memory errors.
Browse files Browse the repository at this point in the history
- Update explicit stdio definitions as well as detect_errors-based definitions to allow this.
- Update runner code to allow resubmission in such cases.
  • Loading branch information
jmchilton committed Dec 22, 2017
1 parent 87b2540 commit f017de3
Show file tree
Hide file tree
Showing 16 changed files with 222 additions and 36 deletions.
14 changes: 9 additions & 5 deletions lib/galaxy/jobs/__init__.py
Expand Up @@ -34,7 +34,7 @@
from galaxy.util.xml_macros import load
from .datasets import (DatasetPath, NullDatasetPathRewriter,
OutputsToWorkingDirectoryPathRewriter, TaskPathRewriter)
from .output_checker import check_output
from .output_checker import check_output, DETECTED_JOB_STATE

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -1122,6 +1122,7 @@ def finish(
stdout,
stderr,
tool_exit_code=None,
check_output_detected_state=None,
remote_working_directory=None,
remote_metadata_directory=None,
):
Expand Down Expand Up @@ -1162,8 +1163,11 @@ def finish(
# The job's stdout and stderr will be set accordingly.

# We set final_job_state to use for dataset management, but *don't* set
# job.state until after dataset collection to prevent history issues
if self.check_tool_output(stdout, stderr, tool_exit_code, job) and not tool_provided_metadata.has_failed_outputs():
# job.state until after dataset discovery to prevent history issues
if check_output_detected_state is None:
check_output_detected_state = self.check_tool_output(stdout, stderr, tool_exit_code, job)

if check_output_detected_state == DETECTED_JOB_STATE.OK and not tool_provided_metadata.has_failed_outputs():
final_job_state = job.states.OK
else:
final_job_state = job.states.ERROR
Expand Down Expand Up @@ -1893,7 +1897,7 @@ def set_runner(self, runner_url, external_id):
self.sa_session.add(task)
self.sa_session.flush()

def finish(self, stdout, stderr, tool_exit_code=None):
def finish(self, stdout, stderr, tool_exit_code=None, **kwds):
# DBTODO integrate previous finish logic.
# Simple finish for tasks. Just set the flag OK.
"""
Expand Down Expand Up @@ -1922,7 +1926,7 @@ def finish(self, stdout, stderr, tool_exit_code=None):
# Check what the tool returned. If the stdout or stderr matched
# regular expressions that indicate errors, then set an error.
# The same goes if the tool's exit code was in a given range.
if (self.check_tool_output(stdout, stderr, tool_exit_code, task)):
if self.check_tool_output(stdout, stderr, tool_exit_code, task) == DETECTED_JOB_STATE.OK:
task.state = task.states.OK
else:
task.state = task.states.ERROR
Expand Down
4 changes: 3 additions & 1 deletion lib/galaxy/jobs/error_level.py
Expand Up @@ -8,12 +8,14 @@ class StdioErrorLevel(object):
LOG = 1
WARNING = 2
FATAL = 3
MAX = 3
FATAL_OOM = 4
MAX = 4
descs = {
NO_ERROR: 'No error',
LOG: 'Log',
WARNING: 'Warning',
FATAL: 'Fatal error',
FATAL_OOM: 'Out of memory error',
}

@staticmethod
Expand Down
35 changes: 21 additions & 14 deletions lib/galaxy/jobs/output_checker.py
Expand Up @@ -2,28 +2,33 @@
import traceback
from logging import getLogger

from galaxy.util.bunch import Bunch
from .error_level import StdioErrorLevel

log = getLogger(__name__)

DETECTED_JOB_STATE = Bunch(
OK='ok',
OUT_OF_MEMORY_ERROR='oom_error',
GENERIC_ERROR='generic_error',
)


def check_output(tool, stdout, stderr, tool_exit_code, job):
"""
Check the output of a tool - given the stdout, stderr, and the tool's
exit code, return True if the tool exited succesfully and False
otherwise. No exceptions should be thrown. If this code encounters
an exception, it returns True so that the workflow can continue;
exit code, return DETECTED_JOB_STATE.OK if the tool exited succesfully or
error type otherwise. No exceptions should be thrown. If this code encounters
an exception, it returns OK so that the workflow can continue;
otherwise, a bug in this code could halt workflow progress.
Note that, if the tool did not define any exit code handling or
any stdio/stderr handling, then it reverts back to previous behavior:
if stderr contains anything, then False is returned.
Note that the job id is just for messages.
"""
# By default, the tool succeeded. This covers the case where the code
# has a bug but the tool was ok, and it lets a workflow continue.
success = True
state = DETECTED_JOB_STATE.OK

try:
# Check exit codes and match regular expressions against stdout and
Expand Down Expand Up @@ -57,7 +62,7 @@ def check_output(tool, stdout, stderr, tool_exit_code, job):
stderr = tool_msg + "\n" + stderr
max_error_level = max(max_error_level,
stdio_exit_code.error_level)
if max_error_level >= StdioErrorLevel.FATAL:
if max_error_level >= StdioErrorLevel.MAX:
break

if max_error_level < StdioErrorLevel.FATAL:
Expand Down Expand Up @@ -105,11 +110,13 @@ def check_output(tool, stdout, stderr, tool_exit_code, job):

# If we encountered a fatal error, then we'll need to set the
# job state accordingly. Otherwise the job is ok:
if max_error_level >= StdioErrorLevel.FATAL:
if max_error_level == StdioErrorLevel.FATAL_OOM:
state = DETECTED_JOB_STATE.OUT_OF_MEMORY_ERROR
elif max_error_level >= StdioErrorLevel.FATAL:
log.debug("Tool exit code indicates an error, failing job.")
success = False
state = DETECTED_JOB_STATE.GENERIC_ERROR
else:
success = True
state = DETECTED_JOB_STATE.OK

# When there are no regular expressions and no exit codes to check,
# default to the previous behavior: when there's anything on stderr
Expand All @@ -121,22 +128,22 @@ def check_output(tool, stdout, stderr, tool_exit_code, job):
if stderr:
peak = stderr[0:250]
log.debug("Tool produced standard error failing job - [%s]" % peak)
success = False
state = DETECTED_JOB_STATE.GENERIC_ERROR
else:
success = True
state = DETECTED_JOB_STATE.OK

# On any exception, return True.
except Exception:
tb = traceback.format_exc()
log.warning("Tool check encountered unexpected exception; " +
"assuming tool was successful: " + tb)
success = True
state = DETECTED_JOB_STATE.OK

# Store the modified stdout and stderr in the job:
if job is not None:
job.set_streams(stdout, stderr)

return success
return state


def __regex_err_msg(match, regex):
Expand Down
23 changes: 21 additions & 2 deletions lib/galaxy/jobs/runners/__init__.py
Expand Up @@ -17,6 +17,7 @@
import galaxy.jobs
from galaxy import model
from galaxy.jobs.command_factory import build_command
from galaxy.jobs.output_checker import DETECTED_JOB_STATE
from galaxy.jobs.runners.util.env import env_to_statement
from galaxy.jobs.runners.util.job_script import (
job_script,
Expand Down Expand Up @@ -406,6 +407,23 @@ def mark_as_resubmitted(self, job_state, info=None):
job_state.job_wrapper.change_state(model.Job.states.QUEUED)
self.app.job_manager.job_handler.dispatcher.put(job_state.job_wrapper)

def _finish_or_resubmit_job(self, job_state, stdout, stderr, exit_code):
job = job_state.job_wrapper.get_job()
check_output_detected_state = job_state.job_wrapper.check_tool_output(stdout, stderr, exit_code, job)
# Flush with streams...
self.sa_session.add(job)
self.sa_session.flush()
if check_output_detected_state != DETECTED_JOB_STATE.OK:
job_runner_state = JobState.runner_states.TOOL_DETECT_ERROR
if check_output_detected_state == DETECTED_JOB_STATE.OUT_OF_MEMORY_ERROR:
job_runner_state = JobState.runner_states.MEMORY_LIMIT_REACHED
job_state.runner_state = job_runner_state
self._handle_runner_state('failure', job_state)
# Was resubmitted or something - I think we are done with it.
if job_state.runner_state_handled:
return
job_state.job_wrapper.finish(stdout, stderr, exit_code, check_output_detected_state=check_output_detected_state)


class JobState(object):
"""
Expand All @@ -417,7 +435,8 @@ class JobState(object):
JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER='Job output not returned from cluster',
UNKNOWN_ERROR='unknown_error',
GLOBAL_WALLTIME_REACHED='global_walltime_reached',
OUTPUT_SIZE_LIMIT='output_size_limit'
OUTPUT_SIZE_LIMIT='output_size_limit',
TOOL_DETECT_ERROR='tool_detected', # job runner interaction worked fine but the tool indicated error
)

def __init__(self, job_wrapper, job_destination):
Expand Down Expand Up @@ -656,7 +675,7 @@ def finish_job(self, job_state):
job_state.cleanup()

try:
job_state.job_wrapper.finish(stdout, stderr, exit_code)
self._finish_or_resubmit_job(job_state, stdout, stderr, exit_code)
except Exception:
log.exception("(%s/%s) Job wrapper finish method failed" % (galaxy_id_tag, external_job_id))
job_state.job_wrapper.fail("Unable to finish job", exception=True)
Expand Down
6 changes: 5 additions & 1 deletion lib/galaxy/jobs/runners/local.py
Expand Up @@ -143,9 +143,13 @@ def queue_job(self, job_wrapper):
return

self._handle_metadata_if_needed(job_wrapper)

job_destination = job_wrapper.job_destination
job_state = JobState(job_wrapper, job_destination)
job_state.stop_job = False
# Finish the job!
try:
job_wrapper.finish(stdout, stderr, exit_code)
self._finish_or_resubmit_job(job_state, stdout, stderr, exit_code)
except Exception:
log.exception("Job wrapper finish method failed")
self._fail_job_local(job_wrapper, "Unable to finish job")
Expand Down
33 changes: 31 additions & 2 deletions lib/galaxy/tools/parser/util.py
Expand Up @@ -2,28 +2,52 @@
from .interface import ToolStdioRegex


def error_on_exit_code():
def error_on_exit_code(out_of_memory_exit_code=None):
exit_codes = []

if out_of_memory_exit_code:
exit_code_oom = ToolStdioExitCode()
exit_code_oom.range_start = int(out_of_memory_exit_code)
exit_code_oom.range_end = int(out_of_memory_exit_code)
_set_oom(exit_code_oom)
exit_codes.append(exit_code_oom)

exit_code_lower = ToolStdioExitCode()
exit_code_lower.range_start = float("-inf")
exit_code_lower.range_end = -1
_set_fatal(exit_code_lower)
exit_codes.append(exit_code_lower)
exit_code_high = ToolStdioExitCode()
exit_code_high.range_start = 1
exit_code_high.range_end = float("inf")
_set_fatal(exit_code_high)
return [exit_code_lower, exit_code_high], []
exit_codes.append(exit_code_high)
return exit_codes, []


def aggressive_error_checks():
exit_codes, _ = error_on_exit_code()
# these regexes are processed as case insensitive by default
regexes = [
_oom_regex("MemoryError"),
_oom_regex("std::bad_alloc"),
_oom_regex("java.lang.OutOfMemoryError"),
_oom_regex("Out of memory"),
_error_regex("exception:"),
_error_regex("error:")
]
return exit_codes, regexes


def _oom_regex(match):
regex = ToolStdioRegex()
_set_oom(regex)
regex.match = match
regex.stdout_match = True
regex.stderr_match = True
return regex


def _error_regex(match):
regex = ToolStdioRegex()
_set_fatal(regex)
Expand All @@ -33,6 +57,11 @@ def _error_regex(match):
return regex


def _set_oom(obj):
from galaxy.jobs.error_level import StdioErrorLevel
obj.error_level = StdioErrorLevel.FATAL_OOM


def _set_fatal(obj):
from galaxy.jobs.error_level import StdioErrorLevel
obj.error_level = StdioErrorLevel.FATAL
8 changes: 7 additions & 1 deletion lib/galaxy/tools/parser/xml.py
Expand Up @@ -327,9 +327,15 @@ def parse_stdio(self):
detect_errors = None
if command_el is not None:
detect_errors = command_el.get("detect_errors")

if detect_errors and detect_errors != "default":
if detect_errors == "exit_code":
return error_on_exit_code()
oom_exit_code = None
if command_el is not None:
oom_exit_code = command_el.get("oom_exit_code", None)
if oom_exit_code is not None:
int(oom_exit_code)
return error_on_exit_code(out_of_memory_exit_code=oom_exit_code)
elif detect_errors == "aggressive":
return aggressive_error_checks()
else:
Expand Down
20 changes: 17 additions & 3 deletions lib/galaxy/tools/xsd/galaxy.xsd
Expand Up @@ -2669,8 +2669,9 @@ If present on the ``command`` tag, this attribute can be one of:
* ``default`` no-op fallback to ``stdio`` tags and erroring on standard error output (for legacy tools).
* ``exit_code`` error if tool exit code is not 0. (The @jmchilton recommendation).
* ``aggressive`` error if tool exit code is not 0 or either ``Exception:`` or ``Error:``
appears in standard error/output. (The @bgruening recommendation).
* ``aggressive`` error if tool exit code is not 0 or ``Exception:``, ``Error:``, or
various messages related to being out of memory appear in the standard error or output.
(The @bgruening recommendation).
For newer tools with ``profile>=16.04``, the default behavior is ``exit_code``.
Legacy tools default to ``default`` behavior described above (erroring if the tool
Expand Down Expand Up @@ -2700,6 +2701,11 @@ deprecated and using the ``$__tool_directory__`` variable is superior.
<xs:documentation></xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="oom_exit_code" type="xs:integer">
<xs:annotation>
<xs:documentation>Only used if ``detect_errors="exit_code", tells Galaxy the specified exit code indicates an out of memory error. Galaxy instances may be configured to retry such jobs on resources with more memory.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="interpreter" type="xs:string" gxdocs:deprecated="true">
<xs:annotation>
<xs:documentation xml:lang="en">This attribute defines the programming language in which the tool's executable file is written. Any language can be used (tools can be written in Python, C, Perl, Java, etc.). The executable file must be in the same directory of the XML file. If instead this attribute is not specified, the tag content should be a Bash command calling executable(s) available in the $PATH. </xs:documentation>
Expand Down Expand Up @@ -4838,9 +4844,11 @@ The following is an example of the <exit_code> tag:
```xml
<stdio>
<exit_code range="2" level="fatal" description="Out of Memory" />
<exit_code range="3:5" level="warning" description="Low disk space" />
<exit_code range="6:" level="fatal" description="Bad input dataset" />
<!-- Catching fatal_oom allows the job runner to potentially resubmit to a resource with more
memory if Galaxy is configured to do this. -->
<exit_code range="2" level="fatal_oom" description="Out of Memory" />
</stdio>
```
Expand Down Expand Up @@ -4912,6 +4920,12 @@ The following is an example of regular expressions that may be used:
source="stdout"
level="fatal"
description="Unknown error encountered" />
<!-- Catching fatal_oom allows the job runner to potentially resubmit to a resource with more
memory if Galaxy is configured to do this. -->
<regex match="out of memory"
source="stdout"
level="fatal_oom"
description="Out of memory error occurred" />
<regex match="[CG]{12}"
description="Fatal error - CG island 12 nts long found" />
<regex match="^Branch A"
Expand Down
33 changes: 33 additions & 0 deletions test/functional/tools/exit_code_oom.xml
@@ -0,0 +1,33 @@
<tool id="exit_code_oom" name="exit_code_oom">
<!-- tool errors out with identified OOM error if less than 10MB are allocated. -->
<command detect_errors="exit_code" oom_exit_code="42"><![CDATA[
echo 'Hello' > '$out_file1';
echo "\$GALAXY_MEMORY_MB";
: \${GALAXY_MEMORY_MB:=20};
echo "\$GALAXY_MEMORY_MB";
if [ "\$GALAXY_MEMORY_MB" -lt 10 ];
then
exit 42;
else
exit 0;
fi
]]></command>
<inputs>
<param name="input" type="integer" label="Dummy" value="6" />
</inputs>
<outputs>
<data name="out_file1" />
</outputs>
<help>
</help>
<tests>
<test>
<param name="input" value="5" />
<output name="out_file1">
<assert_contents>
<has_line line="Hello" />
</assert_contents>
</output>
</test>
</tests>
</tool>
1 change: 1 addition & 0 deletions test/functional/tools/samples_tool_conf.xml
Expand Up @@ -54,6 +54,7 @@
<tool file="version_command_plain.xml" />
<tool file="version_command_interpreter.xml" />
<tool file="version_command_tool_dir.xml" />
<tool file="exit_code_oom.xml" />
<tool file="exit_code_from_file.xml" />
<tool file="gzipped_inputs.xml" />
<tool file="output_order.xml" />
Expand Down

0 comments on commit f017de3

Please sign in to comment.