Skip to content

Commit

Permalink
Merge pull request #304 from goodboy/aio_explicit_task_cancels
Browse files Browse the repository at this point in the history
`LinkedTaskChannel.subscribe()`, explicit `asyncio` task cancel logging, `test_trioisms.py`
  • Loading branch information
goodboy committed Apr 12, 2022
2 parents 1109d96 + 9c27858 commit bfe99f2
Show file tree
Hide file tree
Showing 7 changed files with 448 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
- name: Setup python
uses: actions/setup-python@v2
with:
python-version: '3.9'
python-version: '3.10'

- name: Install dependencies
run: pip install -U . --upgrade-strategy eager -r requirements-test.txt
Expand Down
12 changes: 12 additions & 0 deletions nooz/304.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Add a new ``to_asyncio.LinkedTaskChannel.subscribe()`` which gives
task-oriented broadcast functionality semantically equivalent to
``tractor.MsgStream.subscribe()`` this makes it possible for multiple
``trio``-side tasks to consume ``asyncio``-side task msgs in tandem.

Further Improvements to the test suite were added in this patch set
including a new scenario test for a sub-actor managed "service nursery"
(implementing the basics of a "service manager") including use of
*infected asyncio* mode. Further we added a lower level
``test_trioisms.py`` to start to track issues we need to work around in
``trio`` itself which in this case included a bug we were trying to
solve related to https://github.com/python-trio/trio/issues/2258.
17 changes: 9 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
#!/usr/bin/env python
#
# tractor: a trionic actor model built on `multiprocessing` and `trio`
# tractor: structured concurrent "actors".
#
# Copyright (C) 2018-2020 Tyler Goodlet
# Copyright 2018-eternity Tyler Goodlet.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from setuptools import setup

with open('docs/README.rst', encoding='utf-8') as f:
Expand All @@ -27,7 +28,7 @@
version='0.1.0a5.dev', # alpha zone
description='structured concurrrent "actors"',
long_description=readme,
license='GPLv3',
license='AGPLv3',
author='Tyler Goodlet',
maintainer='Tyler Goodlet',
maintainer_email='jgbt@protonmail.com',
Expand Down Expand Up @@ -80,7 +81,7 @@
"Operating System :: POSIX :: Linux",
"Operating System :: Microsoft :: Windows",
"Framework :: Trio",
"License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)",
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.8",
Expand Down
173 changes: 173 additions & 0 deletions tests/test_child_manages_service_nursery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
'''
Test a service style daemon that maintains a nursery for spawning
"remote async tasks" including both spawning other long living
sub-sub-actor daemons.
'''
from typing import Optional
import asyncio
from contextlib import asynccontextmanager as acm

import pytest
import trio
from trio_typing import TaskStatus
import tractor
from tractor import RemoteActorError
from async_generator import aclosing


