Skip to content

Commit

Permalink
lifecycle: don't use celery ping for worker healthcheck
Browse files Browse the repository at this point in the history
Signed-off-by: Jens Langhammer <jens@goauthentik.io>
  • Loading branch information
BeryJu committed Apr 3, 2023
1 parent a92786e commit 97fb68a
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
33 changes: 32 additions & 1 deletion authentik/root/celery.py
Expand Up @@ -2,9 +2,12 @@
import os
from contextvars import ContextVar
from logging.config import dictConfig
from pathlib import Path
from tempfile import gettempdir
from typing import Callable

from celery import Celery
from celery import Celery, bootsteps
from celery.apps.worker import Worker
from celery.signals import (
after_task_publish,
setup_logging,
Expand All @@ -28,6 +31,7 @@
LOGGER = get_logger()
CELERY_APP = Celery("authentik")
CTX_TASK_ID = ContextVar(STRUCTLOG_KEY_PREFIX + "task_id", default=Ellipsis)
HEARTBEAT_FILE = Path(gettempdir() + "/authentik-worker")


@setup_logging.connect
Expand Down Expand Up @@ -99,6 +103,32 @@ def worker_ready_hook(*args, **kwargs):
start_blueprint_watcher()


class LivenessProbe(bootsteps.StartStopStep):
"""Add a timed task to touch a temporary file for healthchecking reasons"""

requires = {"celery.worker.components:Timer"}

def __init__(self, parent, **kwargs):
super().__init__(parent, **kwargs)
self.requests = []
self.tref = None

Check warning on line 114 in authentik/root/celery.py

View check run for this annotation

Codecov / codecov/patch

authentik/root/celery.py#L112-L114

Added lines #L112 - L114 were not covered by tests

def start(self, worker: Worker):
self.tref = worker.timer.call_repeatedly(

Check warning on line 117 in authentik/root/celery.py

View check run for this annotation

Codecov / codecov/patch

authentik/root/celery.py#L117

Added line #L117 was not covered by tests
10.0,
self.update_heartbeat_file,
(worker,),
priority=10,
)
self.update_heartbeat_file(worker)

Check warning on line 123 in authentik/root/celery.py

View check run for this annotation

Codecov / codecov/patch

authentik/root/celery.py#L123

Added line #L123 was not covered by tests

def stop(self, worker: Worker):
HEARTBEAT_FILE.unlink(missing_ok=True)

Check warning on line 126 in authentik/root/celery.py

View check run for this annotation

Codecov / codecov/patch

authentik/root/celery.py#L126

Added line #L126 was not covered by tests

def update_heartbeat_file(self, worker: Worker):
HEARTBEAT_FILE.touch()

Check warning on line 129 in authentik/root/celery.py

View check run for this annotation

Codecov / codecov/patch

authentik/root/celery.py#L129

Added line #L129 was not covered by tests


# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
Expand All @@ -107,3 +137,4 @@ def worker_ready_hook(*args, **kwargs):

# Load task modules from all registered Django app configs.
CELERY_APP.autodiscover_tasks()
CELERY_APP.steps["worker"].add(LivenessProbe)
9 changes: 8 additions & 1 deletion lifecycle/ak
@@ -1,5 +1,6 @@
#!/bin/bash -e
MODE_FILE="${TMPDIR}/authentik-mode"
WORKER_HEARTBEAT="${TMPDIR}/authentik-worker"

function log {
printf '{"event": "%s", "level": "info", "logger": "bootstrap"}\n' "$@" > /dev/stderr
Expand Down Expand Up @@ -80,7 +81,13 @@ elif [[ "$1" == "healthcheck" ]]; then
if [[ $mode == "server" ]]; then
exec curl --user-agent "goauthentik.io lifecycle Healthcheck" -I http://localhost:9000/-/health/ready/
elif [[ $mode == "worker" ]]; then
exec celery -A authentik.root.celery inspect ping -d celery@$HOSTNAME --timeout 5 -j
mtime=$(stat -f %m $WORKER_HEARTBEAT)
time=$(date +"%s")
if [ "$(( $time - $mtime ))" -gt "30" ]; then
log "Worker hasn't updated heartbeat in 30 seconds"
exit 1
fi
exit 0
fi
elif [[ "$1" == "dump_config" ]]; then
exec python -m authentik.lib.config
Expand Down

0 comments on commit 97fb68a

Please sign in to comment.