Skip to content

Commit

Permalink
add supervisor healthchecks
Browse files Browse the repository at this point in the history
  • Loading branch information
Omer Lachish committed Nov 21, 2019
1 parent 22065a7 commit 092eb6c
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 41 deletions.
15 changes: 5 additions & 10 deletions bin/docker-entrypoint
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,16 @@ dev_scheduler() {
worker() {
echo "Starting RQ worker..."

start_worker
export WORKERS_COUNT=${WORKERS_COUNT:-2}
export QUEUES=${QUEUES:-}

supervisord -c worker.conf
}

dev_worker() {
echo "Starting dev RQ worker..."

start_worker "watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- "
}

start_worker() {
export EXECUTION_PREFIX=$1
export WORKERS_COUNT=${WORKERS_COUNT:-2}
export QUEUES=${QUEUES:-}

supervisord -c worker.conf
exec watchmedo auto-restart --directory=./redash/ --pattern=*.py --recursive -- ./manage.py rq worker $QUEUES
}

dev_celery_worker() {
Expand Down
58 changes: 29 additions & 29 deletions redash/cli/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

from click import argument
from flask.cli import AppGroup
from rq import Connection, Worker
from rq import Connection
from rq.worker import Worker, WorkerStatus
from sqlalchemy.orm import configure_mappers
from supervisor_checks import check_runner
from supervisor_checks.check_modules import base

from redash import rq_redis_connection
from redash.schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions
Expand Down Expand Up @@ -37,38 +40,35 @@ def worker(queues):
w.work()


@manager.command()
def healthcheck():
hostname = socket.gethostname()
with Connection(rq_redis_connection):
all_workers = Worker.all()
class WorkerHealthcheck(base.BaseCheck):
NAME = 'RQ Worker Healthcheck'

def __call__(self, process_spec):
all_workers = Worker.all(connection=rq_redis_connection)
worker = [w for w in all_workers if w.hostname == socket.gethostname().encode() and
w.pid == process_spec['pid']].pop()

local_workers = [w for w in all_workers if w.hostname == hostname]
row_format ="{:>10}" * (len(local_workers) + 1)
is_busy = worker.get_state() == WorkerStatus.BUSY

print("Local worker PIDs:")
local_worker_pids = set([w.pid for w in local_workers])
print(row_format.format("", *local_worker_pids))
time_since_seen = datetime.datetime.utcnow() - worker.last_heartbeat
seen_lately = time_since_seen.seconds < 60

print("Time since seen:")
heartbeats = [w.last_heartbeat for w in local_workers]
time_since_seen = [datetime.datetime.utcnow() - hb for hb in heartbeats]
print(row_format.format("", *[t.seconds for t in time_since_seen]))
seen_lately = [t.seconds < 60 for t in time_since_seen]
total_jobs_in_watched_queues = sum([len(q.jobs) for q in worker.queues])
has_nothing_to_do = total_jobs_in_watched_queues == 0

print("State:")
states = [w.state for w in local_workers]
print(row_format.format("", *states))
busy = [s == "busy" for s in states]
is_healthy = is_busy or seen_lately or has_nothing_to_do

print("Jobs in queues:")
jobs_in_queues = [sum([len(q.jobs) for q in w.queues]) for w in local_workers]
print(row_format.format("", *jobs_in_queues))
has_nothing_to_do = [j == 0 for j in jobs_in_queues]
self._log("Worker %s healthcheck: Is busy? %s. "
"Seen lately? %s (%d seconds ago). "
"Has nothing to do? %s (%d jobs in watched queues). "
"==> Is healthy? %s",
worker.key, is_busy, seen_lately, time_since_seen.seconds,
has_nothing_to_do, total_jobs_in_watched_queues, is_healthy)

print("Healty:")
# a healthy worker is either busy, has been seen lately or has nothing to do
healthy = [any(w) for w in zip(busy, seen_lately, has_nothing_to_do)]
print(row_format.format("", *healthy))
return is_healthy

sys.exit(int(not all(healthy)))

@manager.command()
def healthcheck():
return check_runner.CheckRunner(
'worker_healthcheck', 'worker', None, [(WorkerHealthcheck, {})]).run()
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ maxminddb-geolite2==2018.703
pypd==1.1.0
disposable-email-domains>=0.0.52
gevent==1.4.0
supervisor==4.1.0
supervisor_checks==0.8.1
# Install the dependencies of the bin/bundle-extensions script here.
# It has its own requirements file to simplify the frontend client build process
-r requirements_bundles.txt
Expand Down
19 changes: 17 additions & 2 deletions worker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@ logfile=/dev/null
pidfile=/tmp/supervisord.pid
nodaemon=true

[unix_http_server]
file = /tmp/supervisor.sock

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[program:worker]
command=%(ENV_EXECUTION_PREFIX)s ./manage.py rq worker %(ENV_QUEUES)s
command=./manage.py rq worker %(ENV_QUEUES)s
process_name=%(program_name)s-%(process_num)s
numprocs=%(ENV_WORKERS_COUNT)s
directory=/app
Expand All @@ -14,4 +20,13 @@ autorestart=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
stderr_logfile_maxbytes=0

[eventlistener:worker_healthcheck]
serverurl=AUTO
command=./manage.py rq healthcheck
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
events=TICK_60

0 comments on commit 092eb6c

Please sign in to comment.