From 98956465ee4a18645b409243d627512aa6775d48 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Grasb=C3=B6ck?= Date: Wed, 27 Mar 2024 11:22:43 +0100 Subject: [PATCH 1/3] Added support for custom response exchange --- pjrpc/server/integration/aio_pika.py | 49 +++++++++++++++++++--------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/pjrpc/server/integration/aio_pika.py b/pjrpc/server/integration/aio_pika.py index ca3fa97..683ba8e 100644 --- a/pjrpc/server/integration/aio_pika.py +++ b/pjrpc/server/integration/aio_pika.py @@ -14,20 +14,33 @@ class Executor: `aio_pika `_ based JSON-RPC server. :param broker_url: broker connection url - :param queue_name: requests queue name + :param rx_queue_name: requests queue name + :param tx_exchange_name: response exchange name + :param tx_routing_key: response routing key :param prefetch_count: worker prefetch count :param kwargs: dispatcher additional arguments """ - def __init__(self, broker_url: URL, queue_name: str, prefetch_count: int = 0, **kwargs: Any): + def __init__( + self, + broker_url: URL, + rx_queue_name: str, + tx_exchange_name: str = None, + tx_routing_key: str = None, + prefetch_count: int = 0, + **kwargs: Any + ): self._broker_url = broker_url - self._queue_name = queue_name + self._rx_queue_name = rx_queue_name + self._tx_exchange_name = tx_exchange_name + self._tx_routing_key = tx_routing_key self._prefetch_count = prefetch_count self._connection = aio_pika.connection.Connection(broker_url) self._channel: Optional[aio_pika.abc.AbstractChannel] = None self._queue: Optional[aio_pika.abc.AbstractQueue] = None + self._exchange: Optional[aio_pika.abc.AbstractExchange] = None self._consumer_tag: Optional[str] = None self._dispatcher = pjrpc.server.AsyncDispatcher(**kwargs) @@ -40,17 +53,20 @@ def dispatcher(self) -> pjrpc.server.AsyncDispatcher: return self._dispatcher - async def start(self, queue_args: Optional[Dict[str, Any]] = None) -> None: + async def start(self, queue_args: Optional[Dict[str, Any]] = None, exchange_args: Optional[Dict[str, Any]] = None) -> None: """ Starts executor. :param queue_args: queue arguments + :param exchange_args: exchange arguments """ await self._connection.connect() self._channel = channel = await self._connection.channel() - self._queue = queue = await channel.declare_queue(self._queue_name, **(queue_args or {})) + self._queue = queue = await channel.declare_queue(self._rx_queue_name, **(queue_args or {})) + if self._tx_exchange_name: + self._exchange = await channel.declare_exchange(self._tx_exchange_name, **(exchange_args or {})) await channel.set_qos(prefetch_count=self._prefetch_count) self._consumer_tag = await queue.consume(self._rpc_handle) @@ -80,17 +96,18 @@ async def _rpc_handle(self, message: aio_pika.abc.AbstractIncomingMessage) -> No if response_text is not None: if reply_to is None: logger.warning("property 'reply_to' is missing") - else: - async with self._connection.channel() as channel: - await channel.default_exchange.publish( - aio_pika.Message( - body=response_text.encode(), - reply_to=reply_to, - correlation_id=message.correlation_id, - content_type=pjrpc.common.DEFAULT_CONTENT_TYPE, - ), - routing_key=reply_to, - ) + + async with self._connection.channel() as channel: + exchange = self._exchange if self._exchange else channel.default_exchange + await exchange.publish( + aio_pika.Message( + body=response_text.encode(), + reply_to=reply_to, + correlation_id=message.correlation_id, + content_type=pjrpc.common.DEFAULT_CONTENT_TYPE, + ), + routing_key=self._tx_routing_key if self._tx_routing_key else reply_to, + ) await message.ack() From a8f6a663afbdbea659dd8e68cb0d85ecdfbe382a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20Grasb=C3=B6ck?= Date: Mon, 8 Apr 2024 15:10:25 +0200 Subject: [PATCH 2/3] Pull request fixes --- pjrpc/server/integration/aio_pika.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/pjrpc/server/integration/aio_pika.py b/pjrpc/server/integration/aio_pika.py index 683ba8e..7e6e1c0 100644 --- a/pjrpc/server/integration/aio_pika.py +++ b/pjrpc/server/integration/aio_pika.py @@ -53,7 +53,11 @@ def dispatcher(self) -> pjrpc.server.AsyncDispatcher: return self._dispatcher - async def start(self, queue_args: Optional[Dict[str, Any]] = None, exchange_args: Optional[Dict[str, Any]] = None) -> None: + async def start( + self, + queue_args: Optional[Dict[str, Any]] = None, + exchange_args: Optional[Dict[str, Any]] = None + ) -> None: """ Starts executor. @@ -94,9 +98,13 @@ async def _rpc_handle(self, message: aio_pika.abc.AbstractIncomingMessage) -> No response_text = await self._dispatcher.dispatch(message.body.decode(), context=message) if response_text is not None: - if reply_to is None: - logger.warning("property 'reply_to' is missing") - + if self._tx_routing_key: + routing_key = self._tx_routing_key + elif reply_to: + routing_key = reply_to + else: + routing_key = "" + logger.warning("property 'reply_to' or 'tx_routing_key' missing") async with self._connection.channel() as channel: exchange = self._exchange if self._exchange else channel.default_exchange await exchange.publish( @@ -106,7 +114,7 @@ async def _rpc_handle(self, message: aio_pika.abc.AbstractIncomingMessage) -> No correlation_id=message.correlation_id, content_type=pjrpc.common.DEFAULT_CONTENT_TYPE, ), - routing_key=self._tx_routing_key if self._tx_routing_key else reply_to, + routing_key=routing_key, ) await message.ack() From 33b6416a847ead2380a9edec1a12508930126d17 Mon Sep 17 00:00:00 2001 From: Dmitry Pershin Date: Mon, 22 Apr 2024 00:28:55 +0500 Subject: [PATCH 3/3] bump version 1.9.0 --- CHANGELOG.rst | 6 ++++++ pjrpc/__about__.py | 2 +- pyproject.toml | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 827af0f..d72a168 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -1,6 +1,12 @@ Changelog ========= +1.9.0 (2024-04-22) +------------------ + +- aio-pika custom response exchange support added. + + 1.8.3 (2023-12-15) ------------------ diff --git a/pjrpc/__about__.py b/pjrpc/__about__.py index 90909f8..99f6ad7 100644 --- a/pjrpc/__about__.py +++ b/pjrpc/__about__.py @@ -2,7 +2,7 @@ __description__ = 'Extensible JSON-RPC library' __url__ = 'https://github.com/dapper91/pjrpc' -__version__ = '1.8.3' +__version__ = '1.9.0' __author__ = 'Dmitry Pershin' __email__ = 'dapper1291@gmail.com' diff --git a/pyproject.toml b/pyproject.toml index e1c751c..9fe673e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pjrpc" -version = "1.8.3" +version = "1.9.0" description = "Extensible JSON-RPC library" authors = ["Dmitry Pershin "] license = "Unlicense"