Skip to content

Commit

Permalink
Implemented support for RabbitMQ broker-initiated Basic.Cancel
Browse files Browse the repository at this point in the history
Implemented unit and integration tests for RabbitMQ broker-initiated Basic.Cancel.

Created integration/channel_basic_test.py for Channel.basic integration tests and implemented test_unroutable_message_is_returned.

Moved rabbit_extensions.py from scripts to integration/rabbit_extensions_test.py for RabbitMQ-specific integration tests, removed option parsing logic, and implemented additional test test_unroutable_message_is_returned_with_puback.
  • Loading branch information
Vitaly Kruglikov committed Jul 8, 2015
1 parent 27510db commit 3955646
Show file tree
Hide file tree
Showing 8 changed files with 838 additions and 96 deletions.
50 changes: 36 additions & 14 deletions haigha/classes/basic_class.py
Expand Up @@ -87,6 +87,8 @@ def _generate_consumer_tag(self):
The consumer tag is local to a channel, so two clients can use the
same consumer tags.
NOTE: this protected method may be called by derived classes
'''
self._consumer_tag_id += 1
return "channel-%d-%d" % (self.channel_id, self._consumer_tag_id)
Expand Down Expand Up @@ -148,10 +150,9 @@ def cancel(self, consumer_tag='', nowait=True, consumer=None, cb=None):
to only use a consumer once per channel.
'''
if consumer:
for (tag, func) in self._consumer_cb.iteritems():
if func == consumer:
consumer_tag = tag
break
tag = self._lookup_consumer_tag_by_consumer(consumer)
if tag:
consumer_tag = tag

nowait = nowait and self.allow_nowait() and not cb

Expand All @@ -164,24 +165,45 @@ def cancel(self, consumer_tag='', nowait=True, consumer=None, cb=None):
self._cancel_cb.append(cb)
self.channel.add_synchronous_cb(self._recv_cancel_ok)
else:
try:
del self._consumer_cb[consumer_tag]
except KeyError:
self.logger.warning(
'no callback registered for consumer tag " %s "',
consumer_tag)
self._purge_consumer_by_tag(consumer_tag)


def _recv_cancel_ok(self, method_frame):
consumer_tag = method_frame.args.read_shortstr()
self._purge_consumer_by_tag(consumer_tag)

cb = self._cancel_cb.popleft()
if cb:
cb()

def _lookup_consumer_tag_by_consumer(self, consumer):
'''Look up consumer tag given its consumer function
NOTE: this protected method may be called by derived classes
:param callable consumer: consumer function
:returns: matching consumer tag or None
:rtype: str or None
'''
for (tag, func) in self._consumer_cb.iteritems():
if func == consumer:
return tag

def _purge_consumer_by_tag(self, consumer_tag):
'''Purge consumer entry from this basic instance
NOTE: this protected method may be called by derived classes
:param str consumer_tag:
'''
try:
del self._consumer_cb[consumer_tag]
except KeyError:
self.logger.warning(
'no callback registered for consumer tag " %s "', consumer_tag)

cb = self._cancel_cb.popleft()
if cb:
cb()
else:
self.logger.info('purged consumer with tag " %s "', consumer_tag)

