Skip to content

Commit

Permalink
Got 93% coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
Ask Solem committed Jun 29, 2010
1 parent 8791382 commit 3149a84
Show file tree
Hide file tree
Showing 19 changed files with 969 additions and 143 deletions.
6 changes: 6 additions & 0 deletions kombu/__init__.py
Expand Up @@ -5,3 +5,9 @@
__contact__ = "askh@opera.com"
__homepage__ = "http://github.com/ask/carrot/"
__docformat__ = "restructuredtext"

import os
if not os.environ.get("KOMBU_NO_EVAL", False):
from kombu.connection import BrokerConnection
from kombu.entity import Exchange, Binding
from kombu.messaging import Consumer, Producer
8 changes: 7 additions & 1 deletion kombu/backends/base.py
Expand Up @@ -19,8 +19,14 @@ class BaseMessage(object):
MessageStateError = MessageStateError

def __init__(self, channel, body=None, delivery_tag=None,
content_type=None, content_encoding=None, delivery_info={}, **kwargs):
content_type=None, content_encoding=None, delivery_info={},
**kwargs):
self.channel = channel
self.body = body
self.delivery_tag = delivery_tag
self.content_type = content_type
self.content_encoding = content_encoding
self.delivery_info = delivery_info
self._decoded_cache = None
self._state = "RECEIVED"

Expand Down
7 changes: 3 additions & 4 deletions kombu/backends/emulation.py
@@ -1,8 +1,7 @@
from carrot.backends.base import BaseBackend, BaseMessage
from anyjson import serialize, deserialize
from anyjson import deserialize
from itertools import count
from django.utils.datastructures import SortedDict
from carrot.utils import gen_unique_id
import sys
import time
import atexit
Expand Down Expand Up @@ -30,7 +29,7 @@ def get(self):
:returns: The message and the name of the queue it came from as
a tuple.
:raises Empty: If there are no more items in any of the queues.
:raises Queue.Empty: If there are no more items in any of the queues.
"""

Expand All @@ -42,7 +41,7 @@ def get(self):
queue = self.cycle.next()
try:
item = self.backend._get(queue)
except Empty:
except QueueEmpty:
# raises Empty when we've tried all of them.
tried.add(queue)
if tried == self.all:
Expand Down
2 changes: 1 addition & 1 deletion kombu/backends/pikachu.py
@@ -1,4 +1,3 @@
import asyncore
import weakref
import functools
import itertools
Expand Down Expand Up @@ -197,6 +196,7 @@ def publish(self, message, exchange, routing_key, mandatory=None,
immediate=immediate)
if mandatory or immediate:
self.close()
return ret

def qos(self, prefetch_size, prefetch_count, apply_global=False):
"""Request specific Quality of Service."""
Expand Down
17 changes: 6 additions & 11 deletions kombu/backends/pyamqplib.py
Expand Up @@ -5,13 +5,7 @@
.. _`amqplib`: http://barryp.org/software/py-amqplib/
"""
import warnings
import weakref

from itertools import count

from amqplib import client_0_8 as amqp
from amqplib.client_0_8.exceptions import AMQPChannelException
from amqplib.client_0_8.channel import Channel

from kombu.backends.base import BaseMessage, BaseBackend
Expand Down Expand Up @@ -146,7 +140,7 @@ def __init__(self, channel, amqp_message, **kwargs):
"delivery_info"):
kwargs[attr_name] = getattr(amqp_message, attr_name, None)

super(Message, self).__init__(backend, **kwargs)
super(Message, self).__init__(channel, **kwargs)


class Channel(Channel):
Expand All @@ -157,9 +151,10 @@ def prepare_message(self, message_data, priority=None,
properties=None):
"""Encapsulate data into a AMQP message."""
return amqp.Message(message_data, priority=priority,
content_type=content_type,
content_encoding=content_encoding,
properties=properties)
content_type=content_type,
content_encoding=content_encoding,
properties=properties,
application_headers=headers)

def message_to_python(self, raw_message):
"""Convert encoded message body back to a Python value."""
Expand All @@ -173,7 +168,7 @@ def __init__(self, connection, **kwargs):
self.connection = connection
self.default_port = kwargs.get("default_port") or self.default_port

