From b76f0ed1d077045f1653e6f4a64b3eb9bf65522f Mon Sep 17 00:00:00 2001 From: Valeryi Savich Date: Sat, 26 May 2018 11:43:05 +0300 Subject: [PATCH 1/2] Added passing a custom event loop for RPC client --- sage_utils/amqp/clients.py | 5 +++-- tests/amqp/test_rpc_client.py | 28 ++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/sage_utils/amqp/clients.py b/sage_utils/amqp/clients.py index 148347b..ce51552 100644 --- a/sage_utils/amqp/clients.py +++ b/sage_utils/amqp/clients.py @@ -12,8 +12,9 @@ class RpcAmqpClient(object): } def __init__(self, app, routing_key, request_exchange='', - response_queue=None, response_exchange=''): + response_queue=None, response_exchange='', loop=None): self.app = app + self.loop = loop or getattr(self.app, 'loop', None) or asyncio.get_event_loop() self.routing_key = routing_key self.request_exchange = request_exchange self.response_queue = response_queue @@ -59,7 +60,7 @@ async def connect(self, consume_timeout=None): queue_name=self._response_queue_name, ), timeout=consume_timeout, - loop=self.app.loop + loop=self.loop ) async def on_response(self, _channel, body, _envelope, _properties): diff --git a/tests/amqp/test_rpc_client.py b/tests/amqp/test_rpc_client.py index c7b66eb..4a1d8a2 100644 --- a/tests/amqp/test_rpc_client.py +++ b/tests/amqp/test_rpc_client.py @@ -40,6 +40,34 @@ async def test_rpc_amqp_client_returns_ok(event_loop): await extension.deinit(event_loop) +@pytest.mark.asyncio +async def test_rpc_amqp_client_returns_ok_with_custom_event_loop(event_loop): + app = Application(config=FakeConfig(), loop=event_loop) + register_worker = FakeRegisterMicroserviceWorker(app) + extension = AmqpExtension(app) + extension.register_worker(register_worker) + + await extension.init(event_loop) + + client = RpcAmqpClient( + app=app, + routing_key=REQUEST_QUEUE, + request_exchange=REQUEST_EXCHANGE, + response_queue='', + response_exchange=RESPONSE_EXCHANGE_NAME, + loop=event_loop + ) + response = await client.send(payload={'name': 'microservice', 'version': '1.0.0'}) + + assert Response.CONTENT_FIELD_NAME in response.keys() + assert response[Response.CONTENT_FIELD_NAME] == 'OK' + + assert Response.EVENT_FIELD_NAME in response.keys() + assert response[Response.EVENT_FIELD_NAME] is None + + await extension.deinit(event_loop) + + @pytest.mark.asyncio async def test_rpc_amqp_client_returns_an_error(event_loop): app = Application(config=FakeConfig(), loop=event_loop) From 013236df83b168eadc837acdf230cd0f9c555d36 Mon Sep 17 00:00:00 2001 From: Valeryi Savich Date: Sat, 26 May 2018 11:43:28 +0300 Subject: [PATCH 2/2] Updated version of the package --- sage_utils/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sage_utils/__init__.py b/sage_utils/__init__.py index 8b58586..38dbc45 100644 --- a/sage_utils/__init__.py +++ b/sage_utils/__init__.py @@ -1,4 +1,4 @@ __title__ = 'sage-utils' -__version__ = '0.5.2' +__version__ = '0.5.3' __license__ = 'BSD' VERSION = __version__