Skip to content

Commit

Permalink
Test that memory streams don't raise incorrectly (e.g. dropping items…
Browse files Browse the repository at this point in the history
…) when calling *_nowait() immediately after cancelling send()/receive()

* In the send_nowait() + receive() case, this bug could drop items.
* In the send() + receive_nowait() case, this bug could cause send() to
  raise even though it succeeded.
  • Loading branch information
gschaffner committed May 11, 2024
1 parent 8e8b7f5 commit 685d35b
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 16 deletions.
128 changes: 112 additions & 16 deletions tests/streams/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
fail_after,
wait_all_tasks_blocked,
)
from anyio.abc import ObjectReceiveStream, ObjectSendStream
from anyio.abc import ObjectReceiveStream, ObjectSendStream, TaskStatus
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream

if sys.version_info < (3, 11):
Expand Down Expand Up @@ -298,34 +298,130 @@ async def receiver() -> None:
receive.close()


async def test_cancel_during_receive() -> None:
async def test_cancel_during_receive_after_send_nowait() -> None:
"""
Test that cancelling a pending receive() operation does not cause an item in the
stream to be lost.
Test that cancelling a pending receive() operation immediately after an item has
been sent to that receiver does not cause the item to be lost.
"""
receiver_scope = None

async def scoped_receiver() -> None:
nonlocal receiver_scope
async def scoped_receiver(task_status: TaskStatus[CancelScope]) -> None:
with CancelScope() as receiver_scope:
task_status.started(receiver_scope)
received.append(await receive.receive())

assert receiver_scope.cancel_called

received: list[str] = []
send, receive = create_memory_object_stream[str]()
async with create_task_group() as tg:
tg.start_soon(scoped_receiver)
await wait_all_tasks_blocked()
send.send_nowait("hello")
assert receiver_scope is not None
receiver_scope.cancel()
with send, receive:
async with create_task_group() as tg:
receiver_scope = await tg.start(scoped_receiver)
await wait_all_tasks_blocked()
send.send_nowait("hello")
receiver_scope.cancel()

assert received == ["hello"]
assert received == ["hello"]

send.close()
receive.close()

async def test_cancel_during_receive_before_send_nowait() -> None:
"""
Test that cancelling a pending receive() operation immediately before an item is
sent to that receiver does not cause the item to be lost.
Note: AnyIO's memory stream behavior here currently differs slightly from Trio's
memory channel behavior. Neither will lose items in this case, but Trio's memory
channels use abort_fn to have an extra stage during cancellation delivery, so with a
Trio memory channel send_nowait() will raise WouldBlock even if the receive()
operation has not raised Cancelled yet. This test is intended only as a regression
test for the bug where AnyIO dropped items in this situation; addressing the
(possible) issue where AnyIO behaves slightly differently from Trio in this
situation (in terms of when cancellation is delivered) will involve modifying this
test. See #728.
"""

async def scoped_receiver(task_status: TaskStatus[CancelScope]) -> None:
with CancelScope() as receiver_scope:
task_status.started(receiver_scope)
received.append(await receive.receive())

assert receiver_scope.cancel_called

received: list[str] = []
send, receive = create_memory_object_stream[str]()
with send, receive:
async with create_task_group() as tg:
receiver_scope = await tg.start(scoped_receiver)
await wait_all_tasks_blocked()
receiver_scope.cancel()
send.send_nowait("hello")

assert received == ["hello"]


async def test_cancel_during_send_after_receive_nowait() -> None:
"""
Test that cancelling a pending send() operation immediately after its item has been
received does not cause send() to raise cancelled after successfully sending the
item.
"""
sender_woke = False

async def scoped_sender(task_status: TaskStatus[CancelScope]) -> None:
nonlocal sender_woke
with CancelScope() as sender_scope:
task_status.started(sender_scope)
await send.send("hello")
sender_woke = True

send, receive = create_memory_object_stream[str]()
with send, receive:
async with create_task_group() as tg:
sender_scope = await tg.start(scoped_sender)
await wait_all_tasks_blocked()
assert receive.receive_nowait() == "hello"
sender_scope.cancel()

assert sender_woke


async def test_cancel_during_send_before_receive_nowait() -> None:
"""
Test that cancelling a pending send() operation immediately before its item is
received does not cause send() to raise cancelled after successfully sending the
item.
Note: AnyIO's memory stream behavior here currently differs slightly from Trio's
memory channel behavior. Neither will allow send() to successfully send an item but
still raise cancelled after, but Trio's memory channels use abort_fn to have an
extra stage during cancellation delivery, so with a Trio memory channel
receive_nowait() will raise WouldBlock even if the send() operation has not raised
Cancelled yet. This test is intended only as a regression test for the bug where
send() incorrectly raised cancelled in this situation; addressing the (possible)
issue where AnyIO behaves slightly differently from Trio in this situation (in terms
of when cancellation is delivered) will involve modifying this test. See #728.
"""
sender_woke = False

async def scoped_sender(task_status: TaskStatus[CancelScope]) -> None:
nonlocal sender_woke
with CancelScope() as sender_scope:
task_status.started(sender_scope)
await send.send("hello")
sender_woke = True

send, receive = create_memory_object_stream[str]()
with send, receive:
async with create_task_group() as tg:
sender_scope = await tg.start(scoped_sender)
await wait_all_tasks_blocked()
sender_scope.cancel()
assert receive.receive_nowait() == "hello"

assert sender_woke


async def test_close_receive_after_send() -> None:
Expand Down
19 changes: 19 additions & 0 deletions tests/test_synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,25 @@ async def setter() -> None:
assert setter_started
assert waiter_woke

async def test_event_wait_before_cancel_before_set(self) -> None:
setter_started = waiter_woke = False

async def setter() -> None:
nonlocal setter_started
setter_started = True
assert not event.is_set()
tg.cancel_scope.cancel()
event.set()

event = Event()
async with create_task_group() as tg:
tg.start_soon(setter)
await event.wait()
waiter_woke = True

assert setter_started
assert not waiter_woke

async def test_statistics(self) -> None:
async def waiter() -> None:
await event.wait()
Expand Down

0 comments on commit 685d35b

Please sign in to comment.