From 9efeaec2f75a3e7381c07dfdd4bcc67f13995dfd Mon Sep 17 00:00:00 2001 From: Erik Olof Gunnar Andersson Date: Thu, 4 Jan 2018 22:21:25 -0800 Subject: [PATCH] Multi-Callback Support [#48] (#49) * Multi-Callback Support [#48] * Added sanity check to api connection test and fixing broken test --- CHANGELOG.rst | 4 ++ README.rst | 4 ++ amqpstorm/__init__.py | 2 +- amqpstorm/basic.py | 5 +- amqpstorm/channel.py | 22 ++++---- amqpstorm/management/connection.py | 1 - amqpstorm/tests/functional/generic_tests.py | 52 +++++++++++++------ .../functional/management/connection_tests.py | 5 +- .../tests/functional/reliability_tests.py | 5 +- amqpstorm/tests/functional/web_based_tests.py | 15 ++++-- .../unit/channel/channel_exception_tests.py | 2 +- amqpstorm/tests/unit/channel/channel_tests.py | 36 ++++++++----- examples/classic_consumer.py | 51 ------------------ examples/classic_publisher.py | 28 ---------- examples/flask_threaded_rpc_client.py | 2 +- examples/robust_consumer.py | 2 +- examples/scalable_consumer.py | 7 +-- examples/scalable_rpc_server.py | 2 +- examples/simple_consumer.py | 8 ++- examples/simple_generator_consumer.py | 4 +- examples/simple_publisher.py | 4 +- examples/simple_rpc_client.py | 2 +- examples/simple_rpc_server.py | 10 ++-- examples/simple_transaction_publisher.py | 4 +- setup.py | 2 +- 25 files changed, 126 insertions(+), 153 deletions(-) delete mode 100644 examples/classic_consumer.py delete mode 100644 examples/classic_publisher.py diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 7a2742eb..7206a9cf 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,10 @@ Changelog ========= +Version 2.4.0 +------------- +- basic.consume now allows for multiple callbacks [#48]. + Version 2.3.0 ------------- - Added delivery_tag property to message. diff --git a/README.rst b/README.rst index 272f6f26..6d021417 100644 --- a/README.rst +++ b/README.rst @@ -21,6 +21,10 @@ Additional documentation is available on `amqpstorm.io = 30: + break + time.sleep(0.1) + self.assertEqual(len(inbound_messages), 1) @setup(queue=True) @@ -308,7 +324,8 @@ def test_functional_redelivered(self): self.channel.confirm_deliveries() self.channel.basic.publish(body=self.message, - routing_key=self.queue_name) + routing_key=self.queue_name, + mandatory=True) # Sleep for 0.1s to make sure RabbitMQ has time to catch up. time.sleep(0.1) @@ -323,14 +340,17 @@ def on_message(message): inbound_messages.append(message) self.assertTrue(message.redelivered) - # Sleep for 0.1s to make sure RabbitMQ has time to catch up. - time.sleep(0.1) - self.channel.basic.consume(callback=on_message, queue=self.queue_name, no_ack=True) - self.channel.process_data_events() + start_time = time.time() + while len(inbound_messages) == 0: + self.channel.process_data_events() + if time.time() - start_time >= 30: + break + time.sleep(0.1) + self.assertEqual(len(inbound_messages), 1) @setup(queue=True) diff --git a/amqpstorm/tests/functional/management/connection_tests.py b/amqpstorm/tests/functional/management/connection_tests.py index 78059b04..c555ead6 100644 --- a/amqpstorm/tests/functional/management/connection_tests.py +++ b/amqpstorm/tests/functional/management/connection_tests.py @@ -17,7 +17,10 @@ class ApiConnectionFunctionalTests(TestFunctionalFramework): def test_api_connection_get(self): api = ManagementApi(HTTP_URL, USERNAME, PASSWORD) - for conn in api.connection.list(): + connections = api.connection.list() + self.assertIsNotNone(connections) + + for conn in connections: self.assertIsInstance(api.connection.get(conn['name']), dict) @setup() diff --git a/amqpstorm/tests/functional/reliability_tests.py b/amqpstorm/tests/functional/reliability_tests.py index 55b72d9b..7c7c6fdb 100644 --- a/amqpstorm/tests/functional/reliability_tests.py +++ b/amqpstorm/tests/functional/reliability_tests.py @@ -198,8 +198,9 @@ def test_functional_publish_and_consume_5k_messages(self): consumer_thread.start() start_time = time.time() - while (self.messages_consumed != self.messages_to_send and - time.time() - start_time < 60): + while self.messages_consumed != self.messages_to_send: + if time.time() - start_time >= 60: + break time.sleep(0.1) self.assertEqual(self.messages_consumed, self.messages_to_send, diff --git a/amqpstorm/tests/functional/web_based_tests.py b/amqpstorm/tests/functional/web_based_tests.py index 5b54b2c5..e841c22e 100644 --- a/amqpstorm/tests/functional/web_based_tests.py +++ b/amqpstorm/tests/functional/web_based_tests.py @@ -61,22 +61,29 @@ def test_functional_remove_queue_while_consuming(self): self.assertFalse(self.channel._inbound) - @setup() + @setup(queue=True) def test_functional_connection_forcefully_closed(self): + self.channel.confirm_deliveries() + self.channel.queue.declare(self.queue_name) + connection_list = retry_function_wrapper(self.api.connection.list) self.assertIsNotNone(connection_list) for connection in connection_list: self.api.connection.close(connection['name']) - # Sleep for 1s to make sure RabbitMQ has time to catch up. - time.sleep(1) + start_time = time.time() + while len(self.api.connection.list()) > 0: + if time.time() - start_time >= 60: + break + time.sleep(1) self.assertRaisesRegexp( AMQPConnectionError, 'Connection was closed by remote server: ' 'CONNECTION_FORCED - Closed via management api', - self.channel.basic.publish, 'body', 'routing_key' + self.channel.basic.publish, 'body', self.queue_name, '', + None, True, False ) self.assertRaisesRegexp( diff --git a/amqpstorm/tests/unit/channel/channel_exception_tests.py b/amqpstorm/tests/unit/channel/channel_exception_tests.py index 8a713ab1..54eb7ec4 100644 --- a/amqpstorm/tests/unit/channel/channel_exception_tests.py +++ b/amqpstorm/tests/unit/channel/channel_exception_tests.py @@ -32,7 +32,7 @@ def test_chanel_callback_not_set(self): self.assertRaisesRegexp( AMQPChannelError, - 'no consumer_callback defined', + 'no consumer callback defined', channel.process_data_events ) diff --git a/amqpstorm/tests/unit/channel/channel_tests.py b/amqpstorm/tests/unit/channel/channel_tests.py index 5ab29227..f23f0a5d 100644 --- a/amqpstorm/tests/unit/channel/channel_tests.py +++ b/amqpstorm/tests/unit/channel/channel_tests.py @@ -112,7 +112,7 @@ def test_channel_process_data_events(self): message = self.message.encode('utf-8') message_len = len(message) - deliver = specification.Basic.Deliver() + deliver = specification.Basic.Deliver(consumer_tag='travis-ci') header = ContentHeader(body_size=message_len) body = ContentBody(value=message) @@ -122,7 +122,7 @@ def callback(msg): self.assertIsInstance(msg.body, str) self.assertEqual(msg.body.encode('utf-8'), message) - channel.consumer_callback = callback + channel._consumer_callbacks['travis-ci'] = callback channel.process_data_events() def test_channel_process_data_events_as_tuple(self): @@ -132,7 +132,7 @@ def test_channel_process_data_events_as_tuple(self): message = self.message.encode('utf-8') message_len = len(message) - deliver = specification.Basic.Deliver() + deliver = specification.Basic.Deliver(consumer_tag='travis-ci') header = ContentHeader(body_size=message_len) body = ContentBody(value=message) @@ -145,7 +145,7 @@ def callback(body, channel, method, properties): self.assertIsInstance(properties, dict) self.assertEqual(body, message) - channel.consumer_callback = callback + channel._consumer_callbacks['travis-ci'] = callback channel.process_data_events(to_tuple=True) def test_channel_start_consuming(self): @@ -155,7 +155,7 @@ def test_channel_start_consuming(self): message = self.message.encode('utf-8') message_len = len(message) - deliver = specification.Basic.Deliver() + deliver = specification.Basic.Deliver(consumer_tag='travis-ci') header = ContentHeader(body_size=message_len) body = ContentBody(value=message) @@ -166,28 +166,40 @@ def callback(msg): self.assertEqual(msg.body.encode('utf-8'), message) channel.set_state(channel.CLOSED) - channel.consumer_callback = callback - channel.add_consumer_tag('travis-ci') + channel._consumer_callbacks['travis-ci'] = callback channel.start_consuming() - def test_channel_start_consuming_no_consumer_tag(self): + def test_channel_start_consuming_multiple_callbacks(self): channel = Channel(0, FakeConnection(), 360) channel.set_state(channel.OPEN) message = self.message.encode('utf-8') message_len = len(message) - deliver = specification.Basic.Deliver() + deliver_one = specification.Basic.Deliver(consumer_tag='travis-ci-1') + deliver_two = specification.Basic.Deliver(consumer_tag='travis-ci-2') header = ContentHeader(body_size=message_len) body = ContentBody(value=message) - channel._inbound = [deliver, header, body] + channel._inbound = [ + deliver_one, header, body, + deliver_two, header, body + ] - def callback(msg): + def callback_one(msg): + self.assertEqual(msg.method.get('consumer_tag'), 'travis-ci-1') self.assertIsInstance(msg.body, str) self.assertEqual(msg.body.encode('utf-8'), message) - channel.consumer_callback = callback + def callback_two(msg): + self.assertEqual(msg.method.get('consumer_tag'), 'travis-ci-2') + self.assertIsInstance(msg.body, str) + self.assertEqual(msg.body.encode('utf-8'), message) + channel.set_state(channel.CLOSED) + + channel._consumer_callbacks['travis-ci-1'] = callback_one + channel._consumer_callbacks['travis-ci-2'] = callback_two + channel.start_consuming() def test_channel_open(self): diff --git a/examples/classic_consumer.py b/examples/classic_consumer.py deleted file mode 100644 index 915d7fe8..00000000 --- a/examples/classic_consumer.py +++ /dev/null @@ -1,51 +0,0 @@ -import logging - -from amqpstorm import Connection - - -logging.basicConfig(level=logging.DEBUG) - - -def on_message(body, channel, method, properties): - """This function is called when a message is received. - - :param bytes|str|unicode body: Message payload - :param Channel channel: AMQPStorm Channel - :param dict method: Message method - :param dict properties: Message properties - :return: - """ - print("Message:", body) - - # Acknowledge that we handled the message without any issues. - channel.basic.ack(delivery_tag=method['delivery_tag']) - - -def consumer(): - with Connection('127.0.0.1', 'guest', 'guest') as connection: - with connection.channel() as channel: - # Declare the Queue, 'simple_queue'. - channel.queue.declare('simple_queue') - - # Set QoS to 100. - # This will limit the consumer to only prefetch a 100 messages. - - # This is a recommended setting, as it prevents the - # consumer from keeping all of the messages in a queue to itself. - channel.basic.qos(100) - - # Start consuming the queue 'simple_queue' using the callback - # 'on_message' and last require the message to be acknowledged. - channel.basic.consume(on_message, 'simple_queue', no_ack=False) - - try: - # Start consuming messages. - # to_tuple equal to True means that messages consumed - # are returned as tuple, rather than a Message object. - channel.start_consuming(to_tuple=True) - except KeyboardInterrupt: - channel.close() - - -if __name__ == '__main__': - consumer() diff --git a/examples/classic_publisher.py b/examples/classic_publisher.py deleted file mode 100644 index a9ca9239..00000000 --- a/examples/classic_publisher.py +++ /dev/null @@ -1,28 +0,0 @@ -import logging - -from amqpstorm import Connection - - -logging.basicConfig(level=logging.DEBUG) - - -def publisher(): - with Connection('127.0.0.1', 'guest', 'guest') as connection: - with connection.channel() as channel: - # Declare the Queue, 'simple_queue'. - channel.queue.declare('simple_queue') - - # Message Properties. - properties = { - 'content_type': 'text/plain', - 'headers': {'key': 'value'} - } - - # Publish the message to a queue called, 'simple_queue'. - channel.basic.publish(body='Hello World!', - routing_key='simple_queue', - properties=properties) - - -if __name__ == '__main__': - publisher() diff --git a/examples/flask_threaded_rpc_client.py b/examples/flask_threaded_rpc_client.py index 6525f6a4..53af36d2 100644 --- a/examples/flask_threaded_rpc_client.py +++ b/examples/flask_threaded_rpc_client.py @@ -45,7 +45,7 @@ def _create_process_thread(self): def _process_data_events(self): """Process Data Events using the Process Thread.""" - self.channel.start_consuming(to_tuple=False) + self.channel.start_consuming() def _on_response(self, message): """On Response store the message with the correlation id in a local diff --git a/examples/robust_consumer.py b/examples/robust_consumer.py index 7329296f..0f0a20bb 100644 --- a/examples/robust_consumer.py +++ b/examples/robust_consumer.py @@ -48,7 +48,7 @@ def start(self): channel = self.connection.channel() channel.queue.declare('simple_queue') channel.basic.consume(self, 'simple_queue', no_ack=False) - channel.start_consuming(to_tuple=False) + channel.start_consuming() if not channel.consumer_tags: channel.close() except amqpstorm.AMQPError as why: diff --git a/examples/scalable_consumer.py b/examples/scalable_consumer.py index 187cfe9e..c94a3504 100644 --- a/examples/scalable_consumer.py +++ b/examples/scalable_consumer.py @@ -110,8 +110,9 @@ def _update_consumers(self): :return: """ # Do we need to start more consumers. - consumer_to_start = \ - min(max(self.number_of_consumers - len(self._consumers), 0), 2) + consumer_to_start = min( + max(self.number_of_consumers - len(self._consumers), 0), 2 + ) for _ in range(consumer_to_start): consumer = Consumer(self.queue) self._start_consumer(consumer) @@ -163,7 +164,7 @@ def start(self, connection): self.channel.basic.qos(1) self.channel.queue.declare(self.queue) self.channel.basic.consume(self, self.queue, no_ack=False) - self.channel.start_consuming(to_tuple=False) + self.channel.start_consuming() if not self.channel.consumer_tags: # Only close the channel if there is nothing consuming. # This is to allow messages that are still being processed diff --git a/examples/scalable_rpc_server.py b/examples/scalable_rpc_server.py index bd4e6505..b5862e32 100644 --- a/examples/scalable_rpc_server.py +++ b/examples/scalable_rpc_server.py @@ -172,7 +172,7 @@ def start(self, connection): self.channel.basic.qos(1) self.channel.queue.declare(self.rpc_queue) self.channel.basic.consume(self, self.rpc_queue, no_ack=False) - self.channel.start_consuming(to_tuple=False) + self.channel.start_consuming() if not self.channel.consumer_tags: # Only close the channel if there is nothing consuming. # This is to allow messages that are still being processed diff --git a/examples/simple_consumer.py b/examples/simple_consumer.py index 2722ecff..022f1f76 100644 --- a/examples/simple_consumer.py +++ b/examples/simple_consumer.py @@ -23,7 +23,7 @@ def on_message(message): # message.reject(requeue=True) -def consumer(): +def start_consumer(): with Connection('127.0.0.1', 'guest', 'guest') as connection: with connection.channel() as channel: # Declare the Queue, 'simple_queue'. @@ -42,12 +42,10 @@ def consumer(): try: # Start consuming messages. - # to_tuple equal to False means that messages consumed - # are returned as a Message object, rather than a tuple. - channel.start_consuming(to_tuple=False) + channel.start_consuming() except KeyboardInterrupt: channel.close() if __name__ == '__main__': - consumer() + start_consumer() diff --git a/examples/simple_generator_consumer.py b/examples/simple_generator_consumer.py index b97fd563..db51fb09 100644 --- a/examples/simple_generator_consumer.py +++ b/examples/simple_generator_consumer.py @@ -5,7 +5,7 @@ logging.basicConfig(level=logging.DEBUG) -def consumer(): +def start_consumer(): with Connection('127.0.0.1', 'guest', 'guest') as connection: with connection.channel() as channel: channel.queue.declare('simple_queue') @@ -16,4 +16,4 @@ def consumer(): if __name__ == '__main__': - consumer() + start_consumer() diff --git a/examples/simple_publisher.py b/examples/simple_publisher.py index 0beb2009..36952e4d 100644 --- a/examples/simple_publisher.py +++ b/examples/simple_publisher.py @@ -6,7 +6,7 @@ logging.basicConfig(level=logging.DEBUG) -def publisher(): +def publish_message(): with Connection('127.0.0.1', 'guest', 'guest') as connection: with connection.channel() as channel: # Declare the Queue, 'simple_queue'. @@ -26,4 +26,4 @@ def publisher(): if __name__ == '__main__': - publisher() + publish_message() diff --git a/examples/simple_rpc_client.py b/examples/simple_rpc_client.py index 2334b1c6..ef77adec 100644 --- a/examples/simple_rpc_client.py +++ b/examples/simple_rpc_client.py @@ -51,7 +51,7 @@ def call(self, number): message.publish(routing_key='rpc_queue') while not self.response: - self.channel.process_data_events(to_tuple=False) + self.channel.process_data_events() return int(self.response) def _on_response(self, message): diff --git a/examples/simple_rpc_server.py b/examples/simple_rpc_server.py index db4cdd58..9d6a9009 100644 --- a/examples/simple_rpc_server.py +++ b/examples/simple_rpc_server.py @@ -6,10 +6,6 @@ from amqpstorm import Message -CONNECTION = amqpstorm.Connection('127.0.0.1', 'guest', 'guest') -CHANNEL = CONNECTION.channel() -CHANNEL.queue.declare(queue='rpc_queue') - def fib(number): if number == 0: @@ -38,8 +34,12 @@ def on_request(message): if __name__ == '__main__': + CONNECTION = amqpstorm.Connection('127.0.0.1', 'guest', 'guest') + CHANNEL = CONNECTION.channel() + + CHANNEL.queue.declare(queue='rpc_queue') CHANNEL.basic.qos(prefetch_count=1) CHANNEL.basic.consume(on_request, queue='rpc_queue') print(" [x] Awaiting RPC requests") - CHANNEL.start_consuming(to_tuple=False) + CHANNEL.start_consuming() diff --git a/examples/simple_transaction_publisher.py b/examples/simple_transaction_publisher.py index d23dff68..ed6a24f4 100644 --- a/examples/simple_transaction_publisher.py +++ b/examples/simple_transaction_publisher.py @@ -6,7 +6,7 @@ logging.basicConfig(level=logging.DEBUG) -def publisher(): +def publish_messages(): with Connection('127.0.0.1', 'guest', 'guest') as connection: with connection.channel() as channel: # Declare the Queue, 'simple_queue'. @@ -39,4 +39,4 @@ def publisher(): if __name__ == '__main__': - publisher() + publish_messages() diff --git a/setup.py b/setup.py index 7c2615b2..523eba90 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ setup( name='AMQPStorm', - version='2.3.0', + version='2.4.0', description='Thread-safe Python RabbitMQ Client & Management library.', long_description=open('README.rst').read(), author='Erik Olof Gunnar Andersson',