Skip to content

Commit

Permalink
Multiprocess RQ workers (using supervisor) (#4371)
Browse files Browse the repository at this point in the history
* launch and monitor multiple workers using supervisor

* run supervisord in non-daemon mode

* redirect all output to stdout/stderr

* no need to log supervisord's output because it is redirected to stdout anyway

* updated and less brittle healthcheck

* add supervisor healthchecks

* remove redundant supervisor installation as it is installed by pip

* add a 5 minute check gate
  • Loading branch information
Omer Lachish authored and arikfr committed Jan 1, 2020
1 parent f85490c commit 260bfca
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 12 deletions.
5 changes: 4 additions & 1 deletion bin/docker-entrypoint
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ dev_scheduler() {
worker() {
echo "Starting RQ worker..."

exec /app/manage.py rq worker $QUEUES
export WORKERS_COUNT=${WORKERS_COUNT:-2}
export QUEUES=${QUEUES:-}

supervisord -c worker.conf
}

dev_worker() {
Expand Down
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ services:
target: /app
depends_on:
- server
tty: true
environment:
PYTHONUNBUFFERED: 0
REDASH_LOG_LEVEL: "INFO"
Expand Down
61 changes: 51 additions & 10 deletions redash/cli/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from click import argument
from flask.cli import AppGroup
from rq import Connection
from rq.worker import WorkerStatus
from sqlalchemy.orm import configure_mappers
from supervisor_checks import check_runner
from supervisor_checks.check_modules import base

from redash import rq_redis_connection
from redash.tasks import Worker
Expand Down Expand Up @@ -42,15 +45,53 @@ def worker(queues):
w.work()


@manager.command()
def healthcheck():
hostname = socket.gethostname()
with Connection(rq_redis_connection):
all_workers = Worker.all()
class WorkerHealthcheck(base.BaseCheck):
NAME = 'RQ Worker Healthcheck'
INTERVAL = datetime.timedelta(minutes=5)
_last_check_time = {}

def time_to_check(self, pid):
now = datetime.datetime.utcnow()

if pid not in self._last_check_time:
self._last_check_time[pid] = now

if now - self._last_check_time[pid] >= self.INTERVAL:
self._last_check_time[pid] = now
return True

return False

def __call__(self, process_spec):
pid = process_spec['pid']
if not self.time_to_check(pid):
return True

all_workers = Worker.all(connection=rq_redis_connection)
worker = [w for w in all_workers if w.hostname == socket.gethostname().encode() and
w.pid == pid].pop()

local_workers = [w for w in all_workers if w.hostname == hostname]
heartbeats = [w.last_heartbeat for w in local_workers]
time_since_seen = [datetime.datetime.utcnow() - hb for hb in heartbeats]
active = [t.seconds < 60 for t in time_since_seen]
is_busy = worker.get_state() == WorkerStatus.BUSY

sys.exit(int(not all(active)))
time_since_seen = datetime.datetime.utcnow() - worker.last_heartbeat
seen_lately = time_since_seen.seconds < 60

total_jobs_in_watched_queues = sum([len(q.jobs) for q in worker.queues])
has_nothing_to_do = total_jobs_in_watched_queues == 0

is_healthy = is_busy or seen_lately or has_nothing_to_do

self._log("Worker %s healthcheck: Is busy? %s. "
"Seen lately? %s (%d seconds ago). "
"Has nothing to do? %s (%d jobs in watched queues). "
"==> Is healthy? %s",
worker.key, is_busy, seen_lately, time_since_seen.seconds,
has_nothing_to_do, total_jobs_in_watched_queues, is_healthy)

return is_healthy


@manager.command()
def healthcheck():
return check_runner.CheckRunner(
'worker_healthcheck', 'worker', None, [(WorkerHealthcheck, {})]).run()
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ maxminddb-geolite2==2018.703
pypd==1.1.0
disposable-email-domains>=0.0.52
gevent==1.4.0
supervisor==4.1.0
supervisor_checks==0.8.1
# Install the dependencies of the bin/bundle-extensions script here.
# It has its own requirements file to simplify the frontend client build process
-r requirements_bundles.txt
Expand Down
32 changes: 32 additions & 0 deletions worker.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[supervisord]
logfile=/dev/null
pidfile=/tmp/supervisord.pid
nodaemon=true

[unix_http_server]
file = /tmp/supervisor.sock

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[program:worker]
command=./manage.py rq worker %(ENV_QUEUES)s
process_name=%(program_name)s-%(process_num)s
numprocs=%(ENV_WORKERS_COUNT)s
directory=/app
stopsignal=TERM
autostart=true
autorestart=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0

[eventlistener:worker_healthcheck]
serverurl=AUTO
command=./manage.py rq healthcheck
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
events=TICK_60

0 comments on commit 260bfca

Please sign in to comment.