diff --git a/README.md b/README.md index c6c5c17..fc790f5 100644 --- a/README.md +++ b/README.md @@ -7,3 +7,11 @@ Kombu Transport using Redis SortedSets ## Running tests python setup.py test + +## Using with celery tasks - zpriority + + task.apply_async(zpriority=10) + +zpriority is used not avoid confusion with other celery priority implementations + +Note - tasks created using this backend will have the lowest priority, +inf diff --git a/kombu_redis_priority/transport/redis_priority_async.py b/kombu_redis_priority/transport/redis_priority_async.py index c6edb68..f83bc31 100644 --- a/kombu_redis_priority/transport/redis_priority_async.py +++ b/kombu_redis_priority/transport/redis_priority_async.py @@ -302,6 +302,8 @@ class Channel(virtual.Channel): 'priority_steps') # <-- do not add comma here! ) + default_priority = '+inf' + connection_class = redis.Connection if redis else None def __init__(self, *args, **kwargs): @@ -852,7 +854,10 @@ def active_queues(self): if queue not in self.active_fanout_queues} def _get_message_priority(self, message, reverse=False): - """Get priority from message. + """Get priority from message key zpriority + + Uses zpriority instead of priority to differentiate + from other celery priority implementations The value is not limited! @@ -860,7 +865,7 @@ def _get_message_priority(self, message, reverse=False): Lower value has more priority. """ try: - priority = int(message['properties']['priority']) + priority = int(message['properties']['zpriority']) except (TypeError, ValueError, KeyError): priority = self.default_priority diff --git a/tests/test_sortedset_transport.py b/tests/test_sortedset_transport.py index 4888986..bcc312d 100644 --- a/tests/test_sortedset_transport.py +++ b/tests/test_sortedset_transport.py @@ -48,14 +48,14 @@ def test_default_message_add(self): # verify message: # - a time prefix is appended to the message - # - has default priority (0) + # - has default priority of +inf enqueued_msg, priority = next(six.iteritems(raw_queue)) self.assertEqual(enqueued_msg, self._prefixed_message(faketime, {})) - self.assertEqual(priority, 0.0) + self.assertEqual(priority, float('+inf')) def test_prioritized_message_add(self): raw_db = self.faker._db - msg = {'properties': {'priority': 5}} + msg = {'properties': {'zpriority': 5}} # assert no queues exist self.assertEqual(len(raw_db), 0)