async def aio_streamer(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> trio.abc.ReceiveChannel:

# required first msg to sync caller
to_trio.send_nowait(None)

from itertools import cycle
for i in cycle(range(10)):
to_trio.send_nowait(i)
await asyncio.sleep(0.01)


async def trio_streamer():
from itertools import cycle
for i in cycle(range(10)):
yield i
await trio.sleep(0.01)


async def trio_sleep_and_err(delay: float = 0.5):
await trio.sleep(delay)
# name error
doggy() # noqa


_cached_stream: Optional[
trio.abc.ReceiveChannel
] = None


@acm
async def wrapper_mngr(
):
from tractor.trionics import broadcast_receiver
global _cached_stream
in_aio = tractor.current_actor().is_infected_aio()

if in_aio:
if _cached_stream:

from_aio = _cached_stream

# if we already have a cached feed deliver a rx side clone
# to consumer
async with broadcast_receiver(from_aio, 6) as from_aio:
yield from_aio
return
else:
async with tractor.to_asyncio.open_channel_from(
aio_streamer,
) as (first, from_aio):
assert not first

# cache it so next task uses broadcast receiver
_cached_stream = from_aio

yield from_aio
else:
async with aclosing(trio_streamer()) as stream:
# cache it so next task uses broadcast receiver
_cached_stream = stream
yield stream


_nursery: trio.Nursery = None


@tractor.context
async def trio_main(
ctx: tractor.Context,
):
# sync
await ctx.started()

# stash a "service nursery" as "actor local" (aka a Python global)
global _nursery
n = _nursery
assert n

async def consume_stream():
async with wrapper_mngr() as stream:
async for msg in stream:
print(msg)

# run 2 tasks to ensure broadcaster chan use
n.start_soon(consume_stream)
n.start_soon(consume_stream)

n.start_soon(trio_sleep_and_err)

await trio.sleep_forever()


@tractor.context
async def open_actor_local_nursery(
ctx: tractor.Context,
):
global _nursery
async with trio.open_nursery() as n:
_nursery = n
await ctx.started()
await trio.sleep(10)
# await trio.sleep(1)

# XXX: this causes the hang since
# the caller does not unblock from its own
# ``trio.sleep_forever()``.

# TODO: we need to test a simple ctx task starting remote tasks
# that error and then blocking on a ``Nursery.start()`` which
# never yields back.. aka a scenario where the
# ``tractor.context`` task IS NOT in the service n's cancel
# scope.
n.cancel_scope.cancel()


@pytest.mark.parametrize(
'asyncio_mode',
[True, False],
ids='asyncio_mode={}'.format,
)
def test_actor_managed_trio_nursery_task_error_cancels_aio(
asyncio_mode: bool,
arb_addr
):
'''
Verify that a ``trio`` nursery created managed in a child actor
correctly relays errors to the parent actor when one of its spawned
tasks errors even when running in infected asyncio mode and using
broadcast receivers for multi-task-per-actor subscription.
'''
async def main():

# cancel the nursery shortly after boot
async with tractor.open_nursery() as n:
p = await n.start_actor(
'nursery_mngr',
infect_asyncio=asyncio_mode,
enable_modules=[__name__],
)
async with (
p.open_context(open_actor_local_nursery) as (ctx, first),
p.open_context(trio_main) as (ctx, first),
):
await trio.sleep_forever()

with pytest.raises(RemoteActorError) as excinfo:
trio.run(main)

# verify boxed error
err = excinfo.value
assert isinstance(err.type(), NameError)
73 changes: 59 additions & 14 deletions tests/test_infected_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@
The hipster way to force SC onto the stdlib's "async": 'infection mode'.
'''
from typing import Optional, Iterable
from typing import Optional, Iterable, Union
import asyncio
import builtins
import itertools
import importlib

import pytest
import trio
import tractor
from tractor import to_asyncio
from tractor import RemoteActorError
from tractor.trionics import BroadcastReceiver


async def sleep_and_err():
await asyncio.sleep(0.1)
async def sleep_and_err(sleep_for: float = 0.1):
await asyncio.sleep(sleep_for)
assert 0


Expand Down Expand Up @@ -217,6 +219,7 @@ async def stream_from_aio(
exit_early: bool = False,
raise_err: bool = False,
aio_raise_err: bool = False,
fan_out: bool = False,

) -> None:
seq = range(100)
Expand All @@ -234,35 +237,77 @@ async def stream_from_aio(

assert first is True

async for value in chan:
print(f'trio received {value}')
pulled.append(value)

if value == 50:
if raise_err:
raise Exception
elif exit_early:
break
async def consume(
chan: Union[
to_asyncio.LinkedTaskChannel,
BroadcastReceiver,
],
):
async for value in chan:
print(f'trio received {value}')
pulled.append(value)

if value == 50:
if raise_err:
raise Exception
elif exit_early:
break

if fan_out:
# start second task that get's the same stream value set.
async with (

# NOTE: this has to come first to avoid
# the channel being closed before the nursery
# tasks are joined..
chan.subscribe() as br,

trio.open_nursery() as n,
):
n.start_soon(consume, br)
await consume(chan)

else:
await consume(chan)
finally:

if (
not raise_err and
not exit_early and
not aio_raise_err
):
assert pulled == expect
if fan_out:
# we get double the pulled values in the
# ``.subscribe()`` fan out case.
doubled = list(itertools.chain(*zip(expect, expect)))
expect = doubled[:len(pulled)]
if pulled != expect:
print(
f'uhhh pulled is {pulled}\n',
f'uhhh expect is {expect}\n',
)
assert pulled == expect

else:
assert pulled == expect
else:
assert not fan_out
assert pulled == expect[:51]

print('trio guest mode task completed!')


def test_basic_interloop_channel_stream(arb_addr):
@pytest.mark.parametrize(
'fan_out', [False, True],
ids='fan_out_w_chan_subscribe={}'.format
)
def test_basic_interloop_channel_stream(arb_addr, fan_out):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
stream_from_aio,
infect_asyncio=True,
fan_out=fan_out,
)
await portal.result()

Expand Down

0 comments on commit bfe99f2

Please sign in to comment.