Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
ARIA-120-Builtin-workflows-relationship-operations-execution-order
  • Loading branch information
mxmrlv committed Mar 28, 2017
1 parent 0e10793 commit 07cbfcdabc207452acda33fa9f1ababf97e5d260
Show file tree
Hide file tree
Showing 16 changed files with 323 additions and 186 deletions.
@@ -308,7 +308,7 @@ def many_to_many(model_class,
if prefix is not None:
secondary_table = '{0}_{1}'.format(prefix, secondary_table)
if other_property is None:
other_property = '{0}_{1}'.format(prefix, this_table)
other_property = '{0}_{1}'.format(prefix, formatting.pluralize(this_table))

backref_kwargs = backref_kwargs or {}
backref_kwargs.setdefault('uselist', True)
@@ -90,6 +90,8 @@ def __init__(self,
self.ignore_failure = (self.workflow_context._task_ignore_failure
if ignore_failure is None else ignore_failure)
self.runs_on = runs_on
self.interface_name = interface_name
self.operation_name = operation_name

# Wrap inputs
inputs = copy.deepcopy(inputs) if inputs else {}
@@ -101,27 +103,38 @@ def __init__(self,
# model, because they are different from the operation inputs. If we do this, then the two
# kinds of inputs should *not* be merged here.

operation = self._get_operation(interface_name, operation_name)
operation = self._get_operation()
if operation is None:
raise exceptions.OperationNotFoundException(
'Could not find operation "{0}" on interface "{1}" for {2} "{3}"'
.format(operation_name, interface_name, actor_type, actor.name))
.format(self.operation_name, self.interface_name, actor_type, actor.name))

self.plugin = None
if operation.plugin_specification:
self.plugin = OperationTask._find_plugin(operation.plugin_specification)
if self.plugin is None:
raise exceptions.PluginNotFoundException(
'Could not find plugin of operation "{0}" on interface "{1}" for {2} "{3}"'
.format(operation_name, interface_name, actor_type, actor.name))
.format(self.operation_name, self.interface_name, actor_type, actor.name))

self.implementation = operation.implementation
self.inputs = OperationTask._merge_inputs(operation.inputs, inputs)

self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
name=actor.name,
interface=interface_name,
operation=operation_name)
interface=self.interface_name,
operation=self.operation_name)

def __repr__(self):
return self.name

def _get_operation(self):
interface = self.actor.interfaces.get(self.interface_name)
if interface:
return interface.operations.get(self.operation_name)
return None



