Skip to content

Commit

Permalink
Multi-Callback Support [#48]
Browse files Browse the repository at this point in the history
  • Loading branch information
eandersson committed Dec 9, 2017
1 parent 8c32060 commit 8ae78ec
Show file tree
Hide file tree
Showing 15 changed files with 135 additions and 308 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
Changelog
=========

Version 3.0.0a1
---------------
- basic.consume now allows for multiple callbacks [#48].

- to_tuple has been removed.
This is a breaking change that affects the following function:

- channel.process_data_events
- channel.start_consuming

Version 2.3.0
-------------
- Added delivery_tag property to message.
Expand Down
10 changes: 10 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ Additional documentation is available on `amqpstorm.io <https://www.amqpstorm.io
Changelog
=========

Version 3.0.0a1
---------------
- basic.consume now allows for multiple callbacks [#48].

- to_tuple has been removed.
This is a breaking change that affects the following function:

- channel.process_data_events
- channel.start_consuming

Version 2.3.0
-------------
- Added delivery_tag property to message.
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""AMQPStorm."""
__version__ = '2.3.0' # noqa
__version__ = '3.0.0a1' # noqa
__author__ = 'eandersson' # noqa

import logging
Expand Down
10 changes: 7 additions & 3 deletions amqpstorm/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,12 @@ def consume(self, callback=None, queue='', consumer_tag='',
raise AMQPInvalidArgument('no_local should be a boolean')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
self._channel.consumer_callback = callback
consume_rpc_result = self._consume_rpc_request(arguments, consumer_tag,
exclusive, no_ack,
no_local, queue)
return self._consume_add_and_get_tag(consume_rpc_result)
tag = self._consume_add_and_get_tag(consume_rpc_result)
self._channel.consumer_callback[tag] = callback
return tag

def cancel(self, consumer_tag=''):
"""Cancel a queue consumer.
Expand Down Expand Up @@ -267,7 +268,10 @@ def _consume_add_and_get_tag(self, consume_rpc_result):
:rtype: str
"""
consumer_tag = consume_rpc_result['consumer_tag']

consumer_tag = compatibility.try_utf8_decode(
consume_rpc_result['consumer_tag']
)
self._channel.add_consumer_tag(consumer_tag)
return consumer_tag

Expand Down
32 changes: 9 additions & 23 deletions amqpstorm/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Channel(BaseChannel):

def __init__(self, channel_id, connection, rpc_timeout):
super(Channel, self).__init__(channel_id)
self.consumer_callback = None
self.consumer_callback = {}
self.rpc = Rpc(self, timeout=rpc_timeout)
self._confirming_deliveries = False
self._connection = connection
Expand Down Expand Up @@ -92,14 +92,11 @@ def queue(self):
"""
return self._queue

def build_inbound_messages(self, break_on_empty=False, to_tuple=False,
auto_decode=True):
def build_inbound_messages(self, break_on_empty=False, auto_decode=True):
"""Build messages in the inbound queue.
:param bool break_on_empty: Should we break the loop when there are
no more messages to consume.
:param bool to_tuple: Should incoming messages be converted to a
tuple before delivery.
:param bool auto_decode: Auto-decode strings when possible.
:raises AMQPChannelError: Raises if the channel encountered an error.
Expand All @@ -117,9 +114,6 @@ def build_inbound_messages(self, break_on_empty=False, to_tuple=False,
self.check_for_errors()
sleep(IDLE_WAIT)
continue
if to_tuple:
yield message.to_tuple()
continue
yield message

def close(self, reply_code=200, reply_text=''):
Expand Down Expand Up @@ -246,11 +240,9 @@ def open(self):
self.rpc_request(specification.Channel.Open())
self.set_state(self.OPEN)

def process_data_events(self, to_tuple=False, auto_decode=True):
def process_data_events(self, auto_decode=True):
"""Consume inbound messages.
:param bool to_tuple: Should incoming messages be converted to a
tuple before delivery.
:param bool auto_decode: Auto-decode strings when possible.
:raises AMQPChannelError: Raises if the channel encountered an error.
Expand All @@ -262,15 +254,12 @@ def process_data_events(self, to_tuple=False, auto_decode=True):
if not self.consumer_callback:
raise AMQPChannelError('no consumer_callback defined')
for message in self.build_inbound_messages(break_on_empty=True,
to_tuple=to_tuple,
auto_decode=auto_decode):
if to_tuple:
# noinspection PyCallingNonCallable
self.consumer_callback(*message)
continue
consumer_tag = compatibility.try_utf8_decode(
message.method.get('consumer_tag')
)
# noinspection PyCallingNonCallable
self.consumer_callback(message)
sleep(IDLE_WAIT)
self.consumer_callback[consumer_tag](message)

def rpc_request(self, frame_out, adapter=None):
"""Perform a RPC Request.
Expand All @@ -283,11 +272,9 @@ def rpc_request(self, frame_out, adapter=None):
self._connection.write_frame(self.channel_id, frame_out)
return self.rpc.get_request(uuid, adapter=adapter)

def start_consuming(self, to_tuple=False, auto_decode=True):
def start_consuming(self, auto_decode=True):
"""Start consuming messages.
:param bool to_tuple: Should incoming messages be converted to a
tuple before delivery.
:param bool auto_decode: Auto-decode strings when possible.
:raises AMQPChannelError: Raises if the channel encountered an error.
Expand All @@ -297,8 +284,7 @@ def start_consuming(self, to_tuple=False, auto_decode=True):
:return:
"""
while not self.is_closed:
self.process_data_events(to_tuple=to_tuple,
auto_decode=auto_decode)
self.process_data_events(auto_decode=auto_decode)
if not self.consumer_tags:
break

Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/tests/functional/basic_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_functional_basic_cancel(self):
consumer_tag = self.channel.basic.consume(None, self.queue_name)

result = self.channel.basic.cancel(consumer_tag)
self.assertEqual(result['consumer_tag'], consumer_tag)
self.assertEqual(result['consumer_tag'].decode('utf-8'), consumer_tag)

@setup(queue=True)
def test_functional_basic_recover(self):
Expand Down
18 changes: 11 additions & 7 deletions amqpstorm/tests/functional/generic_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ def test_functional_consume_and_redeliver(self):
self.channel.basic.publish(body=self.message,
routing_key=self.queue_name)

def on_message(message):
def on_message_first(message):
message.reject()

self.channel.basic.consume(callback=on_message,
self.channel.basic.consume(callback=on_message_first,
queue=self.queue_name,
no_ack=False)
self.channel.process_data_events()
Expand All @@ -291,15 +291,19 @@ def on_message(message):
# Store and inbound messages.
inbound_messages = []

def on_message(message):
# Close current channel and open a new one.
self.channel.close()
channel = self.connection.channel()

def on_message_second(message):
inbound_messages.append(message)
self.assertEqual(message.body, self.message)
message.ack()

self.channel.basic.consume(callback=on_message,
queue=self.queue_name,
no_ack=False)
self.channel.process_data_events()
channel.basic.consume(callback=on_message_second,
queue=self.queue_name,
no_ack=False)
channel.process_data_events()
self.assertEqual(len(inbound_messages), 1)

@setup(queue=True)
Expand Down
93 changes: 0 additions & 93 deletions amqpstorm/tests/functional/legacy_tests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import time

from amqpstorm import Channel
from amqpstorm.tests.utility import TestFunctionalFramework
from amqpstorm.tests.utility import setup

Expand All @@ -9,98 +8,6 @@ class LegacyFunctionalTests(TestFunctionalFramework):
def configure(self):
self.disable_logging_validation()

@setup(queue=True)
def test_functional_start_stop_consumer_tuple(self):
self.channel.queue.declare(self.queue_name)
self.channel.confirm_deliveries()

for _ in range(5):
self.channel.basic.publish(body=self.message,
routing_key=self.queue_name)

# Store and inbound messages.
inbound_messages = []

def on_message(body, channel, method, properties):
self.assertIsInstance(body, (bytes, str))
self.assertIsInstance(channel, Channel)
self.assertIsInstance(properties, dict)
self.assertIsInstance(method, dict)
inbound_messages.append(body)
if len(inbound_messages) >= 5:
channel.stop_consuming()

self.channel.basic.consume(callback=on_message,
queue=self.queue_name,
no_ack=True)

# Sleep for 0.01s to make sure RabbitMQ has time to catch up.
time.sleep(0.01)

self.channel.start_consuming(to_tuple=True)

# Make sure all five messages were downloaded.
self.assertEqual(len(inbound_messages), 5)

@setup(queue=True)
def test_functional_publish_and_consume_five_messages_tuple(self):
self.channel.queue.declare(self.queue_name)
self.channel.confirm_deliveries()

for _ in range(5):
self.channel.basic.publish(body=self.message,
routing_key=self.queue_name)

# Store and inbound messages.
inbound_messages = []

def on_message(body, channel, method, properties):
self.assertEqual(body, self.message.encode('utf-8'))
self.assertIsInstance(body, (bytes, str))
self.assertIsInstance(channel, Channel)
self.assertIsInstance(properties, dict)
self.assertIsInstance(method, dict)
inbound_messages.append(body)

self.channel.basic.consume(callback=on_message,
queue=self.queue_name,
no_ack=True)

# Sleep for 0.01s to make sure RabbitMQ has time to catch up.
time.sleep(0.01)

self.channel.process_data_events(to_tuple=True)

# Make sure all five messages were downloaded.
self.assertEqual(len(inbound_messages), 5)

@setup(queue=True)
def test_functional_generator_consume(self):
self.channel.queue.declare(self.queue_name)
self.channel.confirm_deliveries()
for _ in range(5):
self.channel.basic.publish(body=self.message,
routing_key=self.queue_name)
self.channel.basic.consume(queue=self.queue_name,
no_ack=True)
# Sleep for 0.01s to make sure RabbitMQ has time to catch up.
time.sleep(0.01)

# Store and inbound messages.
inbound_messages = []
for message in self.channel.build_inbound_messages(
break_on_empty=True,
to_tuple=True):
self.assertIsInstance(message, tuple)
self.assertIsInstance(message[0], bytes)
self.assertIsInstance(message[1], Channel)
self.assertIsInstance(message[2], dict)
self.assertIsInstance(message[3], dict)
inbound_messages.append(message)

# Make sure all five messages were downloaded.
self.assertEqual(len(inbound_messages), 5)

@setup(queue=True)
def test_functional_publish_and_get_five_messages(self):
self.channel.queue.declare(self.queue_name)
Expand Down
2 changes: 1 addition & 1 deletion amqpstorm/tests/functional/reliability_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def test_functional_publish_and_consume_5k_messages(self):

start_time = time.time()
while (self.messages_consumed != self.messages_to_send and
time.time() - start_time < 60):
time.time() - start_time < 60):
time.sleep(0.1)

self.assertEqual(self.messages_consumed, self.messages_to_send,
Expand Down
Loading

0 comments on commit 8ae78ec

Please sign in to comment.