Skip to content

Commit

Permalink
Further resiliency changes focused on offline database
Browse files Browse the repository at this point in the history
Make logs from database outage more manageable

Raise exception if update_model never recovers from problem

Remove unused current_user cookie

Register system again if deleted by another pod

Avoid cases where missing instance
  would throw error on startup
  this gives time for heartbeat to register it

Give specific messages if job was killed due to SIGTERM or SIGKILL (ansible#12435)

* Reap jobs on dispatcher startup to increase clarity, replace existing reaping logic

* Exit jobs if receiving SIGTERM signal

* Fix unwanted reaping on shutdown, let subprocess close out

* Add some sanity tests for signal module

* Add a log for an unhandled dispatcher error

* Refine wording of error messages

Co-authored-by: Elijah DeLee <kdelee@redhat.com>

Split reaper for running and waiting jobs

Avoid running jobs that have already been reapted

Co-authored-by: Elijah DeLee <kdelee@redhat.com>
Co-authored-by: Shane McDonald <me@shanemcd.com>

Remove unnecessary extra actions

Fix waiting jobs in other cases of reaping

Add logs to debug waiting bottlenecking

Add logs about heartbeat skew

Co-authored-by: Shane McDonald <me@shanemcd.com>

Replace git shallow clone with shutil.copytree

Introduce build_project_dir method
  the base method will create an empty project dir for workdir

Share code between job and inventory tasks with new mixin
  combine rest of pre_run_hook logic
  structure to hold lock for entire sync process

force sync to run for inventory updates due to UI issues

Remove reference to removed scm_last_revision field

Fix consuming control capacity on container groups

Somehow we lost the critical line where we consume control impact on container groups

This needs forward-ported to devel

Remove debug method that calls cleanup

- It's unclear why this was here.
- Removing it doesnt appear to cause any problems.
- It still gets called during heartbeats.

Log chosen control node for container group tasks

Add grace period settings for task manager timeout, and pod / job waiting reapers

Co-authored-by: Alan Rominger <arominge@redhat.com>

Add setting for missed heartbeats before marking node offline

Allow for passing custom job_explanation to reaper methods

Co-authored-by: Alan Rominger <arominge@redhat.com>

Add extra workers if computing based on memory

Co-authored-by: Elijah DeLee <kdelee@redhat.com>

Apply a failed status if cancel_flag is not set
  • Loading branch information
AlanCoding committed Jul 25, 2022
1 parent c905c9a commit 9d52ebe
Show file tree
Hide file tree
Showing 15 changed files with 138 additions and 49 deletions.
1 change: 0 additions & 1 deletion awx/api/generics.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ def post(self, request, *args, **kwargs):
current_user = UserSerializer(self.request.user)
current_user = smart_str(JSONRenderer().render(current_user.data))
current_user = urllib.parse.quote('%s' % current_user, '')
ret.set_cookie('current_user', current_user, secure=settings.SESSION_COOKIE_SECURE or None)
ret.setdefault('X-API-Session-Cookie-Name', getattr(settings, 'SESSION_COOKIE_NAME', 'awx_sessionid'))

return ret
Expand Down
6 changes: 5 additions & 1 deletion awx/main/analytics/subsystem_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ def __init__(self, auto_pipe_execute=False, instance_name=None):
elif settings.IS_TESTING():
self.instance_name = "awx_testing"
else:
self.instance_name = Instance.objects.me().hostname
try:
self.instance_name = Instance.objects.me().hostname
except Exception as e:
self.instance_name = settings.CLUSTER_HOST_ID
logger.info(f'Instance {self.instance_name} seems to be unregistered, error: {e}')

# metric name, help_text
METRICSLIST = [
Expand Down
28 changes: 18 additions & 10 deletions awx/main/dispatch/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from awx.main.models import UnifiedJob
from awx.main.dispatch import reaper
from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity
from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity, log_excess_runtime

if 'run_callback_receiver' in sys.argv:
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
Expand Down Expand Up @@ -328,12 +328,14 @@ def __init__(self, *args, **kwargs):
# Get same number as max forks based on memory, this function takes memory as bytes
self.max_workers = get_mem_effective_capacity(total_memory_gb * 2**30)

# add magic prime number of extra workers to ensure
# we have a few extra workers to run the heartbeat
self.max_workers += 7

# max workers can't be less than min_workers
self.max_workers = max(self.min_workers, self.max_workers)

def debug(self, *args, **kwargs):
self.cleanup()
return super(AutoscalePool, self).debug(*args, **kwargs)
self.task_manager_timeout = settings.TASK_MANAGER_TIMEOUT

@property
def should_grow(self):
Expand All @@ -351,6 +353,7 @@ def full(self):
def debug_meta(self):
return 'min={} max={}'.format(self.min_workers, self.max_workers)

@log_excess_runtime(logger)
def cleanup(self):
"""
Perform some internal account and cleanup. This is run on
Expand Down Expand Up @@ -406,7 +409,7 @@ def cleanup(self):
w.managed_tasks[current_task['uuid']]['started'] = time.time()
age = time.time() - current_task['started']
w.managed_tasks[current_task['uuid']]['age'] = age
if age > (60 * 5):
if age > self.task_manager_timeout:
logger.error(f'run_task_manager has held the advisory lock for >5m, sending SIGTERM to {w.pid}') # noqa
os.kill(w.pid, signal.SIGTERM)

Expand All @@ -417,16 +420,17 @@ def cleanup(self):
idx = random.choice(range(len(self.workers)))
self.write(idx, m)

# if we are not in the dangerous situation of queue backup then clear old waiting jobs
if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1:
reaper.reap_waiting()

# if the database says a job is running on this node, but it's *not*,
# if the database says a job is running or queued on this node, but it's *not*,
# then reap it
running_uuids = []
for worker in self.workers:
worker.calculate_managed_tasks()
running_uuids.extend(list(worker.managed_tasks.keys()))

# if we are not in the dangerous situation of queue backup then clear old waiting jobs
if self.workers and max(len(w.managed_tasks) for w in self.workers) <= 1:
reaper.reap_waiting(excluded_uuids=running_uuids)

reaper.reap(excluded_uuids=running_uuids)

def up(self):
Expand Down Expand Up @@ -456,6 +460,10 @@ def write(self, preferred_queue, body):
w.put(body)
break
else:
task_name = 'unknown'
if isinstance(body, dict):
task_name = body.get('task')
logger.warn(f'Workers maxed, queuing {task_name}, load: {sum(len(w.managed_tasks) for w in self.workers)} / {len(self.workers)}')
return super(AutoscalePool, self).write(preferred_queue, body)
except Exception:
for conn in connections.all():
Expand Down
3 changes: 2 additions & 1 deletion awx/main/dispatch/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import sys
import json
import time
from uuid import uuid4

from django.conf import settings
Expand Down Expand Up @@ -75,7 +76,7 @@ def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw):
msg = f'{cls.name}: Queue value required and may not be None'
logger.error(msg)
raise ValueError(msg)
obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name}
obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name, 'time_pub': time.time()}
guid = get_guid()
if guid:
obj['guid'] = guid
Expand Down
53 changes: 33 additions & 20 deletions awx/main/dispatch/reaper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging

from django.db.models import Q
from django.conf import settings
from django.utils.timezone import now as tz_now
from django.contrib.contenttypes.models import ContentType

Expand All @@ -20,40 +21,48 @@ def startup_reaping():
job_ids = []
for j in jobs:
job_ids.append(j.id)
j.status = 'failed'
j.start_args = ''
j.job_explanation += 'Task was marked as running at system start up. The system must have not shut down properly, so it has been marked as failed.'
j.save(update_fields=['status', 'start_args', 'job_explanation'])
if hasattr(j, 'send_notification_templates'):
j.send_notification_templates('failed')
j.websocket_emit_status('failed')
reap_job(
j,
'failed',
job_explanation='Task was marked as running at system start up. The system must have not shut down properly, so it has been marked as failed.',
)
if job_ids:
logger.error(f'Unified jobs {job_ids} were reaped on dispatch startup')


def reap_job(j, status):
if UnifiedJob.objects.get(id=j.id).status not in ('running', 'waiting'):
def reap_job(j, status, job_explanation=None):
j.refresh_from_db(fields=['status', 'job_explanation'])
status_before = j.status
if status_before not in ('running', 'waiting'):
# just in case, don't reap jobs that aren't running
return
j.status = status
j.start_args = '' # blank field to remove encrypted passwords
j.job_explanation += ' '.join(
(
'Task was marked as running but was not present in',
'the job queue, so it has been marked as failed.',
if j.job_explanation:
j.job_explanation += ' ' # Separate messages for readability
if job_explanation is None:
j.job_explanation += ' '.join(
(
'Task was marked as running but was not present in',
'the job queue, so it has been marked as failed.',
)
)
)
else:
j.job_explanation += job_explanation
j.save(update_fields=['status', 'start_args', 'job_explanation'])
if hasattr(j, 'send_notification_templates'):
j.send_notification_templates('failed')
j.websocket_emit_status(status)
logger.error('{} is no longer running; reaping'.format(j.log_format))
logger.error(f'{j.log_format} is no longer {status_before}; reaping')


def reap_waiting(instance=None, status='failed', grace_period=60):
def reap_waiting(instance=None, status='failed', job_explanation=None, grace_period=None, excluded_uuids=None):
"""
Reap all jobs in waiting for this instance.
"""
if grace_period is None:
grace_period = settings.JOB_WAITING_GRACE_PERIOD + settings.TASK_MANAGER_TIMEOUT

me = instance
if me is None:
try:
Expand All @@ -63,11 +72,13 @@ def reap_waiting(instance=None, status='failed', grace_period=60):
return
now = tz_now()
jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=now - timedelta(seconds=grace_period), controller_node=me.hostname)
if excluded_uuids:
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
for j in jobs:
reap_job(j, status)
reap_job(j, status, job_explanation=job_explanation)


def reap(instance=None, status='failed', excluded_uuids=[]):
def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None):
"""
Reap all jobs in running for this instance.
"""
Expand All @@ -81,6 +92,8 @@ def reap(instance=None, status='failed', excluded_uuids=[]):
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
jobs = UnifiedJob.objects.filter(
Q(status='running') & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
).exclude(celery_task_id__in=excluded_uuids)
)
if excluded_uuids:
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
for j in jobs:
reap_job(j, status)
reap_job(j, status, job_explanation=job_explanation)
5 changes: 5 additions & 0 deletions awx/main/dispatch/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from awx.main.dispatch.pool import WorkerPool
from awx.main.dispatch import pg_bus_conn
from awx.main.utils.common import log_excess_runtime

if 'run_callback_receiver' in sys.argv:
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
Expand Down Expand Up @@ -81,6 +82,9 @@ def control(self, body):
logger.error('unrecognized control message: {}'.format(control))

def process_task(self, body):
if isinstance(body, dict):
body['time_ack'] = time.time()

if 'control' in body:
try:
return self.control(body)
Expand All @@ -101,6 +105,7 @@ def process_task(self, body):
self.total_messages += 1
self.record_statistics()

@log_excess_runtime(logger)
def record_statistics(self):
if time.time() - self.last_stats > 1: # buffer stat recording to once per second
try:
Expand Down
4 changes: 2 additions & 2 deletions awx/main/dispatch/worker/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ def flush(self, force=False):
consecutive_errors = 0
except Exception as exc_indv:
consecutive_errors += 1
if consecutive_errors >= 5:
raise
logger.info(f'Database Error Saving individual Job Event, error {str(exc_indv)}')
if consecutive_errors >= 5:
raise
metrics_singular_events_saved += events_saved
if events_saved == 0:
raise
Expand Down
14 changes: 13 additions & 1 deletion awx/main/dispatch/worker/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import importlib
import sys
import traceback
import time

from kubernetes.config import kube_config

Expand Down Expand Up @@ -60,8 +61,19 @@ def run_callable(self, body):
# the callable is a class, e.g., RunJob; instantiate and
# return its `run()` method
_call = _call().run

log_extra = ''
logger_method = logger.debug
if ('time_ack' in body) and ('time_pub' in body):
time_publish = body['time_ack'] - body['time_pub']
time_waiting = time.time() - body['time_ack']
if time_waiting > 5.0 or time_publish > 5.0:
# If task too a very long time to process, add this information to the log
log_extra = f' took {time_publish:.4f} to ack, {time_waiting:.4f} in local dispatcher'
logger_method = logger.info
# don't print kwargs, they often contain launch-time secrets
logger.debug('task {} starting {}(*{})'.format(uuid, task, args))
logger_method(f'task {uuid} starting {task}(*{args}){log_extra}')

return _call(*args, **kwargs)

def perform_work(self, body):
Expand Down
4 changes: 1 addition & 3 deletions awx/main/management/commands/run_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from django.core.management.base import BaseCommand
from django.db import connection as django_connection

from awx.main.dispatch import get_local_queuename, reaper
from awx.main.dispatch import get_local_queuename
from awx.main.dispatch.control import Control
from awx.main.dispatch.pool import AutoscalePool
from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker
Expand Down Expand Up @@ -53,8 +53,6 @@ def handle(self, *arg, **options):
# (like the node heartbeat)
periodic.run_continuously()

reaper.startup_reaping()
reaper.reap_waiting(grace_period=0)
consumer = None

try:
Expand Down
2 changes: 1 addition & 1 deletion awx/main/models/ha.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def is_lost(self, ref_time=None):
return True
if ref_time is None:
ref_time = now()
grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * 2
grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * settings.CLUSTER_NODE_MISSED_HEARTBEAT_TOLERANCE
if self.node_type in ('execution', 'hop'):
grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD
return self.last_seen < ref_time - timedelta(seconds=grace_period)
Expand Down
7 changes: 5 additions & 2 deletions awx/main/tasks/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,9 +608,12 @@ def run(self, pk, **kwargs):
status = 'failed'
elif status == 'canceled':
self.instance = self.update_model(pk)
if (getattr(self.instance, 'cancel_flag', False) is False) and signal_callback():
self.runner_callback.delay_update(job_explanation="Task was canceled due to receiving a shutdown signal.")
if getattr(self.instance, 'cancel_flag', False) is False:
status = 'failed'
if signal_callback():
self.runner_callback.delay_update(job_explanation="Task was canceled due to receiving a shutdown signal.")
else:
self.runner_callback.delay_update(job_explanation="The running ansible process received a shutdown signal.")
except ReceptorNodeNotFound as exc:
self.runner_callback.delay_update(job_explanation=str(exc))
except Exception:
Expand Down
27 changes: 21 additions & 6 deletions awx/main/tasks/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from django.conf import settings
from django.db import transaction, DatabaseError, IntegrityError
from django.db.models.fields.related import ForeignKey
from django.utils.timezone import now
from django.utils.timezone import now, timedelta
from django.utils.encoding import smart_str
from django.contrib.auth.models import User
from django.utils.translation import gettext_lazy as _
Expand Down Expand Up @@ -103,6 +103,8 @@ def dispatch_startup():
#
apply_cluster_membership_policies()
cluster_node_heartbeat()
reaper.startup_reaping()
reaper.reap_waiting(grace_period=0)
m = Metrics()
m.reset_values()

Expand Down Expand Up @@ -503,12 +505,23 @@ def cluster_node_heartbeat():

if this_inst:
startup_event = this_inst.is_lost(ref_time=nowtime)
last_last_seen = this_inst.last_seen
this_inst.local_health_check()
if startup_event and this_inst.capacity != 0:
logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname))
logger.warning(f'Rejoining the cluster as instance {this_inst.hostname}. Prior last_seen {last_last_seen}')
return
elif not last_last_seen:
logger.warning(f'Instance does not have recorded last_seen, updating to {nowtime}')
elif (nowtime - last_last_seen) > timedelta(seconds=settings.CLUSTER_NODE_HEARTBEAT_PERIOD + 2):
logger.warning(f'Heartbeat skew - interval={(nowtime - last_last_seen).total_seconds():.4f}, expected={settings.CLUSTER_NODE_HEARTBEAT_PERIOD}')
else:
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
if settings.AWX_AUTO_DEPROVISION_INSTANCES:
(changed, this_inst) = Instance.objects.register(ip_address=os.environ.get('MY_POD_IP'), node_type='control', uuid=settings.SYSTEM_UUID)
if changed:
logger.warning(f'Recreated instance record {this_inst.hostname} after unexpected removal')
this_inst.local_health_check()
else:
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
# IFF any node has a greater version than we do, then we'll shutdown services
for other_inst in instance_list:
if other_inst.node_type in ('execution', 'hop'):
Expand All @@ -528,8 +541,9 @@ def cluster_node_heartbeat():

for other_inst in lost_instances:
try:
reaper.reap(other_inst)
reaper.reap_waiting(this_inst, grace_period=0)
explanation = "Job reaped due to instance shutdown"
reaper.reap(other_inst, job_explanation=explanation)
reaper.reap_waiting(other_inst, grace_period=0, job_explanation=explanation)
except Exception:
logger.exception('failed to reap jobs for {}'.format(other_inst.hostname))
try:
Expand Down Expand Up @@ -594,7 +608,8 @@ def awx_k8s_reaper():
for group in InstanceGroup.objects.filter(is_container_group=True).iterator():
logger.debug("Checking for orphaned k8s pods for {}.".format(group))
pods = PodManager.list_active_jobs(group)
for job in UnifiedJob.objects.filter(pk__in=pods.keys()).exclude(status__in=ACTIVE_STATES):
time_cutoff = now() - timedelta(seconds=settings.K8S_POD_REAPER_GRACE_PERIOD)
for job in UnifiedJob.objects.filter(pk__in=pods.keys(), finished__lte=time_cutoff).exclude(status__in=ACTIVE_STATES):
logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format))
try:
pm = PodManager(job)
Expand Down

0 comments on commit 9d52ebe

Please sign in to comment.