@classmethod
def for_node(cls,
@@ -198,12 +211,6 @@ def for_relationship(cls,
ignore_failure=ignore_failure,
inputs=inputs)

def _get_operation(self, interface_name, operation_name):
interface = self.actor.interfaces.get(interface_name)
if interface is not None:
return interface.operations.get(operation_name)
return None

@staticmethod
def _find_plugin(plugin_specification):
workflow_context = context.workflow.current.get()
@@ -12,12 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ..api.task import OperationTask
from .. import exceptions


def create_node_task(interface_name, operation_name, node):
def create_node_task(node, interface_name, operation_name):
"""
Returns a new operation task if the operation exists in the node, otherwise returns None.
"""
@@ -31,24 +30,59 @@ def create_node_task(interface_name, operation_name, node):
return None


def create_relationship_tasks(interface_name, operation_name, runs_on, node):
def create_relationships_tasks(
node, interface_name, source_operation_name=None, target_operation_name=None):
"""
Returns a list of operation tasks for each outbound relationship of the node if the operation
exists there.
Creates a relationship task (source and target) for all of a node_instance relationships.
:param basestring source_operation_name: the relationship operation name.
:param basestring interface_name: the name of the interface.
:param source_operation_name:
:param target_operation_name:
:param NodeInstance node: the source_node
:return:
"""

sequence = []
sub_tasks = []
for relationship in node.outbound_relationships:
try:
sequence.append(
OperationTask.for_relationship(relationship=relationship,
interface_name=interface_name,
operation_name=operation_name,
runs_on=runs_on))
relationship_operations = relationship_tasks(
relationship,
interface_name,
source_operation_name=source_operation_name,
target_operation_name=target_operation_name)
sub_tasks.append(relationship_operations)
except exceptions.OperationNotFoundException:
# We will skip relationships which do not have the operation
pass
return sequence
return sub_tasks


def relationship_tasks(
relationship, interface_name, source_operation_name=None, target_operation_name=None):
"""
Creates a relationship task source and target.
:param Relationship relationship: the relationship instance itself
:param source_operation_name:
:param target_operation_name:
:return:
"""
operations = []
if source_operation_name:
operations.append(
OperationTask.for_relationship(relationship=relationship,
interface_name=interface_name,
operation_name=source_operation_name,
runs_on='source')
)
if target_operation_name:
operations.append(
OperationTask.for_relationship(relationship=relationship,
interface_name=interface_name,
operation_name=target_operation_name,
runs_on='target')
)

return operations


def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False):
@@ -18,19 +18,21 @@
"""

from ... import workflow
from ....modeling.models import Task
from .utils import (create_node_task, create_relationship_tasks)
from .utils import (
create_node_task,
create_relationships_tasks
)


NORMATIVE_STANDARD_INTERFACE = 'Standard' # 'tosca.interfaces.node.lifecycle.Standard'
NORMATIVE_CONFIGURE_INTERFACE = 'Configure' # 'tosca.interfaces.relationship.Configure'

NORMATIVE_CREATE = 'create'
NORMATIVE_CONFIGURE = 'configure'
NORMATIVE_START = 'start'
NORMATIVE_STOP = 'stop'
NORMATIVE_DELETE = 'delete'

NORMATIVE_CONFIGURE = 'configure'
NORMATIVE_PRE_CONFIGURE_SOURCE = 'pre_configure_source'
NORMATIVE_PRE_CONFIGURE_TARGET = 'pre_configure_target'
NORMATIVE_POST_CONFIGURE_SOURCE = 'post_configure_source'
@@ -39,6 +41,7 @@
NORMATIVE_ADD_SOURCE = 'add_source'
NORMATIVE_ADD_TARGET = 'add_target'
NORMATIVE_REMOVE_TARGET = 'remove_target'
NORMATIVE_REMOVE_SOURCE = 'remove_source'
NORMATIVE_TARGET_CHANGED = 'target_changed'


@@ -56,6 +59,7 @@
'NORMATIVE_POST_CONFIGURE_TARGET',
'NORMATIVE_ADD_SOURCE',
'NORMATIVE_ADD_TARGET',
'NORMATIVE_REMOVE_SOURCE',
'NORMATIVE_REMOVE_TARGET',
'NORMATIVE_TARGET_CHANGED',
'install_node',
@@ -67,40 +71,20 @@

@workflow(suffix_template='{node.name}')
def install_node(graph, node, **kwargs):
sequence = []

# Create
sequence.append(
create_node_task(
NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE,
node))
sequence = [create_node_task(node,
NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)]

# Configure
sequence += \
create_relationship_tasks(
NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_PRE_CONFIGURE_SOURCE,
Task.RUNS_ON_SOURCE,
node)
sequence += \
create_relationship_tasks(
NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_PRE_CONFIGURE_TARGET,
Task.RUNS_ON_TARGET,
node)
sequence.append(
create_node_task(
NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE,
node))
sequence += \
create_relationship_tasks(
NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_POST_CONFIGURE_SOURCE,
Task.RUNS_ON_SOURCE,
node)
sequence += \
create_relationship_tasks(
NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_POST_CONFIGURE_TARGET,
Task.RUNS_ON_TARGET,
node)

sequence += create_relationships_tasks(node,
NORMATIVE_CONFIGURE_INTERFACE,
NORMATIVE_PRE_CONFIGURE_SOURCE,
NORMATIVE_PRE_CONFIGURE_TARGET)
sequence.append(create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE))
sequence += create_relationships_tasks(node,
NORMATIVE_CONFIGURE_INTERFACE,
NORMATIVE_POST_CONFIGURE_SOURCE,
NORMATIVE_POST_CONFIGURE_TARGET)
# Start
sequence += _create_start_tasks(node)

@@ -113,10 +97,9 @@ def uninstall_node(graph, node, **kwargs):
sequence = _create_stop_tasks(node)

# Delete
sequence.append(
create_node_task(
NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE,
node))
sequence.append(create_node_task(node,
NORMATIVE_STANDARD_INTERFACE,
NORMATIVE_DELETE))

graph.sequence(*sequence)

@@ -132,43 +115,16 @@ def stop_node(graph, node, **kwargs):


def _create_start_tasks(node):
sequence = []
sequence.append(
create_node_task(
NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START,
node))
sequence += \
create_relationship_tasks(
NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_ADD_SOURCE,
Task.RUNS_ON_SOURCE,
node)
sequence += \
create_relationship_tasks(
NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_ADD_TARGET,
Task.RUNS_ON_TARGET,
node)
sequence += \
create_relationship_tasks(
NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_TARGET_CHANGED,
Task.RUNS_ON_TARGET,
node)
sequence = [create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)]
sequence += create_relationships_tasks(node,
NORMATIVE_CONFIGURE_INTERFACE,
NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET)
return sequence


def _create_stop_tasks(node):
sequence = []
sequence += \
create_relationship_tasks(
NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_REMOVE_TARGET,
Task.RUNS_ON_TARGET,
node)
sequence += \
create_relationship_tasks(
NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_TARGET_CHANGED,
Task.RUNS_ON_TARGET,
node)
sequence.append(
create_node_task(
NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP,
node))
sequence = [create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)]
sequence += create_relationships_tasks(node,
NORMATIVE_CONFIGURE_INTERFACE,
NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET)
return sequence
@@ -20,7 +20,6 @@
Implementation of storage handlers for workflow and operation events.
"""

