Skip to content

Commit

Permalink
99% overall coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Jan 14, 2012
1 parent d348cb4 commit 399f979
Show file tree
Hide file tree
Showing 13 changed files with 311 additions and 12 deletions.
8 changes: 8 additions & 0 deletions .coveragerc
@@ -0,0 +1,8 @@
[run]
branch = True

[report]
exclude_lines =
pragma: no cover

for infinity
4 changes: 2 additions & 2 deletions kombu/compat.py
Expand Up @@ -23,7 +23,7 @@


def _iterconsume(connection, consumer, no_ack=False, limit=None): def _iterconsume(connection, consumer, no_ack=False, limit=None):
consumer.consume(no_ack=no_ack) consumer.consume(no_ack=no_ack)
for iteration in count(0): for iteration in count(0): # for infinity
if limit and iteration >= limit: if limit and iteration >= limit:
raise StopIteration raise StopIteration
yield connection.drain_events() yield connection.drain_events()
Expand Down Expand Up @@ -163,7 +163,7 @@ def wait(self, limit=None):
return list(it) return list(it)


def iterqueue(self, limit=None, infinite=False): def iterqueue(self, limit=None, infinite=False):
for items_since_start in count(): for items_since_start in count(): # for infinity
item = self.fetch() item = self.fetch()
if (not infinite and item is None) or \ if (not infinite and item is None) or \
(limit and items_since_start >= limit): (limit and items_since_start >= limit):
Expand Down
4 changes: 2 additions & 2 deletions kombu/connection.py
Expand Up @@ -23,7 +23,7 @@
from urlparse import urlparse from urlparse import urlparse
try: try:
from urlparse import parse_qsl from urlparse import parse_qsl
except ImportError: except ImportError: # pragma: no cover
from cgi import parse_qsl # noqa from cgi import parse_qsl # noqa


from . import exceptions from . import exceptions
Expand Down Expand Up @@ -289,7 +289,7 @@ def ensure(self, obj, fun, errback=None, max_retries=None,
@wraps(fun) @wraps(fun)
def _ensured(*args, **kwargs): def _ensured(*args, **kwargs):
got_connection = 0 got_connection = 0
for retries in count(0): for retries in count(0): # for infinity
try: try:
return fun(*args, **kwargs) return fun(*args, **kwargs)
except self.connection_errors + self.channel_errors, exc: except self.connection_errors + self.channel_errors, exc:
Expand Down
42 changes: 42 additions & 0 deletions kombu/tests/test_compat.py
@@ -1,11 +1,14 @@
from __future__ import absolute_import from __future__ import absolute_import
from __future__ import with_statement from __future__ import with_statement


from mock import patch

from .. import BrokerConnection, Exchange from .. import BrokerConnection, Exchange
from .. import compat from .. import compat


from .mocks import Transport, Channel from .mocks import Transport, Channel
from .utils import unittest from .utils import unittest
from .utils import Mock




class test_misc(unittest.TestCase): class test_misc(unittest.TestCase):
Expand Down Expand Up @@ -104,6 +107,10 @@ def test_constructor(self):
exchange=explicit) exchange=explicit)
self.assertEqual(pub3.exchange, explicit) self.assertEqual(pub3.exchange, explicit)


compat.Publisher(self.connection,
exchange="test_Publisher_constructor3",
channel=self.connection.default_channel)

