Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncConsumer runs tasks sequentially (should be in parallel?) #1924

Open
primal100 opened this issue Oct 5, 2022 · 6 comments · May be fixed by #1933
Open

AsyncConsumer runs tasks sequentially (should be in parallel?) #1924

primal100 opened this issue Oct 5, 2022 · 6 comments · May be fixed by #1933

Comments

@primal100
Copy link

primal100 commented Oct 5, 2022

The goal with async programming should be to run things in parallel where possible. However the AsyncConsumer consumes messages and runs tasks sequentially. The key is this code in utils.py.

        while True:
            # Wait for any of them to complete
            await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
            # Find the completed one(s), yield results, and replace them
            for i, task in enumerate(tasks):
                if task.done():
                    result = task.result()
                    await dispatch(result)
                    tasks[i] = asyncio.ensure_future(consumer_callables[i]())

So the loop is forced to wait for the handler to complete before getting the next message.

Using asyncio.create_task(dispatch(result)) instead of await dispatch(result) here would ensure the tasks run in parallel (it's a little bit more complicated than that as it's needed to keep track of the tasks to report exceptions and avoid warnings). I have a subclass of the AsyncConsumer for my own app which runs tasks in parallel and results in a speedup. So I could submit a PR based on that.

A better solution would be to use the new asyncio.TaskGroup coming in Python 3.11. There seems to be a backport here:

https://pypi.org/project/taskgroup/

There are other libraries implementing something similar to TaskGroup, or a simpler version could be implemented for Channels consumers to use.

What do you think?

  • What you expected to happen vs. what actually happened:
    I expect that the async consumer can receive messages and process the resulting actions in parallel. Receive message, create task, receive message. Instead the consumer receives messages, creates a task, waits for the task to complete, then receives another message.
  • How you're running Channels (runserver? daphne/runworker? Nginx/Apache in front?):
    Runworker
@LucidDan
Copy link

LucidDan commented Oct 9, 2022

I think this wouldn't be safe for ASGI events, right?
e.g. HTTP expects to process ASGI messages for a connection in order (eg receiving http request body, if it is chunked).
If you create tasks for each message, isn't there a chance of things being processed out of order? I realise in your case you are using it in runworker, but await_many_dispatch() gets passed receive() for both layers and asgi event messages, so a general implementation would need to make sure that isn't a problem.

I also wonder how this would perform in some types of work load - eg light processing load in dispatch() with large number of layer events. I have at least one app in mind that I've built where it is at times handling 1000s of messages per second and each dispatch() call is <1 second. Pretty sure adding a task to each message would slow things down, or at least would use much more memory and eventually slow things down via python garbage collection.

@carltongibson
Copy link
Member

Yes. Some profiling would be worthwhile to make sure we don't introduce an unnecessary performance regression.

@primal100
Copy link
Author

primal100 commented Oct 9, 2022

Thanks for your feedback.

I think this wouldn't be safe for ASGI events, right? e.g. HTTP expects to process ASGI messages for a connection in order (eg receiving http request body, if it is chunked). If you create tasks for each message, isn't there a chance of things being processed out of order? I realise in your case you are using it in runworker, but await_many_dispatch() gets passed receive() for both layers and asgi event messages, so a general implementation would need to make sure that isn't a problem.

I can't imagine any modern network based protocol depending on packets arriving in order. In terms of websockets for example there is no expectation messages are processed in order. Delays can happen over the network that can cause packets to arrive out of order so it's expected that the underlying protocol has some way of dealing with that (for example by using JSONRPC with an id paramater to match requests and responses). I would have thought ASGI would define a way to re-build a chunked http request body without depending on it arriving in order. But I admit I am mostly used to working with Channels Consumers in a worker context, rather than ASGI and HTTP so if someone with detailed knowledge with the protocol thinks we need to keep the existing behaviour then fair enough, but I think there could be a bool variable controlling whether the requests are handled one-by-one or concurrently. Or perhaps a separate class.

I also wonder how this would perform in some types of work load - eg light processing load in dispatch() with large number of layer events. I have at least one app in mind that I've built where it is at times handling 1000s of messages per second and each dispatch() call is <1 second. Pretty sure adding a task to each message would slow things down, or at least would use much more memory and eventually slow things down via python garbage collection.

