Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
ARIA-214 Dry execution changes the state of non implemented operations
  • Loading branch information
mxmrlv committed May 7, 2017
1 parent 3e1ed14 commit 0ec237071ebdeb28cd2feabbc1b51854543d398d
Showing 9 changed files with 31 additions and 54 deletions.
@@ -163,9 +163,6 @@ def __init__(self, api_task, *args, **kwargs):
self._task_id = task_model.id
self._update_fields = None

def execute(self):
super(OperationTask, self).execute()

@contextmanager
def _update(self):
"""
@@ -48,11 +48,7 @@ def build_execution_graph(
execution_graph, dependencies, default=[start_task])

if isinstance(api_task, api.task.OperationTask):
if api_task.implementation:
operation_task = core_task.OperationTask(api_task, executor=default_executor)
else:
operation_task = core_task.OperationTask(api_task,
executor=base.EmptyOperationExecutor())
operation_task = core_task.OperationTask(api_task, executor=default_executor)
_add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
elif isinstance(api_task, api.task.WorkflowTask):
# Build the graph recursively while adding start and end markers
@@ -25,13 +25,22 @@ class BaseExecutor(logger.LoggerMixin):
"""
Base class for executors for running tasks
"""
def _execute(self, task):
raise NotImplementedError

def execute(self, task):
"""
Execute a task
:param task: task to execute
"""
raise NotImplementedError
if task.implementation:
self._execute(task)
else:
# In this case the task is missing an implementation. This task still gets to an
# executor, but since there is nothing to run, we by default simply skip the execution
# itself.
self._task_started(task)
self._task_succeeded(task)

def close(self):
"""
@@ -52,12 +61,6 @@ def _task_succeeded(task):
events.on_success_task_signal.send(task)


class StubTaskExecutor(BaseExecutor):
class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method
def execute(self, task):
task.status = task.SUCCESS


class EmptyOperationExecutor(BaseExecutor):
def execute(self, task):
events.start_task_signal.send(task)
events.on_success_task_signal.send(task)
@@ -42,7 +42,7 @@ def __init__(self, app, *args, **kwargs):
self._receiver_thread.start()
self._started_queue.get(timeout=30)

def execute(self, task):
def _execute(self, task):
self._tasks[task.id] = task
inputs = dict(inp.unwrap() for inp in task.inputs.values())
inputs['ctx'] = task.context
@@ -21,31 +21,31 @@
from .base import BaseExecutor


class DryExecutor(BaseExecutor):
class DryExecutor(BaseExecutor): # pylint: disable=abstract-method
"""
Executor which dry runs tasks - prints task information without causing any side effects
"""

def execute(self, task):
# updating the task manually instead of calling self._task_started(task),
# to avoid any side effects raising that event might cause
with task._update():
task.started_at = datetime.utcnow()
task.status = task.STARTED

if hasattr(task.actor, 'source_node'):
name = '{source_node.name}->{target_node.name}'.format(
source_node=task.actor.source_node, target_node=task.actor.target_node)
else:
name = task.actor.name
if task.implementation:
if hasattr(task.actor, 'source_node'):
name = '{source_node.name}->{target_node.name}'.format(
source_node=task.actor.source_node, target_node=task.actor.target_node)
else:
name = task.actor.name

task.context.logger.info(
'<dry> {name} {task.interface_name}.{task.operation_name} started...'
.format(name=name, task=task))
task.context.logger.info(
'<dry> {name} {task.interface_name}.{task.operation_name} started...'
.format(name=name, task=task))

task.context.logger.info(
'<dry> {name} {task.interface_name}.{task.operation_name} successful'
.format(name=name, task=task))
task.context.logger.info(
'<dry> {name} {task.interface_name}.{task.operation_name} successful'
.format(name=name, task=task))

# updating the task manually instead of calling self._task_succeeded(task),
# to avoid any side effects raising that event might cause
@@ -116,7 +116,7 @@ def close(self):
self._server_socket.close()
self._listener_thread.join(timeout=60)

def execute(self, task):
def _execute(self, task):
self._check_closed()
self._tasks[task.id] = task

@@ -46,7 +46,7 @@ def __init__(self, pool_size=1, *args, **kwargs):
thread.start()
self._pool.append(thread)

def execute(self, task):
def _execute(self, task):
self._queue.put(task)

def close(self):
@@ -24,7 +24,6 @@
api,
core,
exceptions,
executor
)

from tests import mock, storage
@@ -71,8 +70,7 @@ def _create_node_operation_task(self, ctx, node):
node,
interface_name=NODE_INTERFACE_NAME,
operation_name=NODE_OPERATION_NAME)
core_task = core.task.OperationTask(api_task=api_task,
executor=executor.base.EmptyOperationExecutor())
core_task = core.task.OperationTask(api_task=api_task, executor=None)
return api_task, core_task

def _create_relationship_operation_task(self, ctx, relationship):
@@ -81,8 +79,7 @@ def _create_relationship_operation_task(self, ctx, relationship):
relationship,
interface_name=RELATIONSHIP_INTERFACE_NAME,
operation_name=RELATIONSHIP_OPERATION_NAME)
core_task = core.task.OperationTask(api_task=api_task,
executor=executor.base.EmptyOperationExecutor())
core_task = core.task.OperationTask(api_task=api_task, executor=None)
return api_task, core_task

def test_node_operation_task_creation(self, ctx):
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import Queue

@@ -66,7 +65,7 @@ def handler(_, exception=None, **kwargs):
def test_closed(self, executor):
executor.close()
with pytest.raises(RuntimeError) as exc_info:
executor.execute(task=None)
executor.execute(task=MockTask(implementation='some.implementation'))
assert 'closed' in exc_info.value.message


@@ -82,18 +81,3 @@ def mock_plugin(plugin_manager, tmpdir):
source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
return plugin_manager.install(source=plugin_path)


class MockContext(object):

def __init__(self, *args, **kwargs):
self.logger = logging.getLogger('mock_logger')
self.task = type('SubprocessMockTask', (object, ), {'plugin': None})
self.serialization_dict = {'context_cls': self.__class__, 'context': {}}

def __getattr__(self, item):
return None

@classmethod
def deserialize_from_dict(cls, **kwargs):
return cls()

0 comments on commit 0ec2370

Please sign in to comment.