Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

New connection management.

Connections can now be set explicitly on Queues, Workers, and Jobs.
Jobs that are implicitly created by Queue or Worker API calls now
inherit the connection of their creator's.

For all RQ object instances that are created now holds that the
"current" connection is used if none is passed in explicitly.  The
"current" connection is thus hold on to at creation time and won't be
changed for the lifetime of the object.

Effectively, this means that, given a default Redis connection, say you
create a queue Q1, then push another Redis connection onto the
connection stack, then create Q2. In that case, Q1 means a queue on the
first connection and Q2 on the second connection.

This is way more clear than it used to be.

Also, I've removed the `use_redis()` call, which was named ugly.
Instead, some new alternatives for connection management now exist.

You can push/pop connections now:

    >>> my_conn = Redis()
    >>> push_connection(my_conn)
    >>> q = Queue()
    >>> q.connection == my_conn
    True
    >>> pop_connection() == my_conn

Also, you can stack them syntactically:

    >>> conn1 = Redis()
    >>> conn2 = Redis('example.org', 1234)
    >>> with Connection(conn1):
    ...     q = Queue()
    ...     with Connection(conn2):
    ...         q2 = Queue()
    ...     q3 = Queue()
    >>> q.connection == conn1
    True
    >>> q2.connection == conn2
    True
    >>> q3.connection == conn1
    True

Or, if you only require a single connection to Redis (for most uses):

    >>> use_connection(Redis())
  • Loading branch information...