def get_channel(self, connection):
def create_channel(self, connection):
return connection.channel()

def drain_events(self, connection, **kwargs):
Expand Down
5 changes: 4 additions & 1 deletion kombu/connection.py
@@ -1,3 +1,5 @@
import socket

from kombu.backends import get_backend_cls


Expand All @@ -22,7 +24,7 @@ def __init__(self, hostname="localhost", userid="guest",
def __enter__(self):
return self

def __exit__(self, e_type, e_value, e_trace):
def __exit__(self, *args):
self.close()

def _establish_connection(self):
Expand Down Expand Up @@ -66,6 +68,7 @@ def close(self):
try:
if self._connection:
self.backend.close_connection(self._connection)
self._connection = None
except socket.error:
pass
self._closed = True
Expand Down
51 changes: 36 additions & 15 deletions kombu/entity.py
Expand Up @@ -18,6 +18,11 @@ def __init__(self, *args, **kwargs):
value = kwargs.get(name)
if value is not None:
setattr(self, name, (type_ or any)(value))
else:
try:
getattr(self, name)
except AttributeError:
setattr(self, name, None)

def __copy__(self):
return self.__class__(**dict((name, getattr(self, name))
Expand All @@ -26,23 +31,30 @@ def __copy__(self):

def assert_is_bound(fun):

def only_if_bound(self, *args, **kwargs):
def if_bound(self, *args, **kwargs):
if self.is_bound:
return fun(self, *args, **kwargs)
raise NotBoundError(
"Can't call %s on %s not bound to a channel" % (
fun.__name__,
self.__class__.__name__))
only_if_bound.__name__ = fun.__name__
if_bound.__name__ = fun.__name__
if_bound.__doc__ = fun.__doc__
if_bound.__module__ = fun.__module__
if_bound.__dict__.update(fun.__dict__)
if_bound.func_name = fun.__name__

return only_if_bound
return if_bound


class MaybeChannelBound(Object):
"""Mixin for classes that can be bound to an AMQP channel."""
channel = None
_is_bound = False

def __call__(self, channel):
return self.bind(channel)

def bind(self, channel):
"""Create copy of the instance that is bound to a channel."""
return copy(self).maybe_bind(channel)
Expand Down Expand Up @@ -85,6 +97,7 @@ class Exchange(MaybeChannelBound):
("type", None),
("routing_key", None),
("channel", None),
("arguments", None),
("durable", bool),
("auto_delete", bool),
("delivery_mode", lambda m: DELIVERY_MODES.get(m) or m))
Expand All @@ -106,19 +119,21 @@ def declare(self):
return self.channel.exchange_declare(exchange=self.name,
type=self.type,
durable=self.durable,
auto_delete=self.auto_delete)
auto_delete=self.auto_delete,
arguments=self.arguments)

@assert_is_bound
def create_message(self, message_data, delivery_mode=None,
priority=None, content_type=None, content_encoding=None,
properties=None):
properties=None, headers=None):
properties = properties or {}
properties["delivery_mode"] = delivery_mode or self.delivery_mode
return self.channel.prepare_message(message_data,
properties=properties,
priority=priority,
content_type=content_type,
content_encoding=content_encoding)
content_encoding=content_encoding,
headers=headers)

@assert_is_bound
def publish(self, message, routing_key=None, mandatory=False,
Expand All @@ -129,8 +144,7 @@ def publish(self, message, routing_key=None, mandatory=False,
exchange=self.name,
routing_key=routing_key,
mandatory=mandatory,
immediate=immediate,
headers=headers)
immediate=immediate)

@assert_is_bound
def delete(self, if_unused=False):
Expand All @@ -154,6 +168,8 @@ class Binding(MaybeChannelBound):
("exchange", None),
("routing_key", None),
("channel", None),
("queue_arguments", None),
("binding_arguments", None),
("durable", bool),
("exclusive", bool),
("auto_delete", bool))
Expand All @@ -169,7 +185,7 @@ def __init__(self, name="", exchange=None, routing_key="", **kwargs):
self.maybe_bind(self.channel)

