Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
ARIA-163 Update node state for empty tasks
Additional changes:
 * removed `for_node` and `for_relationship` from the api OperationTask.
 * api based OperationTask could also have an empty implementation.
 * each core task wields its own executor.
 * Reordered some of the helper functions for creating tasks.
 * intoduced 2 new executors: StubTaskExecutor (for stub tasks) and EmptyOperationExecutor (for empty tasks)
  • Loading branch information
mxmrlv committed May 4, 2017
1 parent 0878526 commit 8ca3ff297b71e270eccdd9a2e6b8bf468ccdab5d
Show file tree
Hide file tree
Showing 34 changed files with 361 additions and 429 deletions.
@@ -195,6 +195,8 @@ def emit(self, record):
except BaseException:
self._session.rollback()
raise
finally:
self._session.close()


_default_file_formatter = logging.Formatter(
@@ -21,6 +21,7 @@
from ....modeling import models
from ....modeling import utils as modeling_utils
from ....utils.uuid import generate_uuid
from .. import exceptions


class BaseTask(object):
@@ -71,102 +72,44 @@ def __init__(self,
Do not call this constructor directly. Instead, use :meth:`for_node` or
:meth:`for_relationship`.
"""

actor_type = type(actor).__name__.lower()
assert isinstance(actor, (models.Node, models.Relationship))
assert actor_type in ('node', 'relationship')
assert interface_name and operation_name
super(OperationTask, self).__init__()

self.actor = actor
self.max_attempts = (self.workflow_context._task_max_attempts
if max_attempts is None else max_attempts)
self.retry_interval = (self.workflow_context._task_retry_interval
if retry_interval is None else retry_interval)
self.ignore_failure = (self.workflow_context._task_ignore_failure
if ignore_failure is None else ignore_failure)
self.interface_name = interface_name
self.operation_name = operation_name
self.max_attempts = max_attempts or self.workflow_context._task_max_attempts
self.retry_interval = retry_interval or self.workflow_context._task_retry_interval
self.ignore_failure = \
self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure
self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(),
name=actor.name,
interface=self.interface_name,
operation=self.operation_name)
# Creating OperationTask directly should raise an error when there is no
# interface/operation.

if not has_operation(self.actor, self.interface_name, self.operation_name):
raise exceptions.OperationNotFoundException(
'Could not find operation "{self.operation_name}" on interface '
'"{self.interface_name}" for {actor_type} "{actor.name}"'.format(
self=self,
actor_type=type(actor).__name__.lower(),
actor=actor)
)

operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
self.plugin = operation.plugin
self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
self.implementation = operation.implementation
self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
name=actor.name,
interface=self.interface_name,
operation=self.operation_name)

def __repr__(self):
return self.name

@classmethod
def for_node(cls,
node,
interface_name,
operation_name,
max_attempts=None,
retry_interval=None,
ignore_failure=None,
inputs=None):
"""
Creates an operation on a node.
:param node: The node on which to run the operation
:param interface_name: The interface name
:param operation_name: The operation name within the interface
:param max_attempts: The maximum number of attempts in case the operation fails
(if not specified the defaults it taken from the workflow context)
:param retry_interval: The interval in seconds between attempts when the operation fails
(if not specified the defaults it taken from the workflow context)
:param ignore_failure: Whether to ignore failures
(if not specified the defaults it taken from the workflow context)
:param inputs: Additional operation inputs
"""

assert isinstance(node, models.Node)
return cls(
actor=node,
interface_name=interface_name,
operation_name=operation_name,
max_attempts=max_attempts,
retry_interval=retry_interval,
ignore_failure=ignore_failure,
inputs=inputs)

@classmethod
def for_relationship(cls,
relationship,
interface_name,
operation_name,
max_attempts=None,
retry_interval=None,
ignore_failure=None,
inputs=None):
"""
Creates an operation on a relationship edge.
:param relationship: The relationship on which to run the operation
:param interface_name: The interface name
:param operation_name: The operation name within the interface
:param max_attempts: The maximum number of attempts in case the operation fails
(if not specified the defaults it taken from the workflow context)
:param retry_interval: The interval in seconds between attempts when the operation fails
(if not specified the defaults it taken from the workflow context)
:param ignore_failure: Whether to ignore failures
(if not specified the defaults it taken from the workflow context)
:param inputs: Additional operation inputs
"""

assert isinstance(relationship, models.Relationship)
return cls(
actor=relationship,
interface_name=interface_name,
operation_name=operation_name,
max_attempts=max_attempts,
retry_interval=retry_interval,
ignore_failure=ignore_failure,
inputs=inputs)
class StubTask(BaseTask):
"""
Enables creating empty tasks.
"""


class WorkflowTask(BaseTask):
@@ -199,7 +142,83 @@ def __getattr__(self, item):
return super(WorkflowTask, self).__getattribute__(item)


class StubTask(BaseTask):
def create_task(actor, interface_name, operation_name, **kwargs):
"""
Enables creating empty tasks.
This helper function enables safe creation of OperationTask, if the supplied interface or
operation do not exist, None is returned.
:param actor: the actor for this task
:param interface_name: the name of the interface
:param operation_name: the name of the operation
:param kwargs: any additional kwargs to be passed to the task OperationTask
:return: and OperationTask or None (if the interface/operation does not exists)
"""
try:
return OperationTask(
actor,
interface_name=interface_name,
operation_name=operation_name,
**kwargs
)
except exceptions.OperationNotFoundException:
return None


def create_relationships_tasks(
node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs):
"""
Creates a relationship task (source and target) for all of a node_instance relationships.
:param basestring source_operation_name: the relationship operation name.
:param basestring interface_name: the name of the interface.
:param source_operation_name:
:param target_operation_name:
:param NodeInstance node: the source_node
:return:
"""
sub_tasks = []
for relationship in node.outbound_relationships:
relationship_operations = create_relationship_tasks(
relationship,
interface_name,
source_operation_name=source_operation_name,
target_operation_name=target_operation_name,
**kwargs)
sub_tasks.append(relationship_operations)
return sub_tasks


def create_relationship_tasks(relationship, interface_name, source_operation_name=None,
target_operation_name=None, **kwargs):
"""
Creates a relationship task source and target.
:param Relationship relationship: the relationship instance itself
:param source_operation_name:
:param target_operation_name:
:return:
"""
operations = []
if source_operation_name:
operations.append(
create_task(
relationship,
interface_name=interface_name,
operation_name=source_operation_name,
**kwargs
)
)
if target_operation_name:
operations.append(
create_task(
relationship,
interface_name=interface_name,
operation_name=target_operation_name,
**kwargs
)
)

return [o for o in operations if o]


def has_operation(actor, interface_name, operation_name):
interface = actor.interfaces.get(interface_name, None)
return interface and interface.operations.get(operation_name, False)
@@ -17,8 +17,8 @@
Builtin execute_operation workflow
"""

from . import utils
from ... import workflow
from ..api import task


@workflow
@@ -65,11 +65,11 @@ def execute_operation(
# registering actual tasks to sequences
for node in filtered_nodes:
graph.add_tasks(
_create_node_task(
node=node,
task.OperationTask(
node,
interface_name=interface_name,
operation_name=operation_name,
operation_kwargs=operation_kwargs
inputs=operation_kwargs
)
)

@@ -99,23 +99,3 @@ def _is_node_by_type(node_type):
_is_node_by_id(node.id),
_is_node_by_type(node.node_template.type))):
yield node


def _create_node_task(
node,
interface_name,
operation_name,
operation_kwargs):
"""
A workflow which executes a single operation
:param node: the node instance to install
:param basestring operation: the operation name
:param dict operation_kwargs:
:return:
"""

return utils.create_node_task(
node=node,
interface_name=interface_name,
operation_name=operation_name,
inputs=operation_kwargs)
@@ -103,7 +103,7 @@ def heal_uninstall(ctx, graph, failing_nodes, targeted_nodes):
graph.add_dependency(target_node_subgraph, node_sub_workflow)

if target_node in failing_nodes:
dependency = relationship_tasks(
dependency = task.create_relationship_tasks(
relationship=relationship,
operation_name='aria.interfaces.relationship_lifecycle.unlink')
graph.add_tasks(*dependency)
@@ -157,7 +157,7 @@ def heal_install(ctx, graph, failing_nodes, targeted_nodes):
graph.add_dependency(node_sub_workflow, target_node_subworkflow)

if target_node in failing_nodes:
dependent = relationship_tasks(
dependent = task.create_relationship_tasks(
relationship=relationship,
operation_name='aria.interfaces.relationship_lifecycle.establish')
graph.add_tasks(*dependent)
@@ -17,16 +17,15 @@
Builtin install workflow
"""

from .workflows import install_node
from .utils import create_node_task_dependencies
from ..api.task import WorkflowTask
from ... import workflow
from ..api import task as api_task
from . import workflows


@workflow
def install(ctx, graph):
tasks_and_nodes = []
for node in ctx.nodes:
tasks_and_nodes.append((WorkflowTask(install_node, node=node), node))
tasks_and_nodes.append((api_task.WorkflowTask(workflows.install_node, node=node), node))
graph.add_tasks([task for task, _ in tasks_and_nodes])
create_node_task_dependencies(graph, tasks_and_nodes)
workflows.create_node_task_dependencies(graph, tasks_and_nodes)
@@ -18,11 +18,11 @@
"""

from .workflows import start_node
from ..api.task import WorkflowTask
from ... import workflow
from ..api import task as api_task


@workflow
def start(ctx, graph):
for node in ctx.model.node.iter():
graph.add_tasks(WorkflowTask(start_node, node=node))
graph.add_tasks(api_task.WorkflowTask(start_node, node=node))
@@ -18,11 +18,11 @@
"""

from .workflows import stop_node
from ..api.task import WorkflowTask
from ..api import task as api_task
from ... import workflow


@workflow
def stop(ctx, graph):
for node in ctx.model.node.iter():
graph.add_tasks(WorkflowTask(stop_node, node=node))
graph.add_tasks(api_task.WorkflowTask(stop_node, node=node))
@@ -17,18 +17,15 @@
Builtin uninstall workflow
"""

from .workflows import uninstall_node
from .utils import create_node_task_dependencies
from ..api.task import WorkflowTask
from ... import workflow
from ..api import task as api_task
from . import workflows


@workflow
def uninstall(ctx, graph):
tasks_and_nodes = []
for node in ctx.nodes:
tasks_and_nodes.append((
WorkflowTask(uninstall_node, node=node),
node))
tasks_and_nodes.append((api_task.WorkflowTask(workflows.uninstall_node, node=node), node))
graph.add_tasks([task for task, _ in tasks_and_nodes])
create_node_task_dependencies(graph, tasks_and_nodes, reverse=True)
workflows.create_node_task_dependencies(graph, tasks_and_nodes, reverse=True)

0 comments on commit 8ca3ff2

Please sign in to comment.