diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 221959243de7..b1df4a2853b6 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -70,6 +70,7 @@ def after_lock_init(self): """ instances = Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop') self.real_instances = {i.hostname: i for i in instances} + self.controlplane_ig = None instances_partial = [ SimpleNamespace( @@ -86,6 +87,8 @@ def after_lock_init(self): instances_by_hostname = {i.hostname: i for i in instances_partial} for rampart_group in InstanceGroup.objects.prefetch_related('instances'): + if rampart_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME: + self.controlplane_ig = rampart_group self.graph[rampart_group.name] = dict( graph=DependencyGraph(), execution_capacity=0, @@ -298,7 +301,12 @@ def start_task(self, task, rampart_group, dependent_tasks=None, instance=None): if rampart_group is not None: self.consume_capacity(task, rampart_group.name, instance=instance) if task.controller_node: - self.consume_capacity(task, 'controlplane', instance=self.real_instances[task.controller_node], impact=settings.AWX_CONTROL_NODE_TASK_IMPACT) + self.consume_capacity( + task, + settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, + instance=self.real_instances[task.controller_node], + impact=settings.AWX_CONTROL_NODE_TASK_IMPACT, + ) def post_commit(): if task.status != 'failed' and type(task) is not WorkflowJob: @@ -461,7 +469,6 @@ def generate_dependencies(self, undeped_tasks): def process_pending_tasks(self, pending_tasks): running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()} tasks_to_update_job_explanation = [] - controlplane_ig = InstanceGroup.objects.get(name='controlplane') for task in pending_tasks: if self.start_task_limit <= 0: break @@ -494,8 +501,8 @@ def process_pending_tasks(self, pending_tasks): else: control_impact = settings.AWX_CONTROL_NODE_TASK_IMPACT control_instance = InstanceGroup.fit_task_to_most_remaining_capacity_instance( - task, self.graph['controlplane']['instances'], impact=control_impact, capacity_type='control' - ) or InstanceGroup.find_largest_idle_instance(self.graph['controlplane']['instances'], capacity_type='control') + task, self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['instances'], impact=control_impact, capacity_type='control' + ) or InstanceGroup.find_largest_idle_instance(self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['instances'], capacity_type='control') if not control_instance: self.task_needs_capacity(task, tasks_to_update_job_explanation) logger.debug(f"Skipping task {task.log_format} in pending, not enough capacity left on controlplane to control new tasks") @@ -508,16 +515,16 @@ def process_pending_tasks(self, pending_tasks): task.execution_node = control_instance.hostname control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - control_impact) control_instance.jobs_running += 1 - self.graph['controlplane']['graph'].add_job(task) + self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['graph'].add_job(task) execution_instance = self.real_instances[control_instance.hostname] - self.start_task(task, controlplane_ig, task.get_jobs_fail_chain(), execution_instance) + self.start_task(task, self.controlplane_ig, task.get_jobs_fail_chain(), execution_instance) found_acceptable_queue = True continue for rampart_group in preferred_instance_groups: if rampart_group.is_container_group: control_instance.jobs_running += 1 - self.graph['controlplane']['graph'].add_job(task) + self.graph[settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME]['graph'].add_job(task) self.start_task(task, rampart_group, task.get_jobs_fail_chain(), None) found_acceptable_queue = True break @@ -539,7 +546,7 @@ def process_pending_tasks(self, pending_tasks): control_instance = execution_instance task.controller_node = execution_instance.hostname - control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - settings.AWX_CONTROL_PLANE_TASK_IMPACT) + control_instance.remaining_capacity = max(0, control_instance.remaining_capacity - settings.AWX_CONTROL_NODE_TASK_IMPACT) task.log_lifecycle("controller_node_chosen") if control_instance != execution_instance: control_instance.jobs_running += 1 diff --git a/awx/main/tests/conftest.py b/awx/main/tests/conftest.py index 94596aea1baa..0400f025d2d1 100644 --- a/awx/main/tests/conftest.py +++ b/awx/main/tests/conftest.py @@ -15,6 +15,7 @@ ) from django.core.cache import cache +from django.conf import settings def pytest_addoption(parser): @@ -82,7 +83,7 @@ def instance_group_factory(): @pytest.fixture def controlplane_instance_group(instance_factory, instance_group_factory): """There always has to be a controlplane instancegroup and at least one instance in it""" - return create_instance_group("controlplane", create_instance('hybrid-1', node_type='hybrid', capacity=500)) + return create_instance_group(settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, create_instance('hybrid-1', node_type='hybrid', capacity=500)) @pytest.fixture diff --git a/awx/main/tests/factories/fixtures.py b/awx/main/tests/factories/fixtures.py index d6cf1ba0c583..df2390434b1f 100644 --- a/awx/main/tests/factories/fixtures.py +++ b/awx/main/tests/factories/fixtures.py @@ -1,6 +1,7 @@ import json from django.contrib.auth.models import User +from django.conf import settings from awx.main.models import ( Organization, @@ -35,7 +36,7 @@ def mk_instance(persisted=True, hostname='instance.example.org', node_type='hybr instance = Instance.objects.get_or_create(uuid=settings.SYSTEM_UUID, hostname=hostname, node_type=node_type, capacity=capacity)[0] if node_type in ('control', 'hybrid'): - mk_instance_group(name='controlplane', instance=instance) + mk_instance_group(name=settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, instance=instance) return instance diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 753de85ce94c..afef002567d2 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -72,7 +72,9 @@ def IS_TESTING(argv=None): AWX_CONTAINER_GROUP_DEFAULT_NAMESPACE = os.getenv('MY_POD_NAMESPACE', 'default') # Timeout when waiting for pod to enter running state. If the pod is still in pending state , it will be terminated. Valid time units are "s", "m", "h". Example : "5m" , "10s". AWX_CONTAINER_GROUP_POD_PENDING_TIMEOUT = "2h" -AWX_CONTROL_PLANE_TASK_IMPACT = 5 + +# How much capacity controlling a task costs a hybrid or control node +AWX_CONTROL_NODE_TASK_IMPACT = 5 # Internationalization # https://docs.djangoproject.com/en/dev/topics/i18n/ diff --git a/awx/settings/development.py b/awx/settings/development.py index dc84a6711f10..70b64643dde3 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -21,9 +21,6 @@ # Load default settings. from .defaults import * # NOQA -# How much capacity controlling a task costs a node -AWX_CONTROL_NODE_TASK_IMPACT = 5 - # awx-manage shell_plus --notebook NOTEBOOK_ARGUMENTS = ['--NotebookApp.token=', '--ip', '0.0.0.0', '--port', '8888', '--allow-root', '--no-browser']