Skip to content

Commit

Permalink
Support redis-py v2 and v3 (#948)
Browse files Browse the repository at this point in the history
Further to #946 this fixes the underlying issue in a easy-to-upgrade way
for end users, many of whom will have Redis installed via other means.
By having this check here and supporting both versions concurrently it
makes it easier for end users, and to use celery/kombu in projects that
use Redis elsewhere.

With this change it is possibly worth reverting #946
  • Loading branch information
ashb authored and thedrow committed Nov 19, 2018
1 parent d9de66b commit 05152da
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
9 changes: 8 additions & 1 deletion kombu/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,15 @@ def __init__(self, *args, **kwargs):
def append(self, message, delivery_tag):
delivery = message.delivery_info
EX, RK = delivery['exchange'], delivery['routing_key']
# TODO: Remove this once we soley on Redis-py 3.0.0+
if redis.VERSION[0] >= 3:
# Redis-py changed the format of zadd args in v3.0.0
zadd_args = [{time(): delivery_tag}]
else:
zadd_args = [time(), delivery_tag]

with self.pipe_or_acquire() as pipe:
pipe.zadd(self.unacked_index_key, time(), delivery_tag) \
pipe.zadd(self.unacked_index_key, *zadd_args) \
.hset(self.unacked_key, delivery_tag,
dumps([message._raw, EX, RK])) \
.execute()
Expand Down
27 changes: 24 additions & 3 deletions t/unit/transport/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,15 @@ def hdel(self, key, k):
def sadd(self, key, member, *args):
self.sets[key].add(member)

def zadd(self, key, score1, member1, *args):
self.sets[key].add(member1)
def zadd(self, key, *args):
if redis.redis.VERSION[0] >= 3:
(mapping,) = args
for item in mapping:
self.sets[key].add(item)
else:
# TODO: remove me when we drop support for Redis-py v2
(score1, member1) = args
self.sets[key].add(member1)

def smembers(self, key):
return self.sets.get(key, set())
Expand Down Expand Up @@ -840,7 +847,21 @@ def setup(self):
def teardown(self):
self.connection.close()

def test_publish__get(self):
@mock.replace_module_value(redis.redis, 'VERSION', [3, 0, 0])
def test_publish__get_redispyv3(self):
channel = self.connection.channel()
producer = Producer(channel, self.exchange, routing_key='test_Redis')
self.queue(channel).declare()

producer.publish({'hello': 'world'})

assert self.queue(channel).get().payload == {'hello': 'world'}
assert self.queue(channel).get() is None
assert self.queue(channel).get() is None
assert self.queue(channel).get() is None

@mock.replace_module_value(redis.redis, 'VERSION', [2, 5, 10])
def test_publish__get_redispyv2(self):
channel = self.connection.channel()
producer = Producer(channel, self.exchange, routing_key='test_Redis')
self.queue(channel).declare()
Expand Down

0 comments on commit 05152da

Please sign in to comment.