diff --git a/.bumpversion.cfg b/.bumpversion.cfg index a669f1d..f765064 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.1.1 +current_version = 0.2.0 parse = (?P\d+)\.(?P.*)\.(?P.*) serialize = {major}.{minor}.{patch} diff --git a/.travis.yml b/.travis.yml index cf3e5c5..d90f369 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: python python: - "2.7" - - "3.4" + - "3.6" before_script: - pip install python-coveralls coverage # command to run tests diff --git a/docs/source/conf.py b/docs/source/conf.py index ef0be50..0c4ea9a 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -54,9 +54,9 @@ # built documents. # # The short X.Y version. -version = u'0.1.1' +version = u'0.2.0' # The full version, including alpha/beta/rc tags. -release = u'0.1.1' +release = u'0.2.0' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/docs/source/index.rst b/docs/source/index.rst index 5d04f4b..bebf546 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -27,6 +27,7 @@ messages into the sortedset. installation usage + queue_scheduling testing diff --git a/docs/source/queue_scheduling.rst b/docs/source/queue_scheduling.rst new file mode 100644 index 0000000..23d85e4 --- /dev/null +++ b/docs/source/queue_scheduling.rst @@ -0,0 +1,46 @@ +Queue Scheduling +================ + +By default the `redis_priority` transport will consume from multiple queues in +a round robin fashion, like all the other transports for `kombu`. Currently, +this transport does not support the other strategies specified in `kombu` +(`priority` and `sorted`). Instead, this transport provides the +`prioritized_levels` strategy described below. + +Prioritized Scheduling +---------------------- + +Given a configuration of queues encoded as:: + + { + LEVEL: [QUEUE] + } + +where `LEVEL` is a numeric value indicating priority (smaller +first) and `[QUEUE]` indicates a list of queue names, the +scheduler will walk from smallest level to highest level, +only advancing levels if the smaller levels are empty. +Within levels, the queues are rotated in round robin +fashion. + +To honor the prefernece for the lower levels, we will move +back to the lowest level when we do a full rotation of the +current level. + +You can configure the `redis_priority` transport to use this method by using +the `queue_order_strategy` and `prioritized_levels_queue_config` transport +options, configured with `BROKER_TRANSPORT_OPTIONS`. + +Example:: + + BROKER_TRANSPORT_OPTIONS = { + 'queue_order_strategy': 'prioritized_levels', + 'prioritized_levels_queue_config': { + 0: ['TimeMachine', 'FluxCapacitor'], + 1: ['1985', '1955', '2015'] + } + } + +Note that any queue that the worker is specified to consume which is not in the +`prioritized_levels_queue_config` is automatically specified at the highest +level (max int). diff --git a/kombu_redis_priority/__init__.py b/kombu_redis_priority/__init__.py index 2a99b0c..a7c7b90 100644 --- a/kombu_redis_priority/__init__.py +++ b/kombu_redis_priority/__init__.py @@ -1 +1 @@ -VERSION = __version__ = "0.1.1" +VERSION = __version__ = "0.2.0" diff --git a/kombu_redis_priority/scheduling/__init__.py b/kombu_redis_priority/scheduling/__init__.py new file mode 100644 index 0000000..1fc0860 --- /dev/null +++ b/kombu_redis_priority/scheduling/__init__.py @@ -0,0 +1,8 @@ +""" +This module contains various queue scheduling algorithms that can be used to +modify the behavior of redis priority transport in the face of multiple +queues. + +This is an extension of kombu.utils.scheduling, to support schedulers that +require interactions with redis. +""" diff --git a/kombu_redis_priority/scheduling/base.py b/kombu_redis_priority/scheduling/base.py new file mode 100644 index 0000000..227b72d --- /dev/null +++ b/kombu_redis_priority/scheduling/base.py @@ -0,0 +1,25 @@ +class QueueScheduler(object): + def next(self): + """Return the next queue to consume from.""" + raise NotImplementedError + + def rotate(self, last_queue, was_empty): + """ + Rotate queue list based on what queue was popped + last and whether or not it was empty. + + Args: + last_queue : Queue that was returned by scheduler and consumed from. + was_empty (bool) : Whether or not the last_queue was empty when consumed. + + Returns: + True when a full rotation was made and all the queues were empty. + """ + raise NotImplementedError + + def update(self, queue_list): + """ + Update internal queue list with the list of queues + to consume from. + """ + raise NotImplementedError diff --git a/kombu_redis_priority/scheduling/prioritized_levels.py b/kombu_redis_priority/scheduling/prioritized_levels.py new file mode 100644 index 0000000..9d7a577 --- /dev/null +++ b/kombu_redis_priority/scheduling/prioritized_levels.py @@ -0,0 +1,120 @@ +""" +Prioritized Levels scheduler for the queues. + +Given a configuration of queues encoded as: +{ + LEVEL: [QUEUE] +} +where LEVEL is a numeric value indicating priority (smaller +first) and [QUEUE] indicates a list of queue names, the +scheduler will walk from smallest level to highest level, +only advancing levels if the smaller levels are empty. +Within levels, the queues are rotated in round robin +fashion. + +To honor the prefernece for the lower levels, we will move +back to the lowest level when we do a full rotation of the +current level. + +This is done to support the asynchronous nature in which +kombu pulls tasks. +""" +from collections import defaultdict +from kombu.utils.scheduling import cycle_by_name +from .base import QueueScheduler + +HIGHEST_LEVEL = float('inf') + + +class PrioritizedLevelsQueueScheduler(QueueScheduler): + """ + Instance vars: + queue_cycle : kombu round_robin scheduler. Used to + implement round robin fashion within + levels. + queue_config : Current preference list. Should only + contain queues that the worker should + pull from. + current_level : Current level that worker is + scheduling queues from. + start_of_rotation : The first queue in the rotation + for the current level. Used to + detect cycles. + level_lookup_table : Lookup table mapping queues to + levels. + """ + def __init__(self, config): + self.queue_cycle = cycle_by_name('round_robin')() + self.queue_config = config + self._set_initial_state() + + self.level_lookup_table = {} + for level, queues in config.items(): + for q in queues: + self.level_lookup_table[q] = level + + def next(self): + queues = self.queue_cycle.consume(1) + if queues: + return queues[0] + return None + + def rotate(self, last_queue, was_empty): + # This is first rotation for the level and queue was + # empty, so start tracking that rotation was empty + if last_queue == self.start_of_rotation and was_empty: + self.rotation_empty = True + # If at any time in rotation the queue was not + # empty, then rotation is not empty + elif not was_empty: + self.rotation_empty = False + + # Rotate within the level and check if we fully + # rotated the level. + self.queue_cycle.rotate(last_queue) + next_queue = self.queue_cycle.consume(1)[0] + is_full_rotation = next_queue == self.start_of_rotation + + # On a full cycle and if full rotation was empty, + # jump to next level + if is_full_rotation and self.rotation_empty: + next_index = (self.levels.index(self.current_level) + 1) % len(self.levels) + self._set_level(self.levels[next_index]) + # In this situation, all queues are empty if + # we were at the highest level + return next_index == 0 + elif is_full_rotation: + # otherwise, go back to lowest level + self._set_level(min(self.levels)) + return False + + def update(self, queue_list): + # Starting from base config, only include queues in + # the provided list. For any queues not in the list, + # set at the highest level (= lowest priority) + config = defaultdict(list) + for q in queue_list: + if q in self.level_lookup_table: + level = self.level_lookup_table[q] + else: + level = HIGHEST_LEVEL + config[level].append(q) + self.queue_config = config + self._set_initial_state() + + def _set_initial_state(self): + self.levels = sorted(self.queue_config.keys()) + if self.levels: + self._set_level(min(self.levels)) + else: + self._set_level(HIGHEST_LEVEL) + + def _set_level(self, level): + self.current_level = level + self.queue_cycle.update(self.queue_config[self.current_level]) + queues = self.queue_cycle.consume(1) + if queues: + self.start_of_rotation = queues[0] + else: + self.start_of_rotation = None + self.rotation_empty = False diff --git a/kombu_redis_priority/scheduling/round_robin.py b/kombu_redis_priority/scheduling/round_robin.py new file mode 100644 index 0000000..f170675 --- /dev/null +++ b/kombu_redis_priority/scheduling/round_robin.py @@ -0,0 +1,36 @@ +""" +RoundRobin scheduler for the queues, evenly cycling through the list of +queues to consume from. +""" +from kombu.utils.scheduling import cycle_by_name +from .base import QueueScheduler + + +class RoundRobinQueueScheduler(QueueScheduler): + def __init__(self): + self.cycle = cycle_by_name('round_robin')() + + def next(self): + queues = self.cycle.consume(1) + if queues: + return queues[0] + return None + + def rotate(self, last_queue, was_empty): + # This is first rotation and queue was empty, so + # start tracking that rotation was empty + if last_queue == self.start_queue and was_empty: + self.rotation_empty = True + # If at any time in rotation the queue was not + # empty, then rotation is not empty + elif not was_empty: + self.rotation_empty = False + + self.cycle.rotate(last_queue) + next_queue = self.next() + is_full_rotation = next_queue == self.start_queue + return is_full_rotation and self.rotation_empty + + def update(self, queue_list): + self.cycle.update(queue_list) + self.start_queue = self.next() diff --git a/kombu_redis_priority/transport/redis_priority_async.py b/kombu_redis_priority/transport/redis_priority_async.py index 5cec9c0..a44950f 100644 --- a/kombu_redis_priority/transport/redis_priority_async.py +++ b/kombu_redis_priority/transport/redis_priority_async.py @@ -1,4 +1,4 @@ -"""Redis transport.""" +"""Redis transport backed by sortedset data structure.""" from __future__ import absolute_import, unicode_literals import numbers @@ -26,12 +26,16 @@ import kombu.transport.virtual as virtual +from ..scheduling.round_robin import RoundRobinQueueScheduler +from ..scheduling.prioritized_levels import PrioritizedLevelsQueueScheduler + try: import redis except ImportError: # pragma: no cover redis = None # noqa +# Register priority transport into available transports under kombu transport.TRANSPORT_ALIASES['redispriorityasync'] = 'kombu_redis_priority.transport.redis_priority_async:Transport' @@ -42,6 +46,10 @@ DEFAULT_DB = 0 +# Copied from kombu.transport.redis, with list operations replaced with +# sortedset operations. Below notes are copied verbatim from +# kombu.transport.redis as the same comments apply here. +# ----------------- # This implementation may seem overly complex, but I assure you there is # a good reason for doing it this way. # @@ -54,7 +62,8 @@ # # Also it means we can easily use PUBLISH/SUBSCRIBE to do fanout # exchanges (broadcast), as an alternative to pushing messages to fanout-bound -# queues manually. +# queues manually. Note that we must support the fanout exchanges to support +# the celery events system. class MultiChannelPoller(object): @@ -257,26 +266,18 @@ class Channel(virtual.Channel): #: Can be either string alias, or a cycle strategy class #: #: - ``round_robin`` - #: (:class:`~kombu.utils.scheduling.round_robin_cycle`). + #: (:class:`~kombu_redis_priority.scheduling.round_robin`). #: #: Make sure each queue has an equal opportunity to be consumed from. #: - #: - ``sorted`` - #: (:class:`~kombu.utils.scheduling.sorted_cycle`). - #: - #: Consume from queues in alphabetical order. - #: If the first queue in the sorted list always contains messages, - #: then the rest of the queues will never be consumed from. - #: - #: - ``priority`` - #: (:class:`~kombu.utils.scheduling.priority_cycle`). + #: - ``prioritized_levels`` + #: (:class:`~kombu_redis_priority.scheduling.prioritized_levels`). #: - #: Consume from queues in original order, so that if the first - #: queue always contains messages, the rest of the queues - #: in the list will never be consumed from. + #: Requires setting prioritized_levels_queue_config. #: #: The default is to consume from queues in round robin. queue_order_strategy = 'round_robin' + prioritized_levels_queue_config = {} empty = False _async_pool = None @@ -299,7 +300,8 @@ class Channel(virtual.Channel): 'socket_keepalive_options', 'queue_order_strategy', 'max_connections', - 'priority_steps') # <-- do not add comma here! + 'priority_steps', + 'prioritized_levels_queue_config') # <-- do not add comma here! ) default_zpriority = '+inf' @@ -313,7 +315,13 @@ def __init__(self, *args, **kwargs): if not self.ack_emulation: # disable visibility timeout self.QoS = virtual.QoS - self._queue_cycle = cycle_by_name(self.queue_order_strategy)() + if self.queue_order_strategy == 'round_robin': + self._queue_scheduler = RoundRobinQueueScheduler() + elif self.queue_order_strategy == 'prioritized_levels': + self._queue_scheduler = \ + PrioritizedLevelsQueueScheduler(self.prioritized_levels_queue_config) + else: + raise NotImplementedError self.Client = self._get_client() self.ResponseError = self._get_response_error() self.active_fanout_queues = set() @@ -404,6 +412,9 @@ def basic_consume(self, queue, *args, **kwargs): self._fanout_to_queue[exchange] = queue ret = super(Channel, self).basic_consume(queue, *args, **kwargs) + # TODO: update documentation to reflect sortedset implementation + # (we are not using BRPOP) + # # Update fair cycle between queues. # # We cycle between queues fairly to make sure that @@ -414,7 +425,7 @@ def basic_consume(self, queue, *args, **kwargs): # by rotating the most recently used queue to the # and of the list. See Kombu github issue #166 for # more discussion of this method. - self._update_queue_cycle() + self._update_queue_schedule() return ret def basic_cancel(self, consumer_tag): @@ -447,7 +458,7 @@ def _basic_cancel(self, consumer_tag): except KeyError: pass ret = super(Channel, self).basic_cancel(consumer_tag) - self._update_queue_cycle() + self._update_queue_schedule() return ret def _get_publish_topic(self, exchange, routing_key): @@ -530,20 +541,23 @@ def _receive_one(self, c): return True def _zrem_start(self, timeout=1): - queues = self._queue_cycle.consume(1) - if not queues: + queue = self._queue_scheduler.next() + if not queue: return + self.queue = queue self._in_poll = self.client.connection + # In one atomic pipe, grab the top element in the queue by + # score (ZRANGE) and remove it (ZREMRANGEBYRANK) pipe = self.client.pipeline() - queue = queues[0] pipe.zrange(queue, 0, 0) pipe.zremrangebyrank(queue, 0, 0) - self.queue = queue # Hack to make call asynchronous connection = self.client.connection + # Wrap pipeline commands in MULTI/EXEC so that they are executed + # atomically by redis. cmds = chain([(('MULTI', ), {})], pipe.command_stack, [(('EXEC', ), {})]) all_cmds = connection.pack_commands([args for args, _ in cmds]) connection.send_packed_command(all_cmds) @@ -551,7 +565,7 @@ def _zrem_start(self, timeout=1): def _zrem_read(self, **options): try: try: - # We only care about the last output + # The last response contains the response of ZRANGE. output = None for i in range(0, 4): output = self.client.parse_response(self.client.connection, '_', **options) @@ -569,19 +583,14 @@ def _zrem_read(self, **options): # Rotate the queue dest = self.queue - self._queue_cycle.rotate(dest) + all_queues_empty = self._queue_scheduler.rotate(dest, not bool(item)) if item: - self.empty = False self.connection._deliver(loads(bytes_to_str(item)), dest) return True - else: - if dest == self.start_queue: - if self.empty: - sleep(1) - else: - self.empty = True - raise Empty() + elif all_queues_empty: + sleep(1) + raise Empty() finally: self._in_poll = None @@ -598,7 +607,7 @@ def _get(self, queue): def _size(self, queue): with self.conn_or_acquire() as client: - return client.zcount(queue, '-inf', '+inf') + return client.zcard(queue) def _put(self, queue, message, **kwargs): """Deliver message.""" @@ -655,7 +664,7 @@ def get_table(self, exchange): def _purge(self, queue): with self.conn_or_acquire() as client: with client.pipeline() as pipe: - pipe = pipe.zcount(queue, '-inf', '+inf').delete(queue) + pipe = pipe.zcard(queue).delete(queue) size = pipe.execute() return size[0] @@ -807,13 +816,8 @@ def subclient(self): client = self._create_client(async=True) return client.pubsub() - def _update_queue_cycle(self): - self._queue_cycle.update(self.active_queues) - queue = self._queue_cycle.consume(1) - if queue: - self.start_queue = queue[0] - else: - self.start_queue = None + def _update_queue_schedule(self): + self._queue_scheduler.update(self.active_queues) def _get_response_error(self): from redis import exceptions diff --git a/setup.py b/setup.py index efe8dd2..796f6f2 100644 --- a/setup.py +++ b/setup.py @@ -3,13 +3,13 @@ setup( name='kombu-redis-priority', packages=find_packages(), - version='0.1.1', + version='0.2.0', description='Celery backend using redis SortedSets for priority', include_package_data=True, author='Captricity', author_email='webmaster@captricity.com', url='https://github.com/Captricity/kombu-redis-priority', - download_url='https://github.com/Captricity/kombu-redis-priority/tarball/0.1.1', + download_url='https://github.com/Captricity/kombu-redis-priority/tarball/0.2.0', keywords=['redis', 'sorted-set', 'kombu'], classifiers=[], install_requires=[ @@ -17,9 +17,10 @@ ], tests_require=[ 'six', - 'mock==1.0.1', + 'mock==2.0.0', 'freezegun', - 'fakeredis' + 'fakeredis', + 'ddt' ], test_suite='tests' ) diff --git a/tests/scheduler/__init__.py b/tests/scheduler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/scheduler/test_prioritized_levels.py b/tests/scheduler/test_prioritized_levels.py new file mode 100644 index 0000000..5c769d8 --- /dev/null +++ b/tests/scheduler/test_prioritized_levels.py @@ -0,0 +1,69 @@ +import unittest + +from kombu_redis_priority.scheduling.prioritized_levels import \ + HIGHEST_LEVEL, PrioritizedLevelsQueueScheduler + + +class TestPrioritizedLevelsQueueScheduler(unittest.TestCase): + BASE_CONFIG = { + 0: ['TimeMachine', 'FluxCapacitor'], + 1: ['1985', '1955', '2015'] + } + + def test_prioritized_levels_scheduler_gets_queue_at_top_of_lowest_level(self): + scheduler = PrioritizedLevelsQueueScheduler(self.BASE_CONFIG) + self.assertEqual(scheduler.next(), 'TimeMachine') + + def test_prioritized_levels_scheduler_next_with_empty(self): + scheduler = PrioritizedLevelsQueueScheduler(self.BASE_CONFIG) + scheduler.update([]) + self.assertEqual(scheduler.next(), None) + + def test_prioritized_levels_scheduler_update_filters_out_queues_not_in_list(self): + scheduler = PrioritizedLevelsQueueScheduler(self.BASE_CONFIG) + scheduler.update(['TimeMachine']) + self.assertEqual(scheduler.queue_config, {0: ['TimeMachine']}) + self.assertEqual(scheduler.current_level, 0) + self.assertEqual(scheduler.queue_cycle.items, ['TimeMachine']) + + def test_prioritized_levels_scheduler_rotate_full_rotation_empty(self): + scheduler = PrioritizedLevelsQueueScheduler(self.BASE_CONFIG) + queues = ['TimeMachine', 'FluxCapacitor', '1985', '1955'] + for q in queues: + self.assertEqual(scheduler.next(), q) + self.assertFalse(scheduler.rotate(q, True)) + self.assertEqual(scheduler.next(), '2015') + self.assertTrue(scheduler.rotate('2015', True)) + + def test_prioritized_levels_scheduler_jumps_on_empty_full_rotation(self): + scheduler = PrioritizedLevelsQueueScheduler(self.BASE_CONFIG) + # full empty rotation on level 0 causes scheduler to jump to next level + self.assertEqual(scheduler.current_level, 0) + for q in self.BASE_CONFIG[0]: + self.assertEqual(scheduler.next(), q) + self.assertFalse(scheduler.rotate(q, True)) + self.assertEqual(scheduler.current_level, 1) + self.assertEqual(scheduler.next(), '1985') + + def test_prioritized_levels_scheduler_fully_rotates_level(self): + scheduler = PrioritizedLevelsQueueScheduler(self.BASE_CONFIG) + scheduler._set_level(1) + self.assertEqual(scheduler.next(), '1985') + self.assertFalse(scheduler.rotate('1985', False)) + self.assertEqual(scheduler.next(), '1955') + + def test_prioritized_levels_scheduler_moves_to_lowest_level_when_consuming_higher_level_nonempty(self): + config = self.BASE_CONFIG.copy() + config[2] = ['Marty', 'Doc'] + scheduler = PrioritizedLevelsQueueScheduler(config) + scheduler._set_level(1) + for q in config[1]: + self.assertEqual(scheduler.next(), q) + self.assertFalse(scheduler.rotate(q, False)) + self.assertEqual(scheduler.current_level, 0) + self.assertEqual(scheduler.next(), 'TimeMachine') + + def test_prioritized_levels_scheduler_update_non_existant_queue(self): + scheduler = PrioritizedLevelsQueueScheduler(self.BASE_CONFIG) + scheduler.update(['Marty']) + self.assertEqual(scheduler.queue_config, {HIGHEST_LEVEL: ['Marty']}) diff --git a/tests/scheduler/test_roundrobin.py b/tests/scheduler/test_roundrobin.py new file mode 100644 index 0000000..88ab101 --- /dev/null +++ b/tests/scheduler/test_roundrobin.py @@ -0,0 +1,46 @@ +import unittest +from ddt import ddt, data + +from kombu_redis_priority.scheduling.round_robin import RoundRobinQueueScheduler + + +@ddt +class TestRoundRobinQueueScheduler(unittest.TestCase): + def test_round_robin_scheduler_gets_queue_at_top_of_list(self): + scheduler = RoundRobinQueueScheduler() + scheduler.update(['TimeMachine', 'FluxCapacitor']) + self.assertEqual(scheduler.next(), 'TimeMachine') + + def test_round_robin_scheduler_next_with_empty(self): + scheduler = RoundRobinQueueScheduler() + scheduler.update([]) + self.assertEqual(scheduler.next(), None) + + def test_round_robin_scheduler_update_sets_internal_list(self): + scheduler = RoundRobinQueueScheduler() + scheduler.update(['TimeMachine', 'FluxCapacitor']) + self.assertEqual(scheduler.cycle.items, ['TimeMachine', 'FluxCapacitor']) + + @data(True, False) + def test_round_robin_scheduler_rotate_rotates_queue_regardless_of_emptiness(self, was_empty): + scheduler = RoundRobinQueueScheduler() + scheduler.update(['TimeMachine', 'FluxCapacitor']) + scheduler.rotate('TimeMachine', was_empty) + self.assertEqual(scheduler.cycle.items, ['FluxCapacitor', 'TimeMachine']) + + def test_round_robin_scheduler_rotate_full_rotation_empty(self): + scheduler = RoundRobinQueueScheduler() + scheduler.update(['TimeMachine', 'FluxCapacitor']) + # Have not made a full rotation, not fully empty yet + self.assertFalse(scheduler.rotate('TimeMachine', True)) + # Made a full round trip and both queues were empty + self.assertTrue(scheduler.rotate('FluxCapacitor', True)) + + def test_round_robin_scheduler_rotate_full_rotation_state_tracking(self): + scheduler = RoundRobinQueueScheduler() + scheduler.update(['TimeMachine', 'FluxCapacitor', 'Delorean']) + # Have not made a full rotation, not fully empty yet + self.assertFalse(scheduler.rotate('TimeMachine', True)) + self.assertFalse(scheduler.rotate('FluxCapacitor', True)) + # Made a full rotation, but the last queue was not empty + self.assertFalse(scheduler.rotate('Delorean', False)) diff --git a/tests/test_sortedset_transport.py b/tests/test_sortedset_transport.py index d3ed950..7c45f0e 100644 --- a/tests/test_sortedset_transport.py +++ b/tests/test_sortedset_transport.py @@ -9,6 +9,7 @@ from .utils.fakeredis_ext import FakeStrictRedisWithConnection from kombu import Connection +from kombu.five import Empty from kombu_redis_priority.transport.redis_priority_async import redis, Transport @@ -88,7 +89,7 @@ def test_zrem_read(self): # Make the channel pull off the foo queue self.channel._active_queues.append('foo') - self.channel._update_queue_cycle() + self.channel._update_queue_schedule() # And then try the zrem pipeline self.channel._zrem_start() @@ -131,3 +132,67 @@ def test_has_queue(self): self.assertTrue(self.channel._has_queue('foo')) self.assertFalse(self.channel._has_queue('bar')) + + def test_round_robin_multiple_queues(self): + # Create 2 queues with 2 messages + msg = { + 'properties': {'delivery_tag': 'abcd'} + } + for i in range(2): + self.faker.zadd('foo', i, self._prefixed_message(time.time() + i, msg)) + self.faker.zadd('bar', i, self._prefixed_message(time.time() + i, msg)) + + self.channel._queue_scheduler.update(['foo', 'bar']) + + # And then check zrem pipeline rotates + def check_zrem_pipeline(queue): + self.channel._zrem_start() + with mock.patch.object(self.channel.connection, '_deliver') as mock_deliver: + self.channel._zrem_read() + mock_deliver.assert_called_once_with(msg, queue) + + # Check two rotations + check_zrem_pipeline('foo') + check_zrem_pipeline('bar') + check_zrem_pipeline('foo') + check_zrem_pipeline('bar') + + def test_prioritized_levels_queue_scheduling_usage(self): + # Setup channel with prioritized levels scheduler and queue preference + queue_preference = { + 0: ['TimeMachine', 'FluxCapacitor'], + 1: ['1985', '1955', '2015'], + 2: ['Marty'] + } + with mock.patch.object(redis, 'StrictRedis', FakeStrictRedisWithConnection): + connection = Connection( + transport=Transport, + transport_options={ + 'queue_order_strategy': 'prioritized_levels', + 'prioritized_levels_queue_config': queue_preference + }) + channel = connection.default_channel + + # Setup so that only one queue in level 2 has a message + msg = { + 'properties': {'delivery_tag': 'abcd'} + } + self.faker.zadd('1955', 1, self._prefixed_message(time.time(), msg)) + + # Then check to make sure that scheduler will fully rotate levels 0 and 1, but not 2 + def check_zrem_pipeline(queue, empty): + channel._zrem_start() + with mock.patch.object(channel.connection, '_deliver') as mock_deliver: + if empty: + with self.assertRaises(Empty): + channel._zrem_read() + else: + channel._zrem_read() + mock_deliver.assert_called_once_with(msg, queue) + + check_zrem_pipeline('TimeMachine', True) + check_zrem_pipeline('FluxCapacitor', True) + check_zrem_pipeline('1985', True) + check_zrem_pipeline('1955', False) + check_zrem_pipeline('2015', True) + check_zrem_pipeline('TimeMachine', True)