Skip to content

Commit

Permalink
Merge pull request #105 from dapper91/dev
Browse files Browse the repository at this point in the history
- aio-pika custom response exchange support added.
  • Loading branch information
dapper91 committed Apr 21, 2024
2 parents badd7eb + 33b6416 commit d4ad929
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 19 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Changelog
=========

1.9.0 (2024-04-22)
------------------

- aio-pika custom response exchange support added.


1.8.3 (2023-12-15)
------------------

Expand Down
2 changes: 1 addition & 1 deletion pjrpc/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
__description__ = 'Extensible JSON-RPC library'
__url__ = 'https://github.com/dapper91/pjrpc'

__version__ = '1.8.3'
__version__ = '1.9.0'

__author__ = 'Dmitry Pershin'
__email__ = 'dapper1291@gmail.com'
Expand Down
59 changes: 42 additions & 17 deletions pjrpc/server/integration/aio_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,33 @@ class Executor:
`aio_pika <https://aio-pika.readthedocs.io/en/latest/>`_ based JSON-RPC server.
:param broker_url: broker connection url
:param queue_name: requests queue name
:param rx_queue_name: requests queue name
:param tx_exchange_name: response exchange name
:param tx_routing_key: response routing key
:param prefetch_count: worker prefetch count
:param kwargs: dispatcher additional arguments
"""

def __init__(self, broker_url: URL, queue_name: str, prefetch_count: int = 0, **kwargs: Any):
def __init__(
self,
broker_url: URL,
rx_queue_name: str,
tx_exchange_name: str = None,
tx_routing_key: str = None,
prefetch_count: int = 0,
**kwargs: Any
):
self._broker_url = broker_url
self._queue_name = queue_name
self._rx_queue_name = rx_queue_name
self._tx_exchange_name = tx_exchange_name
self._tx_routing_key = tx_routing_key
self._prefetch_count = prefetch_count

self._connection = aio_pika.connection.Connection(broker_url)
self._channel: Optional[aio_pika.abc.AbstractChannel] = None

self._queue: Optional[aio_pika.abc.AbstractQueue] = None
self._exchange: Optional[aio_pika.abc.AbstractExchange] = None
self._consumer_tag: Optional[str] = None

self._dispatcher = pjrpc.server.AsyncDispatcher(**kwargs)
Expand All @@ -40,17 +53,24 @@ def dispatcher(self) -> pjrpc.server.AsyncDispatcher:

return self._dispatcher

async def start(self, queue_args: Optional[Dict[str, Any]] = None) -> None:
async def start(
self,
queue_args: Optional[Dict[str, Any]] = None,
exchange_args: Optional[Dict[str, Any]] = None
) -> None:
"""
Starts executor.
:param queue_args: queue arguments
:param exchange_args: exchange arguments
"""

await self._connection.connect()
self._channel = channel = await self._connection.channel()

self._queue = queue = await channel.declare_queue(self._queue_name, **(queue_args or {}))
self._queue = queue = await channel.declare_queue(self._rx_queue_name, **(queue_args or {}))
if self._tx_exchange_name:
self._exchange = await channel.declare_exchange(self._tx_exchange_name, **(exchange_args or {}))
await channel.set_qos(prefetch_count=self._prefetch_count)
self._consumer_tag = await queue.consume(self._rpc_handle)

Expand Down Expand Up @@ -78,19 +98,24 @@ async def _rpc_handle(self, message: aio_pika.abc.AbstractIncomingMessage) -> No
response_text = await self._dispatcher.dispatch(message.body.decode(), context=message)

if response_text is not None:
if reply_to is None:
logger.warning("property 'reply_to' is missing")
if self._tx_routing_key:
routing_key = self._tx_routing_key
elif reply_to:
routing_key = reply_to
else:
async with self._connection.channel() as channel:
await channel.default_exchange.publish(
aio_pika.Message(
body=response_text.encode(),
reply_to=reply_to,
correlation_id=message.correlation_id,
content_type=pjrpc.common.DEFAULT_CONTENT_TYPE,
),
routing_key=reply_to,
)
routing_key = ""
logger.warning("property 'reply_to' or 'tx_routing_key' missing")
async with self._connection.channel() as channel:
exchange = self._exchange if self._exchange else channel.default_exchange
await exchange.publish(
aio_pika.Message(
body=response_text.encode(),
reply_to=reply_to,
correlation_id=message.correlation_id,
content_type=pjrpc.common.DEFAULT_CONTENT_TYPE,
),
routing_key=routing_key,
)

await message.ack()

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pjrpc"
version = "1.8.3"
version = "1.9.0"
description = "Extensible JSON-RPC library"
authors = ["Dmitry Pershin <dapper1291@gmail.com>"]
license = "Unlicense"
Expand Down

0 comments on commit d4ad929

Please sign in to comment.