Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
ARIA-294 Workflow tasks execution is not in order
  • Loading branch information
mxmrlv committed Jun 28, 2017
1 parent 7bba3ab commit b1b1ee4457dce0b851277e5289faa2178f638f48
Showing 10 changed files with 134 additions and 101 deletions.
@@ -396,14 +396,9 @@ def abort(message=None):
def retry(message=None, retry_interval=None):
raise TaskRetryException(message, retry_interval=retry_interval)

@declared_attr
def dependency_fk(self):
return relationship.foreign_key('task', nullable=True)

@declared_attr
def dependencies(cls):
# symmetric relationship causes funky graphs
return relationship.one_to_many_self(cls, 'dependency_fk')
return relationship.many_to_many(cls, self=True)

def has_ended(self):
return self.status in (self.SUCCESS, self.FAILED)
@@ -90,35 +90,6 @@ def one_to_one_self(model_class, fk):
)


def one_to_many_self(model_class, fk, dict_key=None):
"""
Declare a one-to-many relationship property. The property value would be a list or dict of
instances of the same model.
You will need an associated foreign key to our own table.
*This utility method should only be used during class creation.*
:param model_class: The class in which this relationship will be declared
:type model_class: type
:param fk: Foreign key name
:type fk: basestring
:param dict_key: If set the value will be a dict with this key as the dict key; otherwise will
be a list
:type dict_key: basestring
"""
return _relationship(
model_class,
model_class.__tablename__,
relationship_kwargs={
'remote_side': '{model_class}.{remote_column}'.format(
model_class=model_class.__name__, remote_column=fk)
},
back_populates=False,
dict_key=dict_key
)


def one_to_one(model_class,
other_table,
fk=None,
@@ -162,11 +133,12 @@ def one_to_one(model_class,


def one_to_many(model_class,
child_table,
child_fk=None,
other_table=None,
other_fk=None,
dict_key=None,
back_populates=None,
rel_kwargs=None):
rel_kwargs=None,
self=False):
"""
Declare a one-to-many relationship property. The property value would be a list or dict of
instances of the child table's model.
@@ -181,28 +153,38 @@ def one_to_many(model_class,
:param model_class: The class in which this relationship will be declared
:type model_class: type
:param child_table: Child table name
:type child_table: basestring
:param child_fk: Foreign key name at the child table (no need specify if there's no ambiguity)
:type child_fk: basestring
:type other_table: basestring
:param other_fk: Foreign key name at the child table (no need specify if there's no ambiguity)
:type other_fk: basestring
:param dict_key: If set the value will be a dict with this key as the dict key; otherwise will
be a list
:type dict_key: basestring
:param back_populates: Override name of matching many-to-one property at child table; set to
false to disable
:type back_populates: basestring|bool
"""
rel_kwargs = rel_kwargs or {}
rel_kwargs.setdefault('cascade', 'all')
if back_populates is None:
back_populates = model_class.__tablename__
relationship_kwargs = rel_kwargs or {}
if self:
assert other_fk
other_table_name = model_class.__tablename__
back_populates = False
relationship_kwargs['remote_side'] = '{model}.{column}'.format(model=model_class.__name__,
column=other_fk)

else:
assert other_table
other_table_name = other_table
if back_populates is None:
back_populates = model_class.__tablename__
relationship_kwargs.setdefault('cascade', 'all')

return _relationship(
model_class,
child_table,
other_table_name,
back_populates=back_populates,
other_fk=child_fk,
other_fk=other_fk,
dict_key=dict_key,
relationship_kwargs=rel_kwargs)
relationship_kwargs=relationship_kwargs)


