diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 9dd2159..a669f1d 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.1.0 +current_version = 0.1.1 parse = (?P\d+)\.(?P.*)\.(?P.*) serialize = {major}.{minor}.{patch} diff --git a/docs/source/conf.py b/docs/source/conf.py index 4bba12c..ef0be50 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -54,9 +54,9 @@ # built documents. # # The short X.Y version. -version = u'0.1.0' +version = u'0.1.1' # The full version, including alpha/beta/rc tags. -release = u'0.1.0' +release = u'0.1.1' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/kombu_redis_priority/__init__.py b/kombu_redis_priority/__init__.py index 428345e..2a99b0c 100644 --- a/kombu_redis_priority/__init__.py +++ b/kombu_redis_priority/__init__.py @@ -1 +1 @@ -VERSION = __version__ = "0.1.0" +VERSION = __version__ = "0.1.1" diff --git a/kombu_redis_priority/transport/redis_priority_async.py b/kombu_redis_priority/transport/redis_priority_async.py index 5995ae1..5cec9c0 100644 --- a/kombu_redis_priority/transport/redis_priority_async.py +++ b/kombu_redis_priority/transport/redis_priority_async.py @@ -376,10 +376,6 @@ def _do_restore_message(self, payload, exchange, routing_key, except KeyError: pass for queue in self._lookup(exchange, routing_key): - # (client.lpush if leftmost else client.rpush)( - # queue, dumps(payload), - # ) - # Add with priority 0 so it jumps ahead in the queue client.zadd(queue, '-inf', self._add_time_prefix(dumps(payload))) except Exception: @@ -596,29 +592,13 @@ def _poll_error(self, type, **options): self.client.parse_response(self.client.connection, type) def _get(self, queue): - with self.conn_or_acquire() as client: - for pri in self.priority_steps: - item = client.rpop(self._q_for_pri(queue, pri)) - if item: - return loads(bytes_to_str(item)) - raise Empty() + # This method is used with a synchronous Kombu channel, which is not currently supported. + # Look at commit 4f956903cecc9d575193b7b0819ebe4a386328c9 to the view the old list backend code. + raise NotImplementedError def _size(self, queue): with self.conn_or_acquire() as client: - with client.pipeline() as pipe: - for pri in self.priority_steps: - pipe = pipe.zcount(self._q_for_pri(queue, pri), '-inf', '+inf') - sizes = pipe.execute() - return sum(size for size in sizes - if isinstance(size, numbers.Integral)) - - def _q_for_pri(self, queue, pri): - pri = self.priority(pri) - return '%s%s%s' % ((queue, self.sep, pri) if pri else (queue, '', '')) - - def priority(self, n): - steps = self.priority_steps - return steps[bisect(steps, n) - 1] + return client.zcount(queue, '-inf', '+inf') def _put(self, queue, message, **kwargs): """Deliver message.""" @@ -658,17 +638,11 @@ def _delete(self, queue, exchange, routing_key, pattern, *args, **kwargs): self.sep.join([routing_key or '', pattern or '', queue or ''])) - with client.pipeline() as pipe: - for pri in self.priority_steps: - pipe = pipe.delete(self._q_for_pri(queue, pri)) - pipe.execute() + client.delete(queue) def _has_queue(self, queue, **kwargs): with self.conn_or_acquire() as client: - with client.pipeline() as pipe: - for pri in self.priority_steps: - pipe = pipe.exists(self._q_for_pri(queue, pri)) - return any(pipe.execute()) + return client.exists(queue) def get_table(self, exchange): key = self.keyprefix_queue % exchange @@ -681,11 +655,9 @@ def get_table(self, exchange): def _purge(self, queue): with self.conn_or_acquire() as client: with client.pipeline() as pipe: - for pri in self.priority_steps: - priq = self._q_for_pri(queue, pri) - pipe = pipe.llen(priq).delete(priq) - sizes = pipe.execute() - return sum(sizes[::2]) + pipe = pipe.zcount(queue, '-inf', '+inf').delete(queue) + size = pipe.execute() + return size[0] def close(self): self._closing = True diff --git a/setup.py b/setup.py index 06bcdbb..efe8dd2 100644 --- a/setup.py +++ b/setup.py @@ -3,13 +3,13 @@ setup( name='kombu-redis-priority', packages=find_packages(), - version='0.1.0', + version='0.1.1', description='Celery backend using redis SortedSets for priority', include_package_data=True, author='Captricity', author_email='webmaster@captricity.com', url='https://github.com/Captricity/kombu-redis-priority', - download_url='https://github.com/Captricity/kombu-redis-priority/tarball/0.1.0', + download_url='https://github.com/Captricity/kombu-redis-priority/tarball/0.1.1', keywords=['redis', 'sorted-set', 'kombu'], classifiers=[], install_requires=[ diff --git a/tests/test_sortedset_transport.py b/tests/test_sortedset_transport.py index bcc312d..d3ed950 100644 --- a/tests/test_sortedset_transport.py +++ b/tests/test_sortedset_transport.py @@ -95,3 +95,39 @@ def test_zrem_read(self): with mock.patch.object(self.channel.connection, '_deliver') as mock_deliver: self.channel._zrem_read() mock_deliver.assert_called_once_with(msg, 'foo') + + def test_purge(self): + raw_db = self.faker._db + + # assert no queues exist + self.assertEqual(len(raw_db), 0) + + # put a blank message + self.channel._put('foo', {}) + # verify queue is created + self.assertEqual(len(raw_db), 1) + + num_msg = self.channel._purge('foo') + # verify that we removed one message and the key in Redis does not exist + self.assertEqual(num_msg, 1) + self.assertEqual(len(raw_db), 0) + + def test_size(self): + size = self.channel._size('foo') + # verify that there are no messages + self.assertEqual(size, 0) + + # put two blank messages + self.channel._put('foo', {'bar': 1}) + self.channel._put('foo', {'bar': 2}) + + size = self.channel._size('foo') + # verify that there are two messages + self.assertEqual(size, 2) + + def test_has_queue(self): + # put two blank messages + self.channel._put('foo', {}) + + self.assertTrue(self.channel._has_queue('foo')) + self.assertFalse(self.channel._has_queue('bar'))