From 9d52ebe1905fb851298f654f5607bf4fbf2968ae Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Fri, 15 Jul 2022 14:09:39 -0400 Subject: [PATCH] Further resiliency changes focused on offline database Make logs from database outage more manageable Raise exception if update_model never recovers from problem Remove unused current_user cookie Register system again if deleted by another pod Avoid cases where missing instance would throw error on startup this gives time for heartbeat to register it Give specific messages if job was killed due to SIGTERM or SIGKILL (#12435) * Reap jobs on dispatcher startup to increase clarity, replace existing reaping logic * Exit jobs if receiving SIGTERM signal * Fix unwanted reaping on shutdown, let subprocess close out * Add some sanity tests for signal module * Add a log for an unhandled dispatcher error * Refine wording of error messages Co-authored-by: Elijah DeLee Split reaper for running and waiting jobs Avoid running jobs that have already been reapted Co-authored-by: Elijah DeLee Co-authored-by: Shane McDonald Remove unnecessary extra actions Fix waiting jobs in other cases of reaping Add logs to debug waiting bottlenecking Add logs about heartbeat skew Co-authored-by: Shane McDonald Replace git shallow clone with shutil.copytree Introduce build_project_dir method the base method will create an empty project dir for workdir Share code between job and inventory tasks with new mixin combine rest of pre_run_hook logic structure to hold lock for entire sync process force sync to run for inventory updates due to UI issues Remove reference to removed scm_last_revision field Fix consuming control capacity on container groups Somehow we lost the critical line where we consume control impact on container groups This needs forward-ported to devel Remove debug method that calls cleanup - It's unclear why this was here. - Removing it doesnt appear to cause any problems. - It still gets called during heartbeats. Log chosen control node for container group tasks Add grace period settings for task manager timeout, and pod / job waiting reapers Co-authored-by: Alan Rominger Add setting for missed heartbeats before marking node offline Allow for passing custom job_explanation to reaper methods Co-authored-by: Alan Rominger Add extra workers if computing based on memory Co-authored-by: Elijah DeLee Apply a failed status if cancel_flag is not set --- awx/api/generics.py | 1 - awx/main/analytics/subsystem_metrics.py | 6 ++- awx/main/dispatch/pool.py | 28 ++++++---- awx/main/dispatch/publish.py | 3 +- awx/main/dispatch/reaper.py | 53 ++++++++++++------- awx/main/dispatch/worker/base.py | 5 ++ awx/main/dispatch/worker/callback.py | 4 +- awx/main/dispatch/worker/task.py | 14 ++++- .../management/commands/run_dispatcher.py | 4 +- awx/main/models/ha.py | 2 +- awx/main/tasks/jobs.py | 7 ++- awx/main/tasks/system.py | 27 +++++++--- awx/main/utils/common.py | 17 ++++++ awx/settings/defaults.py | 15 ++++++ awx/sso/views.py | 1 - 15 files changed, 138 insertions(+), 49 deletions(-) diff --git a/awx/api/generics.py b/awx/api/generics.py index dddd9d9e6fa0..c320df09affb 100644 --- a/awx/api/generics.py +++ b/awx/api/generics.py @@ -98,7 +98,6 @@ def post(self, request, *args, **kwargs): current_user = UserSerializer(self.request.user) current_user = smart_str(JSONRenderer().render(current_user.data)) current_user = urllib.parse.quote('%s' % current_user, '') - ret.set_cookie('current_user', current_user, secure=settings.SESSION_COOKIE_SECURE or None) ret.setdefault('X-API-Session-Cookie-Name', getattr(settings, 'SESSION_COOKIE_NAME', 'awx_sessionid')) return ret diff --git a/awx/main/analytics/subsystem_metrics.py b/awx/main/analytics/subsystem_metrics.py index 810f53f7fe98..b020b84f6344 100644 --- a/awx/main/analytics/subsystem_metrics.py +++ b/awx/main/analytics/subsystem_metrics.py @@ -166,7 +166,11 @@ def __init__(self, auto_pipe_execute=False, instance_name=None): elif settings.IS_TESTING(): self.instance_name = "awx_testing" else: - self.instance_name = Instance.objects.me().hostname + try: + self.instance_name = Instance.objects.me().hostname + except Exception as e: + self.instance_name = settings.CLUSTER_HOST_ID + logger.info(f'Instance {self.instance_name} seems to be unregistered, error: {e}') # metric name, help_text METRICSLIST = [ diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index b4f00a54db1f..9b25ecc876b6 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -22,7 +22,7 @@ from awx.main.models import UnifiedJob from awx.main.dispatch import reaper -from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity +from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity, log_excess_runtime if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -328,12 +328,14 @@ def __init__(self, *args, **kwargs): # Get same number as max forks based on memory, this function takes memory as bytes self.max_workers = get_mem_effective_capacity(total_memory_gb * 2**30) + # add magic prime number of extra workers to ensure + # we have a few extra workers to run the heartbeat + self.max_workers += 7 + # max workers can't be less than min_workers self.max_workers = max(self.min_workers, self.max_workers) - def debug(self, *args, **kwargs): - self.cleanup() - return super(AutoscalePool, self).debug(*args, **kwargs) + self.task_manager_timeout = settings.TASK_MANAGER_TIMEOUT @property def should_grow(self): @@ -351,6 +353,7 @@ def full(self): def debug_meta(self): return 'min={} max={}'.format(self.min_workers, self.max_workers) + @log_excess_runtime(logger) def cleanup(self): """ Perform some internal account and cleanup. This is run on @@ -406,7 +409,7 @@ def cleanup(self): w.managed_tasks[current_task['uuid']]['started'] = time.time() age = time.time() - current_task['started'] w.managed_tasks[current_task['uuid']]['age'] = age - if age > (60 * 5): + if age > self.task_manager_timeout: logger.error(f'run_task_manager has held the advisory lock for >5m, sending SIGTERM to {w.pid}') # noqa os.kill(w.pid, signal.SIGTERM) @@ -417,16 +420,17 @@ def cleanup(self): idx = random.choice(range(len(self.workers))) self.write(idx, m) - # if we are not in the dangerous situation of queue backup then clear old waiting jobs - if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1: - reaper.reap_waiting() - - # if the database says a job is running on this node, but it's *not*, + # if the database says a job is running or queued on this node, but it's *not*, # then reap it running_uuids = [] for worker in self.workers: worker.calculate_managed_tasks() running_uuids.extend(list(worker.managed_tasks.keys())) + + # if we are not in the dangerous situation of queue backup then clear old waiting jobs + if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1: + reaper.reap_waiting(excluded_uuids=running_uuids) + reaper.reap(excluded_uuids=running_uuids) def up(self): @@ -456,6 +460,10 @@ def write(self, preferred_queue, body): w.put(body) break else: + task_name = 'unknown' + if isinstance(body, dict): + task_name = body.get('task') + logger.warn(f'Workers maxed, queuing {task_name}, load: {sum(len(w.managed_tasks) for w in self.workers)} / {len(self.workers)}') return super(AutoscalePool, self).write(preferred_queue, body) except Exception: for conn in connections.all(): diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py index e87346515532..dd19c1338cb0 100644 --- a/awx/main/dispatch/publish.py +++ b/awx/main/dispatch/publish.py @@ -2,6 +2,7 @@ import logging import sys import json +import time from uuid import uuid4 from django.conf import settings @@ -75,7 +76,7 @@ def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw): msg = f'{cls.name}: Queue value required and may not be None' logger.error(msg) raise ValueError(msg) - obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name} + obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name, 'time_pub': time.time()} guid = get_guid() if guid: obj['guid'] = guid diff --git a/awx/main/dispatch/reaper.py b/awx/main/dispatch/reaper.py index 0bf322a507de..0ed145508c8c 100644 --- a/awx/main/dispatch/reaper.py +++ b/awx/main/dispatch/reaper.py @@ -2,6 +2,7 @@ import logging from django.db.models import Q +from django.conf import settings from django.utils.timezone import now as tz_now from django.contrib.contenttypes.models import ContentType @@ -20,40 +21,48 @@ def startup_reaping(): job_ids = [] for j in jobs: job_ids.append(j.id) - j.status = 'failed' - j.start_args = '' - j.job_explanation += 'Task was marked as running at system start up. The system must have not shut down properly, so it has been marked as failed.' - j.save(update_fields=['status', 'start_args', 'job_explanation']) - if hasattr(j, 'send_notification_templates'): - j.send_notification_templates('failed') - j.websocket_emit_status('failed') + reap_job( + j, + 'failed', + job_explanation='Task was marked as running at system start up. The system must have not shut down properly, so it has been marked as failed.', + ) if job_ids: logger.error(f'Unified jobs {job_ids} were reaped on dispatch startup') -def reap_job(j, status): - if UnifiedJob.objects.get(id=j.id).status not in ('running', 'waiting'): +def reap_job(j, status, job_explanation=None): + j.refresh_from_db(fields=['status', 'job_explanation']) + status_before = j.status + if status_before not in ('running', 'waiting'): # just in case, don't reap jobs that aren't running return j.status = status j.start_args = '' # blank field to remove encrypted passwords - j.job_explanation += ' '.join( - ( - 'Task was marked as running but was not present in', - 'the job queue, so it has been marked as failed.', + if j.job_explanation: + j.job_explanation += ' ' # Separate messages for readability + if job_explanation is None: + j.job_explanation += ' '.join( + ( + 'Task was marked as running but was not present in', + 'the job queue, so it has been marked as failed.', + ) ) - ) + else: + j.job_explanation += job_explanation j.save(update_fields=['status', 'start_args', 'job_explanation']) if hasattr(j, 'send_notification_templates'): j.send_notification_templates('failed') j.websocket_emit_status(status) - logger.error('{} is no longer running; reaping'.format(j.log_format)) + logger.error(f'{j.log_format} is no longer {status_before}; reaping') -def reap_waiting(instance=None, status='failed', grace_period=60): +def reap_waiting(instance=None, status='failed', job_explanation=None, grace_period=None, excluded_uuids=None): """ Reap all jobs in waiting for this instance. """ + if grace_period is None: + grace_period = settings.JOB_WAITING_GRACE_PERIOD + settings.TASK_MANAGER_TIMEOUT + me = instance if me is None: try: @@ -63,11 +72,13 @@ def reap_waiting(instance=None, status='failed', grace_period=60): return now = tz_now() jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname) + if excluded_uuids: + jobs = jobs.exclude(celery_task_id__in=excluded_uuids) for j in jobs: - reap_job(j, status) + reap_job(j, status, job_explanation=job_explanation) -def reap(instance=None, status='failed', excluded_uuids=[]): +def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None): """ Reap all jobs in running for this instance. """ @@ -81,6 +92,8 @@ def reap(instance=None, status='failed', excluded_uuids=[]): workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id jobs = UnifiedJob.objects.filter( Q(status='running') & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id) - ).exclude(celery_task_id__in=excluded_uuids) + ) + if excluded_uuids: + jobs = jobs.exclude(celery_task_id__in=excluded_uuids) for j in jobs: - reap_job(j, status) + reap_job(j, status, job_explanation=job_explanation) diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 46418828b6d0..b982cb8ab4f5 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -17,6 +17,7 @@ from awx.main.dispatch.pool import WorkerPool from awx.main.dispatch import pg_bus_conn +from awx.main.utils.common import log_excess_runtime if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -81,6 +82,9 @@ def control(self, body): logger.error('unrecognized control message: {}'.format(control)) def process_task(self, body): + if isinstance(body, dict): + body['time_ack'] = time.time() + if 'control' in body: try: return self.control(body) @@ -101,6 +105,7 @@ def process_task(self, body): self.total_messages += 1 self.record_statistics() + @log_excess_runtime(logger) def record_statistics(self): if time.time() - self.last_stats > 1: # buffer stat recording to once per second try: diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 6b419a00d848..c73239483beb 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -182,9 +182,9 @@ def flush(self, force=False): consecutive_errors = 0 except Exception as exc_indv: consecutive_errors += 1 - if consecutive_errors >= 5: - raise logger.info(f'Database Error Saving individual Job Event, error {str(exc_indv)}') + if consecutive_errors >= 5: + raise metrics_singular_events_saved += events_saved if events_saved == 0: raise diff --git a/awx/main/dispatch/worker/task.py b/awx/main/dispatch/worker/task.py index e1fe196ddbfb..04f63002c530 100644 --- a/awx/main/dispatch/worker/task.py +++ b/awx/main/dispatch/worker/task.py @@ -3,6 +3,7 @@ import importlib import sys import traceback +import time from kubernetes.config import kube_config @@ -60,8 +61,19 @@ def run_callable(self, body): # the callable is a class, e.g., RunJob; instantiate and # return its `run()` method _call = _call().run + + log_extra = '' + logger_method = logger.debug + if ('time_ack' in body) and ('time_pub' in body): + time_publish = body['time_ack'] - body['time_pub'] + time_waiting = time.time() - body['time_ack'] + if time_waiting > 5.0 or time_publish > 5.0: + # If task too a very long time to process, add this information to the log + log_extra = f' took {time_publish:.4f} to ack, {time_waiting:.4f} in local dispatcher' + logger_method = logger.info # don't print kwargs, they often contain launch-time secrets - logger.debug('task {} starting {}(*{})'.format(uuid, task, args)) + logger_method(f'task {uuid} starting {task}(*{args}){log_extra}') + return _call(*args, **kwargs) def perform_work(self, body): diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index e9a2b7ebf5dd..2fc35a75d280 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -7,7 +7,7 @@ from django.core.management.base import BaseCommand from django.db import connection as django_connection -from awx.main.dispatch import get_local_queuename, reaper +from awx.main.dispatch import get_local_queuename from awx.main.dispatch.control import Control from awx.main.dispatch.pool import AutoscalePool from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker @@ -53,8 +53,6 @@ def handle(self, *arg, **options): # (like the node heartbeat) periodic.run_continuously() - reaper.startup_reaping() - reaper.reap_waiting(grace_period=0) consumer = None try: diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 782ca59344c4..ab9d226b3ff4 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -203,7 +203,7 @@ def is_lost(self, ref_time=None): return True if ref_time is None: ref_time = now() - grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * 2 + grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * settings.CLUSTER_NODE_MISSED_HEARTBEAT_TOLERANCE if self.node_type in ('execution', 'hop'): grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD return self.last_seen < ref_time - timedelta(seconds=grace_period) diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 954ced13fd92..55ab1de4775c 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -608,9 +608,12 @@ def run(self, pk, **kwargs): status = 'failed' elif status == 'canceled': self.instance = self.update_model(pk) - if (getattr(self.instance, 'cancel_flag', False) is False) and signal_callback(): - self.runner_callback.delay_update(job_explanation="Task was canceled due to receiving a shutdown signal.") + if getattr(self.instance, 'cancel_flag', False) is False: status = 'failed' + if signal_callback(): + self.runner_callback.delay_update(job_explanation="Task was canceled due to receiving a shutdown signal.") + else: + self.runner_callback.delay_update(job_explanation="The running ansible process received a shutdown signal.") except ReceptorNodeNotFound as exc: self.runner_callback.delay_update(job_explanation=str(exc)) except Exception: diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 1534460bd2af..cc2dea7c1c34 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -15,7 +15,7 @@ from django.conf import settings from django.db import transaction, DatabaseError, IntegrityError from django.db.models.fields.related import ForeignKey -from django.utils.timezone import now +from django.utils.timezone import now, timedelta from django.utils.encoding import smart_str from django.contrib.auth.models import User from django.utils.translation import gettext_lazy as _ @@ -103,6 +103,8 @@ def dispatch_startup(): # apply_cluster_membership_policies() cluster_node_heartbeat() + reaper.startup_reaping() + reaper.reap_waiting(grace_period=0) m = Metrics() m.reset_values() @@ -503,12 +505,23 @@ def cluster_node_heartbeat(): if this_inst: startup_event = this_inst.is_lost(ref_time=nowtime) + last_last_seen = this_inst.last_seen this_inst.local_health_check() if startup_event and this_inst.capacity != 0: - logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname)) + logger.warning(f'Rejoining the cluster as instance {this_inst.hostname}. Prior last_seen {last_last_seen}') return + elif not last_last_seen: + logger.warning(f'Instance does not have recorded last_seen, updating to {nowtime}') + elif (nowtime - last_last_seen) > timedelta(seconds=settings.CLUSTER_NODE_HEARTBEAT_PERIOD + 2): + logger.warning(f'Heartbeat skew - interval={(nowtime - last_last_seen).total_seconds():.4f}, expected={settings.CLUSTER_NODE_HEARTBEAT_PERIOD}') else: - raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) + if settings.AWX_AUTO_DEPROVISION_INSTANCES: + (changed, this_inst) = Instance.objects.register(ip_address=os.environ.get('MY_POD_IP'), node_type='control', uuid=settings.SYSTEM_UUID) + if changed: + logger.warning(f'Recreated instance record {this_inst.hostname} after unexpected removal') + this_inst.local_health_check() + else: + raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) # IFF any node has a greater version than we do, then we'll shutdown services for other_inst in instance_list: if other_inst.node_type in ('execution', 'hop'): @@ -528,8 +541,9 @@ def cluster_node_heartbeat(): for other_inst in lost_instances: try: - reaper.reap(other_inst) - reaper.reap_waiting(this_inst, grace_period=0) + explanation = "Job reaped due to instance shutdown" + reaper.reap(other_inst, job_explanation=explanation) + reaper.reap_waiting(other_inst, grace_period=0, job_explanation=explanation) except Exception: logger.exception('failed to reap jobs for {}'.format(other_inst.hostname)) try: @@ -594,7 +608,8 @@ def awx_k8s_reaper(): for group in InstanceGroup.objects.filter(is_container_group=True).iterator(): logger.debug("Checking for orphaned k8s pods for {}.".format(group)) pods = PodManager.list_active_jobs(group) - for job in UnifiedJob.objects.filter(pk__in=pods.keys()).exclude(status__in=ACTIVE_STATES): + time_cutoff = now() - timedelta(seconds=settings.K8S_POD_REAPER_GRACE_PERIOD) + for job in UnifiedJob.objects.filter(pk__in=pods.keys(), finished__lte=time_cutoff).exclude(status__in=ACTIVE_STATES): logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format)) try: pm = PodManager(job) diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 19394247b35d..8011216b54a3 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -6,6 +6,7 @@ import json import yaml import logging +import time import os import subprocess import re @@ -1153,3 +1154,19 @@ def wrapper_cleanup_new_process(*args, **kwargs): return func(*args, **kwargs) return wrapper_cleanup_new_process + + +def log_excess_runtime(func_logger, cutoff=5.0): + def log_excess_runtime_decorator(func): + @wraps(func) + def _new_func(*args, **kwargs): + start_time = time.time() + return_value = func(*args, **kwargs) + delta = time.time() - start_time + if delta > cutoff: + logger.info(f'Running {func.__name__!r} took {delta:.2f}s') + return return_value + + return _new_func + + return log_excess_runtime_decorator diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index ef389e515167..8acd7f7352d3 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -427,6 +427,10 @@ def IS_TESTING(argv=None): # heartbeat period can factor into some forms of logic, so it is maintained as a setting here CLUSTER_NODE_HEARTBEAT_PERIOD = 60 + +# Number of missed heartbeats until a node gets marked as lost +CLUSTER_NODE_MISSED_HEARTBEAT_TOLERANCE = 2 + RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD = 60 # https://github.com/ansible/receptor/blob/aa1d589e154d8a0cb99a220aff8f98faf2273be6/pkg/netceptor/netceptor.go#L34 EXECUTION_NODE_REMEDIATION_CHECKS = 60 * 30 # once every 30 minutes check if an execution node errors have been resolved @@ -1018,3 +1022,14 @@ def IS_TESTING(argv=None): # Mount exposed paths as hostPath resource in k8s/ocp AWX_MOUNT_ISOLATED_PATHS_ON_K8S = False + +# Time out task managers if they take longer than this many seconds +TASK_MANAGER_TIMEOUT = 300 + +# Number of seconds _in addition to_ the task manager timeout a job can stay +# in waiting without being reaped +JOB_WAITING_GRACE_PERIOD = 60 + +# Number of seconds after a container group job finished time to wait +# before the awx_k8s_reaper task will tear down the pods +K8S_POD_REAPER_GRACE_PERIOD = 60 diff --git a/awx/sso/views.py b/awx/sso/views.py index 67921b2fa47a..00a392f5b3e5 100644 --- a/awx/sso/views.py +++ b/awx/sso/views.py @@ -45,7 +45,6 @@ def dispatch(self, request, *args, **kwargs): current_user = UserSerializer(self.request.user) current_user = smart_str(JSONRenderer().render(current_user.data)) current_user = urllib.parse.quote('%s' % current_user, '') - response.set_cookie('current_user', current_user, secure=settings.SESSION_COOKIE_SECURE or None) response.setdefault('X-API-Session-Cookie-Name', getattr(settings, 'SESSION_COOKIE_NAME', 'awx_sessionid')) return response