Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop stream shielding; it was from a legacy api design #230

Merged
merged 3 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions newsfragments/230.removal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Drop stream "shielding" support which was originally added to sidestep
a cancelled call to ``.receive()``

In the original api design a stream instance was returned directly from
a call to ``Portal.run()`` and thus there was no "exit phase" to handle
cancellations and errors which would trigger implicit closure. Now that
we have said enter/exit semantics with ``Portal.open_stream_from()`` and
``Context.open_stream()`` we can drop this implicit (and arguably
confusing) behavior.
10 changes: 5 additions & 5 deletions tests/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,12 @@ async def consume(task_status=trio.TASK_STATUS_IGNORED):
task_status.started(cs)

# shield stream's underlying channel from cancellation
with stream.shield():
# with stream.shield():

async for v in stream:
print(f'from stream: {v}')
expect.remove(v)
received.append(v)
async for v in stream:
print(f'from stream: {v}')
expect.remove(v)
received.append(v)

print('exited consume')

Expand Down
74 changes: 25 additions & 49 deletions tractor/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,29 @@


class ReceiveMsgStream(trio.abc.ReceiveChannel):
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
special behaviour for signalling stream termination across an
inter-actor ``Channel``. This is the type returned to a local task
which invoked a remote streaming function using `Portal.run()`.
'''A IPC message stream for receiving logically sequenced values
over an inter-actor ``Channel``. This is the type returned to
a local task which entered either ``Portal.open_stream_from()`` or
``Context.open_stream()``.

Termination rules:

- if the local task signals stop iteration a cancel signal is
relayed to the remote task indicating to stop streaming
- if the remote task signals the end of a stream, raise
a ``StopAsyncIteration`` to terminate the local ``async for``
- on cancellation the stream is **not** implicitly closed and the
surrounding ``Context`` is expected to handle how that cancel
is relayed to any task on the remote side.
- if the remote task signals the end of a stream the
``ReceiveChannel`` semantics dictate that a ``StopAsyncIteration``
to terminate the local ``async for``.

"""
'''
def __init__(
self,
ctx: 'Context', # typing: ignore # noqa
rx_chan: trio.abc.ReceiveChannel,
shield: bool = False,

) -> None:
self._ctx = ctx
self._rx_chan = rx_chan
self._shielded = shield

# flag to denote end of stream
self._eoc: bool = False
Expand All @@ -61,13 +62,16 @@ def receive_nowait(self):
return msg['yield']

async def receive(self):
'''Async receive a single msg from the IPC transport, the next
in sequence for this stream.

'''
# see ``.aclose()`` for notes on the old behaviour prior to
# introducing this
if self._eoc:
raise trio.EndOfChannel

try:

msg = await self._rx_chan.receive()
return msg['yield']

Expand Down Expand Up @@ -103,7 +107,6 @@ async def receive(self):
except (
trio.ClosedResourceError, # by self._rx_chan
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
trio.Cancelled, # by local cancellation
):
# XXX: we close the stream on any of these error conditions:

Expand Down Expand Up @@ -135,23 +138,6 @@ async def receive(self):

raise # propagate

@contextmanager
def shield(
self
) -> Iterator['ReceiveMsgStream']: # noqa
"""Shield this stream's underlying channel such that a local consumer task
can be cancelled (and possibly restarted) using ``trio.Cancelled``.

Note that here, "shielding" here guards against relaying
a ``'stop'`` message to the far end of the stream thus keeping
the stream machinery active and ready for further use, it does
not have anything to do with an internal ``trio.CancelScope``.

"""
self._shielded = True
yield self
self._shielded = False

async def aclose(self):
"""Cancel associated remote actor task and local memory channel
on close.
Expand All @@ -169,19 +155,6 @@ async def aclose(self):
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
return

# TODO: broadcasting to multiple consumers
# stats = rx_chan.statistics()
# if stats.open_receive_channels > 1:
# # if we've been cloned don't kill the stream
# log.debug(
# "there are still consumers running keeping stream alive")
# return

if self._shielded:
log.warning(f"{self} is shielded, portal channel being kept alive")
return

# XXX: This must be set **AFTER** the shielded test above!
self._eoc = True

# NOTE: this is super subtle IPC messaging stuff:
Expand All @@ -203,9 +176,14 @@ async def aclose(self):
# ``__aexit__()`` on teardown so it **does not** need to be
# called here.
if not self._ctx._portal:
# Only for 2 way streams can we can send stop from the
# caller side.
try:
# only for 2 way streams can we can send
# stop from the caller side
# NOTE: if this call is cancelled we expect this end to
# handle as though the stop was never sent (though if it
# was it shouldn't matter since it's unlikely a user
# will try to re-use a stream after attemping to close
# it).
await self._ctx.send_stop()

except (
Expand All @@ -217,9 +195,9 @@ async def aclose(self):
# it can't traverse the transport.
log.debug(f'Channel for {self} was already closed')

# close the local mem chan ``self._rx_chan`` ??!?
# Do we close the local mem chan ``self._rx_chan`` ??!?

# DEFINITELY NOT if we're a bi-dir ``MsgStream``!
# NO, DEFINITELY NOT if we're a bi-dir ``MsgStream``!
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver
# the potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that context has
Expand Down Expand Up @@ -397,7 +375,6 @@ async def cancel(self) -> None:
async def open_stream(

self,
shield: bool = False,

) -> AsyncGenerator[MsgStream, None]:
'''Open a ``MsgStream``, a bi-directional stream connected to the
Expand Down Expand Up @@ -455,7 +432,6 @@ async def open_stream(
async with MsgStream(
ctx=self,
rx_chan=recv_chan,
shield=shield,
) as rchan:

if self._portal:
Expand Down