Skip to content

Commit

Permalink
remove convergence restriction in API
Browse files Browse the repository at this point in the history
  • Loading branch information
chrismeyersfsu committed Oct 9, 2018
1 parent 1edede2 commit 7f088ac
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 4 deletions.
20 changes: 20 additions & 0 deletions awx/main/migrations/0050_v331_workflow_convergence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.11 on 2018-09-28 14:23
from __future__ import unicode_literals

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('main', '0049_v330_validate_instance_capacity_adjustment'),
]

operations = [
migrations.AddField(
model_name='workflowjobnode',
name='do_not_run',
field=models.BooleanField(default=False),
),
]
3 changes: 3 additions & 0 deletions awx/main/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ class WorkflowJobNode(WorkflowNodeBase):
default={},
editable=False,
)
do_not_run = models.BooleanField(
default=False
)

def get_absolute_url(self, request=None):
return reverse('api:workflow_job_node_detail', kwargs={'pk': self.pk}, request=request)
Expand Down
52 changes: 48 additions & 4 deletions awx/main/scheduler/dag_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,25 @@ def bfs_nodes_to_run(self):
obj = n['node_object']
job = obj.job

if not job:
if not job and obj.do_not_run is False:
nodes_found.append(n)
# Job is about to run or is running. Hold our horses and wait for
# the job to finish. We can't proceed down the graph path until we
# have the job result.
elif job.status not in ['failed', 'successful']:
elif job and job.status not in ['failed', 'successful']:
continue
elif job.status == 'failed':
elif job and job.status == 'failed':
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job.status == 'successful':
elif job and job.status == 'successful':
children_success = self.get_dependencies(obj, 'success_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_success + children_always
nodes.extend(children_all)
else:
print("Should this run??")
return [n['node_object'] for n in nodes_found]

def cancel_node_jobs(self):
Expand Down Expand Up @@ -97,3 +99,45 @@ def is_workflow_done(self):
# have the job result.
return False, False
return True, is_failed

def mark_dnr_nodes(self):
root_nodes = self.get_root_nodes()
nodes = root_nodes
nodes_found = []
nodes_marked_do_not_run = []

for index, n in enumerate(nodes):
obj = n['node_object']
job = obj.job

if not job and obj.do_not_run is False and n not in root_nodes:
parent_nodes = [p['node_object'] for p in self.get_dependents(obj)]
all_parents_dnr = True
for p in parent_nodes:
if not p.job and p.do_not_run is False:
all_parents_dnr = False
break
#all_parents_dnr = reduce(lambda p: bool(p.do_not_run == True), parent_nodes)
if all_parents_dnr:
obj.do_not_run = True
nodes_marked_do_not_run.append(n)

if obj.do_not_run:
children_success = self.get_dependencies(obj, 'success_nodes')
children_failed = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job and job.status == 'failed':
children_failed = self.get_dependencies(obj, 'success_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_failed + children_always
nodes.extend(children_all)
elif job and job.status == 'successful':
children_success = self.get_dependencies(obj, 'failure_nodes')
children_always = self.get_dependencies(obj, 'always_nodes')
children_all = children_success + children_always
nodes.extend(children_all)
return [n['node_object'] for n in nodes_marked_do_not_run]


2 changes: 2 additions & 0 deletions awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ def process_finished_workflow_jobs(self, workflow_jobs):
else:
is_done, has_failed = dag.is_workflow_done()
if not is_done:
workflow_nodes = dag.mark_dnr_nodes()
map(lambda n: n.save(update_fields=['do_not_run']), workflow_nodes)
continue
result.append(workflow_job.id)
workflow_job.status = 'failed' if has_failed else 'successful'
Expand Down
54 changes: 54 additions & 0 deletions awx/main/tests/functional/models/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,60 @@ def test_workflow_not_finished(self):
self.assertFalse(has_failed)


@pytest.mark.django_db
class TestWorkflowDNR():
'success', 'new'

@pytest.fixture
def workflow_job_fn(self):
def fn(states=['new', 'new', 'new', 'new', 'new', 'new']):
"""
Workflow topology:
node[0]
/\
s/ \f
/ \
node[1] node[3]
/ \
s/ \f
/ \
node[2] node[4]
\ /
\ /
\ /
s f
\ /
\ /
node[5]
"""
wfj = WorkflowJob.objects.create()
jt = JobTemplate.objects.create(name='test-jt')
nodes = [WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=jt) for i in range(0, 6)]
for node, state in zip(nodes, states):
if state:
node.job = jt.create_job()
node.job.status = state
node.job.save()
node.save()
print("node id of 0 is {}".format(nodes[0].id))
nodes[0].success_nodes.add(nodes[1])
nodes[1].success_nodes.add(nodes[2])
nodes[0].failure_nodes.add(nodes[3])
nodes[3].failure_nodes.add(nodes[4])
nodes[2].success_nodes.add(nodes[5])
nodes[4].failure_nodes.add(nodes[5])
return wfj, nodes
return fn

def test_workflow_dnr_because_parent(self, workflow_job_fn):
wfj, nodes = workflow_job_fn(states=['successful', None, None, None, None, None,])
dag = WorkflowDAG(workflow_job=wfj)
workflow_nodes = dag.mark_dnr_nodes()
assert len(workflow_nodes) == 2
assert nodes[3] in workflow_nodes
assert nodes[4] in workflow_nodes


@pytest.mark.django_db
class TestWorkflowJob:
@pytest.fixture
Expand Down

0 comments on commit 7f088ac

Please sign in to comment.