Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should closing receive stream remove waiting senders? #440

Closed
mbrancato opened this issue May 28, 2022 · 1 comment
Closed

Should closing receive stream remove waiting senders? #440

mbrancato opened this issue May 28, 2022 · 1 comment

Comments

@mbrancato
Copy link

Currently, when MemoryObjectReceiveStream.close() is called, it completes the asyncio.Event for the senders, this should allow any senders awaiting the Event to continue.

if self._state.open_receive_channels == 0:
send_events = list(self._state.waiting_senders.keys())
for event in send_events:
event.set()

But it does not remove the sender Event objects from the MemoryObjectStreamState. This appears to lead to BrokenResourceErrors in MemoryObjectSendStream.send() as they have not been removed from the dictionary, but they were completed by calling event.set().

if self._state.waiting_senders.pop(send_event, None): # type: ignore[arg-type]
raise BrokenResourceError

This appears to at least happen when using async with on an MemoryObjectReceiveStream object.

Is this expected behavior when MemoryObjectReceiveStream.close() is called? To prevent raising an exception, what better way would there be beside using an async context manager on the MemoryObjectReceiveStream object?

Just to note, I'm tracking this down from its use in Starlette where we periodically see this exception raised.

@agronholm
Copy link
Owner

Sorry for not noticing this before!

Is this expected behavior when MemoryObjectReceiveStream.close() is called?

Yes, as is documented here. The docstring for the send() method was inherited from the superclass and should probably be overridden here.

To prevent raising an exception, what better way would there be beside using an async context manager on the MemoryObjectReceiveStream object?

The producer end should be closed first. It's an error to send an object when nobody is ever going to receive it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants