Skip to content

Commit

Permalink
97% total coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Jan 13, 2012
1 parent b304f93 commit d348cb4
Show file tree
Hide file tree
Showing 11 changed files with 287 additions and 21 deletions.
10 changes: 6 additions & 4 deletions kombu/messaging.py
Expand Up @@ -123,6 +123,7 @@ def publish(self, body, routing_key=None, delivery_mode=None,
:keyword properties: Additional properties, see the AMQP spec.
"""
retry_policy = {} if retry_policy is None else retry_policy
headers = headers or {}
if routing_key is None:
routing_key = self.routing_key
Expand All @@ -134,7 +135,7 @@ def publish(self, body, routing_key=None, delivery_mode=None,

# Additional entities to declare before publishing the message.
for entity in declare:
self.maybe_declare(entity, retry, **retry_policy or {})
self.maybe_declare(entity, retry, **retry_policy)

body, content_type, content_encoding = self._prepare(
body, serializer, content_type, content_encoding,
Expand Down Expand Up @@ -252,14 +253,14 @@ class Consumer(object):

_next_tag = count(1).next # global

def __init__(self, channel, queues, no_ack=None, auto_declare=None,
def __init__(self, channel, queues=None, no_ack=None, auto_declare=None,
callbacks=None, on_decode_error=None):
from .connection import BrokerConnection
if isinstance(channel, BrokerConnection):
channel = channel.default_channel
self.channel = channel

self.queues = queues
self.queues = [] if queues is None else queues
if no_ack is not None:
self.no_ack = no_ack
if auto_declare is not None:
Expand Down Expand Up @@ -465,7 +466,8 @@ def _add_tag(self, queue, consumer_tag=None):
def _receive_callback(self, message):
channel = self.channel
try:
if hasattr(channel, "message_to_python"):
m2p = getattr(channel, "message_to_python", None)
if m2p:
message = channel.message_to_python(message)
decoded = message.decode()
except Exception, exc:
Expand Down
24 changes: 10 additions & 14 deletions kombu/serialization.py
Expand Up @@ -16,17 +16,17 @@
import pickle as pypickle
try:
import cPickle as cpickle
except ImportError:
except ImportError: # pragma: no cover
cpickle = None # noqa

from .exceptions import SerializerNotInstalled
from .utils.encoding import str_to_bytes
from .utils.encoding import str_to_bytes, bytes_t

__all__ = ["pickle", "bytes_type", "encode", "decode",
__all__ = ["pickle", "encode", "decode",
"register", "unregister"]
SKIP_DECODE = frozenset(["binary", "ascii-8bit"])

if sys.platform.startswith("java"):
if sys.platform.startswith("java"): # pragma: no cover

def _decode(t, coding):
return codecs.getdecoder(coding)(t)[0]
Expand All @@ -49,10 +49,6 @@ def _decode(t, coding):
else:
pickle = cpickle or pypickle

bytes_type = str
if sys.version_info >= (3, 0):
bytes_type = bytes


class SerializerRegistry(object):
"""The registry keeps track of serialization methods."""
Expand All @@ -76,7 +72,7 @@ def register(self, name, encoder, decoder, content_type,

def disable(self, name):
if '/' not in name:
name = self.type_to_name[name]
name, _, _ = self._encoders[name]
self._disabled_content_types.add(name)

def unregister(self, name):
Expand Down Expand Up @@ -117,7 +113,7 @@ def encode(self, data, serializer=None):
# If a raw string was sent, assume binary encoding
# (it's likely either ASCII or a raw binary file, and a character
# set of 'binary' will encompass both, even if not ideal.
if not serializer and isinstance(data, bytes_type):
if not serializer and isinstance(data, bytes_t):
# In Python 3+, this would be "bytes"; allow binary data to be
# sent as a message without getting encoder errors
return "application/data", "binary", data
Expand All @@ -139,7 +135,7 @@ def encode(self, data, serializer=None):
return content_type, content_encoding, payload

def decode(self, data, content_type, content_encoding, force=False):
if content_type in self._disabled_content_types:
if content_type in self._disabled_content_types and not force:
raise SerializerNotInstalled(
"Content-type %r has been disabled." % (content_type, ))
content_type = content_type or 'application/data'
Expand All @@ -149,9 +145,9 @@ def decode(self, data, content_type, content_encoding, force=False):
decode = self._decoders.get(content_type)
if decode:
return decode(data)
if content_encoding not in SKIP_DECODE and \
not isinstance(data, unicode):
return _decode(data, content_encoding)
if content_encoding not in SKIP_DECODE and \
not isinstance(data, unicode):
return _decode(data, content_encoding)
return data


Expand Down
13 changes: 13 additions & 0 deletions kombu/tests/test_compression.py
@@ -1,9 +1,12 @@
from __future__ import absolute_import

import sys

from nose import SkipTest

from .. import compression
from .utils import unittest
from .utils import mask_modules


class test_compression(unittest.TestCase):
Expand All @@ -16,6 +19,16 @@ def setUp(self):
else:
self.has_bzip2 = True

@mask_modules("bz2")
def test_no_bz2(self):
c = sys.modules.pop("kombu.compression")
try:
import kombu.compression
self.assertFalse(hasattr(kombu.compression, "bz2"))
finally:
if c is not None:
sys.modules["kombu.compression"] = c

def test_encoders(self):
encoders = compression.encoders()
self.assertIn("application/x-gzip", encoders)
Expand Down
37 changes: 37 additions & 0 deletions kombu/tests/test_entities.py
Expand Up @@ -7,6 +7,7 @@

from .mocks import Transport
from .utils import unittest
from .utils import Mock


def get_conn():
Expand All @@ -26,6 +27,14 @@ def test_bound(self):
self.assertIs(bound.channel, chan)
self.assertIn("<bound", repr(bound))

def test_hash(self):
self.assertEqual(hash(Exchange("a")), hash(Exchange("a")))
self.assertNotEqual(hash(Exchange("a")), hash(Exchange("b")))

def test_can_cache_declaration(self):
self.assertTrue(Exchange("a", durable=True).can_cache_declaration)
self.assertFalse(Exchange("a", durable=False).can_cache_declaration)

def test_eq(self):
e1 = Exchange("foo", "direct")
e2 = Exchange("foo", "direct")
Expand Down Expand Up @@ -102,6 +111,29 @@ class test_Queue(unittest.TestCase):
def setUp(self):
self.exchange = Exchange("foo", "direct")

def test_hash(self):
self.assertEqual(hash(Queue("a")), hash(Queue("a")))
self.assertNotEqual(hash(Queue("a")), hash(Queue("b")))

def test_when_bound_but_no_exchange(self):
q = Queue("a")
q.exchange = None
self.assertIsNone(q.when_bound())

def test_declare_but_no_exchange(self):
q = Queue("a")
q.queue_declare = Mock()
q.queue_bind = Mock()
q.exchange = None

q.declare()
q.queue_declare.assert_called_with(False, passive=False)
q.queue_bind.assert_called_with(False)

def test_can_cache_declaration(self):
self.assertTrue(Queue("a", durable=True).can_cache_declaration)
self.assertFalse(Queue("a", durable=False).can_cache_declaration)

def test_eq(self):
q1 = Queue("xxx", Exchange("xxx", "direct"), "xxx")
q2 = Queue("xxx", Exchange("xxx", "direct"), "xxx")
Expand Down Expand Up @@ -169,6 +201,11 @@ def test_unbind(self):
b.unbind()
self.assertIn("queue_unbind", b.channel)

def test_as_dict(self):
q = Queue("foo", self.exchange, "rk")
d = q.as_dict(recurse=True)
self.assertEqual(d["exchange"]["name"], self.exchange.name)

def test__repr__(self):
b = Queue("foo", self.exchange, "foo")
self.assertIn("foo", repr(b))
Expand Down
104 changes: 104 additions & 0 deletions kombu/tests/test_messaging.py
Expand Up @@ -3,13 +3,16 @@

import anyjson

from mock import patch

from ..connection import BrokerConnection
from ..exceptions import MessageStateError
from ..messaging import Consumer, Producer
from ..entity import Exchange, Queue

from .mocks import Transport
from .utils import unittest
from .utils import Mock


class test_Producer(unittest.TestCase):
Expand All @@ -21,6 +24,19 @@ def setUp(self):
self.assertTrue(self.connection.connection.connected)
self.assertFalse(self.exchange.is_bound)

@patch("kombu.common.maybe_declare")
def test_maybe_declare(self, maybe_declare):
p = self.connection.Producer()
q = Queue("foo")
p.maybe_declare(q)
maybe_declare.assert_called_with(q, p.channel, False)

@patch("kombu.common.maybe_declare")
def test_maybe_declare_when_entity_false(self, maybe_declare):
p = self.connection.Producer()
p.maybe_declare(None)
self.assertFalse(maybe_declare.called)

def test_auto_declare(self):
channel = self.connection.channel()
p = Producer(channel, self.exchange, auto_declare=True)
Expand Down Expand Up @@ -91,6 +107,45 @@ def test_prepare_is_already_unicode(self):
self.assertEqual(ctype, "text/plain")
self.assertEqual(cencoding, "utf-8")

def test_publish_with_Exchange_instance(self):
p = self.connection.Producer()
p._publish = Mock()
p.publish("hello", exchange=Exchange("foo"))
self.assertEqual(p._publish.call_args[0][4], "foo")

def test_publish_retry_with_declare(self):
p = self.connection.Producer()
p.maybe_declare = Mock()
ensure = p.connection.ensure = Mock()
ex = Exchange("foo")
p.publish("hello", exchange=ex, declare=[ex], retry=True,
retry_policy={"step": 4})
p.maybe_declare.assert_called_with(ex, True, step=4)
ensure.assert_called_with(p, p._publish, step=4)

def test_revive_when_channel_is_connection(self):
p = self.connection.Producer()
p.exchange = Mock()
new_conn = BrokerConnection("memory://")
defchan = new_conn.default_channel
p.revive(new_conn)

self.assertIs(p.channel, defchan)
p.exchange.revive.assert_called_with(defchan)

def test_enter_exit(self):
p = self.connection.Producer()
p.release = Mock()

self.assertIs(p.__enter__(), p)
p.__exit__()
p.release.assert_called_with()

def test_connection_property_handles_AttributeError(self):
p = self.connection.Producer()
p.channel = object()
self.assertIsNone(p.connection)

def test_publish(self):
channel = self.connection.channel()
p = Producer(channel, self.exchange, serializer="json")
Expand Down Expand Up @@ -146,6 +201,50 @@ def test_set_no_ack(self):
consumer = Consumer(channel, queue, auto_declare=True, no_ack=True)
self.assertTrue(consumer.no_ack)

def test_add_queue_when_auto_declare(self):
consumer = self.connection.Consumer(auto_declare=True)
q = Mock()
q.return_value = q
consumer.add_queue(q)
self.assertIn(q, consumer.queues)
q.declare.assert_called_with()

def test_add_queue_when_not_auto_declare(self):
consumer = self.connection.Consumer(auto_declare=False)
q = Mock()
q.return_value = q
consumer.add_queue(q)
self.assertIn(q, consumer.queues)
self.assertFalse(q.declare.call_count)

def test_consume_without_queues_returns(self):
consumer = self.connection.Consumer()
consumer.queues[:] = []
self.assertIsNone(consumer.consume())

def test_consuming_from(self):
consumer = self.connection.Consumer()
consumer.queues[:] = [Queue("a"), Queue("b")]
self.assertFalse(consumer.consuming_from(Queue("c")))
self.assertFalse(consumer.consuming_from("c"))
self.assertTrue(consumer.consuming_from(Queue("a")))
self.assertTrue(consumer.consuming_from(Queue("b")))
self.assertTrue(consumer.consuming_from("b"))

def test_receive_callback_without_m2p(self):
channel = self.connection.channel()
c = channel.Consumer()
m2p = getattr(channel, "message_to_python")
channel.message_to_python = None
try:
message = Mock()
message.decode.return_value = "Hello"
recv = c.receive = Mock()
c._receive_callback(message)
recv.assert_called_with("Hello", message)
finally:
channel.message_to_python = m2p

def test_set_callbacks(self):
channel = self.connection.channel()
queue = Queue("qname", self.exchange, "rkey")
Expand Down Expand Up @@ -401,3 +500,8 @@ def test__repr__(self):
channel = self.connection.channel()
b1 = Queue("qname1", self.exchange, "rkey")
self.assertTrue(repr(Consumer(channel, [b1])))

def test_connection_property_handles_AttributeError(self):
p = self.connection.Consumer()
p.channel = object()
self.assertIsNone(p.connection)
7 changes: 7 additions & 0 deletions kombu/tests/test_pidbox.py
@@ -1,11 +1,14 @@
from __future__ import absolute_import
from __future__ import with_statement

import socket

from .. import pidbox
from ..connection import BrokerConnection
from ..utils import uuid

from .utils import unittest
from .utils import Mock


class test_Mailbox(unittest.TestCase):
Expand Down Expand Up @@ -54,6 +57,10 @@ def callback(body):
reply = mailbox._collect(ticket, limit=1, channel=channel)
self.assertEqual(reply, [{"biz": "boz"}])

de = mailbox.connection.drain_events = Mock()
de.side_effect = socket.timeout
mailbox._collect(ticket, limit=1, channel=channel)

def test_constructor(self):
self.assertIsNone(self.mailbox.connection)
self.assertTrue(self.mailbox.exchange.name)
Expand Down

0 comments on commit d348cb4

Please sign in to comment.