Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to AsyncClient for handling out of order responses (prototype attached) #154

Closed
embray opened this issue Aug 13, 2020 · 5 comments

Comments

@embray
Copy link
Contributor

embray commented Aug 13, 2020

Problem: Some RPC calls might themselves take a long time to process on the server side. It would be nice if the client could dispatch multiple calls concurrently without the assumption that responses will come back from the server in the order they were sent, and then match incoming responses to the correct requests.

Here's a prototype I came up with using websockets. It does a few things a bit hackishly in order to be able to easily subclass WebsocketClient, and doesn't yet support batch requests. But I think this functionality could easily be implemented directly in AsyncClient:

import asyncio, websockets, json
from jsonrpcclient.clients.websockets_client import WebSocketsClient
from jsonrpcclient.response import Response

class WebSocketsMultiClient(WebSocketsClient):
    def __init__(self, socket, *args, **kwargs):
        super().__init__(socket, *args, **kwargs)
        self.pending_responses = {}
        self.receiving = False

    def __enter__(self):
        self.receiving = asyncio.ensure_future(self.receive_responses())
        return self

    def __exit__(self, *args):
        self.receiving.cancel()

    async def receive_responses(self):
        while True:
            response_text = await self.socket.recv()
            try:
                # Not a proper JSON-RPC response so we just ignore it
                # since during this mode all messages received from the
                # socket should be RPC responses
                # NOTE: We can avoid having to re-parse the response if this functionality
                # were built directly into AsyncClient.send, for example.
                response = json.loads(response_text)
                response_id = response['id']
                queue = self.pending_responses[response_id]
            except (json.JSONDecodeError, KeyError):
                continue

            await queue.put(response_text)

    async def send(self, request, **kwargs):
        # Override AsyncClient.send to also pass through the request's ID to send_message
        kwargs['request_id'] = request.get('id', None)
        return await super().send(request, **kwargs)

    async def send_message(self, request, response_expected, request_id=None, **kwargs):
        if response_expected:
            queue = self.pending_responses[request_id] = asyncio.Queue()

        await self.socket.send(request)

        if response_expected:
            # As a sanity measure, wait for both the receive_responses task and
            # the queue.  If the receive_responses task returns first that
            # typically means an error occurred (e.g. the websocket was closed)
            # If the completed task was receive_responses, when we call
            # result() it will raise any exception that occurred.
            done, pending = await asyncio.wait([queue.get(), self.receiving],
                    return_when=asyncio.FIRST_COMPLETED)

            for task in done:
                # Raises an exception if task is self.receiving
                result = task.result()
                if task is not self.receiving:
                    response = result

            del self.pending_responses[request_id]
            return Response(response)
        else:
            return Response('')

This is used like:

with WebSocketsMultiClient(websocket):
    # Make RPC requests here

While in the context manager of the client, the client handles all incoming messages in the receive_responses loop and adds them to a queue for each pending response. Messages that aren't expected RPC responses are ignored (TODO: One could easily register a fallback handler for this case as well.)

Here's a full client example:

async def main():
    async def ping(client, message):
        print(f'pinging with message: {message}')
        response = await client.ping(message)
        print(f'response from ping {message}: {response.data.result}')

    async with websockets.connect('ws://localhost:5000') as ws:
        client = WebSocketsMultiClient(ws)
        with client:
            await asyncio.gather(*[ping(client, n) for n in range(10)])
loop.run_until_complete(main())

And a corresponding server implementation that can handle multiple ongoing RPC calls:

import asyncio, websockets, random
from jsonrpcserver import method, async_dispatch as dispatch

@method
async def ping(message):
    # Simulate some work
    wait = random.randint(0, 30)
    await asyncio.sleep(wait)
    return f'pong {message} after waiting {wait} seconds'

async def dispatch_request(queue, request):
    response = await dispatch(request)
    if response.wanted:
        await queue.put(response)

async def handle_requests(websocket, queue):
    while True:
        try:
            request = await websocket.recv()
        except websockets.ConnectionClosed:
            break
        print(f'received ws request {request}')
        asyncio.ensure_future(dispatch_request(queue, request))

async def handle_responses(websocket, queue):
    while True:
        response = await queue.get()
        try:
            await websocket.send(str(response))
        except websockets.ConnectionClosed:
            break

async def main(websocket, path):
    queue = asyncio.Queue()
    await asyncio.gather(
        handle_requests(websocket, queue),
        handle_responses(websocket, queue))

start_server = websockets.serve(main, 'localhost', 5000)
loop = asyncio.get_event_loop()
loop.run_until_complete(start_server)
loop.run_forever()

Some sample output from the client side:

pinging with message: 7
pinging with message: 1
pinging with message: 2
pinging with message: 6
pinging with message: 4
pinging with message: 3
pinging with message: 0
pinging with message: 5
pinging with message: 9
pinging with message: 8
response from ping 1: pong 1 after waiting 1 seconds
response from ping 5: pong 5 after waiting 1 seconds
response from ping 8: pong 8 after waiting 6 seconds
response from ping 0: pong 0 after waiting 8 seconds
response from ping 7: pong 7 after waiting 12 seconds
response from ping 4: pong 4 after waiting 18 seconds
response from ping 3: pong 3 after waiting 18 seconds
response from ping 9: pong 9 after waiting 23 seconds
response from ping 2: pong 2 after waiting 29 seconds
response from ping 6: pong 6 after waiting 29 seconds

I think this is a valuable use-case and I was surprised I couldn't easily find much else like it already existing, at least for Python.

Update: Fixed an issue where if an error occurred in receive_responses, send_message could block forever waiting for an item on the queue that never arrives. When awaiting queue.get() always check for errors on the receive_responses task as well (e.g. if the websocket connection closes before the response is received).

@delaaxe
Copy link

delaaxe commented Mar 5, 2021

Great work. I'm surprised this is not included in jsonrpc by default. Websocket responses are not necessarily the next message recv'd.

@embray
Copy link
Contributor Author

embray commented Mar 8, 2021

@delaaxe Thanks--I've been meaning to open a PR to add this as a standard feature, but in the meantime you can use code like I posted above.

@delaaxe
Copy link

delaaxe commented Mar 8, 2021

Thanks, I actually used this successfully! Added a callback to the constructor to be notified on "non-response" messages

@embray
Copy link
Contributor Author

embray commented Mar 8, 2021

@delaaxe Cool! Thanks for the feedback.

@bcb
Copy link
Member

bcb commented Aug 19, 2021

Clients being removed from v4. Transport will be handled by the user. #171

@bcb bcb closed this as completed Aug 19, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants