Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
ARIA-299 Resuming canceled execution with frozen task fails
  • Loading branch information
mxmrlv committed Jul 10, 2017
1 parent c46c94b commit b30a7edd8a56e21d54e93058b97b3d6162f82fc2
Showing 11 changed files with 251 additions and 71 deletions.
@@ -123,6 +123,10 @@ def _workflow_resume(workflow_context, *args, **kwargs):
with workflow_context.persist_changes:
execution = workflow_context.execution
execution.status = execution.PENDING
# Any non ended task would be put back to pending state
for task in execution.tasks:
if not task.has_ended():
task.status = task.PENDING


@events.on_cancelling_workflow_signal.connect
@@ -49,7 +49,7 @@ def close(self):
"""
pass

def terminate(self, ctx):
def terminate(self, task_id):
"""
Terminate the executing task
:return:
@@ -36,9 +36,10 @@ class ThreadExecutor(BaseExecutor):
Note: This executor is incapable of running plugin operations.
"""

def __init__(self, pool_size=1, *args, **kwargs):
def __init__(self, pool_size=1, close_timeout=5, *args, **kwargs):
super(ThreadExecutor, self).__init__(*args, **kwargs)
self._stopped = False
self._close_timeout = close_timeout
self._queue = Queue.Queue()
self._pool = []
for i in range(pool_size):
@@ -54,7 +55,10 @@ def _execute(self, ctx):
def close(self):
self._stopped = True
for thread in self._pool:
thread.join()
if self._close_timeout is None:
thread.join()
else:
thread.join(self._close_timeout)

def _processor(self):
while not self._stopped:
@@ -87,8 +87,10 @@ def _operation_mapping():
@pytest.fixture
def executor():
result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
yield result
result.close()
try:
yield result
finally:
result.close()


@pytest.fixture
@@ -509,8 +509,10 @@ def mock_workflow(ctx, graph):
@pytest.fixture
def executor(self):
result = process.ProcessExecutor()
yield result
result.close()
try:
yield result
finally:
result.close()

@pytest.fixture
def workflow_context(self, tmpdir):
@@ -277,8 +277,10 @@ def _upload(self, source, path):
@pytest.fixture
def executor(self):
result = process.ProcessExecutor()
yield result
result.close()
try:
yield result
finally:
result.close()

@pytest.fixture
def workflow_context(self, tmpdir):
@@ -14,6 +14,7 @@
# limitations under the License.

import json
import time
from threading import Thread, Event
from datetime import datetime

@@ -23,7 +24,7 @@
from aria.modeling import exceptions as modeling_exceptions
from aria.modeling import models
from aria.orchestrator import exceptions
from aria.orchestrator.events import on_cancelled_workflow_signal
from aria.orchestrator import events
from aria.orchestrator.workflow_runner import WorkflowRunner
from aria.orchestrator.workflows.executor.process import ProcessExecutor
from aria.orchestrator.workflows import api
@@ -46,9 +47,10 @@
resource_storage as resource
)

events = {
custom_events = {
'is_resumed': Event(),
'is_active': Event(),
'execution_cancelled': Event(),
'execution_ended': Event()
}

@@ -57,6 +59,10 @@ class TimeoutError(BaseException):
pass


class FailingTask(BaseException):
pass


def test_undeclared_workflow(request):
# validating a proper error is raised when the workflow is not declared in the service
with pytest.raises(exceptions.UndeclaredWorkflowError):
@@ -318,43 +324,57 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,

class TestResumableWorkflows(object):

def test_resume_workflow(self, workflow_context, executor):
node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
self._create_interface(workflow_context, node, mock_resuming_task)
def _create_initial_workflow_runner(
self, workflow_context, workflow, executor, inputs=None):

service = workflow_context.service
service.workflows['custom_workflow'] = tests_mock.models.create_operation(
'custom_workflow',
operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)}
operation_kwargs={
'function': '{0}.{1}'.format(__name__, workflow.__name__),
'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items())
}
)
workflow_context.model.service.update(service)

wf_runner = WorkflowRunner(
service_id=workflow_context.service.id,
inputs={},
inputs=inputs or {},
model_storage=workflow_context.model,
resource_storage=workflow_context.resource,
plugin_manager=None,
workflow_name='custom_workflow',
executor=executor)
return wf_runner

@staticmethod
def _wait_for_active_and_cancel(workflow_runner):
if custom_events['is_active'].wait(60) is False:
raise TimeoutError("is_active wasn't set to True")
workflow_runner.cancel()
if custom_events['execution_cancelled'].wait(60) is False:
raise TimeoutError("Execution did not end")

def test_resume_workflow(self, workflow_context, thread_executor):
node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
self._create_interface(workflow_context, node, mock_resuming_task)

wf_runner = self._create_initial_workflow_runner(
workflow_context, mock_parallel_workflow, thread_executor)

wf_thread = Thread(target=wf_runner.execute)
wf_thread.daemon = True
wf_thread.start()

# Wait for the execution to start
if events['is_active'].wait(5) is False:
raise TimeoutError("is_active wasn't set to True")
wf_runner.cancel()

if events['execution_ended'].wait(60) is False:
raise TimeoutError("Execution did not end")
self._wait_for_active_and_cancel(wf_runner)

tasks = workflow_context.model.task.list(filters={'_stub_type': None})
assert any(task.status == task.SUCCESS for task in tasks)
assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks)
events['is_resumed'].set()
assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks)
assert any(task.status == task.RETRYING for task in tasks)
custom_events['is_resumed'].set()
assert any(task.status == task.RETRYING for task in tasks)

# Create a new workflow runner, with an existing execution id. This would cause
# the old execution to restart.
@@ -365,7 +385,7 @@ def test_resume_workflow(self, workflow_context, executor):
resource_storage=workflow_context.resource,
plugin_manager=None,
execution_id=wf_runner.execution.id,
executor=executor)
executor=thread_executor)

new_wf_runner.execute()

@@ -374,9 +394,93 @@ def test_resume_workflow(self, workflow_context, executor):
assert node.attributes['invocations'].value == 3
assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED

def test_resume_started_task(self, workflow_context, thread_executor):
node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
self._create_interface(workflow_context, node, mock_stuck_task)

wf_runner = self._create_initial_workflow_runner(
workflow_context, mock_single_task_workflow, thread_executor)

wf_thread = Thread(target=wf_runner.execute)
wf_thread.daemon = True
wf_thread.start()

self._wait_for_active_and_cancel(wf_runner)
task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
assert node.attributes['invocations'].value == 1
assert task.status == task.STARTED
assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
wf_runner.execution.CANCELLING)
custom_events['is_resumed'].set()

new_thread_executor = thread.ThreadExecutor()
try:
new_wf_runner = WorkflowRunner(
service_id=wf_runner.service.id,
inputs={},
model_storage=workflow_context.model,
resource_storage=workflow_context.resource,
plugin_manager=None,
execution_id=wf_runner.execution.id,
executor=new_thread_executor)

new_wf_runner.execute()
finally:
new_thread_executor.close()

# Wait for it to finish and assert changes.
assert node.attributes['invocations'].value == 2
assert task.status == task.SUCCESS
assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED

def test_resume_failed_task(self, workflow_context, thread_executor):
node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
self._create_interface(workflow_context, node, mock_failed_before_resuming)

wf_runner = self._create_initial_workflow_runner(
workflow_context, mock_single_task_workflow, thread_executor)
wf_thread = Thread(target=wf_runner.execute)
wf_thread.setDaemon(True)
wf_thread.start()

self._wait_for_active_and_cancel(wf_runner)

task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
assert node.attributes['invocations'].value == 2
assert task.status == task.STARTED
assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
wf_runner.execution.CANCELLING)

custom_events['is_resumed'].set()
assert node.attributes['invocations'].value == 2

# Create a new workflow runner, with an existing execution id. This would cause
# the old execution to restart.
new_thread_executor = thread.ThreadExecutor()
try:
new_wf_runner = WorkflowRunner(
service_id=wf_runner.service.id,
inputs={},
model_storage=workflow_context.model,
resource_storage=workflow_context.resource,
plugin_manager=None,
execution_id=wf_runner.execution.id,
executor=new_thread_executor)

new_wf_runner.execute()
finally:
new_thread_executor.close()

# Wait for it to finish and assert changes.
assert node.attributes['invocations'].value == task.max_attempts - 1
assert task.status == task.SUCCESS
assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED

@staticmethod
@pytest.fixture
def executor():
def thread_executor():
result = thread.ThreadExecutor()
try:
yield result
@@ -417,16 +521,23 @@ def _engine(workflow_func, workflow_context, executor):

@pytest.fixture(autouse=True)
def register_to_events(self):
def execution_cancelled(*args, **kwargs):
custom_events['execution_cancelled'].set()

def execution_ended(*args, **kwargs):
events['execution_ended'].set()
custom_events['execution_ended'].set()

on_cancelled_workflow_signal.connect(execution_ended)
events.on_cancelled_workflow_signal.connect(execution_cancelled)
events.on_failure_workflow_signal.connect(execution_ended)
yield
on_cancelled_workflow_signal.disconnect(execution_ended)
events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
events.on_failure_workflow_signal.disconnect(execution_ended)
for event in custom_events.values():
event.clear()


@workflow
def mock_workflow(ctx, graph):
def mock_parallel_workflow(ctx, graph):
node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
graph.add_tasks(
api.task.OperationTask(
@@ -441,8 +552,51 @@ def mock_resuming_task(ctx):
ctx.node.attributes['invocations'] += 1

if ctx.node.attributes['invocations'] != 1:
events['is_active'].set()
if not events['is_resumed'].isSet():
custom_events['is_active'].set()
if not custom_events['is_resumed'].isSet():
# if resume was called, increase by one. o/w fail the execution - second task should
# fail as long it was not a part of resuming the workflow
raise BaseException("wasn't resumed yet")
raise FailingTask("wasn't resumed yet")


@workflow
def mock_single_task_workflow(ctx, graph):
node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
graph.add_tasks(
api.task.OperationTask(node,
interface_name='aria.interfaces.lifecycle',
operation_name='create',
retry_interval=1,
max_attempts=10),
)


@operation
def mock_failed_before_resuming(ctx):
"""
The task should run atmost ctx.task.max_attempts - 1 times, and only then pass.
overall, the number of invocations should be ctx.task.max_attempts - 1
"""
ctx.node.attributes['invocations'] += 1

if ctx.node.attributes['invocations'] == 2:
custom_events['is_active'].set()
# unfreeze the thread only when all of the invocations are done
while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1:
time.sleep(5)

elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1:
# pass only just before the end.
return
else:
# fail o.w.
raise FailingTask("stop this task")


@operation
def mock_stuck_task(ctx):
ctx.node.attributes['invocations'] += 1
while not custom_events['is_resumed'].isSet():
if not custom_events['is_active'].isSet():
custom_events['is_active'].set()
time.sleep(5)

0 comments on commit b30a7ed

Please sign in to comment.