Skip to content

Commit

Permalink
Make Queue Cycling in Redis More Fair
Browse files Browse the repository at this point in the history
The previous method of cycling queues with the Redis transport was
not fair if there were a lot of empty queues.  Queues waiting behind
lots of empty queues and at least one very full queue will be forced
to wait until the queue in front of it is rotated, and then will
only get the chance to be consumed once.  This changes the behavior
to rotate the most recently used queue to the back of the list,
elmininating the problem.  Closes #166
  • Loading branch information
kevin1024 authored and ask committed Nov 2, 2012
1 parent 734317c commit 10319aa
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions kombu/transport/redis.py
Expand Up @@ -13,7 +13,6 @@

from bisect import bisect
from contextlib import contextmanager
from itertools import cycle, islice
from time import time
from Queue import Empty

Expand Down Expand Up @@ -321,7 +320,7 @@ def __init__(self, *args, **kwargs):
super_ = super(Channel, self)
super_.__init__(*args, **kwargs)

self._queue_cycle = cycle([])
self._queue_cycle = []
self.Client = self._get_client()
self.ResponseError = self._get_response_error()
self.active_fanout_queues = set()
Expand Down Expand Up @@ -450,6 +449,7 @@ def _brpop_read(self, **options):
if dest__item:
dest, item = dest__item
dest = dest.rsplit(self.sep, 1)[0]
self._rotate_cycle(dest)
return loads(item), dest
else:
raise Empty()
Expand Down Expand Up @@ -654,13 +654,25 @@ def _update_cycle(self):
each queue is equally likely to be consumed from,
so that a very busy queue will not block others.
This works by using Redis's `BRPOP` command and
by rotating the most recently used queue to the
and of the list. See Kombu github issue #166 for
more discussion of this method.
"""
self._queue_cycle = cycle(self.active_queues)
self._queue_cycle = list(self.active_queues)

def _consume_cycle(self):
"""Get a fresh list of queues from the queue cycle."""
active = len(self.active_queues)
return list(islice(self._queue_cycle, 0, active + 1))[:active]
return self._queue_cycle[0:active]

def _rotate_cycle(self, used):
"""
Move most recently used queue to end of list
"""
index = self._queue_cycle.index(used)
self._queue_cycle.append(self._queue_cycle.pop(index))

def _get_response_error(self):
from redis import exceptions
Expand Down

0 comments on commit 10319aa

Please sign in to comment.