Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
ARIA-414 Current events handler mechanism relies on sqlalchemy
  • Loading branch information
mxmrlv authored and aviaefrat committed Dec 4, 2017
1 parent 3c662b8 commit bc2701c005adbd9f737b534c3e2d7957a9981149
Showing 9 changed files with 183 additions and 169 deletions.
@@ -18,7 +18,6 @@
"""

import threading
from contextlib import contextmanager

import aria
from aria.utils import file
@@ -106,12 +105,6 @@ def close(self):
self.model.log._session.remove()
self.model.log._engine.dispose()

@property
@contextmanager
def persist_changes(self):
yield
self.model.task.update(self.task)


class NodeOperationContext(BaseOperationContext):
"""
@@ -93,12 +93,6 @@ def nodes(self):
}
)

@property
@contextmanager
def persist_changes(self):
yield
self._model.execution.update(self.execution)


class _CurrentContext(threading.local):
"""
@@ -28,126 +28,130 @@

@events.sent_task_signal.connect
def _task_sent(ctx, *args, **kwargs):
with ctx.persist_changes:
ctx.task.status = ctx.task.SENT
task = ctx.task
task.status = ctx.task.SENT
ctx.model.task.update(task)


@events.start_task_signal.connect
def _task_started(ctx, *args, **kwargs):
with ctx.persist_changes:
ctx.task.started_at = datetime.utcnow()
ctx.task.status = ctx.task.STARTED
_update_node_state_if_necessary(ctx, is_transitional=True)
task = ctx.task
ctx.task.started_at = datetime.utcnow()
ctx.task.status = ctx.task.STARTED
_update_node_state_if_necessary(ctx, is_transitional=True)
ctx.model.task.update(task)


@events.on_failure_task_signal.connect
def _task_failed(ctx, exception, *args, **kwargs):
with ctx.persist_changes:
should_retry = all([
not isinstance(exception, exceptions.TaskAbortException),
ctx.task.attempts_count < ctx.task.max_attempts or
ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
# ignore_failure check here means the task will not be retried and it will be marked
# as failed. The engine will also look at ignore_failure so it won't fail the
# workflow.
not ctx.task.ignore_failure
])
if should_retry:
retry_interval = None
if isinstance(exception, exceptions.TaskRetryException):
retry_interval = exception.retry_interval
if retry_interval is None:
retry_interval = ctx.task.retry_interval
ctx.task.status = ctx.task.RETRYING
ctx.task.attempts_count += 1
ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
else:
ctx.task.ended_at = datetime.utcnow()
ctx.task.status = ctx.task.FAILED
task = ctx.task
should_retry = all([
not isinstance(exception, exceptions.TaskAbortException),
task.attempts_count < task.max_attempts or
task.max_attempts == task.INFINITE_RETRIES,
# ignore_failure check here means the task will not be retried and it will be marked
# as failed. The engine will also look at ignore_failure so it won't fail the
# workflow.
not task.ignore_failure
])
if should_retry:
retry_interval = None
if isinstance(exception, exceptions.TaskRetryException):
retry_interval = exception.retry_interval
if retry_interval is None:
retry_interval = ctx.task.retry_interval
task.status = task.RETRYING
task.attempts_count += 1
task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
else:
task.ended_at = datetime.utcnow()
task.status = task.FAILED
ctx.model.task.update(task)


@events.on_success_task_signal.connect
def _task_succeeded(ctx, *args, **kwargs):
with ctx.persist_changes:
ctx.task.ended_at = datetime.utcnow()
ctx.task.status = ctx.task.SUCCESS
ctx.task.attempts_count += 1
task = ctx.task
ctx.task.ended_at = datetime.utcnow()
ctx.task.status = ctx.task.SUCCESS
ctx.task.attempts_count += 1

_update_node_state_if_necessary(ctx)
_update_node_state_if_necessary(ctx)
ctx.model.task.update(task)


@events.start_workflow_signal.connect
def _workflow_started(workflow_context, *args, **kwargs):
with workflow_context.persist_changes:
execution = workflow_context.execution
# the execution may already be in the process of cancelling
if execution.status in (execution.CANCELLING, execution.CANCELLED):
return
execution.status = execution.STARTED
execution.started_at = datetime.utcnow()
execution = workflow_context.execution
# the execution may already be in the process of cancelling
if execution.status in (execution.CANCELLING, execution.CANCELLED):
return
execution.status = execution.STARTED
execution.started_at = datetime.utcnow()
workflow_context.model.execution.update(execution)


@events.on_failure_workflow_signal.connect
def _workflow_failed(workflow_context, exception, *args, **kwargs):
with workflow_context.persist_changes:
execution = workflow_context.execution
execution.error = str(exception)
execution.status = execution.FAILED
execution.ended_at = datetime.utcnow()
execution = workflow_context.execution
execution.error = str(exception)
execution.status = execution.FAILED
execution.ended_at = datetime.utcnow()
workflow_context.model.execution.update(execution)


@events.on_success_workflow_signal.connect
def _workflow_succeeded(workflow_context, *args, **kwargs):
with workflow_context.persist_changes:
execution = workflow_context.execution
execution.status = execution.SUCCEEDED
execution.ended_at = datetime.utcnow()
execution = workflow_context.execution
execution.status = execution.SUCCEEDED
execution.ended_at = datetime.utcnow()
workflow_context.model.execution.update(execution)


@events.on_cancelled_workflow_signal.connect
def _workflow_cancelled(workflow_context, *args, **kwargs):
with workflow_context.persist_changes:
execution = workflow_context.execution
# _workflow_cancelling function may have called this function already
if execution.status == execution.CANCELLED:
return
# the execution may have already been finished
elif execution.status in (execution.SUCCEEDED, execution.FAILED):
_log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
else:
execution.status = execution.CANCELLED
execution.ended_at = datetime.utcnow()
execution = workflow_context.execution
# _workflow_cancelling function may have called this function already
if execution.status == execution.CANCELLED:
return
# the execution may have already been finished
elif execution.status in (execution.SUCCEEDED, execution.FAILED):
_log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
else:
execution.status = execution.CANCELLED
execution.ended_at = datetime.utcnow()
workflow_context.model.execution.update(execution)


@events.on_resume_workflow_signal.connect
def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs):
with workflow_context.persist_changes:
execution = workflow_context.execution
execution.status = execution.PENDING
# Any non ended task would be put back to pending state
execution = workflow_context.execution
execution.status = execution.PENDING
# Any non ended task would be put back to pending state
for task in execution.tasks:
if not task.has_ended():
task.status = task.PENDING

if retry_failed:
for task in execution.tasks:
if not task.has_ended():
if task.status == task.FAILED and not task.ignore_failure:
task.attempts_count = 0
task.status = task.PENDING

if retry_failed:
for task in execution.tasks:
if task.status == task.FAILED and not task.ignore_failure:
task.attempts_count = 0
task.status = task.PENDING
workflow_context.model.execution.update(execution)



@events.on_cancelling_workflow_signal.connect
def _workflow_cancelling(workflow_context, *args, **kwargs):
with workflow_context.persist_changes:
execution = workflow_context.execution
if execution.status == execution.PENDING:
return _workflow_cancelled(workflow_context=workflow_context)
# the execution may have already been finished
elif execution.status in (execution.SUCCEEDED, execution.FAILED):
_log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
else:
execution.status = execution.CANCELLING
execution = workflow_context.execution
if execution.status == execution.PENDING:
return _workflow_cancelled(workflow_context=workflow_context)
# the execution may have already been finished
elif execution.status in (execution.SUCCEEDED, execution.FAILED):
_log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
else:
execution.status = execution.CANCELLING
workflow_context.model.execution.update(execution)


def _update_node_state_if_necessary(ctx, is_transitional=False):
@@ -71,5 +71,6 @@ def _task_succeeded(ctx):

class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method
def execute(self, ctx, *args, **kwargs):
with ctx.persist_changes:
ctx.task.status = ctx.task.SUCCESS
task = ctx.task
task.status = ctx.task.SUCCESS
ctx.model.task.update(task)
@@ -27,28 +27,29 @@ class DryExecutor(base.BaseExecutor):
Dry task executor: prints task information without causing any side effects.
"""
def execute(self, ctx):
with ctx.persist_changes:
# updating the task manually instead of calling self._task_started(task),
# to avoid any side effects raising that event might cause
ctx.task.started_at = datetime.utcnow()
ctx.task.status = ctx.task.STARTED

