diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 245cb02d..9d5f9154 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -28,6 +28,20 @@ This library adheres to `Semantic Versioning 2.0 `_. - Emit a ``ResourceWarning`` for ``MemoryObjectReceiveStream`` and ``MemoryObjectSendStream`` that were garbage collected without being closed (PR by Andrey Kazantcev) +- Fixed memory object stream operations incorrectly raising cancelled under certain + conditions where it is too late to do so: + + - Fixed memory object streams dropping items when + ``MemoryObjectSendStream.send_nowait()`` was called immediately after cancelling the + scope of an ``await MemoryObjectReceiveStream.receive()`` call (`#728 + `_) + + - Fixed ``MemoryObjectSendStream.send()`` raising cancelled despite succeeding when + ``MemoryObjectReceiveStream.receive_nowait()`` is called immediately after + cancelling the scope of the ``MemoryObjectSendStream.send()`` call (`#729 + `_) + + (PR by Ganden Schaffner) **4.3.0** diff --git a/src/anyio/streams/memory.py b/src/anyio/streams/memory.py index b16a07a4..96f30388 100644 --- a/src/anyio/streams/memory.py +++ b/src/anyio/streams/memory.py @@ -11,6 +11,7 @@ ClosedResourceError, EndOfStream, WouldBlock, + get_cancelled_exc_class, ) from ..abc import Event, ObjectReceiveStream, ObjectSendStream from ..lowlevel import checkpoint @@ -104,6 +105,11 @@ async def receive(self) -> T_co: try: await receive_event.wait() + except get_cancelled_exc_class(): + # Ignore the immediate cancellation if we already received an item, so + # as not to lose it + if not container: + raise finally: self._state.waiting_receivers.pop(receive_event, None) @@ -230,6 +236,12 @@ async def send(self, item: T_contra) -> None: self._state.waiting_senders[send_event] = item try: await send_event.wait() + except get_cancelled_exc_class(): + # Ignore the immediate cancellation if we already sent the item, so as + # to not indicate failure despite success + if send_event in self._state.waiting_senders: + del self._state.waiting_senders[send_event] + raise except BaseException: self._state.waiting_senders.pop(send_event, None) raise