Skip to content

Commit

Permalink
Drop stream shielding; it was from a legacy design
Browse files Browse the repository at this point in the history
The whole origin was not having an explicit open/close semantic for
streams. We have that now so this internal mechanic isn't needed and
further our streams become more correct by having `.aclose()` be
independent of cancellation.
  • Loading branch information
goodboy committed Sep 1, 2021
1 parent a105e32 commit 8a57cb4
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 39 deletions.
10 changes: 5 additions & 5 deletions tests/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,12 @@ async def consume(task_status=trio.TASK_STATUS_IGNORED):
task_status.started(cs)

# shield stream's underlying channel from cancellation
with stream.shield():
# with stream.shield():

async for v in stream:
print(f'from stream: {v}')
expect.remove(v)
received.append(v)
async for v in stream:
print(f'from stream: {v}')
expect.remove(v)
received.append(v)

print('exited consume')

Expand Down
38 changes: 4 additions & 34 deletions tractor/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,9 @@ def __init__(
self,
ctx: 'Context', # typing: ignore # noqa
rx_chan: trio.abc.ReceiveChannel,
shield: bool = False,
) -> None:
self._ctx = ctx
self._rx_chan = rx_chan
self._shielded = shield

# flag to denote end of stream
self._eoc: bool = False
Expand Down Expand Up @@ -103,7 +101,10 @@ async def receive(self):
except (
trio.ClosedResourceError, # by self._rx_chan
trio.EndOfChannel, # by self._rx_chan or `stop` msg from far end
trio.Cancelled, # by local cancellation

# Wait why would we do an implicit close on cancel? THAT'S
# NOT HOW MEM CHANS WORK!!?!?!?!?
# trio.Cancelled, # by local cancellation
):
# XXX: we close the stream on any of these error conditions:

Expand Down Expand Up @@ -135,23 +136,6 @@ async def receive(self):

raise # propagate

@contextmanager
def shield(
self
) -> Iterator['ReceiveMsgStream']: # noqa
"""Shield this stream's underlying channel such that a local consumer task
can be cancelled (and possibly restarted) using ``trio.Cancelled``.
Note that here, "shielding" here guards against relaying
a ``'stop'`` message to the far end of the stream thus keeping
the stream machinery active and ready for further use, it does
not have anything to do with an internal ``trio.CancelScope``.
"""
self._shielded = True
yield self
self._shielded = False

async def aclose(self):
"""Cancel associated remote actor task and local memory channel
on close.
Expand All @@ -169,18 +153,6 @@ async def aclose(self):
# https://trio.readthedocs.io/en/stable/reference-io.html#trio.abc.AsyncResource.aclose
return

# TODO: broadcasting to multiple consumers
# stats = rx_chan.statistics()
# if stats.open_receive_channels > 1:
# # if we've been cloned don't kill the stream
# log.debug(
# "there are still consumers running keeping stream alive")
# return

if self._shielded:
log.warning(f"{self} is shielded, portal channel being kept alive")
return

# XXX: This must be set **AFTER** the shielded test above!
self._eoc = True

Expand Down Expand Up @@ -397,7 +369,6 @@ async def cancel(self) -> None:
async def open_stream(

self,
shield: bool = False,

) -> AsyncGenerator[MsgStream, None]:
'''Open a ``MsgStream``, a bi-directional stream connected to the
Expand Down Expand Up @@ -455,7 +426,6 @@ async def open_stream(
async with MsgStream(
ctx=self,
rx_chan=recv_chan,
shield=shield,
) as rchan:

if self._portal:
Expand Down

0 comments on commit 8a57cb4

Please sign in to comment.