def many_to_one(model_class,
@@ -244,10 +226,11 @@ def many_to_one(model_class,


def many_to_many(model_class,
other_table,
other_table=None,
prefix=None,
dict_key=None,
other_property=None):
other_property=None,
self=False):
"""
Declare a many-to-many relationship property. The property value would be a list or dict of
instances of the other table's model.
@@ -280,8 +263,11 @@ def many_to_many(model_class,
this_column_name = '{0}_id'.format(this_table)
this_foreign_key = '{0}.id'.format(this_table)

other_column_name = '{0}_id'.format(other_table)
other_foreign_key = '{0}.id'.format(other_table)
if self:
other_table = this_table

other_column_name = '{0}_{1}'.format(other_table, 'self_ref_id' if self else 'id')
other_foreign_key = '{0}.{1}'.format(other_table, 'id')

secondary_table_name = '{0}_{1}'.format(this_table, other_table)

@@ -299,13 +285,20 @@ def many_to_many(model_class,
other_foreign_key
)

return _relationship(
model_class,
other_table,
relationship_kwargs={'secondary': secondary_table},
backref_kwargs={'name': other_property, 'uselist': True} if other_property else None,
dict_key=dict_key
)
kwargs = {'relationship_kwargs': {'secondary': secondary_table}}

if self:
kwargs['back_populates'] = NO_BACK_POP
kwargs['relationship_kwargs']['primaryjoin'] = \
getattr(model_class, 'id') == getattr(secondary_table.c, this_column_name)
kwargs['relationship_kwargs']['secondaryjoin'] = \
getattr(model_class, 'id') == getattr(secondary_table.c, other_column_name)
else:
kwargs['backref_kwargs'] = \
{'name': other_property, 'uselist': True} if other_property else None
kwargs['dict_key'] = dict_key

return _relationship(model_class, other_table, **kwargs)


def _relationship(model_class,
@@ -368,14 +361,6 @@ def _get_secondary_table(metadata,
return Table(
name,
metadata,
Column(
first_column,
Integer,
ForeignKey(first_foreign_key)
),
Column(
second_column,
Integer,
ForeignKey(second_foreign_key)
)
Column(first_column, Integer, ForeignKey(first_foreign_key)),
Column(second_column, Integer, ForeignKey(second_foreign_key))
)
@@ -320,7 +320,7 @@ def parent(cls):

@declared_attr
def children(cls):
return relationship.one_to_many_self(cls, 'parent_type_fk')
return relationship.one_to_many(cls, other_fk='parent_type_fk', self=True)

# region foreign keys

@@ -483,7 +483,7 @@ def capabilities(cls):
@declared_attr
def outbound_relationships(cls):
return relationship.one_to_many(
cls, 'relationship', child_fk='source_node_fk', back_populates='source_node',
cls, 'relationship', other_fk='source_node_fk', back_populates='source_node',
rel_kwargs=dict(
order_by='Relationship.source_position',
collection_class=ordering_list('source_position', count_from=0)
@@ -493,7 +493,7 @@ def outbound_relationships(cls):
@declared_attr
def inbound_relationships(cls):
return relationship.one_to_many(
cls, 'relationship', child_fk='target_node_fk', back_populates='target_node',
cls, 'relationship', other_fk='target_node_fk', back_populates='target_node',
rel_kwargs=dict(
order_by='Relationship.target_position',
collection_class=ordering_list('target_position', count_from=0)
@@ -493,7 +493,7 @@ def capability_templates(cls):

@declared_attr
def requirement_templates(cls):
return relationship.one_to_many(cls, 'requirement_template', child_fk='node_template_fk')
return relationship.one_to_many(cls, 'requirement_template', other_fk='node_template_fk')

@declared_attr
def properties(cls):
@@ -97,8 +97,8 @@ def __init__(self, model_storage, resource_storage, plugin_manager,
if not self._is_resume:
workflow_fn = self._get_workflow_fn()
self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
graph_compiler.GraphCompiler(self._workflow_context, executor.__class__).compile(
self._tasks_graph)
compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__)
compiler.compile(self._tasks_graph)

self._engine = engine.Engine(executors={executor.__class__: executor})

@@ -37,7 +37,6 @@ def compile(self,
:param end_stub_type: internal use
:param depends_on: internal use
"""
task_graph = task_graph or self._task_graph
depends_on = list(depends_on)

# Insert start marker
@@ -110,8 +109,7 @@ def _get_tasks_from_dependencies(self, dependencies):
"""
tasks = []
for dependency in dependencies:
if getattr(dependency, 'actor', False):
# This is
if isinstance(dependency, (api.task.StubTask, api.task.OperationTask)):
dependency_name = dependency.id
else:
dependency_name = self._end_graph_suffix(dependency.id)
@@ -350,11 +350,11 @@ def test_resume_workflow(self, workflow_context, executor):
if events['execution_ended'].wait(60) is False:
raise TimeoutError("Execution did not end")

first_task, second_task = workflow_context.model.task.list(filters={'_stub_type': None})
assert first_task.status == first_task.SUCCESS
assert second_task.status in (second_task.FAILED, second_task.RETRYING)
tasks = workflow_context.model.task.list(filters={'_stub_type': None})
assert any(task.status == task.SUCCESS for task in tasks)
assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks)
events['is_resumed'].set()
assert second_task.status in (second_task.FAILED, second_task.RETRYING)
assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks)

# Create a new workflow runner, with an existing execution id. This would cause
# the old execution to restart.
@@ -370,7 +370,7 @@ def test_resume_workflow(self, workflow_context, executor):
new_wf_runner.execute()

# Wait for it to finish and assert changes.
assert second_task.status == second_task.SUCCESS
assert all(task.status == task.SUCCESS for task in tasks)
assert node.attributes['invocations'].value == 3
assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED

0 comments on commit b1b1ee4

Please sign in to comment.