From 8a00b5fce2c737058f10c1e3d3a92bdcfe53e882 Mon Sep 17 00:00:00 2001 From: Dan Kilman Date: Thu, 5 Jan 2017 13:00:01 +0200 Subject: [PATCH] ARIA-55 Implement task retry and abort mechanism --- aria/orchestrator/exceptions.py | 16 ++++ .../workflows/core/events_handler.py | 25 +++-- aria/storage/base_model.py | 10 +- .../workflows/core/test_engine.py | 94 ++++++++++++++++++- .../workflows/executor/test_executor.py | 1 + .../executor/test_process_executor.py | 1 + 6 files changed, 136 insertions(+), 11 deletions(-) diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py index 74e9002d..bd5238e0 100644 --- a/aria/orchestrator/exceptions.py +++ b/aria/orchestrator/exceptions.py @@ -30,3 +30,19 @@ class PluginAlreadyExistsError(AriaError): Raised when a plugin with the same package name and package version already exists """ pass + + +class TaskRetryException(RuntimeError): + """ + Used internally when ctx.task.retry is called + """ + def __init__(self, message, retry_interval): + super(TaskRetryException, self).__init__(message) + self.retry_interval = retry_interval + + +class TaskAbortException(RuntimeError): + """ + Used internally when ctx.task.abort is called + """ + pass diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index d05cbcb3..c973ad9e 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -27,7 +27,7 @@ ) from ... import events - +from ... import exceptions @events.sent_task_signal.connect def _task_sent(task, *args, **kwargs): @@ -43,18 +43,25 @@ def _task_started(task, *args, **kwargs): @events.on_failure_task_signal.connect -def _task_failed(task, *args, **kwargs): +def _task_failed(task, exception, *args, **kwargs): with task._update(): - should_retry = ( - (task.retry_count < task.max_attempts - 1 or - task.max_attempts == task.INFINITE_RETRIES) and - # ignore_failure check here means the task will not be retries and it will be marked as - # failed. The engine will also look at ignore_failure so it won't fail the workflow. - not task.ignore_failure) + should_retry = all([ + not isinstance(exception, exceptions.TaskAbortException), + task.retry_count < task.max_attempts - 1 or task.max_attempts == task.INFINITE_RETRIES, + # ignore_failure check here means the task will not be retries and it will be marked + # as failed. The engine will also look at ignore_failure so it won't fail the + # workflow. + not task.ignore_failure + ]) if should_retry: + retry_interval = None + if isinstance(exception, exceptions.TaskRetryException): + retry_interval = exception.retry_interval + if retry_interval is None: + retry_interval = task.retry_interval task.status = task.RETRYING task.retry_count += 1 - task.due_at = datetime.utcnow() + timedelta(seconds=task.retry_interval) + task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval) else: task.ended_at = datetime.utcnow() task.status = task.FAILED diff --git a/aria/storage/base_model.py b/aria/storage/base_model.py index d1aebf2d..97c541cc 100644 --- a/aria/storage/base_model.py +++ b/aria/storage/base_model.py @@ -52,8 +52,8 @@ orm, ) +from ..orchestrator.exceptions import TaskAbortException, TaskRetryException from .structure import ModelMixin - from .type import ( List, Dict @@ -675,3 +675,11 @@ def as_node_instance(cls, instance, **kwargs): @classmethod def as_relationship_instance(cls, instance, **kwargs): return cls(relationship_instance=instance, **kwargs) + + @staticmethod + def abort(message=None): + raise TaskAbortException(message) + + @staticmethod + def retry(message=None, retry_interval=None): + raise TaskRetryException(message, retry_interval=retry_interval) diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index a6b55bab..d9b50a99 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -382,6 +382,78 @@ def mock_workflow(ctx, graph): assert global_test_holder.get('sent_task_signal_calls') == 1 +class TestTaskRetryAndAbort(BaseTest): + message = 'EXPECTED_ERROR' + + def test_task_retry_default_interval(self, workflow_context, executor): + default_retry_interval = 0.1 + + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_task_retry, ctx, + inputs={'message': self.message}, + retry_interval=default_retry_interval, + max_attempts=2) + graph.add_tasks(op) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 2 + invocation1, invocation2 = invocations + assert invocation2 - invocation1 >= default_retry_interval + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_task_retry_custom_interval(self, workflow_context, executor): + default_retry_interval = 100 + custom_retry_interval = 0.1 + + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_task_retry, ctx, + inputs={'message': self.message, + 'retry_interval': custom_retry_interval}, + retry_interval=default_retry_interval, + max_attempts=2) + graph.add_tasks(op) + execution_start = time.time() + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + execution_end = time.time() + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 2 + assert (execution_end - execution_start) < default_retry_interval + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_task_abort(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_task_abort, ctx, + inputs={'message': self.message}, + retry_interval=100, + max_attempts=100) + graph.add_tasks(op) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 1 + assert global_test_holder.get('sent_task_signal_calls') == 1 + + @operation def mock_success_task(**_): pass @@ -408,7 +480,27 @@ def mock_conditional_failure_task(failure_count, **_): invocations.append(time.time()) +@operation def mock_sleep_task(seconds, **_): + _add_invocation_timestamp() + time.sleep(seconds) + + +@operation +def mock_task_retry(ctx, message, retry_interval=None, **_): + _add_invocation_timestamp() + retry_kwargs = {} + if retry_interval is not None: + retry_kwargs['retry_interval'] = retry_interval + ctx.task.retry(message, **retry_kwargs) + + +@operation +def mock_task_abort(ctx, message, **_): + _add_invocation_timestamp() + ctx.task.abort(message) + + +def _add_invocation_timestamp(): invocations = global_test_holder.setdefault('invocations', []) invocations.append(time.time()) - time.sleep(seconds) diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index cd00cd5e..2486a1ee 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -107,6 +107,7 @@ def __init__(self, func, inputs=None): self.retry_count = 0 self.max_attempts = 1 self.plugin_fk = None + self.ignore_failure = False for state in model.Task.STATES: setattr(self, state.upper(), state) diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index e321388b..687e245a 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -130,6 +130,7 @@ def __init__(self, plugin, operation): self.max_attempts = 1 self.plugin_fk = plugin.id self.plugin = plugin + self.ignore_failure = False for state in aria_model.Task.STATES: setattr(self, state.upper(), state)