Skip to content

Commit

Permalink
Merge branch 'wait-for-connection'
Browse files Browse the repository at this point in the history
solves issue rq#1153, rq#998
rq workers not auto connecting to redis server incase if they are down/restarted.
  • Loading branch information
Asrst committed Dec 3, 2020
2 parents d3b07fb + 7d780c6 commit 2c40de2
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
29 changes: 21 additions & 8 deletions rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
except ImportError:
from signal import SIGTERM as SIGKILL

from redis import WatchError
import redis.exceptions

from . import worker_registration
from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command
Expand Down Expand Up @@ -106,6 +106,10 @@ class Worker(object):
log_result_lifespan = True
# `log_job_description` is used to toggle logging an entire jobs description.
log_job_description = True
# factor to increase connection_wait_time incase of continous connection failures.
exponential_backoff_factor = 2.0
# Max Wait time (in seconds) after which exponential_backoff_factor wont be applicable.
max_connection_wait_time = 60.0

@classmethod
def all(cls, connection=None, job_class=None, queue_class=None, queue=None, serializer=None):
Expand Down Expand Up @@ -469,7 +473,6 @@ def handle_warm_shutdown_request(self):

def check_for_suspension(self, burst):
"""Check to see if workers have been suspended by `rq suspend`"""

before_state = None
notified = False

Expand Down Expand Up @@ -628,14 +631,15 @@ def dequeue_job_and_maintain_ttl(self, timeout):
self.set_state(WorkerStatus.IDLE)
self.procline('Listening on ' + qnames)
self.log.debug('*** Listening on %s...', green(qnames))

connection_wait_time = 1.0
while True:
self.heartbeat()

if self.should_run_maintenance_tasks:
self.run_maintenance_tasks()

try:
self.heartbeat()

if self.should_run_maintenance_tasks:
self.run_maintenance_tasks()

result = self.queue_class.dequeue_any(self.queues, timeout,
connection=self.connection,
job_class=self.job_class,
Expand All @@ -654,6 +658,15 @@ def dequeue_job_and_maintain_ttl(self, timeout):
break
except DequeueTimeout:
pass
except redis.exceptions.ConnectionError as conn_err:
self.log.error('Could not connect to Redis instance: %s '
f'Retrying in {connection_wait_time} seconds...', conn_err)
time.sleep(connection_wait_time)
connection_wait_time *= self.exponential_backoff_factor
if connection_wait_time > max_connection_wait_time:
connection_wait_time = max_connection_wait_time
else:
connection_wait_time = 1.0

self.heartbeat()
return result
Expand Down Expand Up @@ -955,7 +968,7 @@ def handle_job_success(self, job, queue, started_job_registry):

pipeline.execute()
break
except WatchError:
except redis.exceptions.WatchError:
continue

def perform_job(self, job, queue, heartbeat_ttl=None):
Expand Down
14 changes: 14 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from unittest import skipIf

import redis.exceptions
import pytest
import mock
from mock import Mock
Expand Down Expand Up @@ -283,6 +284,19 @@ def test_heartbeat(self):
self.testconn.hdel(w.key, 'birth')
w.refresh()

@slow
def test_heartbeat_survives_lost_connection(self):
with mock.patch.object(Worker, 'heartbeat') as mocked:
# None -> Heartbeat is first called before the job loop
mocked.side_effect = [None, redis.exceptions.ConnectionError()]
q = Queue()
w = Worker([q])
w.work(burst=True)
# First call is prior to job loop, second raises the error,
# third is successful, after "recovery"
assert mocked.call_count == 3


@slow
def test_heartbeat_busy(self):
"""Periodic heartbeats while horse is busy with long jobs"""
Expand Down

0 comments on commit 2c40de2

Please sign in to comment.