Skip to content

Commit

Permalink
Merge pull request #8 from Captricity/sb-purge
Browse files Browse the repository at this point in the history
Fixed up ability to purge celery queues which use redispriorityasync
  • Loading branch information
sbarman committed Mar 2, 2017
2 parents 47b3b74 + 70e1312 commit a24d63e
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.1.0
current_version = 0.1.1
parse = (?P<major>\d+)\.(?P<minor>.*)\.(?P<patch>.*)
serialize = {major}.{minor}.{patch}

Expand Down
4 changes: 2 additions & 2 deletions docs/source/conf.py
Expand Up @@ -54,9 +54,9 @@
# built documents.
#
# The short X.Y version.
version = u'0.1.0'
version = u'0.1.1'
# The full version, including alpha/beta/rc tags.
release = u'0.1.0'
release = u'0.1.1'

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
2 changes: 1 addition & 1 deletion kombu_redis_priority/__init__.py
@@ -1 +1 @@
VERSION = __version__ = "0.1.0"
VERSION = __version__ = "0.1.1"
46 changes: 9 additions & 37 deletions kombu_redis_priority/transport/redis_priority_async.py
Expand Up @@ -376,10 +376,6 @@ def _do_restore_message(self, payload, exchange, routing_key,
except KeyError:
pass
for queue in self._lookup(exchange, routing_key):
# (client.lpush if leftmost else client.rpush)(
# queue, dumps(payload),
# )

# Add with priority 0 so it jumps ahead in the queue
client.zadd(queue, '-inf', self._add_time_prefix(dumps(payload)))
except Exception:
Expand Down Expand Up @@ -596,29 +592,13 @@ def _poll_error(self, type, **options):
self.client.parse_response(self.client.connection, type)

def _get(self, queue):
with self.conn_or_acquire() as client:
for pri in self.priority_steps:
item = client.rpop(self._q_for_pri(queue, pri))
if item:
return loads(bytes_to_str(item))
raise Empty()
# This method is used with a synchronous Kombu channel, which is not currently supported.
# Look at commit 4f956903cecc9d575193b7b0819ebe4a386328c9 to the view the old list backend code.
raise NotImplementedError

def _size(self, queue):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
pipe = pipe.zcount(self._q_for_pri(queue, pri), '-inf', '+inf')
sizes = pipe.execute()
return sum(size for size in sizes
if isinstance(size, numbers.Integral))

def _q_for_pri(self, queue, pri):
pri = self.priority(pri)
return '%s%s%s' % ((queue, self.sep, pri) if pri else (queue, '', ''))

def priority(self, n):
steps = self.priority_steps
return steps[bisect(steps, n) - 1]
return client.zcount(queue, '-inf', '+inf')

def _put(self, queue, message, **kwargs):
"""Deliver message."""
Expand Down Expand Up @@ -658,17 +638,11 @@ def _delete(self, queue, exchange, routing_key, pattern, *args, **kwargs):
self.sep.join([routing_key or '',
pattern or '',
queue or '']))
with client.pipeline() as pipe:
for pri in self.priority_steps:
pipe = pipe.delete(self._q_for_pri(queue, pri))
pipe.execute()
client.delete(queue)

def _has_queue(self, queue, **kwargs):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
pipe = pipe.exists(self._q_for_pri(queue, pri))
return any(pipe.execute())
return client.exists(queue)

def get_table(self, exchange):
key = self.keyprefix_queue % exchange
Expand All @@ -681,11 +655,9 @@ def get_table(self, exchange):
def _purge(self, queue):
with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for pri in self.priority_steps:
priq = self._q_for_pri(queue, pri)
pipe = pipe.llen(priq).delete(priq)
sizes = pipe.execute()
return sum(sizes[::2])
pipe = pipe.zcount(queue, '-inf', '+inf').delete(queue)
size = pipe.execute()
return size[0]

def close(self):
self._closing = True
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Expand Up @@ -3,13 +3,13 @@
setup(
name='kombu-redis-priority',
packages=find_packages(),
version='0.1.0',
version='0.1.1',
description='Celery backend using redis SortedSets for priority',
include_package_data=True,
author='Captricity',
author_email='webmaster@captricity.com',
url='https://github.com/Captricity/kombu-redis-priority',
download_url='https://github.com/Captricity/kombu-redis-priority/tarball/0.1.0',
download_url='https://github.com/Captricity/kombu-redis-priority/tarball/0.1.1',
keywords=['redis', 'sorted-set', 'kombu'],
classifiers=[],
install_requires=[
Expand Down
36 changes: 36 additions & 0 deletions tests/test_sortedset_transport.py
Expand Up @@ -95,3 +95,39 @@ def test_zrem_read(self):
with mock.patch.object(self.channel.connection, '_deliver') as mock_deliver:
self.channel._zrem_read()
mock_deliver.assert_called_once_with(msg, 'foo')

def test_purge(self):
raw_db = self.faker._db

# assert no queues exist
self.assertEqual(len(raw_db), 0)

# put a blank message
self.channel._put('foo', {})
# verify queue is created
self.assertEqual(len(raw_db), 1)

num_msg = self.channel._purge('foo')
# verify that we removed one message and the key in Redis does not exist
self.assertEqual(num_msg, 1)
self.assertEqual(len(raw_db), 0)

def test_size(self):
size = self.channel._size('foo')
# verify that there are no messages
self.assertEqual(size, 0)

# put two blank messages
self.channel._put('foo', {'bar': 1})
self.channel._put('foo', {'bar': 2})

size = self.channel._size('foo')
# verify that there are two messages
self.assertEqual(size, 2)

def test_has_queue(self):
# put two blank messages
self.channel._put('foo', {})

self.assertTrue(self.channel._has_queue('foo'))
self.assertFalse(self.channel._has_queue('bar'))

0 comments on commit a24d63e

Please sign in to comment.