Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev #105

Merged
merged 4 commits into from
Apr 21, 2024
Merged

Dev #105

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
Changelog
=========

1.9.0 (2024-04-22)
------------------

- aio-pika custom response exchange support added.


1.8.3 (2023-12-15)
------------------

Expand Down
2 changes: 1 addition & 1 deletion pjrpc/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
__description__ = 'Extensible JSON-RPC library'
__url__ = 'https://github.com/dapper91/pjrpc'

__version__ = '1.8.3'
__version__ = '1.9.0'

__author__ = 'Dmitry Pershin'
__email__ = 'dapper1291@gmail.com'
Expand Down
59 changes: 42 additions & 17 deletions pjrpc/server/integration/aio_pika.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,33 @@ class Executor:
`aio_pika <https://aio-pika.readthedocs.io/en/latest/>`_ based JSON-RPC server.

:param broker_url: broker connection url
:param queue_name: requests queue name
:param rx_queue_name: requests queue name
:param tx_exchange_name: response exchange name
:param tx_routing_key: response routing key
:param prefetch_count: worker prefetch count
:param kwargs: dispatcher additional arguments
"""

def __init__(self, broker_url: URL, queue_name: str, prefetch_count: int = 0, **kwargs: Any):
def __init__(
self,
broker_url: URL,
rx_queue_name: str,
tx_exchange_name: str = None,
tx_routing_key: str = None,
prefetch_count: int = 0,
**kwargs: Any
):
self._broker_url = broker_url
self._queue_name = queue_name
self._rx_queue_name = rx_queue_name
self._tx_exchange_name = tx_exchange_name
self._tx_routing_key = tx_routing_key
self._prefetch_count = prefetch_count

self._connection = aio_pika.connection.Connection(broker_url)
self._channel: Optional[aio_pika.abc.AbstractChannel] = None

self._queue: Optional[aio_pika.abc.AbstractQueue] = None
self._exchange: Optional[aio_pika.abc.AbstractExchange] = None
self._consumer_tag: Optional[str] = None

self._dispatcher = pjrpc.server.AsyncDispatcher(**kwargs)
Expand All @@ -40,17 +53,24 @@ def dispatcher(self) -> pjrpc.server.AsyncDispatcher:

return self._dispatcher

async def start(self, queue_args: Optional[Dict[str, Any]] = None) -> None:
async def start(
self,
queue_args: Optional[Dict[str, Any]] = None,
exchange_args: Optional[Dict[str, Any]] = None
) -> None:
"""
Starts executor.

:param queue_args: queue arguments
:param exchange_args: exchange arguments
"""

await self._connection.connect()
self._channel = channel = await self._connection.channel()

self._queue = queue = await channel.declare_queue(self._queue_name, **(queue_args or {}))
self._queue = queue = await channel.declare_queue(self._rx_queue_name, **(queue_args or {}))
if self._tx_exchange_name:
self._exchange = await channel.declare_exchange(self._tx_exchange_name, **(exchange_args or {}))
await channel.set_qos(prefetch_count=self._prefetch_count)
self._consumer_tag = await queue.consume(self._rpc_handle)

Expand Down Expand Up @@ -78,19 +98,24 @@ async def _rpc_handle(self, message: aio_pika.abc.AbstractIncomingMessage) -> No
response_text = await self._dispatcher.dispatch(message.body.decode(), context=message)

if response_text is not None:
if reply_to is None:
logger.warning("property 'reply_to' is missing")
if self._tx_routing_key:
routing_key = self._tx_routing_key
elif reply_to:
routing_key = reply_to
else:
async with self._connection.channel() as channel:
await channel.default_exchange.publish(
aio_pika.Message(
body=response_text.encode(),
reply_to=reply_to,
correlation_id=message.correlation_id,
content_type=pjrpc.common.DEFAULT_CONTENT_TYPE,
),
routing_key=reply_to,
)
routing_key = ""
logger.warning("property 'reply_to' or 'tx_routing_key' missing")
async with self._connection.channel() as channel:
exchange = self._exchange if self._exchange else channel.default_exchange
await exchange.publish(
aio_pika.Message(
body=response_text.encode(),
reply_to=reply_to,
correlation_id=message.correlation_id,
content_type=pjrpc.common.DEFAULT_CONTENT_TYPE,
),
routing_key=routing_key,
)

await message.ack()

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pjrpc"
version = "1.8.3"
version = "1.9.0"
description = "Extensible JSON-RPC library"
authors = ["Dmitry Pershin <dapper1291@gmail.com>"]
license = "Unlicense"
Expand Down