Skip to content

Commit

Permalink
Make LinkedTaskChannel trio-task-broadcastable with .subscribe()
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Apr 11, 2022
1 parent 1fb0b29 commit 7205702
Showing 1 changed file with 43 additions and 3 deletions.
46 changes: 43 additions & 3 deletions tractor/to_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
from .log import get_logger
from ._state import current_actor
from ._exceptions import AsyncioCancelled
from .trionics._broadcast import (
broadcast_receiver,
BroadcastReceiver,
)

log = get_logger(__name__)

Expand All @@ -63,6 +67,7 @@ class LinkedTaskChannel(trio.abc.Channel):
# set after ``asyncio.create_task()``
_aio_task: Optional[asyncio.Task] = None
_aio_err: Optional[BaseException] = None
_broadcaster: Optional[BroadcastReceiver] = None

async def aclose(self) -> None:
await self._from_aio.aclose()
Expand All @@ -79,7 +84,7 @@ async def receive(self) -> Any:

return await self._from_aio.receive()

async def wait_ayncio_complete(self) -> None:
async def wait_asyncio_complete(self) -> None:
await self._aio_task_complete.wait()

# def cancel_asyncio_task(self) -> None:
Expand All @@ -94,6 +99,43 @@ async def send(self, item: Any) -> None:
'''
self._to_aio.put_nowait(item)

def closed(self) -> bool:
return self._from_aio._closed

# TODO: shoud we consider some kind of "decorator" system
# that checks for structural-typing compatibliity and then
# automatically adds this ctx-mngr-as-method machinery?
@acm
async def subscribe(
self,

) -> AsyncIterator[BroadcastReceiver]:
'''
Allocate and return a ``BroadcastReceiver`` which delegates
to this inter-task channel.
This allows multiple local tasks to receive each their own copy
of this message stream.
See ``tractor._streaming.MsgStream.subscribe()`` for further
similar details.
'''
if self._broadcaster is None:

bcast = self._broadcaster = broadcast_receiver(
self,
# use memory channel size by default
self._from_aio._state.max_buffer_size, # type: ignore
receive_afunc=self.receive,
)

self.receive = bcast.receive # type: ignore

async with self._broadcaster.subscribe() as bstream:
assert bstream.key != self._broadcaster.key
assert bstream._recv == self._broadcaster._recv
yield bstream


def _run_asyncio_task(

Expand Down Expand Up @@ -334,7 +376,6 @@ def maybe_raise_aio_err(
maybe_raise_aio_err()



async def run_task(
func: Callable,
*,
Expand Down Expand Up @@ -425,7 +466,6 @@ async def aio_main(trio_main):
trio_done_fut = asyncio.Future()

def trio_done_callback(main_outcome):
actor = current_actor()

if isinstance(main_outcome, Error):
error = main_outcome.error
Expand Down

0 comments on commit 7205702

Please sign in to comment.