def when_bound(self):
self.exchange = self.exchange.bind(self.channel)
self.exchange = self.exchange(self.channel)

@assert_is_bound
def declare(self):
Expand All @@ -178,13 +194,14 @@ def declare(self):
chan = self.channel
return (self.exchange and self.exchange.declare(),
self.name and chan.queue_declare(queue=self.name,
durable=self.durable,
exclusive=self.exclusive,
auto_delete=self.auto_delete),
durable=self.durable,
exclusive=self.exclusive,
auto_delete=self.auto_delete,
arguments=self.queue_arguments),
self.name and chan.queue_bind(queue=self.name,
exchange=self.exchange.name,
routing_key=self.routing_key))

exchange=self.exchange.name,
routing_key=self.routing_key,
arguments=self.binding_arguments))

@assert_is_bound
def get(self, no_ack=None):
Expand All @@ -208,6 +225,10 @@ def consume(self, consumer_tag, callback, no_ack=None, nowait=True):
def cancel(self, consumer_tag):
return self.channel.basic_cancel(consumer_tag)

@assert_is_bound
def delete(self, if_unused=False, if_empty=False):
return self.channel.queue_delete(self.name, if_unused, if_empty)

def __repr__(self):
return super(Binding, self).__repr__(
"Binding %s -> %s -> %s" % (self.name,
Expand Down
33 changes: 20 additions & 13 deletions kombu/messaging.py
Expand Up @@ -19,7 +19,7 @@ def __init__(self, channel, exchange=None, serializer=None,
self.auto_declare = auto_declare

if self.exchange:
self.exchange = self.exchange.bind(self.channel)
self.exchange = self.exchange(self.channel)
self.auto_declare and self.declare()

def declare(self):
Expand Down Expand Up @@ -50,15 +50,16 @@ def prepare(self, message_data, serializer=None,

def publish(self, message_data, routing_key=None, delivery_mode=None,
mandatory=False, immediate=False, priority=0, content_type=None,
content_encoding=None, serializer=None):
content_encoding=None, serializer=None, headers=None):

message_data, content_type, content_encoding = self.prepare(
message_data, content_type, content_encoding)
message = self.exchange.create_message(message_data,
delivery_mode,
priority,
content_type,
content_encoding)
content_encoding,
headers=headers)
return self.exchange.publish(message, routing_key, mandatory,
immediate)

Expand All @@ -84,25 +85,26 @@ def __init__(self, channel, bindings, no_ack=None, auto_declare=None,
self.callbacks = callbacks
if self.callbacks is None:
self.callbacks = []
self._active_tags = {}

self.bindings = [binding.bind(self.channel)
self.bindings = [binding(self.channel)
for binding in maybe_list(self.bindings)]
self.auto_declare and self.declare()

self._active_tags = {}
self._declare_consumer()
if self.auto_declare:
self.declare()
self.consume()

def __enter__(self):
return self

def __exit__(self):
def __exit__(self, *args):
self.cancel()

def declare(self):
for binding in self.bindings:
binding.declare(self.channel)
binding.declare()

def _consume(self):
def consume(self):
H, T = self.bindings[:-1], self.bindings[-1]
for binding in H:
binding.consume(self._add_tag(binding),
Expand All @@ -114,7 +116,6 @@ def _consume(self):
self.no_ack,
nowait=False)


def _add_tag(self, binding):
tag = self._active_tags[binding] = str(_next_tag())
return tag
Expand All @@ -141,14 +142,20 @@ def register_callback(self, callback):

def purge(self):
for binding in self.bindings:
self.binding.purge()
binding.purge()

def cancel(self):
for binding, tag in self._active_tags.items():
binding.cancel(tag)
self._active_tags = {}

def flow(self, active):
self.channel.flow(active)

def qos(self, prefetch_size=0, prefetch_count=0, apply_global=False):
self.channel.qos(prefetch_size, prefetch_count, apply_global)
return self.channel.basic_qos(prefetch_size,
prefetch_count,
apply_global)

def recover(self, requeue=False):
return self.channel.basic_recover(requeue)

0 comments on commit 3149a84

Please sign in to comment.