Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve task manager performance for task dependencies #5787

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions awx/main/migrations/0108_v370_unifiedjob_dependencies_processed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 2.2.8 on 2020-02-06 16:43

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('main', '0107_v370_workflow_convergence_api_toggle'),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

]

operations = [
migrations.AddField(
model_name='unifiedjob',
name='dependencies_processed',
field=models.BooleanField(default=False, editable=False, help_text='If True, the task manager has already processed potential dependencies for this job.'),
),
]
5 changes: 5 additions & 0 deletions awx/main/models/unified_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,11 @@ class Meta:
editable=False,
help_text=_("The date and time the job was queued for starting."),
)
dependencies_processed = models.BooleanField(
default=False,
editable=False,
help_text=_("If True, the task manager has already processed potential dependencies for this job.")
)
finished = models.DateTimeField(
null=True,
default=None,
Expand Down
92 changes: 16 additions & 76 deletions awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Project,
ProjectUpdate,
SystemJob,
UnifiedJob,
WorkflowApproval,
WorkflowJob,
WorkflowJobTemplate
Expand Down Expand Up @@ -74,21 +75,6 @@ def get_tasks(self, status_list=('pending', 'waiting', 'running')):
key=lambda task: task.created)
return all_tasks


def get_latest_project_update_tasks(self, all_sorted_tasks):
project_ids = set()
for task in all_sorted_tasks:
if isinstance(task, Job):
project_ids.add(task.project_id)
return ProjectUpdate.objects.filter(id__in=project_ids)

def get_latest_inventory_update_tasks(self, all_sorted_tasks):
inventory_ids = set()
for task in all_sorted_tasks:
if isinstance(task, Job):
inventory_ids.add(task.inventory_id)
return InventoryUpdate.objects.filter(id__in=inventory_ids)

def get_running_workflow_jobs(self):
graph_workflow_jobs = [wf for wf in
WorkflowJob.objects.filter(status='running')]
Expand Down Expand Up @@ -200,9 +186,6 @@ def process_finished_workflow_jobs(self, workflow_jobs):
schedule_task_manager()
return result

def get_dependent_jobs_for_inv_and_proj_update(self, job_obj):
return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()]

def start_task(self, task, rampart_group, dependent_tasks=None, instance=None):
from awx.main.tasks import handle_work_error, handle_work_success

Expand Down Expand Up @@ -364,10 +347,6 @@ def get_latest_inventory_update(self, inventory_source):
def should_update_inventory_source(self, job, latest_inventory_update):
now = tz_now()

# Already processed dependencies for this job
if job.dependent_jobs.all():
return False

