diff --git a/awx/api/generics.py b/awx/api/generics.py index dddd9d9e6fa0..c320df09affb 100644 --- a/awx/api/generics.py +++ b/awx/api/generics.py @@ -98,7 +98,6 @@ def post(self, request, *args, **kwargs): current_user = UserSerializer(self.request.user) current_user = smart_str(JSONRenderer().render(current_user.data)) current_user = urllib.parse.quote('%s' % current_user, '') - ret.set_cookie('current_user', current_user, secure=settings.SESSION_COOKIE_SECURE or None) ret.setdefault('X-API-Session-Cookie-Name', getattr(settings, 'SESSION_COOKIE_NAME', 'awx_sessionid')) return ret diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 810f53f7fe98..b020b84f6344 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -166,7 +166,11 @@ def __init__(self, auto_pipe_execute=False, instance_name=None): elif settings.IS_TESTING(): self.instance_name = "awx_testing" else: - self.instance_name = Instance.objects.me().hostname + try: + self.instance_name = Instance.objects.me().hostname + except Exception as e: + self.instance_name = settings.CLUSTER_HOST_ID + logger.info(f'Instance {self.instance_name} seems to be unregistered, error: {e}') # metric name, help_text METRICSLIST = [ diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index b4f00a54db1f..9b25ecc876b6 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -22,7 +22,7 @@ from awx.main.models import UnifiedJob from awx.main.dispatch import reaper -from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity +from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity, log_excess_runtime if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -328,12 +328,14 @@ def __init__(self, *args, **kwargs): # Get same number as max forks based on memory, this function takes memory as bytes self.max_workers = get_mem_effective_capacity(total_memory_gb * 2**30) + # add magic prime number of extra workers to ensure + # we have a few extra workers to run the heartbeat + self.max_workers += 7 + # max workers can't be less than min_workers self.max_workers = max(self.min_workers, self.max_workers) - def debug(self, *args, **kwargs): - self.cleanup() - return super(AutoscalePool, self).debug(*args, **kwargs) + self.task_manager_timeout = settings.TASK_MANAGER_TIMEOUT @property def should_grow(self): @@ -351,6 +353,7 @@ def full(self): def debug_meta(self): return 'min={} max={}'.format(self.min_workers, self.max_workers) + @log_excess_runtime(logger) def cleanup(self): """ Perform some internal account and cleanup. This is run on @@ -406,7 +409,7 @@ def cleanup(self): w.managed_tasks[current_task['uuid']]['started'] = time.time() age = time.time() - current_task['started'] w.managed_tasks[current_task['uuid']]['age'] = age - if age > (60 * 5): + if age > self.task_manager_timeout: logger.error(f'run_task_manager has held the advisory lock for >5m, sending SIGTERM to {w.pid}') # noqa os.kill(w.pid, signal.SIGTERM) @@ -417,16 +420,17 @@ def cleanup(self): idx = random.choice(range(len(self.workers))) self.write(idx, m) - # if we are not in the dangerous situation of queue backup then clear old waiting jobs - if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1: - reaper.reap_waiting() - - # if the database says a job is running on this node, but it's *not*, + # if the database says a job is running or queued on this node, but it's *not*, # then reap it running_uuids = [] for worker in self.workers: worker.calculate_managed_tasks() running_uuids.extend(list(worker.managed_tasks.keys())) + + # if we are not in the dangerous situation of queue backup then clear old waiting jobs + if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1: + reaper.reap_waiting(excluded_uuids=running_uuids) + reaper.reap(excluded_uuids=running_uuids) def up(self): @@ -456,6 +460,10 @@ def write(self, preferred_queue, body): w.put(body) break else: + task_name = 'unknown' + if isinstance(body, dict): + task_name = body.get('task') + logger.warn(f'Workers maxed, queuing {task_name}, load: {sum(len(w.managed_tasks) for w in self.workers)} / {len(self.workers)}') return super(AutoscalePool, self).write(preferred_queue, body) except Exception: for conn in connections.all(): diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py index e87346515532..dd19c1338cb0 100644 --- a/awx/main/dispatch/publish.py +++ b/awx/main/dispatch/publish.py @@ -2,6 +2,7 @@ import logging import sys import json +import time from uuid import uuid4 from django.conf import settings @@ -75,7 +76,7 @@ def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw): msg = f'{cls.name}: Queue value required and may not be None' logger.error(msg) raise ValueError(msg) - obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name} + obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name, 'time_pub': time.time()} guid = get_guid() if guid: obj['guid'] = guid diff --git a/awx/main/dispatch/reaper.py b/awx/main/dispatch/reaper.py index 0bf322a507de..0ed145508c8c 100644 --- a/awx/main/dispatch/reaper.py +++ b/awx/main/dispatch/reaper.py @@ -2,6 +2,7 @@ import logging from django.db.models import Q +from django.conf import settings from django.utils.timezone import now as tz_now from django.contrib.contenttypes.models import ContentType @@ -20,40 +21,48 @@ def startup_reaping(): job_ids = [] for j in jobs: job_ids.append(j.id) - j.status = 'failed' - j.start_args = '' - j.job_explanation += 'Task was marked as running at system start up. The system must have not shut down properly, so it has been marked as failed.' - j.save(update_fields=['status', 'start_args', 'job_explanation']) - if hasattr(j, 'send_notification_templates'): - j.send_notification_templates('failed') - j.websocket_emit_status('failed') + reap_job( + j, + 'failed', + job_explanation='Task was marked as running at system start up. The system must have not shut down properly, so it has been marked as failed.', + ) if job_ids: logger.error(f'Unified jobs {job_ids} were reaped on dispatch startup') -def reap_job(j, status): - if UnifiedJob.objects.get(id=j.id).status not in ('running', 'waiting'): +def reap_job(j, status, job_explanation=None): + j.refresh_from_db(fields=['status', 'job_explanation']) + status_before = j.status + if status_before not in ('running', 'waiting'): # just in case, don't reap jobs that aren't running return j.status = status j.start_args = '' # blank field to remove encrypted passwords - j.job_explanation += ' '.join( - ( - 'Task was marked as running but was not present in', - 'the job queue, so it has been marked as failed.', + if j.job_explanation: + j.job_explanation += ' ' # Separate messages for readability + if job_explanation is None: + j.job_explanation += ' '.join( + ( + 'Task was marked as running but was not present in', + 'the job queue, so it has been marked as failed.', + ) ) - ) + else: + j.job_explanation += job_explanation j.save(update_fields=['status', 'start_args', 'job_explanation']) if hasattr(j, 'send_notification_templates'): j.send_notification_templates('failed') j.websocket_emit_status(status) - logger.error('{} is no longer running; reaping'.format(j.log_format)) + logger.error(f'{j.log_format} is no longer {status_before}; reaping') -def reap_waiting(instance=None, status='failed', grace_period=60): +def reap_waiting(instance=None, status='failed', job_explanation=None, grace_period=None, excluded_uuids=None): """ Reap all jobs in waiting for this instance. """ + if grace_period is None: + grace_period = settings.JOB_WAITING_GRACE_PERIOD + settings.TASK_MANAGER_TIMEOUT + me = instance if me is None: try: @@ -63,11 +72,13 @@ def reap_waiting(instance=None, status='failed', grace_period=60): return now = tz_now() jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname) + if excluded_uuids: + jobs = jobs.exclude(celery_task_id__in=excluded_uuids) for j in jobs: - reap_job(j, status) + reap_job(j, status, job_explanation=job_explanation) -def reap(instance=None, status='failed', excluded_uuids=[]): +def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None): """ Reap all jobs in running for this instance. """ @@ -81,6 +92,8 @@ def reap(instance=None, status='failed', excluded_uuids=[]): workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id jobs = UnifiedJob.objects.filter( Q(status='running') & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id) - ).exclude(celery_task_id__in=excluded_uuids) + ) + if excluded_uuids: + jobs = jobs.exclude(celery_task_id__in=excluded_uuids) for j in jobs: - reap_job(j, status) + reap_job(j, status, job_explanation=job_explanation) diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 46418828b6d0..b982cb8ab4f5 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -17,6 +17,7 @@ from awx.main.dispatch.pool import WorkerPool from awx.main.dispatch import pg_bus_conn +from awx.main.utils.common import log_excess_runtime if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -81,6 +82,9 @@ def control(self, body): logger.error('unrecognized control message: {}'.format(control)) def process_task(self, body): + if isinstance(body, dict): + body['time_ack'] = time.time() + if 'control' in body: try: return self.control(body) @@ -101,6 +105,7 @@ def process_task(self, body): self.total_messages += 1 self.record_statistics() + @log_excess_runtime(logger) def record_statistics(self): if time.time() - self.last_stats > 1: # buffer stat recording to once per second try: diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 6b419a00d848..c73239483beb 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -182,9 +182,9 @@ def flush(self, force=False): consecutive_errors = 0 except Exception as exc_indv: consecutive_errors += 1 - if consecutive_errors >= 5: - raise logger.info(f'Database Error Saving individual Job Event, error {str(exc_indv)}') + if consecutive_errors >= 5: + raise metrics_singular_events_saved += events_saved if events_saved == 0: raise diff --git a/awx/main/dispatch/worker/task.py b/awx/main/dispatch/worker/task.py index e1fe196ddbfb..04f63002c530 100644 --- a/awx/main/dispatch/worker/task.py +++ b/awx/main/dispatch/worker/task.py @@ -3,6 +3,7 @@ import importlib import sys import traceback +import time from kubernetes.config import kube_config @@ -60,8 +61,19 @@ def run_callable(self, body): # the callable is a class, e.g., RunJob; instantiate and # return its `run()` method _call = _call().run + + log_extra = '' + logger_method = logger.debug + if ('time_ack' in body) and ('time_pub' in body): + time_publish = body['time_ack'] - body['time_pub'] + time_waiting = time.time() - body['time_ack'] + if time_waiting > 5.0 or time_publish > 5.0: + # If task too a very long time to process, add this information to the log + log_extra = f' took {time_publish:.4f} to ack, {time_waiting:.4f} in local dispatcher' + logger_method = logger.info # don't print kwargs, they often contain launch-time secrets - logger.debug('task {} starting {}(*{})'.format(uuid, task, args)) + logger_method(f'task {uuid} starting {task}(*{args}){log_extra}') + return _call(*args, **kwargs) def perform_work(self, body): diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index e9a2b7ebf5dd..2fc35a75d280 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -7,7 +7,7 @@ from django.core.management.base import BaseCommand from django.db import connection as django_connection -from awx.main.dispatch import get_local_queuename, reaper +from awx.main.dispatch import get_local_queuename from awx.main.dispatch.control import Control from awx.main.dispatch.pool import AutoscalePool from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker @@ -53,8 +53,6 @@ def handle(self, *arg, **options): # (like the node heartbeat) periodic.run_continuously() - reaper.startup_reaping() - reaper.reap_waiting(grace_period=0) consumer = None try: diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 782ca59344c4..ab9d226b3ff4 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -203,7 +203,7 @@ def is_lost(self, ref_time=None): return True if ref_time is None: ref_time = now() - grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * 2 + grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * settings.CLUSTER_NODE_MISSED_HEARTBEAT_TOLERANCE if self.node_type in ('execution', 'hop'): grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD return self.last_seen < ref_time - timedelta(seconds=grace_period) diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 954ced13fd92..55ab1de4775c 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -608,9 +608,12 @@ def run(self, pk, **kwargs): status = 'failed' elif status == 'canceled': self.instance = self.update_model(pk) - if (getattr(self.instance, 'cancel_flag', False) is False) and signal_callback(): - self.runner_callback.delay_update(job_explanation="Task was canceled due to receiving a shutdown signal.") + if getattr(self.instance, 'cancel_flag', False) is False: status = 'failed' + if signal_callback(): + self.runner_callback.delay_update(job_explanation="Task was canceled due to receiving a shutdown signal.") + else: + self.runner_callback.delay_update(job_explanation="The running ansible process received a shutdown signal.") except ReceptorNodeNotFound as exc: self.runner_callback.delay_update(job_explanation=str(exc)) except Exception: diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 1534460bd2af..cc2dea7c1c34 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -15,7 +15,7 @@ from django.conf import settings from django.db import transaction, DatabaseError, IntegrityError from django.db.models.fields.related import ForeignKey -from django.utils.timezone import now +from django.utils.timezone import now, timedelta from django.utils.encoding import smart_str from django.contrib.auth.models import User from django.utils.translation import gettext_lazy as _ @@ -103,6 +103,8 @@ def dispatch_startup(): # apply_cluster_membership_policies() cluster_node_heartbeat() + reaper.startup_reaping() + reaper.reap_waiting(grace_period=0) m = Metrics() m.reset_values() @@ -503,12 +505,23 @@ def cluster_node_heartbeat(): if this_inst: startup_event = this_inst.is_lost(ref_time=nowtime) + last_last_seen = this_inst.last_seen this_inst.local_health_check() if startup_event and this_inst.capacity != 0: - logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname)) + logger.warning(f'Rejoining the cluster as instance {this_inst.hostname}. Prior last_seen {last_last_seen}') return + elif not last_last_seen: + logger.warning(f'Instance does not have recorded last_seen, updating to {nowtime}') + elif (nowtime - last_last_seen) > timedelta(seconds=settings.CLUSTER_NODE_HEARTBEAT_PERIOD + 2): + logger.warning(f'Heartbeat skew - interval={(nowtime - last_last_seen).total_seconds():.4f}, expected={settings.CLUSTER_NODE_HEARTBEAT_PERIOD}') else: - raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) + if settings.AWX_AUTO_DEPROVISION_INSTANCES: + (changed, this_inst) = Instance.objects.register(ip_address=os.environ.get('MY_POD_IP'), node_type='control', uuid=settings.SYSTEM_UUID) + if changed: + logger.warning(f'Recreated instance record {this_inst.hostname} after unexpected removal') + this_inst.local_health_check() + else: + raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) # IFF any node has a greater version than we do, then we'll shutdown services for other_inst in instance_list: if other_inst.node_type in ('execution', 'hop'): @@ -528,8 +541,9 @@ def cluster_node_heartbeat(): for other_inst in lost_instances: try: - reaper.reap(other_inst) - reaper.reap_waiting(this_inst, grace_period=0) + explanation = "Job reaped due to instance shutdown" + reaper.reap(other_inst, job_explanation=explanation) + reaper.reap_waiting(other_inst, grace_period=0, job_explanation=explanation) except Exception: logger.exception('failed to reap jobs for {}'.format(other_inst.hostname)) try: @@ -594,7 +608,8 @@ def awx_k8s_reaper(): for group in InstanceGroup.objects.filter(is_container_group=True).iterator(): logger.debug("Checking for orphaned k8s pods for {}.".format(group)) pods = PodManager.list_active_jobs(group) - for job in UnifiedJob.objects.filter(pk__in=pods.keys()).exclude(status__in=ACTIVE_STATES): + time_cutoff = now() - timedelta(seconds=settings.K8S_POD_REAPER_GRACE_PERIOD) + for job in UnifiedJob.objects.filter(pk__in=pods.keys(), finished__lte=time_cutoff).exclude(status__in=ACTIVE_STATES): logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format)) try: pm = PodManager(job) diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 19394247b35d..8011216b54a3 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -6,6 +6,7 @@ import json import yaml import logging +import time import os import subprocess import re @@ -1153,3 +1154,19 @@ def wrapper_cleanup_new_process(*args, **kwargs): return func(*args, **kwargs) return wrapper_cleanup_new_process + + +def log_excess_runtime(func_logger, cutoff=5.0): + def log_excess_runtime_decorator(func): + @wraps(func) + def _new_func(*args, **kwargs): + start_time = time.time() + return_value = func(*args, **kwargs) + delta = time.time() - start_time + if delta > cutoff: + logger.info(f'Running {func.__name__!r} took {delta:.2f}s') + return return_value + + return _new_func + + return log_excess_runtime_decorator diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index ef389e515167..8acd7f7352d3 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -427,6 +427,10 @@ def IS_TESTING(argv=None): # heartbeat period can factor into some forms of logic, so it is maintained as a setting here CLUSTER_NODE_HEARTBEAT_PERIOD = 60 + +# Number of missed heartbeats until a node gets marked as lost +CLUSTER_NODE_MISSED_HEARTBEAT_TOLERANCE = 2 + RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD = 60 # https://github.com/ansible/receptor/blob/aa1d589e154d8a0cb99a220aff8f98faf2273be6/pkg/netceptor/netceptor.go#L34 EXECUTION_NODE_REMEDIATION_CHECKS = 60 * 30 # once every 30 minutes check if an execution node errors have been resolved @@ -1018,3 +1022,14 @@ def IS_TESTING(argv=None): # Mount exposed paths as hostPath resource in k8s/ocp AWX_MOUNT_ISOLATED_PATHS_ON_K8S = False + +# Time out task managers if they take longer than this many seconds +TASK_MANAGER_TIMEOUT = 300 + +# Number of seconds _in addition to_ the task manager timeout a job can stay +# in waiting without being reaped +JOB_WAITING_GRACE_PERIOD = 60 + +# Number of seconds after a container group job finished time to wait +# before the awx_k8s_reaper task will tear down the pods +K8S_POD_REAPER_GRACE_PERIOD = 60 diff --git a/awx/sso/views.py b/awx/sso/views.py index 67921b2fa47a..00a392f5b3e5 100644 --- a/awx/sso/views.py +++ b/awx/sso/views.py @@ -45,7 +45,6 @@ def dispatch(self, request, *args, **kwargs): current_user = UserSerializer(self.request.user) current_user = smart_str(JSONRenderer().render(current_user.data)) current_user = urllib.parse.quote('%s' % current_user, '') - response.set_cookie('current_user', current_user, secure=settings.SESSION_COOKIE_SECURE or None) response.setdefault('X-API-Session-Cookie-Name', getattr(settings, 'SESSION_COOKIE_NAME', 'awx_sessionid')) return response