Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiprocess RQ workers (using supervisor) #4371

Merged
merged 11 commits into from
Jan 1, 2020
5 changes: 4 additions & 1 deletion bin/docker-entrypoint
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ dev_scheduler() {
worker() {
echo "Starting RQ worker..."

exec /app/manage.py rq worker $QUEUES
export WORKERS_COUNT=${WORKERS_COUNT:-2}
export QUEUES=${QUEUES:-}

supervisord -c worker.conf
}

dev_worker() {
Expand Down
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ services:
target: /app
depends_on:
- server
tty: true
environment:
PYTHONUNBUFFERED: 0
REDASH_LOG_LEVEL: "INFO"
Expand Down
61 changes: 51 additions & 10 deletions redash/cli/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from click import argument
from flask.cli import AppGroup
from rq import Connection
from rq.worker import WorkerStatus
from sqlalchemy.orm import configure_mappers
from supervisor_checks import check_runner
from supervisor_checks.check_modules import base

from redash import rq_redis_connection
from redash.tasks import Worker
Expand Down Expand Up @@ -42,15 +45,53 @@ def worker(queues):
w.work()


@manager.command()
def healthcheck():
hostname = socket.gethostname()
with Connection(rq_redis_connection):
all_workers = Worker.all()
class WorkerHealthcheck(base.BaseCheck):
NAME = 'RQ Worker Healthcheck'
INTERVAL = datetime.timedelta(minutes=5)
_last_check_time = {}

def time_to_check(self, pid):
now = datetime.datetime.utcnow()

if pid not in self._last_check_time:
self._last_check_time[pid] = now

if now - self._last_check_time[pid] >= self.INTERVAL:
self._last_check_time[pid] = now
return True

return False

def __call__(self, process_spec):
pid = process_spec['pid']
if not self.time_to_check(pid):
return True

all_workers = Worker.all(connection=rq_redis_connection)
worker = [w for w in all_workers if w.hostname == socket.gethostname().encode() and
w.pid == pid].pop()

local_workers = [w for w in all_workers if w.hostname == hostname]
heartbeats = [w.last_heartbeat for w in local_workers]
time_since_seen = [datetime.datetime.utcnow() - hb for hb in heartbeats]
active = [t.seconds < 60 for t in time_since_seen]
is_busy = worker.get_state() == WorkerStatus.BUSY

sys.exit(int(not all(active)))
time_since_seen = datetime.datetime.utcnow() - worker.last_heartbeat
seen_lately = time_since_seen.seconds < 60

total_jobs_in_watched_queues = sum([len(q.jobs) for q in worker.queues])
has_nothing_to_do = total_jobs_in_watched_queues == 0

is_healthy = is_busy or seen_lately or has_nothing_to_do

self._log("Worker %s healthcheck: Is busy? %s. "
"Seen lately? %s (%d seconds ago). "
"Has nothing to do? %s (%d jobs in watched queues). "
"==> Is healthy? %s",
worker.key, is_busy, seen_lately, time_since_seen.seconds,
has_nothing_to_do, total_jobs_in_watched_queues, is_healthy)

return is_healthy


@manager.command()
def healthcheck():
return check_runner.CheckRunner(
'worker_healthcheck', 'worker', None, [(WorkerHealthcheck, {})]).run()
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ maxminddb-geolite2==2018.703
pypd==1.1.0
disposable-email-domains>=0.0.52
gevent==1.4.0
supervisor==4.1.0
supervisor_checks==0.8.1
# Install the dependencies of the bin/bundle-extensions script here.
# It has its own requirements file to simplify the frontend client build process
-r requirements_bundles.txt
Expand Down
32 changes: 32 additions & 0 deletions worker.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[supervisord]
logfile=/dev/null
pidfile=/tmp/supervisord.pid
nodaemon=true

[unix_http_server]
file = /tmp/supervisor.sock

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[program:worker]
command=./manage.py rq worker %(ENV_QUEUES)s
process_name=%(program_name)s-%(process_num)s
numprocs=%(ENV_WORKERS_COUNT)s
directory=/app
stopsignal=TERM
autostart=true
autorestart=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0

[eventlistener:worker_healthcheck]
serverurl=AUTO
command=./manage.py rq healthcheck
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
events=TICK_60