From f017de30bf7b0188f8872a9842c38750aab9a410 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Fri, 22 Dec 2017 09:08:18 -0500 Subject: [PATCH] Tool-centric annotation of out of memory errors. - Update explicit stdio definitions as well as detect_errors-based definitions to allow this. - Update runner code to allow resubmission in such cases. --- lib/galaxy/jobs/__init__.py | 14 +++++--- lib/galaxy/jobs/error_level.py | 4 ++- lib/galaxy/jobs/output_checker.py | 35 +++++++++++-------- lib/galaxy/jobs/runners/__init__.py | 23 ++++++++++-- lib/galaxy/jobs/runners/local.py | 6 +++- lib/galaxy/tools/parser/util.py | 33 +++++++++++++++-- lib/galaxy/tools/parser/xml.py | 8 ++++- lib/galaxy/tools/xsd/galaxy.xsd | 20 +++++++++-- test/functional/tools/exit_code_oom.xml | 33 +++++++++++++++++ test/functional/tools/samples_tool_conf.xml | 1 + .../resubmission_small_memory_job_conf.xml | 17 +++++++++ ..._memory_resubmission_to_large_job_conf.xml | 22 ++++++++++++ test/integration/test_job_resubmission.py | 28 +++++++++++++-- test/unit/jobs/test_job_output_checker.py | 6 ++-- test/unit/jobs/test_runner_local.py | 5 ++- test/unit/tools/test_parsing.py | 3 +- 16 files changed, 222 insertions(+), 36 deletions(-) create mode 100644 test/functional/tools/exit_code_oom.xml create mode 100644 test/integration/resubmission_small_memory_job_conf.xml create mode 100644 test/integration/resubmission_small_memory_resubmission_to_large_job_conf.xml diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 82107f160940..e30c7f3da4a5 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -34,7 +34,7 @@ from galaxy.util.xml_macros import load from .datasets import (DatasetPath, NullDatasetPathRewriter, OutputsToWorkingDirectoryPathRewriter, TaskPathRewriter) -from .output_checker import check_output +from .output_checker import check_output, DETECTED_JOB_STATE log = logging.getLogger(__name__) @@ -1122,6 +1122,7 @@ def finish( stdout, stderr, tool_exit_code=None, + check_output_detected_state=None, remote_working_directory=None, remote_metadata_directory=None, ): @@ -1162,8 +1163,11 @@ def finish( # The job's stdout and stderr will be set accordingly. # We set final_job_state to use for dataset management, but *don't* set - # job.state until after dataset collection to prevent history issues - if self.check_tool_output(stdout, stderr, tool_exit_code, job) and not tool_provided_metadata.has_failed_outputs(): + # job.state until after dataset discovery to prevent history issues + if check_output_detected_state is None: + check_output_detected_state = self.check_tool_output(stdout, stderr, tool_exit_code, job) + + if check_output_detected_state == DETECTED_JOB_STATE.OK and not tool_provided_metadata.has_failed_outputs(): final_job_state = job.states.OK else: final_job_state = job.states.ERROR @@ -1893,7 +1897,7 @@ def set_runner(self, runner_url, external_id): self.sa_session.add(task) self.sa_session.flush() - def finish(self, stdout, stderr, tool_exit_code=None): + def finish(self, stdout, stderr, tool_exit_code=None, **kwds): # DBTODO integrate previous finish logic. # Simple finish for tasks. Just set the flag OK. """ @@ -1922,7 +1926,7 @@ def finish(self, stdout, stderr, tool_exit_code=None): # Check what the tool returned. If the stdout or stderr matched # regular expressions that indicate errors, then set an error. # The same goes if the tool's exit code was in a given range. - if (self.check_tool_output(stdout, stderr, tool_exit_code, task)): + if self.check_tool_output(stdout, stderr, tool_exit_code, task) == DETECTED_JOB_STATE.OK: task.state = task.states.OK else: task.state = task.states.ERROR diff --git a/lib/galaxy/jobs/error_level.py b/lib/galaxy/jobs/error_level.py index e8bce87f06bf..d3836cc7ba87 100644 --- a/lib/galaxy/jobs/error_level.py +++ b/lib/galaxy/jobs/error_level.py @@ -8,12 +8,14 @@ class StdioErrorLevel(object): LOG = 1 WARNING = 2 FATAL = 3 - MAX = 3 + FATAL_OOM = 4 + MAX = 4 descs = { NO_ERROR: 'No error', LOG: 'Log', WARNING: 'Warning', FATAL: 'Fatal error', + FATAL_OOM: 'Out of memory error', } @staticmethod diff --git a/lib/galaxy/jobs/output_checker.py b/lib/galaxy/jobs/output_checker.py index 7cd5549c58a3..3c0aada0cdd6 100644 --- a/lib/galaxy/jobs/output_checker.py +++ b/lib/galaxy/jobs/output_checker.py @@ -2,28 +2,33 @@ import traceback from logging import getLogger +from galaxy.util.bunch import Bunch from .error_level import StdioErrorLevel log = getLogger(__name__) +DETECTED_JOB_STATE = Bunch( + OK='ok', + OUT_OF_MEMORY_ERROR='oom_error', + GENERIC_ERROR='generic_error', +) + def check_output(tool, stdout, stderr, tool_exit_code, job): """ Check the output of a tool - given the stdout, stderr, and the tool's - exit code, return True if the tool exited succesfully and False - otherwise. No exceptions should be thrown. If this code encounters - an exception, it returns True so that the workflow can continue; + exit code, return DETECTED_JOB_STATE.OK if the tool exited succesfully or + error type otherwise. No exceptions should be thrown. If this code encounters + an exception, it returns OK so that the workflow can continue; otherwise, a bug in this code could halt workflow progress. Note that, if the tool did not define any exit code handling or any stdio/stderr handling, then it reverts back to previous behavior: if stderr contains anything, then False is returned. - - Note that the job id is just for messages. """ # By default, the tool succeeded. This covers the case where the code # has a bug but the tool was ok, and it lets a workflow continue. - success = True + state = DETECTED_JOB_STATE.OK try: # Check exit codes and match regular expressions against stdout and @@ -57,7 +62,7 @@ def check_output(tool, stdout, stderr, tool_exit_code, job): stderr = tool_msg + "\n" + stderr max_error_level = max(max_error_level, stdio_exit_code.error_level) - if max_error_level >= StdioErrorLevel.FATAL: + if max_error_level >= StdioErrorLevel.MAX: break if max_error_level < StdioErrorLevel.FATAL: @@ -105,11 +110,13 @@ def check_output(tool, stdout, stderr, tool_exit_code, job): # If we encountered a fatal error, then we'll need to set the # job state accordingly. Otherwise the job is ok: - if max_error_level >= StdioErrorLevel.FATAL: + if max_error_level == StdioErrorLevel.FATAL_OOM: + state = DETECTED_JOB_STATE.OUT_OF_MEMORY_ERROR + elif max_error_level >= StdioErrorLevel.FATAL: log.debug("Tool exit code indicates an error, failing job.") - success = False + state = DETECTED_JOB_STATE.GENERIC_ERROR else: - success = True + state = DETECTED_JOB_STATE.OK # When there are no regular expressions and no exit codes to check, # default to the previous behavior: when there's anything on stderr @@ -121,22 +128,22 @@ def check_output(tool, stdout, stderr, tool_exit_code, job): if stderr: peak = stderr[0:250] log.debug("Tool produced standard error failing job - [%s]" % peak) - success = False + state = DETECTED_JOB_STATE.GENERIC_ERROR else: - success = True + state = DETECTED_JOB_STATE.OK # On any exception, return True. except Exception: tb = traceback.format_exc() log.warning("Tool check encountered unexpected exception; " + "assuming tool was successful: " + tb) - success = True + state = DETECTED_JOB_STATE.OK # Store the modified stdout and stderr in the job: if job is not None: job.set_streams(stdout, stderr) - return success + return state def __regex_err_msg(match, regex): diff --git a/lib/galaxy/jobs/runners/__init__.py b/lib/galaxy/jobs/runners/__init__.py index b770c9b9f1be..cf568c4d323e 100644 --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -17,6 +17,7 @@ import galaxy.jobs from galaxy import model from galaxy.jobs.command_factory import build_command +from galaxy.jobs.output_checker import DETECTED_JOB_STATE from galaxy.jobs.runners.util.env import env_to_statement from galaxy.jobs.runners.util.job_script import ( job_script, @@ -406,6 +407,23 @@ def mark_as_resubmitted(self, job_state, info=None): job_state.job_wrapper.change_state(model.Job.states.QUEUED) self.app.job_manager.job_handler.dispatcher.put(job_state.job_wrapper) + def _finish_or_resubmit_job(self, job_state, stdout, stderr, exit_code): + job = job_state.job_wrapper.get_job() + check_output_detected_state = job_state.job_wrapper.check_tool_output(stdout, stderr, exit_code, job) + # Flush with streams... + self.sa_session.add(job) + self.sa_session.flush() + if check_output_detected_state != DETECTED_JOB_STATE.OK: + job_runner_state = JobState.runner_states.TOOL_DETECT_ERROR + if check_output_detected_state == DETECTED_JOB_STATE.OUT_OF_MEMORY_ERROR: + job_runner_state = JobState.runner_states.MEMORY_LIMIT_REACHED + job_state.runner_state = job_runner_state + self._handle_runner_state('failure', job_state) + # Was resubmitted or something - I think we are done with it. + if job_state.runner_state_handled: + return + job_state.job_wrapper.finish(stdout, stderr, exit_code, check_output_detected_state=check_output_detected_state) + class JobState(object): """ @@ -417,7 +435,8 @@ class JobState(object): JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER='Job output not returned from cluster', UNKNOWN_ERROR='unknown_error', GLOBAL_WALLTIME_REACHED='global_walltime_reached', - OUTPUT_SIZE_LIMIT='output_size_limit' + OUTPUT_SIZE_LIMIT='output_size_limit', + TOOL_DETECT_ERROR='tool_detected', # job runner interaction worked fine but the tool indicated error ) def __init__(self, job_wrapper, job_destination): @@ -656,7 +675,7 @@ def finish_job(self, job_state): job_state.cleanup() try: - job_state.job_wrapper.finish(stdout, stderr, exit_code) + self._finish_or_resubmit_job(job_state, stdout, stderr, exit_code) except Exception: log.exception("(%s/%s) Job wrapper finish method failed" % (galaxy_id_tag, external_job_id)) job_state.job_wrapper.fail("Unable to finish job", exception=True) diff --git a/lib/galaxy/jobs/runners/local.py b/lib/galaxy/jobs/runners/local.py index dc9bdfac3097..96a20bea277b 100644 --- a/lib/galaxy/jobs/runners/local.py +++ b/lib/galaxy/jobs/runners/local.py @@ -143,9 +143,13 @@ def queue_job(self, job_wrapper): return self._handle_metadata_if_needed(job_wrapper) + + job_destination = job_wrapper.job_destination + job_state = JobState(job_wrapper, job_destination) + job_state.stop_job = False # Finish the job! try: - job_wrapper.finish(stdout, stderr, exit_code) + self._finish_or_resubmit_job(job_state, stdout, stderr, exit_code) except Exception: log.exception("Job wrapper finish method failed") self._fail_job_local(job_wrapper, "Unable to finish job") diff --git a/lib/galaxy/tools/parser/util.py b/lib/galaxy/tools/parser/util.py index 03a967e558df..b831f161c372 100644 --- a/lib/galaxy/tools/parser/util.py +++ b/lib/galaxy/tools/parser/util.py @@ -2,28 +2,52 @@ from .interface import ToolStdioRegex -def error_on_exit_code(): +def error_on_exit_code(out_of_memory_exit_code=None): + exit_codes = [] + + if out_of_memory_exit_code: + exit_code_oom = ToolStdioExitCode() + exit_code_oom.range_start = int(out_of_memory_exit_code) + exit_code_oom.range_end = int(out_of_memory_exit_code) + _set_oom(exit_code_oom) + exit_codes.append(exit_code_oom) + exit_code_lower = ToolStdioExitCode() exit_code_lower.range_start = float("-inf") exit_code_lower.range_end = -1 _set_fatal(exit_code_lower) + exit_codes.append(exit_code_lower) exit_code_high = ToolStdioExitCode() exit_code_high.range_start = 1 exit_code_high.range_end = float("inf") _set_fatal(exit_code_high) - return [exit_code_lower, exit_code_high], [] + exit_codes.append(exit_code_high) + return exit_codes, [] def aggressive_error_checks(): exit_codes, _ = error_on_exit_code() # these regexes are processed as case insensitive by default regexes = [ + _oom_regex("MemoryError"), + _oom_regex("std::bad_alloc"), + _oom_regex("java.lang.OutOfMemoryError"), + _oom_regex("Out of memory"), _error_regex("exception:"), _error_regex("error:") ] return exit_codes, regexes +def _oom_regex(match): + regex = ToolStdioRegex() + _set_oom(regex) + regex.match = match + regex.stdout_match = True + regex.stderr_match = True + return regex + + def _error_regex(match): regex = ToolStdioRegex() _set_fatal(regex) @@ -33,6 +57,11 @@ def _error_regex(match): return regex +def _set_oom(obj): + from galaxy.jobs.error_level import StdioErrorLevel + obj.error_level = StdioErrorLevel.FATAL_OOM + + def _set_fatal(obj): from galaxy.jobs.error_level import StdioErrorLevel obj.error_level = StdioErrorLevel.FATAL diff --git a/lib/galaxy/tools/parser/xml.py b/lib/galaxy/tools/parser/xml.py index 08c6649ec76f..d6de65c88afb 100644 --- a/lib/galaxy/tools/parser/xml.py +++ b/lib/galaxy/tools/parser/xml.py @@ -327,9 +327,15 @@ def parse_stdio(self): detect_errors = None if command_el is not None: detect_errors = command_el.get("detect_errors") + if detect_errors and detect_errors != "default": if detect_errors == "exit_code": - return error_on_exit_code() + oom_exit_code = None + if command_el is not None: + oom_exit_code = command_el.get("oom_exit_code", None) + if oom_exit_code is not None: + int(oom_exit_code) + return error_on_exit_code(out_of_memory_exit_code=oom_exit_code) elif detect_errors == "aggressive": return aggressive_error_checks() else: diff --git a/lib/galaxy/tools/xsd/galaxy.xsd b/lib/galaxy/tools/xsd/galaxy.xsd index 506f0c0e8121..56b24d6a5845 100644 --- a/lib/galaxy/tools/xsd/galaxy.xsd +++ b/lib/galaxy/tools/xsd/galaxy.xsd @@ -2669,8 +2669,9 @@ If present on the ``command`` tag, this attribute can be one of: * ``default`` no-op fallback to ``stdio`` tags and erroring on standard error output (for legacy tools). * ``exit_code`` error if tool exit code is not 0. (The @jmchilton recommendation). -* ``aggressive`` error if tool exit code is not 0 or either ``Exception:`` or ``Error:`` - appears in standard error/output. (The @bgruening recommendation). +* ``aggressive`` error if tool exit code is not 0 or ``Exception:``, ``Error:``, or + various messages related to being out of memory appear in the standard error or output. + (The @bgruening recommendation). For newer tools with ``profile>=16.04``, the default behavior is ``exit_code``. Legacy tools default to ``default`` behavior described above (erroring if the tool @@ -2700,6 +2701,11 @@ deprecated and using the ``$__tool_directory__`` variable is superior. + + + Only used if ``detect_errors="exit_code", tells Galaxy the specified exit code indicates an out of memory error. Galaxy instances may be configured to retry such jobs on resources with more memory. + + This attribute defines the programming language in which the tool's executable file is written. Any language can be used (tools can be written in Python, C, Perl, Java, etc.). The executable file must be in the same directory of the XML file. If instead this attribute is not specified, the tag content should be a Bash command calling executable(s) available in the $PATH. @@ -4838,9 +4844,11 @@ The following is an example of the tag: ```xml - + + ``` @@ -4912,6 +4920,12 @@ The following is an example of regular expressions that may be used: source="stdout" level="fatal" description="Unknown error encountered" /> + + + + '$out_file1'; + echo "\$GALAXY_MEMORY_MB"; + : \${GALAXY_MEMORY_MB:=20}; + echo "\$GALAXY_MEMORY_MB"; + if [ "\$GALAXY_MEMORY_MB" -lt 10 ]; + then + exit 42; + else + exit 0; + fi + ]]> + + + + + + + + + + + + + + + + + + + diff --git a/test/functional/tools/samples_tool_conf.xml b/test/functional/tools/samples_tool_conf.xml index 5bd62a7cbb8b..2660bc005d23 100644 --- a/test/functional/tools/samples_tool_conf.xml +++ b/test/functional/tools/samples_tool_conf.xml @@ -54,6 +54,7 @@ + diff --git a/test/integration/resubmission_small_memory_job_conf.xml b/test/integration/resubmission_small_memory_job_conf.xml new file mode 100644 index 000000000000..a919ca586285 --- /dev/null +++ b/test/integration/resubmission_small_memory_job_conf.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + 4 + + + + diff --git a/test/integration/resubmission_small_memory_resubmission_to_large_job_conf.xml b/test/integration/resubmission_small_memory_resubmission_to_large_job_conf.xml new file mode 100644 index 000000000000..086fea6ef4aa --- /dev/null +++ b/test/integration/resubmission_small_memory_resubmission_to_large_job_conf.xml @@ -0,0 +1,22 @@ + + + + + + + + + + + + + 4 + + + + + 40 + + + + diff --git a/test/integration/test_job_resubmission.py b/test/integration/test_job_resubmission.py index 8401b0be8015..3bdc2b427685 100644 --- a/test/integration/test_job_resubmission.py +++ b/test/integration/test_job_resubmission.py @@ -8,6 +8,8 @@ JOB_RESUBMISSION_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_job_conf.xml") JOB_RESUBMISSION_DEFAULT_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_default_job_conf.xml") JOB_RESUBMISSION_DYNAMIC_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_dynamic_job_conf.xml") +JOB_RESUBMISSION_SMALL_MEMORY_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_small_memory_job_conf.xml") +JOB_RESUBMISSION_SMALL_MEMORY_RESUBMISSION_TO_LARGE_JOB_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_small_memory_resubmission_to_large_job_conf.xml") JOB_RESUBMISSION_JOB_RESOURCES_CONFIG_FILE = os.path.join(SCRIPT_DIRECTORY, "resubmission_job_resource_parameters_conf.xml") @@ -15,12 +17,12 @@ class _BaseResubmissionIntegerationTestCase(integration_util.IntegrationTestCase framework_tool_and_types = True def _assert_job_passes(self, resource_parameters={}): - self._run_tool_test("simple_constructs", resource_parameters=resource_parameters) + self._run_tool_test("exit_code_oom", resource_parameters=resource_parameters) def _assert_job_fails(self, resource_parameters={}): exception_thrown = False try: - self._run_tool_test("simple_constructs", resource_parameters=resource_parameters) + self._run_tool_test("exit_code_oom", resource_parameters=resource_parameters) except Exception: exception_thrown = True @@ -128,3 +130,25 @@ def handle_galaxy_config_kwds(cls, config): def test_dynamic_resubmission(self): self._assert_job_passes() + + +# Verify the test tool fails if only a small amount of memory is allocated. +class JobResubmissionSmallMemoryIntegrationTestCase(_BaseResubmissionIntegerationTestCase): + + @classmethod + def handle_galaxy_config_kwds(cls, config): + config["job_config_file"] = JOB_RESUBMISSION_SMALL_MEMORY_JOB_CONFIG_FILE + + def test_dynamic_resubmission(self): + self._assert_job_fails() + + +# Verify the test tool fails if only a small amount of memory is allocated. +class JobResubmissionSmallMemoryResubmitsToLargeIntegrationTestCase(_BaseResubmissionIntegerationTestCase): + + @classmethod + def handle_galaxy_config_kwds(cls, config): + config["job_config_file"] = JOB_RESUBMISSION_SMALL_MEMORY_RESUBMISSION_TO_LARGE_JOB_CONFIG_FILE + + def test_dynamic_resubmission(self): + self._assert_job_passes() diff --git a/test/unit/jobs/test_job_output_checker.py b/test/unit/jobs/test_job_output_checker.py index c14490ba6c35..9c258d2cecfc 100644 --- a/test/unit/jobs/test_job_output_checker.py +++ b/test/unit/jobs/test_job_output_checker.py @@ -1,7 +1,7 @@ from unittest import TestCase from galaxy.jobs.error_level import StdioErrorLevel -from galaxy.jobs.output_checker import check_output +from galaxy.jobs.output_checker import check_output, DETECTED_JOB_STATE from galaxy.model import Job from galaxy.tools.parser.interface import ToolStdioRegex from galaxy.util.bunch import Bunch @@ -87,10 +87,10 @@ def __add_regex(self, regex): self.tool.stdio_regexes.append(regex) def __assertSuccessful(self): - self.assertTrue(self.__check_output()) + assert self.__check_output() == DETECTED_JOB_STATE.OK def __assertNotSuccessful(self): - self.assertFalse(self.__check_output()) + assert self.__check_output() != DETECTED_JOB_STATE.OK def __check_output(self): return check_output(self.tool, self.stdout, self.stderr, self.tool_exit_code, self.job) diff --git a/test/unit/jobs/test_runner_local.py b/test/unit/jobs/test_runner_local.py index 0733bd772099..6b3c26cc9ce0 100644 --- a/test/unit/jobs/test_runner_local.py +++ b/test/unit/jobs/test_runner_local.py @@ -156,6 +156,9 @@ def __init__(self, app, test_directory, tool): build_dependency_shell_commands=lambda: [] ) + def check_tool_output(*args, **kwds): + return "ok" + def wait_for_external_id(self): """Test method for waiting til an external id has been registered.""" external_id = None @@ -203,7 +206,7 @@ def fail(self, message, exception): self.fail_message = message self.fail_exception = exception - def finish(self, stdout, stderr, exit_code): + def finish(self, stdout, stderr, exit_code, **kwds): self.stdout = stdout self.stderr = stderr self.exit_code = exit_code diff --git a/test/unit/tools/test_parsing.py b/test/unit/tools/test_parsing.py index ccc5a5621188..6086ac555f5b 100644 --- a/test/unit/tools/test_parsing.py +++ b/test/unit/tools/test_parsing.py @@ -244,7 +244,8 @@ def test_exit_code(self): """) exit, regexes = tool_source.parse_stdio() assert len(exit) == 2, exit - assert len(regexes) == 2, regexes + # error:, exception: various memory exception... + assert len(regexes) > 2, regexes def test_sanitize_option(self): assert self._tool_source.parse_sanitize() is True