def publish(self, msg, exchange, routing_key, mandatory=False,
immediate=False, ticket=None):
Expand Down
107 changes: 107 additions & 0 deletions haigha/connections/rabbit_connection.py
Expand Up @@ -5,6 +5,7 @@
'''

from collections import deque
import copy

from haigha.connection import Connection
from haigha.classes.basic_class import BasicClass
Expand All @@ -30,6 +31,21 @@ def __init__(self, **kwargs):
class_map.setdefault(85, RabbitConfirmClass)
kwargs['class_map'] = class_map

# Set RabbitMQ-specific capabilities to inform broker

# Avoid modifying caller's client_properties dict, if passed in
if "client_properties" in kwargs:
client_properties = copy.deepcopy(kwargs["client_properties"])
else:
client_properties = dict()

# Indicate consumer_cancel_notify capability per
# www.rabbitmq.com/consumer-cancel.html
client_properties['capabilities'] = dict(consumer_cancel_notify=True)

kwargs['client_properties'] = client_properties


super(RabbitConnection, self).__init__(**kwargs)


Expand Down Expand Up @@ -133,6 +149,7 @@ class RabbitBasicClass(BasicClass):

def __init__(self, *args, **kwargs):
super(RabbitBasicClass, self).__init__(*args, **kwargs)
self.dispatch_map[30] = self._recv_cancel
self.dispatch_map[80] = self._recv_ack
self.dispatch_map[120] = self._recv_nack

Expand All @@ -142,6 +159,18 @@ def __init__(self, *args, **kwargs):
self._msg_id = 0
self._last_ack_id = 0

# Mapping of active consumer tags to user's consumer cancel callbacks
self._broker_cancel_cb_map = dict()

def _cleanup(self):
'''
Cleanup all the local data.
'''
self._ack_listener = None
self._nack_listener = None
self._broker_cancel_cb_map = None
super(RabbitBasicClass, self)._cleanup()

def set_ack_listener(self, cb):
'''
Set a callback for ack listening, to be used when the channel is
Expand Down Expand Up @@ -208,6 +237,84 @@ def _recv_nack(self, method_frame):
self._last_ack_id = delivery_tag
self._nack_listener(self._last_ack_id, requeue)

def consume(self, queue, consumer, consumer_tag='', no_local=False,
no_ack=True, exclusive=False, nowait=True, ticket=None,
cb=None, cancel_cb=None):
'''Start a queue consumer.
Accepts the following optional arg in addition to those of
`BasicClass.consume()`:
:param cancel_cb: a callable to be called when the broker cancels the
consumer; e.g., when the consumer's queue is deleted. See
www.rabbitmq.com/consumer-cancel.html.
:type cancel_cb: None or callable with signature cancel_cb(consumer_tag)
'''
# Register the consumer's broker-cancel callback entry
if cancel_cb is not None:
if not callable(cancel_cb):
raise ValueError('cancel_cb is not callable: %r' % (cancel_cb,))

if not consumer_tag:
consumer_tag = self._generate_consumer_tag()

self._broker_cancel_cb_map[consumer_tag] = cancel_cb

# Start consumer
super(RabbitBasicClass, self).consume(queue, consumer, consumer_tag,
no_local, no_ack, exclusive,
nowait, ticket, cb)

def cancel(self, consumer_tag='', nowait=True, consumer=None, cb=None):
'''
Cancel a consumer. Can choose to delete based on a consumer tag or
the function which is consuming. If deleting by function, take care
to only use a consumer once per channel.
'''
# Remove the consumer's broker-cancel callback entry
if consumer:
tag = self._lookup_consumer_tag_by_consumer(consumer)
if tag:
consumer_tag = tag

try:
del self._broker_cancel_cb_map[consumer_tag]
except KeyError:
self.logger.warning(
'cancel: no broker-cancel-cb entry for consumer tag %r '
'(consumer %r)', consumer_tag, consumer)

# Cancel consumer
super(RabbitBasicClass, self).cancel(consumer_tag, nowait, consumer, cb)

def _recv_cancel(self, method_frame):
'''Handle Basic.Cancel from broker
:param MethodFrame method_frame: Basic.Cancel method frame from broker
'''
self.logger.warning("consumer cancelled by broker: %r", method_frame)

consumer_tag = method_frame.args.read_shortstr()

# NOTE: per RabbitMQ spec, no-wait is always true in Basic.Cancel from
# broker

# Remove consumer from this basic instance
try:
cancel_cb = self._broker_cancel_cb_map.pop(consumer_tag)
except KeyError:
# Must be a race condition between user's cancel and broker's cancel
self.logger.warning(
'_recv_cancel: no broker-cancel-cb entry for consumer tag %r',
consumer_tag)
else:
if callable(cancel_cb):
# Purge from base class only when user supplies cancel_cb
self._purge_consumer_by_tag(consumer_tag)

# Notify user
cancel_cb(consumer_tag)


class RabbitConfirmClass(ProtocolClass):