It really depends if your dispatch calls makes I/O requests or runs no gil C extension code. If your dispatch is purely cpu-bound python code, then you should be using the SyncConsumer. The current AsyncConsumer already creates one future for every dispatch call (utils.py:51), and a task is just a subclass of a future, basically the same thing with a different api, so it shouldn't slow things down compared to the current implementation. If your app is pure python cpu-bound code and running in the same process, then you are just adding asyncio overhead for nothing. The channels docs makes this clear, that you should only use AsyncConsumers for tasks runnning in parallel.

"We recommend that you write SyncConsumers by default, and only use AsyncConsumers in cases where you know you are doing something that would be improved by async handling (long-running tasks that could be done in parallel) and you are only using async-native libraries."

If your dispatch runs I/O bound or no-gil C extension tasks, then I guarantee that app you've built would see a performance boost from concurrency. Yes, asyncio is designed to run 1000s of tasks concurrently. It's not designed for idil waiting of co-routines.

I will admit that when the current implementation is used for HTTP each connection has it's own consumer instance running tasks, so you have concurrency across connections. Still, concurrency within a connection would be nice to have, and of course really important for a worker.

If you are worried about creating 1000s of tasks or if the protocol requires tasks being performed in order then I think you should be using the Sync Consumer. There is no point in using asyncio "in name only" where you use the nice async/await api but don't get the benefits of it, and only the added overhead of an event loop for no reason. It's a feature of asyncronous programming that things are done out of order and any modern app or protocol should be able to deal with that. But if there are legacy protocols that don't, then they should use the Sync Consumer. But I acknowledge that if some developers are already using the Async Consumer and are sure they need tasks done in order you wouldn't want to break backward compatibility. A separate AsyncConcurrentConsumer class may be needed in that case. But the docs have always made clear that the AsyncConsumer should only be used for tasks that can be run in parallel (and hence finish out of order).

This is even more relevant with Django ORM now supporting async queries as of version 4.1. The current AsyncConsumer would be waiting for one ORM request to complete before moving onto the next one, instead of doing them concurrently. It defeats the purpose of Asyncronous programming.

Anyway, I'll work on an exploratory PR using Task Groups with profiling and we can take it from there. It will be interesting for you @LucidDan to feedback then how it works with your existing app.

@carltongibson
Copy link
Member

Hey @primal100 — I've been digging into this a little bit — It would be good to see your PR (even if draft) if you have something you can share? Thanks.

@primal100 primal100 linked a pull request Oct 14, 2022 that will close this issue
@carltongibson carltongibson linked a pull request Oct 15, 2022 that will close this issue
@carltongibson
Copy link
Member

e.g. HTTP expects to process ASGI messages for a connection in order (eg receiving http request body, if it is chunked).

I think this is handled by AsyncHttpConsumer.http_request waiting for the whole body before handing off to the handle() implementation:

async def http_request(self, message):
"""
Async entrypoint - concatenates body fragments and hands off control
to ``self.handle`` when the body has been completely received.
"""
if "body" in message:
self.body.append(message["body"])
if not message.get("more_body"):
try:
await self.handle(b"".join(self.body))
finally:
await self.disconnect()
raise StopConsumer()

@LucidDan
Copy link

e.g. HTTP expects to process ASGI messages for a connection in order (eg receiving http request body, if it is chunked).

I think this is handled by AsyncHttpConsumer.http_request waiting for the whole body before handing off to the handle() implementation:

async def http_request(self, message):
"""
Async entrypoint - concatenates body fragments and hands off control
to ``self.handle`` when the body has been completely received.
"""
if "body" in message:
self.body.append(message["body"])
if not message.get("more_body"):
try:
await self.handle(b"".join(self.body))
finally:
await self.disconnect()
raise StopConsumer()

This might be a case of me not thinking in async enough - I think this is only a potential issue if Tasks created by create_task can execute out of order (ie does the event loop guarantee that tasks are started in the order they are created in?)

I was thinking of the scenario of http_request receiving chunks out of order, but if asyncio maintains ordering of tasks it's not possible for that to happen, I guess?

I'll try to get some performance testing in today with (and without) the PR. I've been wanting to set up some tests to compare v3 and v4 anyway, so it'll serve both purposes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants