New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed #32798 -- Fixed ASGIHandler to run response iterators in sync context #14526
Conversation
@@ -239,7 +268,7 @@ async def send_response(self, response, send): | |||
if response.streaming: | |||
# Access `__iter__` and not `streaming_content` directly in case | |||
# it has been overridden in a subclass. | |||
for part in response: | |||
async for part in _sync_to_async_iterator(response): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious - what was your thought process behind writing an iterator converter rather than just wrapping this whole set of code inside if
inside sync_to_async?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be honest, my original thought process happened when I was trying to get this working without any code changes to Django itself, so I was trying to make the smallest change to existing code as possible. Not exactly a relevant frame of mind anymore. I also thought it would be simpler, but it doesn't turn out that way because the usual tools like sync_to_async
don't handle async generators.
In my testing of other solutions it seems that an async_to_sync
call inside sync_to_async
is a lot slower. I just tried your suggestion like this:
def process_sync_response():
# Access `__iter__` and not `streaming_content` directly in
# case it has been overridden in a subclass.
for part in response:
for chunk, _ in self.chunk_bytes(part):
async_to_sync(send)({
'type': 'http.response.body',
'body': chunk,
# Ignore "more" as there may be more parts; instead,
# use an empty final closing message with False.
'more_body': True,
})
await sync_to_async(process_sync_response)()
Using the same profiling as before, it runs in 16.2 msec per loop
, compared to 3.64 msec per loop
in the main branch. I don't have a good concept of how much those numbers matter, so no arguments if reducing the code complexity is worth the performance hit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your solution honestly isn't that much more complex, so I'm not saying we shouldn't do it, I was just curious how you ended up here! I think the resulting patch is pretty nice - I will need to take more time to properly review it but I like it on first glance.
django/core/handlers/asgi.py
Outdated
|
||
def sync_iterator(): | ||
for item in iterator: | ||
q.put_nowait(item) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I correct that this approach can result in loading the entire response into memory before starting to send the chunks? If so, it seems like that's a possible downside because it appears to go against StreamingHttpResponse
's purpose, e.g.:
StreamingHttpResponse
should only be used in situations where it is absolutely required that the whole content isn’t iterated before transferring the data to the client.
Without this change, it looks like send_response()
only has one bytestring of the streaming response's iterator in memory at a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't necessarily - the queue means it'll be unloaded as it's loaded - but I do wonder if giving the queue an explicit length and using normal put()
, or some similar way of limiting the amount of memory used, would be sensible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wouldn't necessarily but it could in theory. In practice, I'm not sure.
It's definitely a tradeoff, since calling put()
would have to use async_to_sync(put)()
so it would be slower. Although it occurs to me this could still use put_nowait()
even if there is a maxsize
, but I'm not sure what to do if QueueFull
is raised. Sleep for a short amount of time and try again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, yes, I forgot this is an async queue so put()
won't just sleep synchronously. I think catching QueueFull and sleeping would accomplish the same thing, though, and give you a queue with one sync "end" and one async "end",
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts on the amount of time to sleep? Magic numbers are weird. I'll make it 0.1s when I update the PR to use a sentinel value but we can change it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also... thoughts on the size of the queue? I feel like setting it to 1 could raise a lot of QueueFull
errors depending on how the view is written. Would a small queue size like 5 be against the spirit of a streaming response?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well the thing is that what StreamingResponse returns isn't guaranteed to be single bytes - if I remember right, it can be chunks of any size - so the queue size isn't going to directly dictate the number of bytes stored in memory. 5 would be a good first start, but I wonder if there's a way to cap the number of bytes too here, along with the sentinel value idea.
django/core/handlers/asgi.py
Outdated
task = asyncio.create_task(sync_to_async(sync_iterator)()) | ||
try: | ||
while True: | ||
next_item = asyncio.create_task(q.get()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like you should be able to do this without creating a new task for each call to get()
and instead awaiting get()
directly. For example, you could add a sentinel value to the queue when the sync iterator has been exhausted and check whether the retrieved value is the sentinel to know whether any items are left.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sentinel value is a good idea! I think it will make the logic simpler. I'll work on making this change and post a comment when I've updated the PR.
- Changed queue to use sentinal value - Bound response queue size - Limit bytes in queue
I pushed a commit with sentinel value implementation, bounded queue size, and limited bytes in the queue.
Another issue is that Sorry for all the back and forth on this PR. I guess I need more guidance on this than I thought.... getting async code right is hard. |
As long as the queue is limited to a size that's a small number (say 5 or less), it doesn't seem like counting and limiting the bytes should be necessary. The reason is that the streaming response object already needs to / has the ability to limit the size of the items it yields. So the total bytes would already be limited to the size of the queue times that, which is the same order of magnitude. I would actually be more worried about the items yielded by the iterator being too small. To address that, I think making the size customizable e.g. by a class attribute would be good enough. That way people will have two options: (1) changing the class attribute, and (2) altering the streaming response object to yield bigger items. Note that currently, the handler chunks responses into pieces of size django/django/core/handlers/asgi.py Lines 129 to 130 in 225d965
(The handler may have a separate issue worth addressing of not combining items together for sending when the items are small. For example, the Django docs have an example of streaming a large CSV file, where the response items are the individual lines. In that case, the lines are likely much smaller than the chunk size, so sending via the async handler might be inefficient -- but I can't say for sure because I didn't look closely.) |
I've been working with Michael to find a better way to use sync iterators in an async context. The problem with the above implementation is:
Making a sync-to-async iterator that uses bounded memory and supports backpressure (blocks the sync side if the async side is slow) turns out to be pretty tricky. I put some thought into a more general approach and came up with this: class SyncIteratorToAsync:
def __init__(self, iterator, queue_size=50, thread_sensitive=True):
self.iterator = iterator
self.q = collections.deque()
self.loop = asyncio.get_running_loop()
self.full_event = threading.Event()
self.empty_event = asyncio.Event()
self.sentinel = object()
self.task = asyncio.create_task(
sync_to_async(self._sync_iterator, thread_sensitive=thread_sensitive)()
)
self.queue_size = queue_size
self.should_exit = False
def _sync_iterator(self):
try:
for item in self.iterator:
if len(self.q) >= self.queue_size:
# Queue is full, block waiting for an item to be removed
self.full_event.wait()
# Queue has at least one free slot
self.full_event.clear()
# Check if we should exit after clearing the Event, to avoid a race condition if this
# runs after the exit flag was set but before the Event was set.
if self.should_exit:
break
was_empty = len(self.q) == 0
self.q.append(item)
if was_empty:
# Signal that the Q is no longer empty
self.loop.call_soon_threadsafe(self.empty_event.set)
finally:
self.q.append(self.sentinel)
self.loop.call_soon_threadsafe(self.empty_event.set)
async def __aiter__(self):
try:
while True:
if len(self.q) == 0:
# Queue is empty, await for an item to be added
await self.empty_event.wait()
# Queue has at least one item
self.empty_event.clear()
was_full = len(self.q) >= self.queue_size
item = self.q.popleft()
if was_full and len(self.q) < self.queue_size:
# Signal that the queue is no longer full
self.full_event.set()
if item is self.sentinel:
break
yield item
# Raise any errors that may have occurred in the sync thread
await self.task
finally:
self.should_exit = True
# Wake up the thread if it was waiting
self.full_event.set()
self.task.cancel() While this adds quite a bit of more complexity, it's also quite a bit faster. Here's the highlights:
If this approach sounds good, we can update the PR. This may also be a handy utility to include in asgiref. |
Gosh, that's a lot of code. The idea seems better - I would want to see it in an actual PR with the current tests passing and more tests, though, as well as finding a weekend to review it in the excruciating detail any async bridge code ends up needing! |
I understand, and having had a chance to think this over further, I'm not sure I like my solution due to its complexity. Michael and I ended up side-stepping this issue by using Using def process_sync_response():
# Access `__iter__` and not `streaming_content` directly in
# case it has been overridden in a subclass.
for part in response:
for chunk, _ in self.chunk_bytes(part):
async_to_sync(send)({
'type': 'http.response.body',
'body': chunk,
# Ignore "more" as there may be more parts; instead,
# use an empty final closing message with False.
'more_body': True,
})
await sync_to_async(process_sync_response)() For us, that ends up being okay since it's only for development convenience to run a single dev server. To be clear, this means I probably won't have time to work the above into an actual PR with tests, sorry. |
That's fine - as you can tell, I am not blessed with a lot of time at the moment either, hence my slow reply. This is not the sort of thing that's an immediate awful bug, so it's not super pressing. |
What do you think about me opening a new PR with the simple solution of wrapping the streaming response code in |
A simpler but slower solution beats one that doesn't work, if you ask me, so I think that's a decent idea. |
Closing in favour of #14652 as a simpler, albeit slower, fix seems to be the consensus. (It was also mentioned that providing a more performant general helper in |
ticket-32798
I did some quick performance tests to make sure this isn't significantly slower. I set up a view with a streaming response yielding 100 items. timeit reports "3.04 msec per loop," compared to "3.64 msec per loop" on the main branch, so pretty much the same. The alternative implementations I tried are much slower (using
async_to_sync(Queue.put)()
instead ofQueue.put_nowait()
is 21 msec per loop; not using a queue is 15.5 msec per loop).