Skip to content

Commit

Permalink
Merge pull request #230 from goodboy/drop_stream_shielding
Browse files Browse the repository at this point in the history
Drop stream shielding; it was from a legacy api design
  • Loading branch information
goodboy committed Sep 2, 2021
2 parents 7e98afa + 558c44f commit e5845b5
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 54 deletions.
9 changes: 9 additions & 0 deletions newsfragments/230.removal.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Drop stream "shielding" support which was originally added to sidestep
a cancelled call to ``.receive()``

In the original api design a stream instance was returned directly from
a call to ``Portal.run()`` and thus there was no "exit phase" to handle
cancellations and errors which would trigger implicit closure. Now that
we have said enter/exit semantics with ``Portal.open_stream_from()`` and
``Context.open_stream()`` we can drop this implicit (and arguably
confusing) behavior.
10 changes: 5 additions & 5 deletions tests/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,12 @@ async def consume(task_status=trio.TASK_STATUS_IGNORED):
task_status.started(cs)

# shield stream's underlying channel from cancellation
with stream.shield():
# with stream.shield():

async for v in stream:
print(f'from stream: {v}')
expect.remove(v)
received.append(v)
async for v in stream:
print(f'from stream: {v}')
expect.remove(v)
received.append(v)

print('exited consume')

Expand Down
74 changes: 25 additions & 49 deletions tractor/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,29 @@


class ReceiveMsgStream(trio.abc.ReceiveChannel):
"""A wrapper around a ``trio._channel.MemoryReceiveChannel`` with
special behaviour for signalling stream termination across an
inter-actor ``Channel``. This is the type returned to a local task
which invoked a remote streaming function using `Portal.run()`.
'''A IPC message stream for receiving logically sequenced values
over an inter-actor ``Channel``. This is the type returned to
a local task which entered either ``Portal.open_stream_from()`` or
``Context.open_stream()``.
Termination rules:
- if the local task signals stop iteration a cancel signal is
relayed to the remote task indicating to stop streaming
- if the remote task signals the end of a stream, raise
a ``StopAsyncIteration`` to terminate the local ``async for``
- on cancellation the stream is **not** implicitly closed and the
surrounding ``Context`` is expected to handle how that cancel
is relayed to any task on the remote side.
- if the remote task signals the end of a stream the
``ReceiveChannel`` semantics dictate that a ``StopAsyncIteration``
to terminate the local ``async for``.
"""
'''
def __init__(
self,
ctx: 'Context', # typing: ignore # noqa
rx_chan: trio.abc.ReceiveChannel,
shield: bool = False,

) -> None:
self._ctx = ctx
self._rx_chan = rx_chan
self._shielded = shield

# flag to denote end of stream
self._eoc: bool = False
Expand All @@ -61,13 +62,16 @@ def receive_nowait(self):
return msg['yield']

async def receive(self):
'''Async receive a single msg from the IPC transport, the next
in sequence for this stream.
'''
# see ``.aclose()`` for notes on the old behaviour prior to
# introducing this
if self._eoc:
raise trio.EndOfChannel

try:

msg = await self._rx_chan.receive()
return msg['yield']

Expand Down Expand Up @@ -103,7 +107,6 @@ async def receive(self):
except (
trio.ClosedResourceError, # by self._rx_chan
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
trio.Cancelled, # by local cancellation
):
# XXX: we close the stream on any of these error conditions:

Expand Down Expand Up @@ -135,23 +138,6 @@ async def receive(self):

raise # propagate

@contextmanager
def shield(
self
) -> Iterator['ReceiveMsgStream']: # noqa
"""Shield this stream's underlying channel such that a local consumer task
can be cancelled (and possibly restarted) using ``trio.Cancelled``.
Note that here, "shielding" here guards against relaying
a ``'stop'`` message to the far end of the stream thus keeping
the stream machinery active and ready for further use, it does
not have anything to do with an internal ``trio.CancelScope``.
"""
self._shielded = True
yield self
self._shielded = False

async def aclose(self):
"""Cancel associated remote actor task and local memory channel
on close.
Expand All @@ -169,19 +155,6 @@ async def aclose(self):
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
return

# TODO: broadcasting to multiple consumers
# stats = rx_chan.statistics()
# if stats.open_receive_channels > 1:
# # if we've been cloned don't kill the stream
# log.debug(
# "there are still consumers running keeping stream alive")
# return

if self._shielded:
log.warning(f"{self} is shielded, portal channel being kept alive")
return

# XXX: This must be set **AFTER** the shielded test above!
self._eoc = True

# NOTE: this is super subtle IPC messaging stuff:
Expand All @@ -203,9 +176,14 @@ async def aclose(self):
# ``__aexit__()`` on teardown so it **does not** need to be
# called here.
if not self._ctx._portal:
# Only for 2 way streams can we can send stop from the
# caller side.
try:
# only for 2 way streams can we can send
# stop from the caller side
# NOTE: if this call is cancelled we expect this end to
# handle as though the stop was never sent (though if it
# was it shouldn't matter since it's unlikely a user
# will try to re-use a stream after attemping to close
# it).
await self._ctx.send_stop()

except (
Expand All @@ -217,9 +195,9 @@ async def aclose(self):
# it can't traverse the transport.
log.debug(f'Channel for {self} was already closed')

# close the local mem chan ``self._rx_chan`` ??!?
# Do we close the local mem chan ``self._rx_chan`` ??!?

# DEFINITELY NOT if we're a bi-dir ``MsgStream``!
# NO, DEFINITELY NOT if we're a bi-dir ``MsgStream``!
# BECAUSE this same core-msg-loop mem recv-chan is used to deliver
# the potential final result from the surrounding inter-actor
# `Context` so we don't want to close it until that context has
Expand Down Expand Up @@ -397,7 +375,6 @@ async def cancel(self) -> None:
async def open_stream(

self,
shield: bool = False,

) -> AsyncGenerator[MsgStream, None]:
'''Open a ``MsgStream``, a bi-directional stream connected to the
Expand Down Expand Up @@ -455,7 +432,6 @@ async def open_stream(
async with MsgStream(
ctx=self,
rx_chan=recv_chan,
shield=shield,
) as rchan:

if self._portal:
Expand Down

0 comments on commit e5845b5

Please sign in to comment.