Skip to content

Commit

Permalink
use settings correctly
Browse files Browse the repository at this point in the history
use settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME instead of a hardcoded
name
cache the controlplane_ig object during the after lock init to avoid
an uneccesary query
eliminate mistakenly duplicated AWX_CONTROL_PLANE_TASK_IMPACT and use
only AWX_CONTROL_NODE_TASK_IMPACT
  • Loading branch information
kdelee committed Feb 2, 2022
1 parent b8c6414 commit b2afc9d
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 14 deletions.
23 changes: 15 additions & 8 deletions awx/main/scheduler/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def after_lock_init(self):
"""
instances = Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop')
self.real_instances = {i.hostname: i for i in instances}
self.controlplane_ig = None

instances_partial = [
SimpleNamespace(
Expand All @@ -86,6 +87,8 @@ def after_lock_init(self):
instances_by_hostname = {i.hostname: i for i in instances_partial}

for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
if rampart_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME:
self.controlplane_ig = rampart_group
self.graph[rampart_group.name] = dict(
graph=DependencyGraph(),
execution_capacity=0,
Expand Down Expand Up @@ -298,7 +301,12 @@ def start_task(self, task, rampart_group, dependent_tasks=None, instance=None):
if rampart_group is not None:
self.consume_capacity(task, rampart_group.name, instance=instance)
if task.controller_node:
self.consume_capacity(task, 'controlplane', instance=self.real_instances[task.controller_node], impact=settings.AWX_CONTROL_NODE_TASK_IMPACT)
self.consume_capacity(
task,
settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME,
instance=self.real_instances[task.controller_node],
impact=settings.AWX_CONTROL_NODE_TASK_IMPACT,
)

def post_commit():
if task.status != 'failed' and type(task) is not WorkflowJob:
Expand Down Expand Up @@ -461,7 +469,6 @@ def generate_dependencies(self, undeped_tasks):
def process_pending_tasks(self, pending_tasks):
running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()}
tasks_to_update_job_explanation = []
controlplane_ig = InstanceGroup.objects.get(name='controlplane')
for task in pending_tasks:
if self.start_task_limit <= 0:
break
Expand Down Expand Up @@ -494,8 +501,8 @@ def process_pending_tasks(self, pending_tasks):
else:
control_impact = settings.AWX_CONTROL_NODE_TASK_IMPACT
control_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance(
task, self.graph['controlplane']['instances'], impact=control_impact, capacity_type='control'
) or InstanceGroup.find_largest_idle_instance(self.graph['controlplane']['instances'], capacity_type='control')
task, self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['instances'], impact=control_impact, capacity_type='control'
) or InstanceGroup.find_largest_idle_instance(self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['instances'], capacity_type='control')
if not control_instance:
self.task_needs_capacity(task, tasks_to_update_job_explanation)
logger.debug(f"Skipping task {task.log_format} in pending, not enough capacity left on controlplane to control new tasks")
Expand All @@ -508,16 +515,16 @@ def process_pending_tasks(self, pending_tasks):
task.execution_node = control_instance.hostname
control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - control_impact)
control_instance.jobs_running += 1
self.graph['controlplane']['graph'].add_job(task)
self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['graph'].add_job(task)
execution_instance = self.real_instances[control_instance.hostname]
self.start_task(task, controlplane_ig, task.get_jobs_fail_chain(), execution_instance)
self.start_task(task, self.controlplane_ig, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True
continue

for rampart_group in preferred_instance_groups:
if rampart_group.is_container_group:
control_instance.jobs_running += 1
self.graph['controlplane']['graph'].add_job(task)
self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['graph'].add_job(task)
self.start_task(task, rampart_group, task.get_jobs_fail_chain(), None)
found_acceptable_queue = True
break
Expand All @@ -539,7 +546,7 @@ def process_pending_tasks(self, pending_tasks):
control_instance = execution_instance
task.controller_node = execution_instance.hostname

control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - settings.AWX_CONTROL_PLANE_TASK_IMPACT)
control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - settings.AWX_CONTROL_NODE_TASK_IMPACT)
task.log_lifecycle("controller_node_chosen")
if control_instance != execution_instance:
control_instance.jobs_running += 1
Expand Down
3 changes: 2 additions & 1 deletion awx/main/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)

from django.core.cache import cache
from django.conf import settings


def pytest_addoption(parser):
Expand Down Expand Up @@ -82,7 +83,7 @@ def instance_group_factory():
@pytest.fixture
def controlplane_instance_group(instance_factory, instance_group_factory):
"""There always has to be a controlplane instancegroup and at least one instance in it"""
return create_instance_group("controlplane", create_instance('hybrid-1', node_type='hybrid', capacity=500))
return create_instance_group(settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, create_instance('hybrid-1', node_type='hybrid', capacity=500))


@pytest.fixture
Expand Down
3 changes: 2 additions & 1 deletion awx/main/tests/factories/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json

from django.contrib.auth.models import User
from django.conf import settings

from awx.main.models import (
Organization,
Expand Down Expand Up @@ -35,7 +36,7 @@ def mk_instance(persisted=True, hostname='instance.example.org', node_type='hybr

instance = Instance.objects.get_or_create(uuid=settings.SYSTEM_UUID, hostname=hostname, node_type=node_type, capacity=capacity)[0]
if node_type in ('control', 'hybrid'):
mk_instance_group(name='controlplane', instance=instance)
mk_instance_group(name=settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, instance=instance)
return instance


Expand Down
4 changes: 3 additions & 1 deletion awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ def IS_TESTING(argv=None):
AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE = os.getenv('MY_POD_NAMESPACE', 'default')
# Timeout when waiting for pod to enter running state. If the pod is still in pending state , it will be terminated. Valid time units are "s", "m", "h". Example : "5m" , "10s".
AWX_CONTAINER_GROUP_POD_PENDING_TIMEOUT = "2h"
AWX_CONTROL_PLANE_TASK_IMPACT = 5

# How much capacity controlling a task costs a hybrid or control node
AWX_CONTROL_NODE_TASK_IMPACT = 5

# Internationalization
# https://docs.djangoproject.com/en/dev/topics/i18n/
Expand Down
3 changes: 0 additions & 3 deletions awx/settings/development.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@
# Load default settings.
from .defaults import * # NOQA

# How much capacity controlling a task costs a node
AWX_CONTROL_NODE_TASK_IMPACT = 5

# awx-manage shell_plus --notebook
NOTEBOOK_ARGUMENTS = ['--NotebookApp.token=', '--ip', '0.0.0.0', '--port', '8888', '--allow-root', '--no-browser']

Expand Down

0 comments on commit b2afc9d

Please sign in to comment.