From 3ce96f89153ba0c49f112cb61dae4fd6eb45fd74 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Thu, 27 Sep 2018 15:47:51 -0400 Subject: [PATCH] support workflow convergence nodes * remove convergence restriction in API * change task manager logic to be aware of and support convergence nodes --- awx/api/views/__init__.py | 2 - .../0050_v331_workflow_convergence.py | 20 +++++++ awx/main/models/workflow.py | 3 + awx/main/scheduler/dag_simple.py | 2 +- awx/main/scheduler/dag_workflow.py | 49 +++++++++++++++-- awx/main/scheduler/task_manager.py | 2 + .../tests/functional/models/test_workflow.py | 55 ++++++++++++++++++- 7 files changed, 124 insertions(+), 9 deletions(-) create mode 100644 awx/main/migrations/0050_v331_workflow_convergence.py diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 8d502eb5c934..0db6d5b74f21 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -3465,8 +3465,6 @@ def is_valid_relation(self, parent, sub, created=False): if not find: sub_node = graph[sub.pk] parent_node = graph[parent.pk] - if sub_node['metadata']['parent'] is not None: - return {"Error": _("Multiple parent relationship not allowed.")} sub_node['metadata']['parent'] = parent_node iter_node = sub_node while iter_node is not None: diff --git a/awx/main/migrations/0050_v331_workflow_convergence.py b/awx/main/migrations/0050_v331_workflow_convergence.py new file mode 100644 index 000000000000..2e6edd42d7b2 --- /dev/null +++ b/awx/main/migrations/0050_v331_workflow_convergence.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.11 on 2018-09-28 14:23 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0049_v330_validate_instance_capacity_adjustment'), + ] + + operations = [ + migrations.AddField( + model_name='workflowjobnode', + name='do_not_run', + field=models.BooleanField(default=False), + ), + ] diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 198595424f99..9833c7d95f4a 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -183,6 +183,9 @@ class WorkflowJobNode(WorkflowNodeBase): default={}, editable=False, ) + do_not_run = models.BooleanField( + default=False + ) def get_absolute_url(self, request=None): return reverse('api:workflow_job_node_detail', kwargs={'pk': self.pk}, request=request) diff --git a/awx/main/scheduler/dag_simple.py b/awx/main/scheduler/dag_simple.py index c7bde9410180..0a078f9821ee 100644 --- a/awx/main/scheduler/dag_simple.py +++ b/awx/main/scheduler/dag_simple.py @@ -51,7 +51,7 @@ def short_string_obj(obj): for n in self.nodes: doc += "%s [color = %s]\n" % ( short_string_obj(n['node_object']), - "red" if n['node_object'].status == 'running' else "black", + "red" if getattr(n['node_object'], 'status', 'N/A') == 'running' else "black", ) for from_node, to_node, label in self.edges: doc += "%s -> %s [ label=\"%s\" ];\n" % ( diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index 3f7657f5713a..2aaf361fda3e 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -1,4 +1,7 @@ +# Python +import copy + # AWX from awx.main.scheduler.dag_simple import SimpleDAG @@ -30,19 +33,19 @@ def bfs_nodes_to_run(self): obj = n['node_object'] job = obj.job - if not job: + if not job and obj.do_not_run is False: nodes_found.append(n) # Job is about to run or is running. Hold our horses and wait for # the job to finish. We can't proceed down the graph path until we # have the job result. - elif job.status not in ['failed', 'successful']: + elif job and job.status not in ['failed', 'successful']: continue - elif job.status == 'failed': + elif job and job.status == 'failed': children_failed = self.get_dependencies(obj, 'failure_nodes') children_always = self.get_dependencies(obj, 'always_nodes') children_all = children_failed + children_always nodes.extend(children_all) - elif job.status == 'successful': + elif job and job.status == 'successful': children_success = self.get_dependencies(obj, 'success_nodes') children_always = self.get_dependencies(obj, 'always_nodes') children_all = children_success + children_always @@ -97,3 +100,41 @@ def is_workflow_done(self): # have the job result. return False, False return True, is_failed + + def mark_dnr_nodes(self): + root_nodes = self.get_root_nodes() + nodes = copy.copy(root_nodes) + nodes_marked_do_not_run = [] + + for index, n in enumerate(nodes): + obj = n['node_object'] + job = obj.job + + if not job and obj.do_not_run is False and n not in root_nodes: + parent_nodes = [p['node_object'] for p in self.get_dependents(obj)] + all_parents_dnr = True + for p in parent_nodes: + if not p.job and p.do_not_run is False: + all_parents_dnr = False + break + #all_parents_dnr = reduce(lambda p: bool(p.do_not_run == True), parent_nodes) + if all_parents_dnr: + obj.do_not_run = True + nodes_marked_do_not_run.append(n) + + if obj.do_not_run: + children_success = self.get_dependencies(obj, 'success_nodes') + children_failed = self.get_dependencies(obj, 'failure_nodes') + children_always = self.get_dependencies(obj, 'always_nodes') + children_all = children_failed + children_always + nodes.extend(children_all) + elif job and job.status == 'failed': + children_failed = self.get_dependencies(obj, 'success_nodes') + children_all = children_failed + nodes.extend(children_all) + elif job and job.status == 'successful': + children_success = self.get_dependencies(obj, 'failure_nodes') + children_all = children_success + nodes.extend(children_all) + return [n['node_object'] for n in nodes_marked_do_not_run] + diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 3607e53241f7..b2a670b17ede 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -226,6 +226,8 @@ def process_finished_workflow_jobs(self, workflow_jobs): else: is_done, has_failed = dag.is_workflow_done() if not is_done: + workflow_nodes = dag.mark_dnr_nodes() + map(lambda n: n.save(update_fields=['do_not_run']), workflow_nodes) continue result.append(workflow_job.id) workflow_job.status = 'failed' if has_failed else 'successful' diff --git a/awx/main/tests/functional/models/test_workflow.py b/awx/main/tests/functional/models/test_workflow.py index 0514fc8bda52..c826ecb4980a 100644 --- a/awx/main/tests/functional/models/test_workflow.py +++ b/awx/main/tests/functional/models/test_workflow.py @@ -99,6 +99,59 @@ def test_workflow_not_finished(self): self.assertFalse(has_failed) +@pytest.mark.django_db +class TestWorkflowDNR(): + 'success', 'new' + + @pytest.fixture + def workflow_job_fn(self): + def fn(states=['new', 'new', 'new', 'new', 'new', 'new']): + """ + Workflow topology: + node[0] + /\ + s/ \f + / \ + node[1] node[3] + / \ + s/ \f + / \ + node[2] node[4] + \ / + \ / + \ / + s f + \ / + \ / + node[5] + """ + wfj = WorkflowJob.objects.create() + jt = JobTemplate.objects.create(name='test-jt') + nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 6)] + for node, state in zip(nodes, states): + if state: + node.job = jt.create_job() + node.job.status = state + node.job.save() + node.save() + nodes[0].success_nodes.add(nodes[1]) + nodes[1].success_nodes.add(nodes[2]) + nodes[0].failure_nodes.add(nodes[3]) + nodes[3].failure_nodes.add(nodes[4]) + nodes[2].success_nodes.add(nodes[5]) + nodes[4].failure_nodes.add(nodes[5]) + return wfj, nodes + return fn + + def test_workflow_dnr_because_parent(self, workflow_job_fn): + wfj, nodes = workflow_job_fn(states=['successful', None, None, None, None, None,]) + dag = WorkflowDAG(workflow_job=wfj) + workflow_nodes = dag.mark_dnr_nodes() + assert 2 == len(workflow_nodes) + assert nodes[3] in workflow_nodes + assert nodes[4] in workflow_nodes + + @pytest.mark.django_db class TestWorkflowJob: @pytest.fixture @@ -192,8 +245,6 @@ def test_topology_validator(self, wfjt): nodes[2].always_nodes.add(node_assoc) # test cycle validation assert test_view.is_valid_relation(node_assoc, nodes[0]) == {'Error': 'Cycle detected.'} - # test multi-ancestor validation - assert test_view.is_valid_relation(node_assoc, nodes[1]) == {'Error': 'Multiple parent relationship not allowed.'} # test mutex validation test_view.relationship = 'failure_nodes' node_assoc_1 = WorkflowJobTemplateNode.objects.create(workflow_job_template=wfjt)