import re
from datetime import (
datetime,
timedelta,
@@ -126,11 +125,9 @@ def _workflow_cancelling(workflow_context, *args, **kwargs):


def _update_node_state_if_necessary(task, is_transitional=False):
match = re.search(r'^(?:tosca.interfaces.node.lifecycle.Standard|Standard):(\S+)@node',
task.name)
if match:
if task.interface_name in ['tosca.interfaces.node.lifecycle.Standard', 'Standard']:
node = task.runs_on
state = node.determine_state(op_name=match.group(1), is_transitional=is_transitional)
state = node.determine_state(op_name=task.operation_name, is_transitional=is_transitional)
if state:
node.state = state
task.context.model.node.update(node)
@@ -106,6 +106,8 @@ class OperationTask(BaseTask):
def __init__(self, api_task, *args, **kwargs):
super(OperationTask, self).__init__(id=api_task.id, **kwargs)
self._workflow_context = api_task._workflow_context
self.interface_name = api_task.interface_name
self.operation_name = api_task.operation_name
model_storage = api_task._workflow_context.model
plugin = api_task.plugin

@@ -27,9 +27,10 @@
from .topology import create_simple_topology_two_nodes


def simple(tmpdir, inmemory=False, context_kwargs=None):
def simple(tmpdir, inmemory=False, context_kwargs=None, topology=None):
initiator = init_inmemory_model_storage if inmemory else None
initiator_kwargs = {} if inmemory else dict(base_dir=tmpdir)
topology = topology or create_simple_topology_two_nodes

model_storage = aria.application_model_storage(
sql_mapi.SQLAlchemyModelAPI, initiator=initiator, initiator_kwargs=initiator_kwargs)
@@ -38,13 +39,11 @@ def simple(tmpdir, inmemory=False, context_kwargs=None):
api_kwargs=dict(directory=os.path.join(tmpdir, 'resources'))
)

service_id = create_simple_topology_two_nodes(model_storage)

final_kwargs = dict(
name='simple_context',
model_storage=model_storage,
resource_storage=resource_storage,
service_id=service_id,
service_id=topology(model_storage),
workflow_name=models.WORKFLOW_NAME,
task_max_attempts=models.TASK_MAX_ATTEMPTS,
task_retry_interval=models.TASK_RETRY_INTERVAL

0 comments on commit 07cbfcd

Please sign in to comment.