From 399f9798149800f9c67b723a3a237ee7f8a3c4e1 Mon Sep 17 00:00:00 2001 From: Ask Solem Date: Sat, 14 Jan 2012 00:02:59 +0000 Subject: [PATCH] 99% overall coverage --- .coveragerc | 8 ++ kombu/compat.py | 4 +- kombu/connection.py | 4 +- kombu/tests/test_compat.py | 42 +++++++++++ kombu/tests/test_connection.py | 95 ++++++++++++++++++++++++ kombu/tests/test_transport_virtual.py | 101 +++++++++++++++++++++++++- kombu/tests/test_utils_functional.py | 50 +++++++++++++ kombu/transport/base.py | 6 ++ kombu/transport/virtual/__init__.py | 4 +- kombu/transport/virtual/scheduling.py | 2 +- kombu/utils/__init__.py | 2 +- requirements/default.txt | 2 +- setup.cfg | 3 +- 13 files changed, 311 insertions(+), 12 deletions(-) create mode 100644 .coveragerc create mode 100644 kombu/tests/test_utils_functional.py diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 000000000..825a2bc72 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,8 @@ +[run] +branch = True + +[report] +exclude_lines = + pragma: no cover + + for infinity diff --git a/kombu/compat.py b/kombu/compat.py index d86cb7a3b..ffc45e1af 100644 --- a/kombu/compat.py +++ b/kombu/compat.py @@ -23,7 +23,7 @@ def _iterconsume(connection, consumer, no_ack=False, limit=None): consumer.consume(no_ack=no_ack) - for iteration in count(0): + for iteration in count(0): # for infinity if limit and iteration >= limit: raise StopIteration yield connection.drain_events() @@ -163,7 +163,7 @@ def wait(self, limit=None): return list(it) def iterqueue(self, limit=None, infinite=False): - for items_since_start in count(): + for items_since_start in count(): # for infinity item = self.fetch() if (not infinite and item is None) or \ (limit and items_since_start >= limit): diff --git a/kombu/connection.py b/kombu/connection.py index 311e2879e..630f01c33 100644 --- a/kombu/connection.py +++ b/kombu/connection.py @@ -23,7 +23,7 @@ from urlparse import urlparse try: from urlparse import parse_qsl -except ImportError: +except ImportError: # pragma: no cover from cgi import parse_qsl # noqa from . import exceptions @@ -289,7 +289,7 @@ def ensure(self, obj, fun, errback=None, max_retries=None, @wraps(fun) def _ensured(*args, **kwargs): got_connection = 0 - for retries in count(0): + for retries in count(0): # for infinity try: return fun(*args, **kwargs) except self.connection_errors + self.channel_errors, exc: diff --git a/kombu/tests/test_compat.py b/kombu/tests/test_compat.py index de9dcdef7..3fd09b045 100644 --- a/kombu/tests/test_compat.py +++ b/kombu/tests/test_compat.py @@ -1,11 +1,14 @@ from __future__ import absolute_import from __future__ import with_statement +from mock import patch + from .. import BrokerConnection, Exchange from .. import compat from .mocks import Transport, Channel from .utils import unittest +from .utils import Mock class test_misc(unittest.TestCase): @@ -104,6 +107,10 @@ def test_constructor(self): exchange=explicit) self.assertEqual(pub3.exchange, explicit) + compat.Publisher(self.connection, + exchange="test_Publisher_constructor3", + channel=self.connection.default_channel) + def test_send(self): pub = compat.Publisher(self.connection, exchange="test_Publisher_send", @@ -127,6 +134,12 @@ class test_Consumer(unittest.TestCase): def setUp(self): self.connection = BrokerConnection(transport=Transport) + @patch("kombu.compat._iterconsume") + def test_iterconsume_calls__iterconsume(self, it, n="test_iterconsume"): + c = compat.Consumer(self.connection, queue=n, exchange=n) + c.iterconsume(limit=10, no_ack=True) + it.assert_called_with(c.connection, c, True, 10) + def test_constructor(self, n="test_Consumer_constructor"): c = compat.Consumer(self.connection, queue=n, exchange=n, routing_key="rkey") @@ -158,6 +171,20 @@ def test__enter__exit__(self, n="test__enter__exit__"): self.assertIn("close", c.backend) self.assertTrue(c._closed) + def test_revive(self, n="test_revive"): + c = compat.Consumer(self.connection, queue=n, exchange=n) + + with self.connection.channel() as c2: + c.revive(c2) + self.assertIs(c.backend, c2) + + def test__iter__(self, n="test__iter__"): + c = compat.Consumer(self.connection, queue=n, exchange=n) + c.iterqueue = Mock() + + c.__iter__() + c.iterqueue.assert_called_with(infinite=True) + def test_iter(self, n="test_iterqueue"): c = compat.Consumer(self.connection, queue=n, exchange=n, routing_key="rkey") @@ -241,6 +268,21 @@ class test_ConsumerSet(unittest.TestCase): def setUp(self): self.connection = BrokerConnection(transport=Transport) + @patch("kombu.compat._iterconsume") + def test_iterconsume(self, _iterconsume, n="test_iterconsume"): + c = compat.Consumer(self.connection, queue=n, exchange=n) + cs = compat.ConsumerSet(self.connection, consumers=[c]) + cs.iterconsume(limit=10, no_ack=True) + _iterconsume.assert_called_with(c.connection, cs, True, 10) + + def test_revive(self, n="test_revive"): + c = compat.Consumer(self.connection, queue=n, exchange=n) + cs = compat.ConsumerSet(self.connection, consumers=[c]) + + with self.connection.channel() as c2: + cs.revive(c2) + self.assertIs(cs.backend, c2) + def test_constructor(self, prefix="0daf8h21"): dcon = {"%s.xyx" % prefix: {"exchange": "%s.xyx" % prefix, "routing_key": "xyx"}, diff --git a/kombu/tests/test_connection.py b/kombu/tests/test_connection.py index 9bfc1fb31..db7e618aa 100644 --- a/kombu/tests/test_connection.py +++ b/kombu/tests/test_connection.py @@ -42,6 +42,10 @@ def test_parse_generated_as_uri(self): self.assertEqual(conn.as_uri(), self.nopass) self.assertEqual(conn.as_uri(include_password=True), self.url) + def test_as_uri_when_mongodb(self): + x = BrokerConnection("mongodb://localhost") + self.assertTrue(x.as_uri()) + def test_bogus_scheme(self): with self.assertRaises(KeyError): BrokerConnection("bogus://localhost:7421").transport @@ -153,6 +157,22 @@ def publish(): with self.assertRaises(_ConnectionError): ensured() + def test_autoretry(self): + myfun = Mock() + myfun.__name__ = "test_autoretry" + + self.conn.transport.connection_errors = (KeyError, ) + + def on_call(*args, **kwargs): + myfun.side_effect = None + raise KeyError("foo") + + myfun.side_effect = on_call + insured = self.conn.autoretry(myfun) + insured() + + self.assertTrue(myfun.called) + def test_SimpleQueue(self): conn = self.conn q = conn.SimpleQueue("foo") @@ -221,6 +241,12 @@ def test_establish_connection(self): self.assertEqual(conn.transport_options, self.transport_options) +class xResource(Resource): + + def setup(self): + pass + + class ResourceCase(unittest.TestCase): abstract = True @@ -250,6 +276,75 @@ def test_acquire__release(self): [chan.release() for chan in chans] self.assertState(P, 10, 0) + def test_acquire_no_limit(self): + if self.abstract: + return + P = self.create_resource(None, 0) + P.acquire().release() + + def test_replace_when_limit(self): + if self.abstract: + return + P = self.create_resource(10, 0) + r = P.acquire() + P._dirty = Mock() + P.close_resource = Mock() + + P.replace(r) + P._dirty.discard.assert_called_with(r) + P.close_resource.assert_called_with(r) + + def test_replace_no_limit(self): + if self.abstract: + return + P = self.create_resource(None, 0) + r = P.acquire() + P._dirty = Mock() + P.close_resource = Mock() + + P.replace(r) + self.assertFalse(P._dirty.discard.called) + P.close_resource.assert_called_with(r) + + def test_interface_prepare(self): + if not self.abstract: + return + x = xResource() + self.assertEqual(x.prepare(10), 10) + + def test_force_close_all_handles_AttributeError(self): + if self.abstract: + return + P = self.create_resource(10, 10) + cr = P.close_resource = Mock() + cr.side_effect = AttributeError("x") + + P.acquire() + self.assertTrue(P._dirty) + + P.force_close_all() + + def test_force_close_all_no_mutex(self): + if self.abstract: + return + P = self.create_resource(10, 10) + P.close_resource = Mock() + + m = P._resource = Mock() + m.mutex = None + m.queue.pop.side_effect = IndexError + + P.force_close_all() + + def test_add_when_empty(self): + if self.abstract: + return + P = self.create_resource(None, None) + P._resource.queue[:] = [] + self.assertFalse(P._resource.queue) + P._add_when_empty() + self.assertTrue(P._resource.queue) + class test_ConnectionPool(ResourceCase): abstract = False diff --git a/kombu/tests/test_transport_virtual.py b/kombu/tests/test_transport_virtual.py index d0efc093d..dd8c9d5c0 100644 --- a/kombu/tests/test_transport_virtual.py +++ b/kombu/tests/test_transport_virtual.py @@ -3,13 +3,16 @@ import warnings +from mock import patch + from ..connection import BrokerConnection +from ..exceptions import StdChannelError from ..transport import virtual from ..utils import uuid from .compat import catch_warnings from .utils import unittest -from .utils import redirect_stdouts +from .utils import Mock, redirect_stdouts def client(): @@ -109,6 +112,11 @@ def test_create(self): "the quick brown fox...".encode("utf-8")) self.assertTrue(message.delivery_tag, tag) + def test_create_no_body(self): + virtual.Message(Mock(), { + "body": None, + "properties": {"delivery_tag": 1}}) + def test_serializable(self): c = client().channel() data = c.prepare_message("the quick brown fox...") @@ -314,6 +322,39 @@ def restore_unacked(self): self.channel.basic_recover(requeue=True) self.assertTrue(self.channel._qos.was_restored) + def test_restore_unacked_raises_BaseException(self): + q = self.channel.qos + q._flush = Mock() + q._delivered = {1: 1} + + q.channel._restore = Mock() + q.channel._restore.side_effect = SystemExit + + errors = q.restore_unacked() + self.assertIsInstance(errors[0][0], SystemExit) + self.assertEqual(errors[0][1], 1) + self.assertFalse(q._delivered) + + @patch("kombu.transport.virtual.emergency_dump_state") + @patch("kombu.transport.virtual.say") + def test_restore_unacked_once_when_unrestored(self, say, + emergency_dump_state): + q = self.channel.qos + q._flush = Mock() + + class State(dict): + restored = False + + q._delivered = State({1: 1}) + ru = q.restore_unacked = Mock() + exc = KeyError() + ru.return_value = [(exc, 1)] + + self.channel.do_restore = True + q.restore_unacked_once() + self.assertTrue(say.called) + self.assertTrue(emergency_dump_state.called) + def test_basic_recover(self): with self.assertRaises(NotImplementedError): self.channel.basic_recover(requeue=False) @@ -355,6 +396,64 @@ def test_flow(self): with self.assertRaises(NotImplementedError): self.channel.flow(False) + def test_close_when_no_connection(self): + self.channel.connection = None + self.channel.close() + self.assertTrue(self.channel.closed) + + def test_drain_events_has_get_many(self): + c = self.channel + c._get_many = Mock() + c._poll = Mock() + c._consumers = [1] + c._qos = Mock() + c._qos.can_consume.return_value = True + + c.drain_events(timeout=10.0) + c._get_many.assert_called_with(c._active_queues, timeout=10.0) + + def test_get_exchanges(self): + self.channel.exchange_declare(exchange="foo") + self.assertTrue(self.channel.get_exchanges()) + + def test_basic_cancel_not_in_active_queues(self): + c = self.channel + c._consumers.add("x") + c._tag_to_queue["x"] = "foo" + c._active_queues = Mock() + c._active_queues.remove.side_effect = ValueError() + c.auto_delete_queues["foo"] = 3 + + c.basic_cancel("x") + self.assertEqual(c.auto_delete_queues["foo"], 2) + c._active_queues.remove.assert_called_with("foo") + + def test_basic_cancel_unknown_ctag(self): + self.assertIsNone(self.channel.basic_cancel("unknown-tag")) + + def test_list_bindings(self): + c = self.channel + c.exchange_declare(exchange="foo") + c.queue_declare(queue="q") + c.queue_bind(queue="q", exchange="foo", routing_key="rk") + + self.assertIn(("q", "foo", "rk"), list(c.list_bindings())) + + def test_after_reply_message_received(self): + c = self.channel + c.queue_delete = Mock() + c.after_reply_message_received("foo") + c.queue_delete.assert_called_with("foo") + + def test_queue_delete_unknown_queue(self): + self.assertIsNone(self.channel.queue_delete("xiwjqjwel")) + + def test_queue_declare_passive(self): + has_queue = self.channel._has_queue = Mock() + has_queue.return_value = False + with self.assertRaises(StdChannelError): + self.channel.queue_declare(queue="21wisdjwqe", passive=True) + class test_Transport(unittest.TestCase): diff --git a/kombu/tests/test_utils_functional.py b/kombu/tests/test_utils_functional.py new file mode 100644 index 000000000..cf44bfff7 --- /dev/null +++ b/kombu/tests/test_utils_functional.py @@ -0,0 +1,50 @@ +from __future__ import absolute_import + +import pickle + +from ..utils.functional import promise, maybe_promise +from .utils import unittest + + +def double(x): + return x * 2 + + +class test_promise(unittest.TestCase): + + def test__str__(self): + self.assertEqual(str(promise(lambda: "the quick brown fox")), + "the quick brown fox") + + def test__repr__(self): + self.assertEqual(repr(promise(lambda: "fi fa fo")), + "'fi fa fo'") + + def test_evaluate(self): + self.assertEqual(promise(lambda: 2 + 2)(), 4) + self.assertEqual(promise(lambda x: x * 4, 2), 8) + self.assertEqual(promise(lambda x: x * 8, 2)(), 16) + + def test_cmp(self): + self.assertEqual(promise(lambda: 10), promise(lambda: 10)) + self.assertNotEqual(promise(lambda: 10), promise(lambda: 20)) + + def test__reduce__(self): + x = promise(double, 4) + y = pickle.loads(pickle.dumps(x)) + self.assertEqual(x(), y()) + + def test__deepcopy__(self): + from copy import deepcopy + x = promise(double, 4) + y = deepcopy(x) + self.assertEqual(x._fun, y._fun) + self.assertEqual(x._args, y._args) + self.assertEqual(x(), y()) + + +class test_maybe_promise(unittest.TestCase): + + def test_evaluates(self): + self.assertEqual(maybe_promise(promise(lambda: 10)), 10) + self.assertEqual(maybe_promise(20), 20) diff --git a/kombu/transport/base.py b/kombu/transport/base.py index 59ef2d6a9..3c6eee02c 100644 --- a/kombu/transport/base.py +++ b/kombu/transport/base.py @@ -37,6 +37,12 @@ def after_reply_message_received(self, queue): after transient reply message received.""" pass + def __enter__(self): + return self + + def __exit__(self, *exc_info): + self.close() + class Message(object): """Base class for received messages.""" diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index 5ec1bb278..12c25626c 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -160,7 +160,7 @@ def restore_unacked(self): try: self.channel._restore(message) - except (KeyboardInterrupt, SystemExit, Exception), exc: + except BaseException, exc: errors.append((exc, message)) delivered.clear() return errors @@ -175,7 +175,7 @@ def restore_unacked_once(self): self._flush() state = self._delivered - if not self.channel.do_restore or getattr(state, "restored"): + if not self.channel.do_restore or getattr(state, "restored", None): assert not state return diff --git a/kombu/transport/virtual/scheduling.py b/kombu/transport/virtual/scheduling.py index 22ade3e32..13fcb4691 100644 --- a/kombu/transport/virtual/scheduling.py +++ b/kombu/transport/virtual/scheduling.py @@ -35,7 +35,7 @@ def _next(self): raise self.predicate() def get(self, **kwargs): - for tried in count(0): + for tried in count(0): # for infinity resource = self._next() try: diff --git a/kombu/utils/__init__.py b/kombu/utils/__init__.py index 9593781b5..debc6975a 100644 --- a/kombu/utils/__init__.py +++ b/kombu/utils/__init__.py @@ -153,7 +153,7 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None, interval_max + interval_start, interval_step, repeatlast=True) - for retries, interval in enumerate(interval_range): + for retries, interval in enumerate(interval_range): # for infinity try: return fun(*args, **kwargs) except catch, exc: diff --git a/requirements/default.txt b/requirements/default.txt index 0b65f76f2..9d7aebdb1 100644 --- a/requirements/default.txt +++ b/requirements/default.txt @@ -1,2 +1,2 @@ anyjson>=0.3.1 -kamqp +amqplib>=1.0 diff --git a/setup.cfg b/setup.cfg index f97c61418..2d7475886 100644 --- a/setup.cfg +++ b/setup.cfg @@ -8,7 +8,6 @@ cover3-package = kombu cover3-exclude = kombu kombu.utils.compat kombu.utils.eventio - kombu.utils.functional kombu.utils.finalize kombu.transport.pika kombu.transport.couchdb @@ -27,4 +26,4 @@ upload-dir = docs/.build/html [bdist_rpm] requires = anyjson >= 0.3.1 - kamqp + amqplib >= 1.0.0