if latest_inventory_update is None:
return True
'''
Expand All @@ -393,8 +372,6 @@ def get_latest_project_update(self, job):

def should_update_related_project(self, job, latest_project_update):
now = tz_now()
if job.dependent_jobs.all():
return False

if latest_project_update is None:
return True
Expand Down Expand Up @@ -426,18 +403,21 @@ def should_update_related_project(self, job, latest_project_update):
return True
return False

def generate_dependencies(self, task):
dependencies = []
if type(task) is Job:
def generate_dependencies(self, undeped_tasks):
created_dependencies = []
for task in undeped_tasks:
dependencies = []
if not type(task) is Job:
continue
# TODO: Can remove task.project None check after scan-job-default-playbook is removed
if task.project is not None and task.project.scm_update_on_launch is True:
latest_project_update = self.get_latest_project_update(task)
if self.should_update_related_project(task, latest_project_update):
project_task = self.create_project_update(task)
created_dependencies.append(project_task)
dependencies.append(project_task)
else:
if latest_project_update.status in ['waiting', 'pending', 'running']:
dependencies.append(latest_project_update)
dependencies.append(latest_project_update)

# Inventory created 2 seconds behind job
try:
Expand All @@ -452,56 +432,20 @@ def generate_dependencies(self, task):
latest_inventory_update = self.get_latest_inventory_update(inventory_source)
if self.should_update_inventory_source(task, latest_inventory_update):
inventory_task = self.create_inventory_update(task, inventory_source)
created_dependencies.append(inventory_task)
dependencies.append(inventory_task)
else:
if latest_inventory_update.status in ['waiting', 'pending', 'running']:
dependencies.append(latest_inventory_update)
dependencies.append(latest_inventory_update)

if len(dependencies) > 0:
self.capture_chain_failure_dependencies(task, dependencies)
return dependencies

def process_dependencies(self, dependent_task, dependency_tasks):
for task in dependency_tasks:
if self.is_job_blocked(task):
logger.debug("Dependent {} is blocked from running".format(task.log_format))
continue
preferred_instance_groups = task.preferred_instance_groups
found_acceptable_queue = False
idle_instance_that_fits = None
for rampart_group in preferred_instance_groups:
if idle_instance_that_fits is None:
idle_instance_that_fits = rampart_group.find_largest_idle_instance()
if not rampart_group.is_containerized and self.get_remaining_capacity(rampart_group.name) <= 0:
logger.debug("Skipping group {} capacity <= 0".format(rampart_group.name))
continue

execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task)
if execution_instance:
logger.debug("Starting dependent {} in group {} instance {}".format(
task.log_format, rampart_group.name, execution_instance.hostname))
elif not execution_instance and idle_instance_that_fits:
if not rampart_group.is_containerized:
execution_instance = idle_instance_that_fits
logger.debug("Starting dependent {} in group {} on idle instance {}".format(
task.log_format, rampart_group.name, execution_instance.hostname))
if execution_instance or rampart_group.is_containerized:
self.graph[rampart_group.name]['graph'].add_job(task)
tasks_to_fail = [t for t in dependency_tasks if t != task]
tasks_to_fail += [dependent_task]
self.start_task(task, rampart_group, tasks_to_fail, execution_instance)
found_acceptable_queue = True
break
else:
logger.debug("No instance available in group {} to run job {} w/ capacity requirement {}".format(
rampart_group.name, task.log_format, task.task_impact))
if not found_acceptable_queue:
logger.debug("Dependent {} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format))
UnifiedJob.objects.filter(pk__in = [task.pk for task in undeped_tasks]).update(dependencies_processed=True)
return created_dependencies

def process_pending_tasks(self, pending_tasks):
running_workflow_templates = set([wf.unified_job_template_id for wf in self.get_running_workflow_jobs()])
for task in pending_tasks:
self.process_dependencies(task, self.generate_dependencies(task))
if self.is_job_blocked(task):
logger.debug("{} is blocked from running".format(task.log_format))
continue
Expand Down Expand Up @@ -574,13 +518,6 @@ def timeout_approval_node(self):
def calculate_capacity_consumed(self, tasks):
self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph)

def would_exceed_capacity(self, task, instance_group):
current_capacity = self.graph[instance_group]['consumed_capacity']
capacity_total = self.graph[instance_group]['capacity_total']
if current_capacity == 0:
return False
return (task.task_impact + current_capacity > capacity_total)

def consume_capacity(self, task, instance_group):
logger.debug('{} consumed {} capacity units from {} with prior total of {}'.format(
task.log_format, task.task_impact, instance_group,
Expand All @@ -598,6 +535,9 @@ def process_tasks(self, all_sorted_tasks):
self.process_running_tasks(running_tasks)

pending_tasks = [t for t in all_sorted_tasks if t.status == 'pending']
undeped_tasks = [t for t in pending_tasks if not t.dependencies_processed]
dependencies = self.generate_dependencies(undeped_tasks)
self.process_pending_tasks(dependencies)
self.process_pending_tasks(pending_tasks)

def _schedule(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g
pu = p.project_updates.first()
TaskManager.start_task.assert_called_once_with(pu,
default_instance_group,
[j1],
[j1,j2],
default_instance_group.instances.all()[0])
pu.finished = pu.created + timedelta(seconds=1)
pu.status = "successful"
Expand Down Expand Up @@ -193,7 +193,7 @@ def test_instance_group_basic_policies(instance_factory, instance_group_factory)
ig2 = InstanceGroup.objects.get(id=ig2.id)
ig3 = InstanceGroup.objects.get(id=ig3.id)
assert len(ig0.instances.all()) == 1
assert i0 in ig0.instances.all()
assert i0 in ig0.instances.all()
assert len(InstanceGroup.objects.get(id=ig1.id).instances.all()) == 2
assert i1 in ig1.instances.all()
assert i2 in ig1.instances.all()
Expand Down
38 changes: 35 additions & 3 deletions awx/main/tests/functional/task_management/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from awx.main.scheduler import TaskManager
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.utils import encrypt_field
from awx.main.models import WorkflowJobTemplate, JobTemplate
from awx.main.models import WorkflowJobTemplate, JobTemplate, Job


@pytest.mark.django_db
Expand Down Expand Up @@ -307,8 +307,8 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory
TaskManager().schedule()
pu = p.project_updates.first()
iu = ii.inventory_updates.first()
TaskManager.start_task.assert_has_calls([mock.call(pu, default_instance_group, [iu, j1], instance),
mock.call(iu, default_instance_group, [pu, j1], instance)])
TaskManager.start_task.assert_has_calls([mock.call(iu, default_instance_group, [j1, j2, pu], instance),
mock.call(pu, default_instance_group, [j1, j2, iu], instance)])
pu.status = "successful"
pu.finished = pu.created + timedelta(seconds=1)
pu.save()
Expand Down Expand Up @@ -383,3 +383,35 @@ def test_job_not_blocking_inventory_update(default_instance_group, job_template_
dependency_graph = DependencyGraph(None)
dependency_graph.add_job(job)
assert not dependency_graph.is_job_blocked(inventory_update)


@pytest.mark.django_db
def test_generate_dependencies_only_once(job_template_factory):
objects = job_template_factory('jt', organization='org1')

job = objects.job_template.create_job()
job.status = "pending"
job.name = "job_gen_dep"
job.save()


with mock.patch("awx.main.scheduler.TaskManager.start_task"):
# job starts with dependencies_processed as False
assert not job.dependencies_processed
# run one cycle of ._schedule() to generate dependencies
TaskManager()._schedule()

# make sure dependencies_processed is now True
job = Job.objects.filter(name="job_gen_dep")[0]
assert job.dependencies_processed

# Run ._schedule() again, but make sure .generate_dependencies() is not
# called with job in the argument list
tm = TaskManager()
tm.generate_dependencies = mock.MagicMock()
tm._schedule()

# .call_args is tuple, (positional_args, kwargs), [0][0] then is
# the first positional arg, i.e. the first argument of
# .generate_dependencies()
assert tm.generate_dependencies.call_args[0][0] == []