Skip to content

Commit

Permalink
Fix memory streams incorrectly raising cancelled when caling *_nowait…
Browse files Browse the repository at this point in the history
…() immediately after cancelling send()/receive()

This partially reverts commit 6b0a1f3.

Co-authored-by: Alex Grönholm <alex.gronholm@nextday.fi>
  • Loading branch information
gschaffner and agronholm committed May 11, 2024
1 parent 685d35b commit 06e38da
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 0 deletions.
14 changes: 14 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.
- Emit a ``ResourceWarning`` for ``MemoryObjectReceiveStream`` and
``MemoryObjectSendStream`` that were garbage collected without being closed (PR by
Andrey Kazantcev)
- Fixed memory object stream operations incorrectly raising cancelled under certain
conditions where it is too late to do so:

- Fixed memory object streams dropping items when
``MemoryObjectSendStream.send_nowait()`` was called immediately after cancelling the
scope of an ``await MemoryObjectReceiveStream.receive()`` call (`#728
<https://github.com/agronholm/anyio/issues/728>`_)

- Fixed ``MemoryObjectSendStream.send()`` raising cancelled despite succeeding when
``MemoryObjectReceiveStream.receive_nowait()`` is called immediately after
cancelling the scope of the ``MemoryObjectSendStream.send()`` call (`#729
<https://github.com/agronholm/anyio/issues/729>`_)

(PR by Ganden Schaffner)

**4.3.0**

Expand Down
12 changes: 12 additions & 0 deletions src/anyio/streams/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ClosedResourceError,
EndOfStream,
WouldBlock,
get_cancelled_exc_class,
)
from ..abc import Event, ObjectReceiveStream, ObjectSendStream
from ..lowlevel import checkpoint
Expand Down Expand Up @@ -104,6 +105,11 @@ async def receive(self) -> T_co:

try:
await receive_event.wait()
except get_cancelled_exc_class():
# Ignore the immediate cancellation if we already received an item, so
# as not to lose it
if not container:
raise
finally:
self._state.waiting_receivers.pop(receive_event, None)

Expand Down Expand Up @@ -230,6 +236,12 @@ async def send(self, item: T_contra) -> None:
self._state.waiting_senders[send_event] = item
try:
await send_event.wait()
except get_cancelled_exc_class():
# Ignore the immediate cancellation if we already sent the item, so as
# to not indicate failure despite success
if send_event in self._state.waiting_senders:
del self._state.waiting_senders[send_event]
raise
except BaseException:
self._state.waiting_senders.pop(send_event, None)
raise
Expand Down

0 comments on commit 06e38da

Please sign in to comment.