Expand Down
Empty file added scripts/integration/__init__.py
Empty file.
151 changes: 151 additions & 0 deletions scripts/integration/channel_basic_test.py
@@ -0,0 +1,151 @@
'''
Copyright (c) 2011-2015, Agora Games, LLC All rights reserved.
https://github.com/agoragames/haigha/blob/master/LICENSE.txt
'''

#
# Integration tests for Channel.basic
#


# Disable "no member" pylint error since Haigha's channel class members get
# added at runtime.
#
# e.g., "E1103: Instance of 'Channel' has no 'exchange' member (but some types
# could not be inferred)"
#
# pylint: disable=E1103


import logging
import socket
import unittest


from haigha.channel import Channel
from haigha.connection import Connection
from haigha.message import Message


class TestOptions(object): # pylint: disable=R0903
'''Configuration settings'''
user = 'guest'
password = 'guest'
vhost = '/'
host = 'localhost'
debug = False


_OPTIONS = TestOptions()
_LOG = None


def setUpModule(): # pylint: disable=C0103
'''Unittest fixture for module-level initialization'''

global _LOG # pylint: disable=W0603


# Setup logging
log_level = logging.DEBUG if _OPTIONS.debug else logging.INFO
logging.basicConfig(level=log_level,
format="[%(levelname)s %(asctime)s] %(message)s")
_LOG = logging.getLogger('haigha')


class _CallbackSink(object):
'''Callback sink; an instance of this class may be passed as a callback
and it will store the callback args in the values instance attribute
'''

__slots__ = ('values',)

def __init__(self):
self.values = None
self.reset()

def reset(self):
'''Reset the args buffer'''
self.values = []

def __repr__(self):
return "%s(ready=%s, values=%.255r)" % (self.__class__.__name__,
self.ready,
self.values)

def __call__(self, *args):
self.values.append(args)

@property
def ready(self):
'''True if called; False if not called'''
return bool(self.values)


class ChannelBasicTests(unittest.TestCase):
'''Integration tests for Channel.basic'''

def _connect_to_broker(self):
''' Connect to broker and regisiter cleanup action to disconnect
:returns: connection instance
:rtype: `haigha.connection.Connection`
'''
sock_opts = {
(socket.IPPROTO_TCP, socket.TCP_NODELAY) : 1,
}
connection = Connection(
logger=_LOG,
debug=_OPTIONS.debug,
user=_OPTIONS.user,
password=_OPTIONS.password,
vhost=_OPTIONS.vhost,
host=_OPTIONS.host,
heartbeat=None,
sock_opts=sock_opts,
transport='socket')
self.addCleanup(lambda: connection.close(disconnect=True)
if not connection.closed else None)

return connection

def test_unroutable_message_is_returned(self):
connection = self._connect_to_broker()

ch = connection.channel()
self.addCleanup(ch.close)

_LOG.info('Declaring exchange "foo"')
ch.exchange.declare('foo', 'direct')
self.addCleanup(ch.exchange.delete, 'foo')

callback_sink = _CallbackSink()
ch.basic.set_return_listener(callback_sink)

_LOG.info(
'Publishing to exchange "foo" on route "nullroute" mandatory=True')
mid = ch.basic.publish(Message('hello world'), 'foo', 'nullroute',
mandatory=True)
_LOG.info('Published message mid %s to foo/nullroute', mid)

# Wait for return of unroutable message
while not callback_sink.ready:
connection.read_frames()


((msg,),) = callback_sink.values
self.assertEqual(msg.body, 'hello world')

self.assertIsNone(msg.delivery_info)
self.assertIsNotNone(msg.return_info)

return_info = msg.return_info
self.assertItemsEqual(
['channel', 'reply_code', 'reply_text', 'exchange', 'routing_key'],
return_info.keys())
self.assertIs(return_info['channel'], ch)
self.assertEqual(return_info['reply_code'], 312)
self.assertEqual(return_info['reply_text'], 'NO_ROUTE')
self.assertEqual(return_info['exchange'], 'foo')
self.assertEqual(return_info['routing_key'], 'nullroute')

0 comments on commit 3955646

Please sign in to comment.