Skip to content

Commit

Permalink
support workflow convergence nodes
Browse files Browse the repository at this point in the history
* remove convergence restriction in API
* change task manager logic to be aware of and support convergence nodes
  • Loading branch information
chrismeyersfsu committed Oct 9, 2018
1 parent 1edede2 commit 3ce96f8
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 9 deletions.
2 changes: 0 additions & 2 deletions awx/api/views/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3465,8 +3465,6 @@ def is_valid_relation(self, parent, sub, created=False):
if not find:
sub_node = graph[sub.pk]
parent_node = graph[parent.pk]
if sub_node['metadata']['parent'] is not None:
return {"Error": _("Multiple parent relationship not allowed.")}
sub_node['metadata']['parent'] = parent_node
iter_node = sub_node
while iter_node is not None:
Expand Down
20 changes: 20 additions & 0 deletions awx/main/migrations/0050_v331_workflow_convergence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.11 on 2018-09-28 14:23
from __future__ import unicode_literals

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('main', '0049_v330_validate_instance_capacity_adjustment'),
]

operations = [
migrations.AddField(
model_name='workflowjobnode',
name='do_not_run',
field=models.BooleanField(default=False),
),
]
3 changes: 3 additions & 0 deletions awx/main/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ class WorkflowJobNode(WorkflowNodeBase):
default={},
editable=False,
)
do_not_run = models.BooleanField(
default=False
)

def get_absolute_url(self, request=None):
return reverse('api:workflow_job_node_detail', kwargs={'pk': self.pk}, request=request)
Expand Down
2 changes: 1 addition & 1 deletion awx/main/scheduler/dag_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def short_string_obj(obj):
for n in self.nodes:
doc += "%s [color = %s]\n" % (
short_string_obj(n['node_object']),
"red" if n['node_object'].status == 'running' else "black",
"red" if getattr(n['node_object'], 'status', 'N/A') == 'running' else "black",
)
for from_node, to_node, label in self.edges:
doc += "%s -> %s [ label=\"%s\" ];\n" % (
Expand Down
49 changes: 45 additions & 4 deletions awx/main/scheduler/dag_workflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@

# Python
import copy

# AWX
from awx.main.scheduler.dag_simple import SimpleDAG

Expand Down Expand Up @@ -30,19 +33,19 @@ def bfs_nodes_to_run(self):
obj = n['node_object']
job = obj.job

if not job:
if not job and obj.do_not_run is False:
nodes_found.append(n)
# Job is about to run or is running. Hold our horses and wait for
# the job to finish. We can't proceed down the graph path until we
# have the job result.
elif job.status not in ['failed', 'successful']:
elif job and job.status not in ['failed', 'successful']:
continue
elif job.status == 'failed':
elif job and job.status == 'failed':
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job.status == 'successful':
elif job and job.status == 'successful':
children_success = self.get_dependencies(obj, 'success_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_success + children_always
Expand Down Expand Up @@ -97,3 +100,41 @@ def is_workflow_done(self):
# have the job result.
return False, False
return True, is_failed

def mark_dnr_nodes(self):
root_nodes = self.get_root_nodes()
nodes = copy.copy(root_nodes)
nodes_marked_do_not_run = []

for index, n in enumerate(nodes):
obj = n['node_object']
job = obj.job

if not job and obj.do_not_run is False and n not in root_nodes:
parent_nodes = [p['node_object'] for p in self.get_dependents(obj)]
all_parents_dnr = True
for p in parent_nodes:
if not p.job and p.do_not_run is False:
all_parents_dnr = False
break
#all_parents_dnr = reduce(lambda p: bool(p.do_not_run == True), parent_nodes)
if all_parents_dnr:
obj.do_not_run = True
nodes_marked_do_not_run.append(n)

if obj.do_not_run:
children_success = self.get_dependencies(obj, 'success_nodes')
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job and job.status == 'failed':
children_failed = self.get_dependencies(obj, 'success_nodes')
children_all = children_failed
nodes.extend(children_all)
elif job and job.status == 'successful':
children_success = self.get_dependencies(obj, 'failure_nodes')
children_all = children_success
nodes.extend(children_all)
return [n['node_object'] for n in nodes_marked_do_not_run]

2 changes: 2 additions & 0 deletions awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ def process_finished_workflow_jobs(self, workflow_jobs):
else:
is_done, has_failed = dag.is_workflow_done()
if not is_done:
workflow_nodes = dag.mark_dnr_nodes()
map(lambda n: n.save(update_fields=['do_not_run']), workflow_nodes)
continue
result.append(workflow_job.id)
workflow_job.status = 'failed' if has_failed else 'successful'
Expand Down
55 changes: 53 additions & 2 deletions awx/main/tests/functional/models/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,59 @@ def test_workflow_not_finished(self):
self.assertFalse(has_failed)


@pytest.mark.django_db
class TestWorkflowDNR():
'success', 'new'

@pytest.fixture
def workflow_job_fn(self):
def fn(states=['new', 'new', 'new', 'new', 'new', 'new']):
"""
Workflow topology:
node[0]
/\
s/ \f
/ \
node[1] node[3]
/ \
s/ \f
/ \
node[2] node[4]
\ /
\ /
\ /
s f
\ /
\ /
node[5]
"""
wfj = WorkflowJob.objects.create()
jt = JobTemplate.objects.create(name='test-jt')
nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 6)]
for node, state in zip(nodes, states):
if state:
node.job = jt.create_job()
node.job.status = state
node.job.save()
node.save()
nodes[0].success_nodes.add(nodes[1])
nodes[1].success_nodes.add(nodes[2])
nodes[0].failure_nodes.add(nodes[3])
nodes[3].failure_nodes.add(nodes[4])
nodes[2].success_nodes.add(nodes[5])
nodes[4].failure_nodes.add(nodes[5])
return wfj, nodes
return fn

def test_workflow_dnr_because_parent(self, workflow_job_fn):
wfj, nodes = workflow_job_fn(states=['successful', None, None, None, None, None,])
dag = WorkflowDAG(workflow_job=wfj)
workflow_nodes = dag.mark_dnr_nodes()
assert 2 == len(workflow_nodes)
assert nodes[3] in workflow_nodes
assert nodes[4] in workflow_nodes


@pytest.mark.django_db
class TestWorkflowJob:
@pytest.fixture
Expand Down Expand Up @@ -192,8 +245,6 @@ def test_topology_validator(self, wfjt):
nodes[2].always_nodes.add(node_assoc)
# test cycle validation
assert test_view.is_valid_relation(node_assoc, nodes[0]) == {'Error': 'Cycle detected.'}
# test multi-ancestor validation
assert test_view.is_valid_relation(node_assoc, nodes[1]) == {'Error': 'Multiple parent relationship not allowed.'}
# test mutex validation
test_view.relationship = 'failure_nodes'
node_assoc_1 = WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt)
Expand Down

0 comments on commit 3ce96f8

Please sign in to comment.