Skip to content

Commit

Permalink
Add subscription support to message streams
Browse files Browse the repository at this point in the history
Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast
receiver around the stream for use by multiple actor-local consumer
tasks. Entering this context manager idempotently mutates the stream's
receive machinery which for now can not be undone. Move `.clone()` to
the receive stream type.

Resolves #204
  • Loading branch information
goodboy committed Aug 15, 2021
1 parent 47c637e commit a1a7cb2
Showing 1 changed file with 53 additions and 11 deletions.
64 changes: 53 additions & 11 deletions tractor/_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Message stream types and APIs.
"""
from __future__ import annotations
import inspect
from contextlib import contextmanager, asynccontextmanager
from dataclasses import dataclass
Expand All @@ -17,6 +18,7 @@
from ._ipc import Channel
from ._exceptions import unpack_error, ContextCancelled
from ._state import current_actor
from ._broadcast import broadcast_receiver, BroadcastReceiver
from .log import get_logger


Expand Down Expand Up @@ -51,6 +53,7 @@ def __init__(
self._ctx = ctx
self._rx_chan = rx_chan
self._shielded = shield
self._broadcaster: Optional[BroadcastReceiver] = None

# flag to denote end of stream
self._eoc: bool = False
Expand Down Expand Up @@ -253,6 +256,56 @@ async def aclose(self):
# still need to consume msgs that are "in transit" from the far
# end (eg. for ``Context.result()``).

def clone(self):
"""Clone this receive channel allowing for multi-task
consumption from the same channel.
"""
return type(self)(
self._ctx,
self._rx_chan.clone(),
)

@asynccontextmanager
async def subscribe(
self,

) -> BroadcastReceiver:
'''Allocate and return a ``BroadcastReceiver`` which delegates
to this message stream.
This allows multiple local tasks to receive each their own copy
of this message stream.
This operation is indempotent and and mutates this stream's
receive machinery to copy and window-length-store each received
value from the far end via the internally created broudcast
receiver wrapper.
'''
if self._broadcaster is None:
self._broadcaster = broadcast_receiver(
self,
self._rx_chan._state.max_buffer_size,
)
# override the original stream instance's receive to
# delegate to the broadcaster receive such that
# new subscribers will be copied received values
# XXX: this operation is indempotent and non-reversible,
# so be sure you can deal with any (theoretical) overhead
# of the the ``BroadcastReceiver`` before calling
# this method for the first time.

# XXX: why does this work without a recursion issue?!
self.receive = self._broadcaster.receive

async with self._broadcaster.subscribe() as bstream:
# a ``MsgStream`` clone is allocated for the
# broadcaster to track this entry's subscription
stream_clone = bstream._rx
assert stream_clone is not self
yield bstream


class MsgStream(ReceiveMsgStream, trio.abc.Channel):
"""
Expand All @@ -269,17 +322,6 @@ async def send(
'''
await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid})

# TODO: but make it broadcasting to consumers
def clone(self):
"""Clone this receive channel allowing for multi-task
consumption from the same channel.
"""
return MsgStream(
self._ctx,
self._rx_chan.clone(),
)


@dataclass
class Context:
Expand Down

0 comments on commit a1a7cb2

Please sign in to comment.