dry_msg = u'<dry> {name} {task.interface_name}.{task.operation_name} {suffix}'
logger = ctx.logger.info if ctx.task.function else ctx.logger.debug

if hasattr(ctx.task.actor, 'source_node'):
name = u'{source_node.name}->{target_node.name}'.format(
source_node=ctx.task.actor.source_node, target_node=ctx.task.actor.target_node)
else:
name = ctx.task.actor.name

if ctx.task.function:
logger(dry_msg.format(name=name, task=ctx.task, suffix='started...'))
logger(dry_msg.format(name=name, task=ctx.task, suffix='successful'))
else:
logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation'))

# updating the task manually instead of calling self._task_succeeded(task),
# to avoid any side effects raising that event might cause
ctx.task.ended_at = datetime.utcnow()
ctx.task.status = ctx.task.SUCCESS
task = ctx.task
# updating the task manually instead of calling self._task_started(task),
# to avoid any side effects raising that event might cause
task.started_at = datetime.utcnow()
task.status = task.STARTED

dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}'
logger = ctx.logger.info if task.function else ctx.logger.debug

if hasattr(task.actor, 'source_node'):
name = '{source_node.name}->{target_node.name}'.format(
source_node=task.actor.source_node, target_node=task.actor.target_node)
else:
name = task.actor.name

if task.function:
logger(dry_msg.format(name=name, task=ctx.task, suffix='started...'))
logger(dry_msg.format(name=name, task=ctx.task, suffix='successful'))
else:
logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation'))

# updating the task manually instead of calling self._task_succeeded(task),
# to avoid any side effects raising that event might cause
task.ended_at = datetime.utcnow()
task.status = task.SUCCESS
ctx.model.task.update(task)
@@ -25,10 +25,12 @@ class MockContext(object):

INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS

def __init__(self, storage, task_kwargs=None):
def __init__(self, storage=None, task_kwargs=None):
self.logger = logging.getLogger('mock_logger')
self._task_kwargs = task_kwargs or {}
self._storage = storage
import mock
self._storage = storage or mock.MagicMock()
self._storage_kwargs = self._storage.serialization_dict if storage else None
self.task = MockTask(storage, **task_kwargs)
self.states = []
self.exception = None
@@ -38,7 +40,7 @@ def serialization_dict(self):
return {
'context_cls': self.__class__,
'context': {
'storage_kwargs': self._storage.serialization_dict,
'storage_kwargs': self._storage_kwargs,
'task_kwargs': self._task_kwargs
}
}
@@ -55,13 +57,11 @@ def model(self):

@classmethod
def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None):
return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),
task_kwargs=(task_kwargs or {}))

@property
@contextmanager
def persist_changes(self):
yield
if storage_kwargs:
return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),
task_kwargs=task_kwargs)
else:
return cls(task_kwargs=task_kwargs)


class MockActor(object):

0 comments on commit bc2701c

Please sign in to comment.