Skip to content

Commit

Permalink
Merge pull request #9 from Captricity/yori-prioritized-scheduling
Browse files Browse the repository at this point in the history
Prioritized Scheduling
  • Loading branch information
yorinasub17 committed May 8, 2018
2 parents a24d63e + 26e9e5b commit 43c48e3
Show file tree
Hide file tree
Showing 16 changed files with 473 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.1.1
current_version = 0.2.0
parse = (?P<major>\d+)\.(?P<minor>.*)\.(?P<patch>.*)
serialize = {major}.{minor}.{patch}

Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
@@ -1,7 +1,7 @@
language: python
python:
- "2.7"
- "3.4"
- "3.6"
before_script:
- pip install python-coveralls coverage
# command to run tests
Expand Down
4 changes: 2 additions & 2 deletions docs/source/conf.py
Expand Up @@ -54,9 +54,9 @@
# built documents.
#
# The short X.Y version.
version = u'0.1.1'
version = u'0.2.0'
# The full version, including alpha/beta/rc tags.
release = u'0.1.1'
release = u'0.2.0'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Expand Up @@ -27,6 +27,7 @@ messages into the sortedset.

installation
usage
queue_scheduling
testing


Expand Down
46 changes: 46 additions & 0 deletions docs/source/queue_scheduling.rst
@@ -0,0 +1,46 @@
Queue Scheduling
================

By default the `redis_priority` transport will consume from multiple queues in
a round robin fashion, like all the other transports for `kombu`. Currently,
this transport does not support the other strategies specified in `kombu`
(`priority` and `sorted`). Instead, this transport provides the
`prioritized_levels` strategy described below.

Prioritized Scheduling
----------------------

Given a configuration of queues encoded as::

{
LEVEL: [QUEUE]
}

where `LEVEL` is a numeric value indicating priority (smaller
first) and `[QUEUE]` indicates a list of queue names, the
scheduler will walk from smallest level to highest level,
only advancing levels if the smaller levels are empty.
Within levels, the queues are rotated in round robin
fashion.

To honor the prefernece for the lower levels, we will move
back to the lowest level when we do a full rotation of the
current level.

You can configure the `redis_priority` transport to use this method by using
the `queue_order_strategy` and `prioritized_levels_queue_config` transport
options, configured with `BROKER_TRANSPORT_OPTIONS`.

Example::

BROKER_TRANSPORT_OPTIONS = {
'queue_order_strategy': 'prioritized_levels',
'prioritized_levels_queue_config': {
0: ['TimeMachine', 'FluxCapacitor'],
1: ['1985', '1955', '2015']
}
}

Note that any queue that the worker is specified to consume which is not in the
`prioritized_levels_queue_config` is automatically specified at the highest
level (max int).
2 changes: 1 addition & 1 deletion kombu_redis_priority/__init__.py
@@ -1 +1 @@
VERSION = __version__ = "0.1.1"
VERSION = __version__ = "0.2.0"
8 changes: 8 additions & 0 deletions kombu_redis_priority/scheduling/__init__.py
@@ -0,0 +1,8 @@
"""
This module contains various queue scheduling algorithms that can be used to
modify the behavior of redis priority transport in the face of multiple
queues.
This is an extension of kombu.utils.scheduling, to support schedulers that
require interactions with redis.
"""
25 changes: 25 additions & 0 deletions kombu_redis_priority/scheduling/base.py
@@ -0,0 +1,25 @@
class QueueScheduler(object):
def next(self):
"""Return the next queue to consume from."""
raise NotImplementedError

def rotate(self, last_queue, was_empty):
"""
Rotate queue list based on what queue was popped
last and whether or not it was empty.
Args:
last_queue : Queue that was returned by scheduler and consumed from.
was_empty (bool) : Whether or not the last_queue was empty when consumed.
Returns:
True when a full rotation was made and all the queues were empty.
"""
raise NotImplementedError