commit 2982486448412b336f6712defde5e73cf489f3bc 1 parent a662180
@nvie nvie authored
View
20 rq/__init__.py
@@ -1,5 +1,6 @@
-from redis import Redis
-from .proxy import conn
+from .connections import get_current_connection
+from .connections import use_connection, push_connection, pop_connection
+from .connections import Connection
from .queue import Queue, get_failed_queue
from .job import cancel_job, requeue_job
from .worker import Worker
@@ -7,21 +8,12 @@
def use_redis(redis=None):
- """Pushes the given Redis connection (a redis.Redis instance) onto the
- connection stack. This is merely a helper function to easily start
- using RQ without having to know or understand the RQ connection stack.
+ use_connection(redis)
- When given None as an argument, a (default) Redis connection to
- redis://localhost:6379 is set up.
- """
- if redis is None:
- redis = Redis()
- elif not isinstance(redis, Redis):
- raise TypeError('Argument redis should be a Redis instance.')
- conn.push(redis)
__all__ = [
- 'conn', 'use_redis',
+ 'use_connection', 'get_current_connection',
+ 'push_connection', 'pop_connection', 'Connection',
'Queue', 'get_failed_queue', 'Worker',
'cancel_job', 'requeue_job']
__version__ = VERSION
View
82 rq/connections.py
@@ -0,0 +1,82 @@
+from contextlib import contextmanager
+from redis import Redis
+
+
+class NoRedisConnectionException(Exception):
+ pass
+
+
+class _RedisConnectionStack(object):
+ def __init__(self):
+ self.stack = []
+
+ def _get_current_object(self):
+ try:
+ return self.stack[-1]
+ except IndexError:
+ msg = 'No Redis connection configured.'
+ raise NoRedisConnectionException(msg)
+
+ def pop(self):
+ return self.stack.pop()
+
+ def push(self, connection):
+ self.stack.append(connection)
+
+ def empty(self):
+ del self.stack[:]
+
+ def depth(self):
+ return len(self.stack)
+
+ def __getattr__(self, name):
+ return getattr(self._get_current_object(), name)
+
+
+_connection_stack = _RedisConnectionStack()
+
+
+@contextmanager
+def Connection(connection=None):
+ if connection is None:
+ connection = Redis()
+ _connection_stack.push(connection)
+ try:
+ yield
+ finally:
+ popped = _connection_stack.pop()
+ assert popped == connection, \
+ 'Unexpected Redis connection was popped off the stack. ' \
+ 'Check your Redis connection setup.'
+
+
+def push_connection(redis):
+ """Pushes the given connection on the stack."""
+ _connection_stack.push(redis)
+
+
+def pop_connection():
+ """Pops the topmost connection from the stack."""
+ return _connection_stack.pop()
+
+
+def use_connection(redis):
+ """Clears the stack and uses the given connection. Protects against mixed
+ use of use_connection() and stacked connection contexts.
+ """
+ assert _connection_stack.depth() <= 1, \
+ 'You should not mix Connection contexts with use_connection().'
+ _connection_stack.empty()
+ push_connection(redis)
+
+
+def get_current_connection():
+ """Returns the current Redis connection (i.e. the topmost on the
+ connection stack).
+ """
+ return _connection_stack._get_current_object()
+
+
+__all__ = ['Connection',
+ 'get_current_connection', 'push_connection', 'pop_connection',
+ 'use_connection']
View
31 rq/job.py
@@ -2,7 +2,7 @@
import times
from uuid import uuid4
from cPickle import loads, dumps, UnpicklingError
-from .proxy import conn
+from .connections import get_current_connection
from .exceptions import UnpickleError, NoSuchJobError
@@ -21,20 +21,20 @@ def unpickle(pickled_string):
return obj
-def cancel_job(job_id):
+def cancel_job(job_id, connection=None):
"""Cancels the job with the given job ID, preventing execution. Discards
any job info (i.e. it can't be requeued later).
"""
- Job(job_id).cancel()
+ Job(job_id, connection=connection).cancel()
-def requeue_job(job_id):
+def requeue_job(job_id, connection=None):
"""Requeues the job with the given job ID. The job ID should refer to
a failed job (i.e. it should be on the failed queue). If no such (failed)
job exists, a NoSuchJobError is raised.
"""
from .queue import get_failed_queue
- fq = get_failed_queue()
+ fq = get_failed_queue(connection=connection)
fq.requeue(job_id)
@@ -48,7 +48,8 @@ def create(cls, func, *args, **kwargs):
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
"""
- job = Job()
+ connection = kwargs.pop('connection', None)
+ job = Job(connection=connection)
job._func_name = '%s.%s' % (func.__module__, func.__name__)
job._args = args
job._kwargs = kwargs
@@ -80,18 +81,22 @@ def kwargs(self):
@classmethod
def exists(cls, job_id):
"""Returns whether a job hash exists for the given job ID."""
+ conn = get_current_connection()
return conn.exists(cls.key_for(job_id))
@classmethod
- def fetch(cls, id):
+ def fetch(cls, id, connection=None):
"""Fetches a persisted job from its corresponding Redis key and
instantiates it.
"""
- job = Job(id)
+ job = Job(id, connection=connection)
job.refresh()
return job
- def __init__(self, id=None):
+ def __init__(self, id=None, connection=None):
+ if connection is None:
+ connection = get_current_connection()
+ self.connection = connection
self._id = id
self.created_at = times.now()
self._func_name = None
@@ -156,7 +161,7 @@ def return_value(self):
seconds by default).
"""
if self._result is None:
- rv = conn.hget(self.key, 'result')
+ rv = self.connection.hget(self.key, 'result')
if rv is not None:
# cache the result
self._result = loads(rv)
@@ -175,7 +180,7 @@ def refresh(self): # noqa
'enqueued_at', 'ended_at', 'result', 'exc_info', 'timeout']
data, created_at, origin, description, \
enqueued_at, ended_at, result, \
- exc_info, timeout = conn.hmget(key, properties)
+ exc_info, timeout = self.connection.hmget(key, properties)
if data is None:
raise NoSuchJobError('No such job: %s' % (key,))
@@ -222,7 +227,7 @@ def save(self):
if self.timeout is not None:
obj['timeout'] = self.timeout
- conn.hmset(key, obj)
+ self.connection.hmset(key, obj)
def cancel(self):
"""Cancels the given job, which will prevent the job from ever being
@@ -237,7 +242,7 @@ def cancel(self):
def delete(self):
"""Deletes the job hash from Redis."""
- conn.delete(self.key)
+ self.connection.delete(self.key)
# Job execution
View
28 rq/proxy.py
@@ -1,28 +0,0 @@
-class NoRedisConnectionException(Exception):
- pass
-
-
-class RedisConnectionProxy(object):
- def __init__(self):
- self.stack = []
-
- def _get_current_object(self):
- try:
- return self.stack[-1]
- except IndexError:
- msg = 'No Redis connection configured.'
- raise NoRedisConnectionException(msg)
-
- def pop(self):
- return self.stack.pop()
-
- def push(self, db):
- self.stack.append(db)
-
- def __getattr__(self, name):
- return getattr(self._get_current_object(), name)
-
-
-conn = RedisConnectionProxy()
-
-__all__ = ['conn']
View
67 rq/queue.py
@@ -1,13 +1,13 @@
import times
from functools import total_ordering
-from .proxy import conn
+from .connections import get_current_connection
from .job import Job
from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError
-def get_failed_queue():
+def get_failed_queue(connection=None):
"""Returns a handle to the special failed queue."""
- return FailedQueue()
+ return FailedQueue(connection=connection)
def compact(lst):
@@ -19,14 +19,19 @@ class Queue(object):
redis_queue_namespace_prefix = 'rq:queue:'
@classmethod
- def all(cls):
+ def all(cls, connection=None):
"""Returns an iterable of all Queues.
"""
prefix = cls.redis_queue_namespace_prefix
- return map(cls.from_queue_key, conn.keys('%s*' % prefix))
+ if connection is None:
+ connection = get_current_connection()
+
+ def to_queue(queue_key):
+ return cls.from_queue_key(queue_key, connection=connection)
+ return map(to_queue, connection.keys('%s*' % prefix))
@classmethod
- def from_queue_key(cls, queue_key):
+ def from_queue_key(cls, queue_key, connection=None):
"""Returns a Queue instance, based on the naming conventions for naming
the internal Redis keys. Can be used to reverse-lookup Queues by their
Redis keys.
@@ -35,9 +40,12 @@ def from_queue_key(cls, queue_key):
if not queue_key.startswith(prefix):
raise ValueError('Not a valid RQ queue key: %s' % (queue_key,))
name = queue_key[len(prefix):]
- return Queue(name)
+ return Queue(name, connection=connection)
- def __init__(self, name='default', default_timeout=None):
+ def __init__(self, name='default', default_timeout=None, connection=None):
+ if connection is None:
+ connection = get_current_connection()
+ self.connection = connection
prefix = self.redis_queue_namespace_prefix
self.name = name
self._key = '%s%s' % (prefix, name)
@@ -50,7 +58,7 @@ def key(self):
def empty(self):
"""Removes all messages on the queue."""
- conn.delete(self.key)
+ self.connection.delete(self.key)
def is_empty(self):
"""Returns whether the current queue is empty."""
@@ -59,7 +67,7 @@ def is_empty(self):
@property
def job_ids(self):
"""Returns a list of all job IDS in the queue."""
- return conn.lrange(self.key, 0, -1)
+ return self.connection.lrange(self.key, 0, -1)
@property
def jobs(self):
@@ -78,7 +86,7 @@ def safe_fetch(job_id):
@property
def count(self):
"""Returns a count of all messages in the queue."""
- return conn.llen(self.key)
+ return self.connection.llen(self.key)
def compact(self):
"""Removes all "dead" jobs from the queue by cycling through it, while
@@ -86,18 +94,18 @@ def compact(self):
"""
COMPACT_QUEUE = 'rq:queue:_compact'
- conn.rename(self.key, COMPACT_QUEUE)
+ self.connection.rename(self.key, COMPACT_QUEUE)
while True:
- job_id = conn.lpop(COMPACT_QUEUE)
+ job_id = self.connection.lpop(COMPACT_QUEUE)
if job_id is None:
break
if Job.exists(job_id):
- conn.rpush(self.key, job_id)
+ self.connection.rpush(self.key, job_id)
def push_job_id(self, job_id): # noqa
"""Pushes a job ID on the corresponding Redis queue."""
- conn.rpush(self.key, job_id)
+ self.connection.rpush(self.key, job_id)
def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues
@@ -115,7 +123,7 @@ def enqueue(self, f, *args, **kwargs):
'by workers.')
timeout = kwargs.pop('timeout', None)
- job = Job.create(f, *args, **kwargs)
+ job = Job.create(f, *args, connection=self.connection, **kwargs)
return self.enqueue_job(job, timeout=timeout)
def enqueue_job(self, job, timeout=None, set_meta_data=True):
@@ -143,7 +151,7 @@ def enqueue_job(self, job, timeout=None, set_meta_data=True):
def pop_job_id(self):
"""Pops a given job ID from this Redis queue."""
- return conn.lpop(self.key)
+ return self.connection.lpop(self.key)
@classmethod
def lpop(cls, queue_keys, blocking):
@@ -155,6 +163,7 @@ def lpop(cls, queue_keys, blocking):
Until Redis receives a specific method for this, we'll have to wrap it
this way.
"""
+ conn = get_current_connection()
if blocking:
queue_key, job_id = conn.blpop(queue_keys)
return queue_key, job_id
@@ -174,7 +183,7 @@ def dequeue(self):
if job_id is None:
return None
try:
- job = Job.fetch(job_id)
+ job = Job.fetch(job_id, connection=self.connection)
except NoSuchJobError as e:
# Silently pass on jobs that don't exist (anymore),
# and continue by reinvoking itself recursively
@@ -187,7 +196,7 @@ def dequeue(self):
return job
@classmethod
- def dequeue_any(cls, queues, blocking):
+ def dequeue_any(cls, queues, blocking, connection=None):
"""Class method returning the Job instance at the front of the given
set of Queues, where the order of the queues is important.
@@ -200,13 +209,13 @@ def dequeue_any(cls, queues, blocking):
if result is None:
return None
queue_key, job_id = result
- queue = Queue.from_queue_key(queue_key)
+ queue = Queue.from_queue_key(queue_key, connection=connection)
try:
- job = Job.fetch(job_id)
+ job = Job.fetch(job_id, connection=connection)
except NoSuchJobError:
# Silently pass on jobs that don't exist (anymore),
# and continue by reinvoking the same function recursively
- return cls.dequeue_any(queues, blocking)
+ return cls.dequeue_any(queues, blocking, connection=connection)
except UnpickleError as e:
# Attach queue information on the exception for improved error
# reporting
@@ -240,8 +249,8 @@ def __str__(self):
class FailedQueue(Queue):
- def __init__(self):
- super(FailedQueue, self).__init__('failed')
+ def __init__(self, connection=None):
+ super(FailedQueue, self).__init__('failed', connection=connection)
def quarantine(self, job, exc_info):
"""Puts the given Job in quarantine (i.e. put it on the failed
@@ -258,16 +267,16 @@ def quarantine(self, job, exc_info):
def requeue(self, job_id):
"""Requeues the job with the given job ID."""
try:
- job = Job.fetch(job_id)
+ job = Job.fetch(job_id, connection=self.connection)
except NoSuchJobError:
# Silently ignore/remove this job and return (i.e. do nothing)
- conn.lrem(self.key, job_id)
+ self.connection.lrem(self.key, job_id)
return
- # Delete it from the FailedQueue (raise an error if that failed)
- if conn.lrem(self.key, job.id) == 0:
+ # Delete it from the failed queue (raise an error if that failed)
+ if self.connection.lrem(self.key, job.id) == 0:
raise InvalidJobOperationError('Cannot requeue non-failed jobs.')
job.exc_info = None
- q = Queue(job.origin)
+ q = Queue(job.origin, connection=self.connection)
q.enqueue_job(job)
View
27 rq/worker.py
@@ -12,8 +12,8 @@
Logger = Logger # Does nothing except it shuts up pyflakes annoying error
except ImportError:
from logging import Logger
-from .queue import Queue, FailedQueue
-from .proxy import conn
+from .queue import Queue, get_failed_queue
+from .connections import get_current_connection
from .utils import make_colorizer
from .exceptions import NoQueueError, UnpickleError
from .timeouts import death_pentalty_after
@@ -53,6 +53,7 @@ class Worker(object):
def all(cls):
"""Returns an iterable of all Workers.
"""
+ conn = get_current_connection()
reported_working = conn.smembers(cls.redis_workers_keys)
return compact(map(cls.find_by_key, reported_working))
@@ -67,6 +68,7 @@ def find_by_key(cls, worker_key):
if not worker_key.startswith(prefix):
raise ValueError('Not a valid RQ worker key: %s' % (worker_key,))
+ conn = get_current_connection()
if not conn.exists(worker_key):
return None
@@ -79,7 +81,10 @@ def find_by_key(cls, worker_key):
return worker
- def __init__(self, queues, name=None, rv_ttl=500): # noqa
+ def __init__(self, queues, name=None, rv_ttl=500, connection=None): # noqa
+ if connection is None:
+ connection = get_current_connection()
+ self.connection = connection
if isinstance(queues, Queue):
queues = [queues]
self._name = name
@@ -91,7 +96,7 @@ def __init__(self, queues, name=None, rv_ttl=500): # noqa
self._horse_pid = 0
self._stopped = False
self.log = Logger('worker')
- self.failed_queue = FailedQueue()
+ self.failed_queue = get_failed_queue(connection=self.connection)
def validate_queues(self): # noqa
@@ -158,14 +163,15 @@ def procline(self, message):
def register_birth(self): # noqa
"""Registers its own birth."""
self.log.debug('Registering birth of worker %s' % (self.name,))
- if conn.exists(self.key) and not conn.hexists(self.key, 'death'):
+ if self.connection.exists(self.key) and \
+ not self.connection.hexists(self.key, 'death'):
raise ValueError(
'There exists an active worker named \'%s\' '
'already.' % (self.name,))
key = self.key
now = time.time()
queues = ','.join(self.queue_names())
- with conn.pipeline() as p:
+ with self.connection.pipeline() as p:
p.delete(key)
p.hset(key, 'birth', now)
p.hset(key, 'queues', queues)
@@ -175,7 +181,7 @@ def register_birth(self): # noqa
def register_death(self):
"""Registers its own death."""
self.log.debug('Registering death')
- with conn.pipeline() as p:
+ with self.connection.pipeline() as p:
# We cannot use self.state = 'dead' here, because that would
# rollback the pipeline
p.srem(self.redis_workers_keys, self.key)
@@ -185,7 +191,7 @@ def register_death(self):
def set_state(self, new_state):
self._state = new_state
- conn.hset(self.key, 'state', new_state)
+ self.connection.hset(self.key, 'state', new_state)
def get_state(self):
return self._state
@@ -268,7 +274,8 @@ def work(self, burst=False): # noqa
green(', '.join(qnames)))
wait_for_job = not burst
try:
- result = Queue.dequeue_any(self.queues, wait_for_job)
+ result = Queue.dequeue_any(self.queues, wait_for_job, \
+ connection=self.connection)
if result is None:
break
except UnpickleError as e:
@@ -359,7 +366,7 @@ def perform_job(self, job):
self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),))
if rv is not None:
- p = conn.pipeline()
+ p = self.connection.pipeline()
p.hset(job.key, 'result', dumps(rv))
p.expire(job.key, self.rv_ttl)
p.execute()
View
10 tests/__init__.py
@@ -1,7 +1,7 @@
import unittest
from redis import Redis
from logbook import NullHandler
-from rq import conn
+from rq import push_connection, pop_connection
def find_empty_redis_database():
@@ -42,7 +42,7 @@ class RQTestCase(unittest.TestCase):
def setUpClass(cls):
# Set up connection to Redis
testconn = find_empty_redis_database()
- conn.push(testconn)
+ push_connection(testconn)
# Store the connection (for sanity checking)
cls.testconn = testconn
@@ -53,17 +53,17 @@ def setUpClass(cls):
def setUp(self):
# Flush beforewards (we like our hygiene)
- conn.flushdb()
+ self.testconn.flushdb()
def tearDown(self):
# Flush afterwards
- conn.flushdb()
+ self.testconn.flushdb()
@classmethod
def tearDownClass(cls):
cls.log_handler.pop_thread()
# Pop the connection to Redis
- testconn = conn.pop()
+ testconn = pop_connection()
assert testconn == cls.testconn, 'Wow, something really nasty ' \
'happened to the Redis connection stack. Check your setup.'
View
36 tests/test_connection.py
@@ -0,0 +1,36 @@
+from tests import RQTestCase, find_empty_redis_database
+from tests.fixtures import do_nothing
+from rq import Queue
+from rq import Connection
+
+
+def new_connection():
+ return find_empty_redis_database()
+
+
+class TestConnectionInheritance(RQTestCase):
+ def test_connection_detection(self):
+ """Automatic detection of the connection."""
+ q = Queue()
+ self.assertEquals(q.connection, self.testconn)
+
+ def test_connection_stacking(self):
+ """Connection stacking."""
+ conn1 = new_connection()
+ conn2 = new_connection()
+
+ with Connection(conn1):
+ q1 = Queue()
+ with Connection(conn2):
+ q2 = Queue()
+ self.assertNotEquals(q1.connection, q2.connection)
+
+ def test_connection_pass_thru(self):
+ """Connection passed through from queues to jobs."""
+ q1 = Queue()
+ with Connection(new_connection()):
+ q2 = Queue()
+ job1 = q1.enqueue(do_nothing)
+ job2 = q2.enqueue(do_nothing)
+ self.assertEquals(q1.connection, job1.connection)
+ self.assertEquals(q2.connection, job2.connection)
View
6 tests/test_queue.py
@@ -188,7 +188,7 @@ def test_dequeue_any_ignores_nonexisting_jobs(self):
# Dequeue simply ignores the missing job and returns None
self.assertEquals(q.count, 1)
- self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], False),
+ self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], False), # noqa
None)
self.assertEquals(q.count, 0)
@@ -199,9 +199,9 @@ def test_requeue_job(self):
job = Job.create(div_by_zero, 1, 2, 3)
job.origin = 'fake'
job.save()
- get_failed_queue().quarantine(job, Exception('Some fake error'))
+ get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa
- self.assertItemsEqual(Queue.all(), [get_failed_queue()])
+ self.assertItemsEqual(Queue.all(), [get_failed_queue()]) # noqa
self.assertEquals(get_failed_queue().count, 1)
get_failed_queue().requeue(job.id)
View
6 tests/test_worker.py
@@ -3,7 +3,7 @@
from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file, \
create_file_after_timeout
from tests.helpers import strip_milliseconds
-from rq import Queue, Worker
+from rq import Queue, Worker, get_failed_queue
from rq.job import Job
@@ -28,7 +28,7 @@ def test_work_and_quit(self):
def test_work_is_unreadable(self):
"""Unreadable jobs are put on the failed queue."""
q = Queue()
- failed_q = Queue('failed')
+ failed_q = get_failed_queue()
self.assertEquals(failed_q.count, 0)
self.assertEquals(q.count, 0)
@@ -58,7 +58,7 @@ def test_work_is_unreadable(self):
def test_work_fails(self):
"""Failing jobs are put on the failed queue."""
q = Queue()
- failed_q = Queue('failed')
+ failed_q = get_failed_queue()
# Preconditions
self.assertEquals(failed_q.count, 0)
Please sign in to comment.
Something went wrong with that request. Please try again.