def test_send(self): def test_send(self):
pub = compat.Publisher(self.connection, pub = compat.Publisher(self.connection,
exchange="test_Publisher_send", exchange="test_Publisher_send",
Expand All @@ -127,6 +134,12 @@ class test_Consumer(unittest.TestCase):
def setUp(self): def setUp(self):
self.connection = BrokerConnection(transport=Transport) self.connection = BrokerConnection(transport=Transport)


@patch("kombu.compat._iterconsume")
def test_iterconsume_calls__iterconsume(self, it, n="test_iterconsume"):
c = compat.Consumer(self.connection, queue=n, exchange=n)
c.iterconsume(limit=10, no_ack=True)
it.assert_called_with(c.connection, c, True, 10)

def test_constructor(self, n="test_Consumer_constructor"): def test_constructor(self, n="test_Consumer_constructor"):
c = compat.Consumer(self.connection, queue=n, exchange=n, c = compat.Consumer(self.connection, queue=n, exchange=n,
routing_key="rkey") routing_key="rkey")
Expand Down Expand Up @@ -158,6 +171,20 @@ def test__enter__exit__(self, n="test__enter__exit__"):
self.assertIn("close", c.backend) self.assertIn("close", c.backend)
self.assertTrue(c._closed) self.assertTrue(c._closed)


def test_revive(self, n="test_revive"):
c = compat.Consumer(self.connection, queue=n, exchange=n)

with self.connection.channel() as c2:
c.revive(c2)
self.assertIs(c.backend, c2)

def test__iter__(self, n="test__iter__"):
c = compat.Consumer(self.connection, queue=n, exchange=n)
c.iterqueue = Mock()

c.__iter__()
c.iterqueue.assert_called_with(infinite=True)

def test_iter(self, n="test_iterqueue"): def test_iter(self, n="test_iterqueue"):
c = compat.Consumer(self.connection, queue=n, exchange=n, c = compat.Consumer(self.connection, queue=n, exchange=n,
routing_key="rkey") routing_key="rkey")
Expand Down Expand Up @@ -241,6 +268,21 @@ class test_ConsumerSet(unittest.TestCase):
def setUp(self): def setUp(self):
self.connection = BrokerConnection(transport=Transport) self.connection = BrokerConnection(transport=Transport)


@patch("kombu.compat._iterconsume")
def test_iterconsume(self, _iterconsume, n="test_iterconsume"):
c = compat.Consumer(self.connection, queue=n, exchange=n)
cs = compat.ConsumerSet(self.connection, consumers=[c])
cs.iterconsume(limit=10, no_ack=True)
_iterconsume.assert_called_with(c.connection, cs, True, 10)

def test_revive(self, n="test_revive"):
c = compat.Consumer(self.connection, queue=n, exchange=n)
cs = compat.ConsumerSet(self.connection, consumers=[c])

with self.connection.channel() as c2:
cs.revive(c2)
self.assertIs(cs.backend, c2)

def test_constructor(self, prefix="0daf8h21"): def test_constructor(self, prefix="0daf8h21"):
dcon = {"%s.xyx" % prefix: {"exchange": "%s.xyx" % prefix, dcon = {"%s.xyx" % prefix: {"exchange": "%s.xyx" % prefix,
"routing_key": "xyx"}, "routing_key": "xyx"},
Expand Down
95 changes: 95 additions & 0 deletions kombu/tests/test_connection.py
Expand Up @@ -42,6 +42,10 @@ def test_parse_generated_as_uri(self):
self.assertEqual(conn.as_uri(), self.nopass) self.assertEqual(conn.as_uri(), self.nopass)
self.assertEqual(conn.as_uri(include_password=True), self.url) self.assertEqual(conn.as_uri(include_password=True), self.url)


def test_as_uri_when_mongodb(self):
x = BrokerConnection("mongodb://localhost")
self.assertTrue(x.as_uri())

def test_bogus_scheme(self): def test_bogus_scheme(self):
with self.assertRaises(KeyError): with self.assertRaises(KeyError):
BrokerConnection("bogus://localhost:7421").transport BrokerConnection("bogus://localhost:7421").transport
Expand Down Expand Up @@ -153,6 +157,22 @@ def publish():
with self.assertRaises(_ConnectionError): with self.assertRaises(_ConnectionError):
ensured() ensured()


def test_autoretry(self):
myfun = Mock()
myfun.__name__ = "test_autoretry"

self.conn.transport.connection_errors = (KeyError, )

def on_call(*args, **kwargs):
myfun.side_effect = None
raise KeyError("foo")

myfun.side_effect = on_call
insured = self.conn.autoretry(myfun)
insured()

self.assertTrue(myfun.called)

def test_SimpleQueue(self): def test_SimpleQueue(self):
conn = self.conn conn = self.conn
q = conn.SimpleQueue("foo") q = conn.SimpleQueue("foo")
Expand Down Expand Up @@ -221,6 +241,12 @@ def test_establish_connection(self):
self.assertEqual(conn.transport_options, self.transport_options) self.assertEqual(conn.transport_options, self.transport_options)




class xResource(Resource):

def setup(self):
pass


class ResourceCase(unittest.TestCase): class ResourceCase(unittest.TestCase):
abstract = True abstract = True


Expand Down Expand Up @@ -250,6 +276,75 @@ def test_acquire__release(self):
[chan.release() for chan in chans] [chan.release() for chan in chans]
self.assertState(P, 10, 0) self.assertState(P, 10, 0)


def test_acquire_no_limit(self):
if self.abstract:
return
P = self.create_resource(None, 0)
P.acquire().release()

def test_replace_when_limit(self):
if self.abstract:
return
P = self.create_resource(10, 0)
r = P.acquire()
P._dirty = Mock()
P.close_resource = Mock()

P.replace(r)
P._dirty.discard.assert_called_with(r)
P.close_resource.assert_called_with(r)

def test_replace_no_limit(self):
if self.abstract:
return
P = self.create_resource(None, 0)
r = P.acquire()
P._dirty = Mock()
P.close_resource = Mock()

P.replace(r)
self.assertFalse(P._dirty.discard.called)
P.close_resource.assert_called_with(r)

def test_interface_prepare(self):
if not self.abstract:
return
x = xResource()
self.assertEqual(x.prepare(10), 10)

def test_force_close_all_handles_AttributeError(self):
if self.abstract:
return
P = self.create_resource(10, 10)
cr = P.close_resource = Mock()
cr.side_effect = AttributeError("x")

P.acquire()
self.assertTrue(P._dirty)

P.force_close_all()

def test_force_close_all_no_mutex(self):
if self.abstract:
return
P = self.create_resource(10, 10)
P.close_resource = Mock()

m = P._resource = Mock()
m.mutex = None
m.queue.pop.side_effect = IndexError

P.force_close_all()

def test_add_when_empty(self):
if self.abstract:
return
P = self.create_resource(None, None)
P._resource.queue[:] = []
self.assertFalse(P._resource.queue)
P._add_when_empty()
self.assertTrue(P._resource.queue)



class test_ConnectionPool(ResourceCase): class test_ConnectionPool(ResourceCase):
abstract = False abstract = False
Expand Down
101 changes: 100 additions & 1 deletion kombu/tests/test_transport_virtual.py
Expand Up @@ -3,13 +3,16 @@


import warnings import warnings


from mock import patch

from ..connection import BrokerConnection from ..connection import BrokerConnection
from ..exceptions import StdChannelError
from ..transport import virtual from ..transport import virtual
from ..utils import uuid from ..utils import uuid


from .compat import catch_warnings from .compat import catch_warnings
from .utils import unittest from .utils import unittest
from .utils import redirect_stdouts from .utils import Mock, redirect_stdouts




def client(): def client():
Expand Down Expand Up @@ -109,6 +112,11 @@ def test_create(self):
"the quick brown fox...".encode("utf-8")) "the quick brown fox...".encode("utf-8"))
self.assertTrue(message.delivery_tag, tag) self.assertTrue(message.delivery_tag, tag)


def test_create_no_body(self):
virtual.Message(Mock(), {
"body": None,
"properties": {"delivery_tag": 1}})

def test_serializable(self): def test_serializable(self):
c = client().channel() c = client().channel()
data = c.prepare_message("the quick brown fox...") data = c.prepare_message("the quick brown fox...")
Expand Down Expand Up @@ -314,6 +322,39 @@ def restore_unacked(self):
self.channel.basic_recover(requeue=True) self.channel.basic_recover(requeue=True)
self.assertTrue(self.channel._qos.was_restored) self.assertTrue(self.channel._qos.was_restored)


def test_restore_unacked_raises_BaseException(self):
q = self.channel.qos
q._flush = Mock()
q._delivered = {1: 1}

q.channel._restore = Mock()
q.channel._restore.side_effect = SystemExit

errors = q.restore_unacked()
self.assertIsInstance(errors[0][0], SystemExit)
self.assertEqual(errors[0][1], 1)
self.assertFalse(q._delivered)

@patch("kombu.transport.virtual.emergency_dump_state")
@patch("kombu.transport.virtual.say")
def test_restore_unacked_once_when_unrestored(self, say,
emergency_dump_state):
q = self.channel.qos
q._flush = Mock()

class State(dict):
restored = False

q._delivered = State({1: 1})
ru = q.restore_unacked = Mock()
exc = KeyError()
ru.return_value = [(exc, 1)]

self.channel.do_restore = True
q.restore_unacked_once()
self.assertTrue(say.called)
self.assertTrue(emergency_dump_state.called)

def test_basic_recover(self): def test_basic_recover(self):
with self.assertRaises(NotImplementedError): with self.assertRaises(NotImplementedError):
self.channel.basic_recover(requeue=False) self.channel.basic_recover(requeue=False)
Expand Down Expand Up @@ -355,6 +396,64 @@ def test_flow(self):
with self.assertRaises(NotImplementedError): with self.assertRaises(NotImplementedError):
self.channel.flow(False) self.channel.flow(False)


def test_close_when_no_connection(self):
self.channel.connection = None
self.channel.close()
self.assertTrue(self.channel.closed)

def test_drain_events_has_get_many(self):
c = self.channel
c._get_many = Mock()
c._poll = Mock()
c._consumers = [1]
c._qos = Mock()
c._qos.can_consume.return_value = True

c.drain_events(timeout=10.0)
c._get_many.assert_called_with(c._active_queues, timeout=10.0)

def test_get_exchanges(self):
self.channel.exchange_declare(exchange="foo")
self.assertTrue(self.channel.get_exchanges())

def test_basic_cancel_not_in_active_queues(self):
c = self.channel
c._consumers.add("x")
c._tag_to_queue["x"] = "foo"
c._active_queues = Mock()
c._active_queues.remove.side_effect = ValueError()
c.auto_delete_queues["foo"] = 3

c.basic_cancel("x")
self.assertEqual(c.auto_delete_queues["foo"], 2)
c._active_queues.remove.assert_called_with("foo")

def test_basic_cancel_unknown_ctag(self):
self.assertIsNone(self.channel.basic_cancel("unknown-tag"))

def test_list_bindings(self):
c = self.channel
c.exchange_declare(exchange="foo")
c.queue_declare(queue="q")
c.queue_bind(queue="q", exchange="foo", routing_key="rk")

self.assertIn(("q", "foo", "rk"), list(c.list_bindings()))

def test_after_reply_message_received(self):
c = self.channel
c.queue_delete = Mock()
c.after_reply_message_received("foo")
c.queue_delete.assert_called_with("foo")

def test_queue_delete_unknown_queue(self):
self.assertIsNone(self.channel.queue_delete("xiwjqjwel"))

def test_queue_declare_passive(self):
has_queue = self.channel._has_queue = Mock()
has_queue.return_value = False
with self.assertRaises(StdChannelError):
self.channel.queue_declare(queue="21wisdjwqe", passive=True)



class test_Transport(unittest.TestCase): class test_Transport(unittest.TestCase):


Expand Down

0 comments on commit 399f979

Please sign in to comment.