def update(self, queue_list):
"""
Update internal queue list with the list of queues
to consume from.
"""
raise NotImplementedError
120 changes: 120 additions & 0 deletions kombu_redis_priority/scheduling/prioritized_levels.py
@@ -0,0 +1,120 @@
"""
Prioritized Levels scheduler for the queues.
Given a configuration of queues encoded as:
{
LEVEL: [QUEUE]
}
where LEVEL is a numeric value indicating priority (smaller
first) and [QUEUE] indicates a list of queue names, the
scheduler will walk from smallest level to highest level,
only advancing levels if the smaller levels are empty.
Within levels, the queues are rotated in round robin
fashion.
To honor the prefernece for the lower levels, we will move
back to the lowest level when we do a full rotation of the
current level.
This is done to support the asynchronous nature in which
kombu pulls tasks.
"""
from collections import defaultdict
from kombu.utils.scheduling import cycle_by_name
from .base import QueueScheduler

HIGHEST_LEVEL = float('inf')


class PrioritizedLevelsQueueScheduler(QueueScheduler):
"""
Instance vars:
queue_cycle : kombu round_robin scheduler. Used to
implement round robin fashion within
levels.
queue_config : Current preference list. Should only
contain queues that the worker should
pull from.
current_level : Current level that worker is
scheduling queues from.
start_of_rotation : The first queue in the rotation
for the current level. Used to
detect cycles.
level_lookup_table : Lookup table mapping queues to
levels.
"""
def __init__(self, config):
self.queue_cycle = cycle_by_name('round_robin')()
self.queue_config = config
self._set_initial_state()

self.level_lookup_table = {}
for level, queues in config.items():
for q in queues:
self.level_lookup_table[q] = level

def next(self):
queues = self.queue_cycle.consume(1)
if queues:
return queues[0]
return None

def rotate(self, last_queue, was_empty):
# This is first rotation for the level and queue was
# empty, so start tracking that rotation was empty
if last_queue == self.start_of_rotation and was_empty:
self.rotation_empty = True
# If at any time in rotation the queue was not
# empty, then rotation is not empty
elif not was_empty:
self.rotation_empty = False

# Rotate within the level and check if we fully
# rotated the level.
self.queue_cycle.rotate(last_queue)
next_queue = self.queue_cycle.consume(1)[0]
is_full_rotation = next_queue == self.start_of_rotation

# On a full cycle and if full rotation was empty,
# jump to next level
if is_full_rotation and self.rotation_empty:
next_index = (self.levels.index(self.current_level) + 1) % len(self.levels)
self._set_level(self.levels[next_index])
# In this situation, all queues are empty if
# we were at the highest level
return next_index == 0
elif is_full_rotation:
# otherwise, go back to lowest level
self._set_level(min(self.levels))
return False

def update(self, queue_list):
# Starting from base config, only include queues in
# the provided list. For any queues not in the list,
# set at the highest level (= lowest priority)
config = defaultdict(list)
for q in queue_list:
if q in self.level_lookup_table:
level = self.level_lookup_table[q]
else:
level = HIGHEST_LEVEL
config[level].append(q)
self.queue_config = config
self._set_initial_state()

def _set_initial_state(self):
self.levels = sorted(self.queue_config.keys())
if self.levels:
self._set_level(min(self.levels))
else:
self._set_level(HIGHEST_LEVEL)

def _set_level(self, level):
self.current_level = level
self.queue_cycle.update(self.queue_config[self.current_level])
queues = self.queue_cycle.consume(1)
if queues:
self.start_of_rotation = queues[0]
else:
self.start_of_rotation = None
self.rotation_empty = False
36 changes: 36 additions & 0 deletions kombu_redis_priority/scheduling/round_robin.py
@@ -0,0 +1,36 @@
"""
RoundRobin scheduler for the queues, evenly cycling through the list of
queues to consume from.
"""
from kombu.utils.scheduling import cycle_by_name
from .base import QueueScheduler


class RoundRobinQueueScheduler(QueueScheduler):
def __init__(self):
self.cycle = cycle_by_name('round_robin')()

def next(self):
queues = self.cycle.consume(1)
if queues:
return queues[0]
return None

def rotate(self, last_queue, was_empty):
# This is first rotation and queue was empty, so
# start tracking that rotation was empty
if last_queue == self.start_queue and was_empty:
self.rotation_empty = True
# If at any time in rotation the queue was not
# empty, then rotation is not empty
elif not was_empty:
self.rotation_empty = False

self.cycle.rotate(last_queue)
next_queue = self.next()
is_full_rotation = next_queue == self.start_queue
return is_full_rotation and self.rotation_empty

def update(self, queue_list):
self.cycle.update(queue_list)
self.start_queue = self.next()

0 comments on commit 43c48e3

Please sign in to comment.