Skip to content

Commit

Permalink
Optional TTL support for MongoDB transport. AMQP TTL headers: x-messa…
Browse files Browse the repository at this point in the history
…ge-ttl and x-expires are used to produce MongoDB's TTL indexes.
  • Loading branch information
daevaorn authored and ask committed Dec 9, 2015
1 parent 94d31ca commit f2e8b97
Show file tree
Hide file tree
Showing 3 changed files with 273 additions and 59 deletions.
56 changes: 29 additions & 27 deletions README.rst
Expand Up @@ -84,33 +84,33 @@ and the `Wikipedia article about AMQP`_.
Transport Comparison
====================

+---------------+----------+------------+------------+---------------+--------------+
| **Client** | **Type** | **Direct** | **Topic** | **Fanout** | **Priority** |
+---------------+----------+------------+------------+---------------+--------------+
| *amqp* | Native | Yes | Yes | Yes | Yes [#f3]_ |
+---------------+----------+------------+------------+---------------+--------------+
| *qpid* | Native | Yes | Yes | Yes | No |
+---------------+----------+------------+------------+---------------+--------------+
| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *mongodb* | Virtual | Yes | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | No |
+---------------+----------+------------+------------+---------------+--------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *in-memory* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *django* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| **Client** | **Type** | **Direct** | **Topic** | **Fanout** | **Priority** | **TTL** |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *amqp* | Native | Yes | Yes | Yes | Yes [#f3]_ | Yes [#f4]_ |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *qpid* | Native | Yes | Yes | Yes | No | No |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) | Yes | No |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *mongodb* | Virtual | Yes | Yes | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No | Yes | No |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No | No | No |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | Yes | No |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *in-memory* | Virtual | Yes | Yes [#f1]_ | No | No | No |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *django* | Virtual | Yes | Yes [#f1]_ | No | No | No |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | No | No |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+
| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No | No | No |
+---------------+----------+------------+------------+---------------+--------------+-----------------------+


.. [#f1] Declarations only kept in memory, so exchanges/queues
Expand All @@ -122,6 +122,8 @@ Transport Comparison
.. [#f3] AMQP Message priority support depends on broker implementation.
.. [#f4] AMQP Message/Queue TTL support depends on broker implementation.
Documentation
-------------

Expand Down
137 changes: 130 additions & 7 deletions kombu/tests/transport/test_mongodb.py
@@ -1,5 +1,7 @@
from __future__ import absolute_import

import datetime

from kombu import Connection
from kombu.five import Empty
from kombu.tests.case import Case, MagicMock, call, patch, skip_if_not_module
Expand All @@ -13,6 +15,7 @@ class _Channel(mongodb.Channel):
_fanout_queues = {}

collections = {}
now = datetime.datetime.utcnow()

def _create_client(self):
mock = MagicMock(name='client')
Expand All @@ -32,6 +35,9 @@ def get_collection(name):

return mock

def get_now(self):
return self.now

class Transport(mongodb.Transport):
Channel = _Channel

Expand Down Expand Up @@ -86,12 +92,7 @@ def test_options(self):
self.assertEqual(options['tz_aware'], True)


class test_mongodb_channel(Case):
@skip_if_not_module('pymongo')
def setup(self):
self.connection = _create_mock_connection()
self.channel = self.connection.default_channel

class BaseMongoDBChannelCase(Case):
def _get_method(self, cname, mname):
collection = getattr(self.channel, 'get_%s' % cname)()
method = getattr(collection, mname.split('.', 1)[0])
Expand Down Expand Up @@ -138,6 +139,13 @@ def assert_operation_has_calls(self, cname, mname, calls, any_order=False):
def assert_operation_called_with(self, cname, mname, *args, **kwargs):
self.assert_operation_has_calls(cname, mname, [call(*args, **kwargs)])


class test_mongodb_channel(BaseMongoDBChannelCase):
@skip_if_not_module('pymongo')
def setup(self):
self.connection = _create_mock_connection()
self.channel = self.connection.default_channel

# Tests for "public" channel interface

def test_new_queue(self):
Expand Down Expand Up @@ -282,7 +290,7 @@ def test_queue_delete_fanout(self):
# Tests for channel internals

def test_create_broadcast(self):
self.channel._create_broadcast(self.channel.client, {})
self.channel._create_broadcast(self.channel.client)

self.channel.client.create_collection.assert_called_with('messages.broadcast',
capped=True,
Expand Down Expand Up @@ -319,3 +327,118 @@ def test_create_broadcast_cursor(self):
cursor_type=pymongo.CursorType.TAILABLE,
filter={'queue': 'fanout_exchange1'},
sort=[('$natural', pymongo.ASCENDING)])


class test_mongodb_channel_ttl(BaseMongoDBChannelCase):
@skip_if_not_module('pymongo')
def setup(self):
self.connection = _create_mock_connection(transport_options={'ttl': True})
self.channel = self.connection.default_channel

self.expire_at = self.channel.get_now() + datetime.timedelta(milliseconds=777)

# Tests

def test_new_queue(self):
self.channel._new_queue('foobar')

self.assert_operation_called_with('queues', 'update',
{'_id': 'foobar'},
{'_id': 'foobar', 'options': {}, 'expire_at': None},
upsert=True)

def test_get(self):
import pymongo

self.set_operation_return_value('queues', 'find_one',
{'_id': 'docId',
'options': {'arguments': {'x-expires': 777}}})

self.set_operation_return_value('messages', 'find_and_modify',
{'_id': 'docId', 'payload': '{"some": "data"}'})

self.channel._get('foobar')
self.assert_collection_accessed('messages', 'messages.queues')
self.assert_operation_called_with('messages', 'find_and_modify',
query={'queue': 'foobar'},
remove=True,
sort=[('priority', pymongo.ASCENDING),
('_id', pymongo.ASCENDING)])
self.assert_operation_called_with('routing', 'update',
{'queue': 'foobar'},
{'$set': {'expire_at': self.expire_at}},
multiple=True)

def test_put(self):
self.set_operation_return_value('queues', 'find_one',
{'_id': 'docId',
'options': {'arguments': {'x-message-ttl': 777}}})

self.channel._put('foobar', {'some': 'data'})

self.assert_collection_accessed('messages')
self.assert_operation_called_with('messages', 'insert', {'queue': 'foobar',
'priority': 9,
'payload': '{"some": "data"}',
'expire_at': self.expire_at})

def test_queue_bind(self):
self.set_operation_return_value('queues', 'find_one',
{'_id': 'docId',
'options': {'arguments': {'x-expires': 777}}})

self.channel._queue_bind('test_exchange', 'foo', '*', 'foo')
self.assert_collection_accessed('messages.routing')
self.assert_operation_called_with('routing', 'update',
{'queue': 'foo', 'pattern': '*',
'routing_key': 'foo', 'exchange': 'test_exchange'},
{'queue': 'foo', 'pattern': '*',
'routing_key': 'foo', 'exchange': 'test_exchange',
'expire_at': self.expire_at},
upsert=True)

def test_queue_delete(self):
self.channel.queue_delete('foobar')
self.assert_collection_accessed('messages.queues')
self.assert_operation_called_with('queues', 'remove', {'_id': 'foobar'})

def test_ensure_indexes(self):
self.channel._ensure_indexes()

self.assert_operation_called_with('messages', 'ensure_index',
[('expire_at', 1)], expireAfterSeconds=0)
self.assert_operation_called_with('routing', 'ensure_index',
[('expire_at', 1)], expireAfterSeconds=0)
self.assert_operation_called_with('queues', 'ensure_index',
[('expire_at', 1)], expireAfterSeconds=0)

def test_get_expire(self):
result = self.channel.get_expire({'arguments': {'x-expires': 777}}, 'x-expires')

self.assertFalse(self.channel.client.called)

self.assertEqual(result, self.expire_at)

self.set_operation_return_value('queues', 'find_one',
{'_id': 'docId',
'options': {'arguments': {'x-expires': 777}}})

result = self.channel.get_expire('foobar', 'x-expires')
self.assertEqual(result, self.expire_at)


def test_update_queues_expire(self):
self.set_operation_return_value('queues', 'find_one',
{'_id': 'docId',
'options': {'arguments': {'x-expires': 777}}})
self.channel.update_queues_expire('foobar')

self.assert_collection_accessed('messages.routing', 'messages.queues')
self.assert_operation_called_with('routing', 'update',
{'queue': 'foobar'},
{'$set': {'expire_at': self.expire_at}},
multiple=True)
self.assert_operation_called_with('queues', 'update',
{'_id': 'foobar'},
{'$set': {'expire_at': self.expire_at}},
multiple=True)

1 comment on commit f2e8b97

@jonnycrunch
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TTL approach for mongodb seems to be the most reasonable, however, I don't follow all of the dependencies for which versions of amqp or celery to test it out. I tried a few branches haven't been able to test it.

Please sign in to comment.