Skip to content

Commit

Permalink
Pass combined artifacts from nested workflows into downstream nodes (#…
Browse files Browse the repository at this point in the history
…12223)

* Track combined artifacts on workflow jobs

* Avoid schema change for passing nested workflow artifacts

* Basic support for nested workflow artifacts, add test

* Forgot that only does not work with polymorphic

* Remove incorrect field

* Consolidate logic and prevent recursion with UJ artifacts method

* Stop trying to do precedence by status, filter for obvious ones

* Review comments about sets

* Fix up bug with convergence node paths and artifacts
  • Loading branch information
AlanCoding committed Jun 23, 2022
1 parent f7982a0 commit 783b744
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 4 deletions.
6 changes: 6 additions & 0 deletions awx/main/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,12 @@ def display_artifacts(self):
return "$hidden due to Ansible no_log flag$"
return artifacts

def get_effective_artifacts(self, **kwargs):
"""Return unified job artifacts (from set_stats) to pass downstream in workflows"""
if isinstance(self.artifacts, dict):
return self.artifacts
return {}

@property
def is_container_group_task(self):
return bool(self.instance_group and self.instance_group.is_container_group)
Expand Down
4 changes: 4 additions & 0 deletions awx/main/models/unified_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1204,6 +1204,10 @@ def workflow_node_id(self):
pass
return None

def get_effective_artifacts(self, **kwargs):
"""Return unified job artifacts (from set_stats) to pass downstream in workflows"""
return {}

def get_passwords_needed_to_start(self):
return []

Expand Down
25 changes: 23 additions & 2 deletions awx/main/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ def get_job_kwargs(self):
for parent_node in self.get_parent_nodes():
is_root_node = False
aa_dict.update(parent_node.ancestor_artifacts)
if parent_node.job and hasattr(parent_node.job, 'artifacts'):
aa_dict.update(parent_node.job.artifacts)
if parent_node.job:
aa_dict.update(parent_node.job.get_effective_artifacts(parents_set=set([self.workflow_job_id])))
if aa_dict and not is_root_node:
self.ancestor_artifacts = aa_dict
self.save(update_fields=['ancestor_artifacts'])
Expand Down Expand Up @@ -682,6 +682,27 @@ def get_ancestor_workflows(self):
wj = wj.get_workflow_job()
return ancestors

def get_effective_artifacts(self, **kwargs):
"""
For downstream jobs of a workflow nested inside of a workflow,
we send aggregated artifacts from the nodes inside of the nested workflow
"""
artifacts = {}
job_queryset = (
UnifiedJob.objects.filter(unified_job_node__workflow_job=self)
.defer('job_args', 'job_cwd', 'start_args', 'result_traceback')
.order_by('finished', 'id')
.filter(status__in=['successful', 'failed'])
.iterator()
)
parents_set = kwargs.get('parents_set', set())
new_parents_set = parents_set | {self.id}
for job in job_queryset:
if job.id in parents_set:
continue
artifacts.update(job.get_effective_artifacts(parents_set=new_parents_set))
return artifacts

def get_notification_templates(self):
return self.workflow_job_template.notification_templates

Expand Down
4 changes: 2 additions & 2 deletions awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,11 @@ def process_finished_workflow_jobs(self, workflow_jobs):
workflow_job.save(update_fields=update_fields)
status_changed = True
if status_changed:
if workflow_job.spawned_by_workflow:
schedule_task_manager()
workflow_job.websocket_emit_status(workflow_job.status)
# Operations whose queries rely on modifications made during the atomic scheduling session
workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed')
if workflow_job.spawned_by_workflow:
schedule_task_manager()
return result

@timeit
Expand Down
36 changes: 36 additions & 0 deletions awx/main/tests/functional/models/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
# Django
from django.test import TransactionTestCase
from django.core.exceptions import ValidationError
from django.utils.timezone import now


class TestWorkflowDAGFunctional(TransactionTestCase):
Expand Down Expand Up @@ -381,3 +382,38 @@ def test_workflow_ancestors_recursion_prevention(organization):
WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=wfjt, job=wfj) # well, this is a problem
# mostly, we just care that this assertion finishes in finite time
assert wfj.get_ancestor_workflows() == []


@pytest.mark.django_db
class TestCombinedArtifacts:
@pytest.fixture
def wfj_artifacts(self, job_template, organization):
wfjt = WorkflowJobTemplate.objects.create(organization=organization, name='has_artifacts')
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt, launch_type='workflow')
job = job_template.create_unified_job(_eager_fields=dict(artifacts={'foooo': 'bar'}, status='successful', finished=now()))
WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=job_template, job=job)
return wfj

def test_multiple_types(self, project, wfj_artifacts):
project_update = project.create_unified_job()
WorkflowJobNode.objects.create(workflow_job=wfj_artifacts, unified_job_template=project, job=project_update)

assert wfj_artifacts.get_effective_artifacts() == {'foooo': 'bar'}

def test_precedence_based_on_time(self, wfj_artifacts, job_template):
later_job = job_template.create_unified_job(
_eager_fields=dict(artifacts={'foooo': 'zoo'}, status='successful', finished=now()) # finished later, should win
)
WorkflowJobNode.objects.create(workflow_job=wfj_artifacts, unified_job_template=job_template, job=later_job)

assert wfj_artifacts.get_effective_artifacts() == {'foooo': 'zoo'}

def test_bad_data_with_artifacts(self, organization):
# This is toxic database data, this tests that it doesn't create an infinite loop
wfjt = WorkflowJobTemplate.objects.create(organization=organization, name='child')
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt, launch_type='workflow')
WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=wfjt, job=wfj)
job = Job.objects.create(artifacts={'foo': 'bar'}, status='successful')
WorkflowJobNode.objects.create(workflow_job=wfj, job=job)
# mostly, we just care that this assertion finishes in finite time
assert wfj.get_effective_artifacts() == {'foo': 'bar'}

0 comments on commit 783b744

Please sign in to comment.