Skip to content

Commit

Permalink
change priority to use zpriority and change default priority to +inf
Browse files Browse the repository at this point in the history
  • Loading branch information
AJ Renold committed Feb 10, 2017
1 parent dce2b36 commit a1595c4
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 5 deletions.
8 changes: 8 additions & 0 deletions README.md
Expand Up @@ -7,3 +7,11 @@ Kombu Transport using Redis SortedSets
## Running tests

python setup.py test

## Using with celery tasks - zpriority

task.apply_async(zpriority=10)

zpriority is used not avoid confusion with other celery priority implementations

Note - tasks created using this backend will have the lowest priority, +inf
9 changes: 7 additions & 2 deletions kombu_redis_priority/transport/redis_priority_async.py
Expand Up @@ -302,6 +302,8 @@ class Channel(virtual.Channel):
'priority_steps') # <-- do not add comma here!
)

default_priority = '+inf'

connection_class = redis.Connection if redis else None

def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -852,15 +854,18 @@ def active_queues(self):
if queue not in self.active_fanout_queues}

def _get_message_priority(self, message, reverse=False):
"""Get priority from message.
"""Get priority from message key zpriority
Uses zpriority instead of priority to differentiate
from other celery priority implementations
The value is not limited!
Note:
Lower value has more priority.
"""
try:
priority = int(message['properties']['priority'])
priority = int(message['properties']['zpriority'])
except (TypeError, ValueError, KeyError):
priority = self.default_priority

Expand Down
6 changes: 3 additions & 3 deletions tests/test_sortedset_transport.py
Expand Up @@ -48,14 +48,14 @@ def test_default_message_add(self):

# verify message:
# - a time prefix is appended to the message
# - has default priority (0)
# - has default priority of +inf
enqueued_msg, priority = next(six.iteritems(raw_queue))
self.assertEqual(enqueued_msg, self._prefixed_message(faketime, {}))
self.assertEqual(priority, 0.0)
self.assertEqual(priority, float('+inf'))

def test_prioritized_message_add(self):
raw_db = self.faker._db
msg = {'properties': {'priority': 5}}
msg = {'properties': {'zpriority': 5}}

# assert no queues exist
self.assertEqual(len(raw_db), 0)
Expand Down

0 comments on commit a1595c4

Please sign in to comment.