Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multihomed transport (server) addrs 🕶️ #367

Draft
wants to merge 65 commits into
base: asyncio_debugger_support
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
ee151b0
Mk `gather_contexts()` support `@acm`s yielding `None`
goodboy Sep 27, 2023
3d0e955
Init-support for "multi homed" transports
goodboy Sep 27, 2023
fa9a9cf
Kick off `.devx` subpkg for our dev tools B)
goodboy Sep 28, 2023
7bed470
Start `.devx.cli` extensions for pop CLI frameworks
goodboy Sep 28, 2023
de89e3a
Add libp2p style "multi-address" parser from `piker`
goodboy Sep 29, 2023
86da79a
Rename to `parse_maddr()` and fill out doc strings
goodboy Sep 29, 2023
e94f126
Move `maybe_open_crash_handler()` CLI `--pdb`-driven wrapper to debug…
goodboy Oct 2, 2023
4314a59
Add post-mortem catch around failed transport addr binds to aid with …
goodboy Oct 3, 2023
78c0d2b
Start inter-peer cancellation test mod
goodboy Oct 5, 2023
18a1634
Add shielding support to `.pause()`
goodboy Oct 6, 2023
d24a9e1
Msg-ified `ContextCancelled`s sub-error type should always be just, i…
goodboy Oct 7, 2023
c4cd573
Drop pause line from ctx cancel handler block in test
goodboy Oct 7, 2023
a09b856
Oof, default reg addrs needs to be in `list[tuple]` form..
goodboy Oct 7, 2023
919e462
Write more comprehensive `Portal.cancel_actor()` doc str
goodboy Oct 8, 2023
575a24a
Always raise remote (cancelled) error if set
goodboy Oct 10, 2023
6d951c5
Comment all `.pause(shield=True)` attempts again, need to solve cance…
goodboy Oct 10, 2023
2fdb8fc
Factor non-yield stream msg processing into helper
goodboy Oct 16, 2023
07cec02
Add comments around diff between `C/context` refs
goodboy Oct 16, 2023
ae326cb
Ignore kbis in `open_crash_handler()` by default
goodboy Oct 16, 2023
ab2664d
Runtime level log on debug REPL exits
goodboy Oct 16, 2023
e4a6223
`._exceptions`: typing and error unpacking updates
goodboy Oct 16, 2023
534e5d1
Drop `msg` kwarg from `Context.cancel()`
goodboy Oct 17, 2023
7eb31f3
Runtime import `.get_root()` in stdin hijacker to avoid import cycle
goodboy Oct 17, 2023
63b1488
Get mega-pedantic in `Portal.open_context()`
goodboy Oct 17, 2023
43b659d
Tidy/clarify another `._runtime` comment
goodboy Oct 18, 2023
87c1113
Always set default reg addr in `find_actor()` if not defined
goodboy Oct 18, 2023
ca3f7a1
Add a first serious inter-peer remote cancel suite
goodboy Oct 18, 2023
fcc8cee
._root: set a `_default_lo_addrs` and apply it when not provided by c…
goodboy Oct 18, 2023
215fec1
Change old `._debug._pause()` name, cherry to #362 re `greenback`
goodboy Oct 18, 2023
0c74b04
Facepalm, `wait_for_actor()` dun take an addr `list`..
goodboy Oct 18, 2023
190845c
Add masked super timeout line to `do_hard_kill()` for would-be runtim…
goodboy Oct 18, 2023
1e689ee
Rename fixture `arb_addr` -> `reg_addr` and set the session value glo…
goodboy Oct 18, 2023
6b1ceee
Type out the full-fledged streaming ex.
goodboy Oct 18, 2023
0e94572
Port all tests to new `reg_addr` fixture name
goodboy Oct 18, 2023
022bf8c
Ensure `registry_addrs` is always set to something
goodboy Oct 18, 2023
2e81ccf
Dump `.msgdata` in `RemoteActorError.__repr__()`
goodboy Oct 18, 2023
42d621b
Always dynamically re-read the `._root._default_lo_addrs` value in `f…
goodboy Oct 18, 2023
a3ed30e
Get remaining suites passing..
goodboy Oct 19, 2023
1d6f555
Expose per-actor registry addrs via `.reg_addrs`
goodboy Oct 19, 2023
9da3b63
Change remaining internals to use `Actor.reg_addrs`
goodboy Oct 19, 2023
2f0bed3
Ignore `greenback` import error if not installed
goodboy Oct 19, 2023
0518b3a
Move `MessagingError` into `._exceptions` set
goodboy Oct 23, 2023
5a94e8f
Raise a `MessagingError` from the src error on msging edge cases
goodboy Oct 23, 2023
131674e
Be mega-pedantic with `ContextCancelled` semantics
goodboy Oct 23, 2023
df31047
Be ultra-correct in `Portal.open_context()`
goodboy Oct 23, 2023
f4e6346
Tweak `Channel._cancel_called` comment
goodboy Oct 23, 2023
b77d123
Fix `Context.result()` call to be in runtime scope
goodboy Oct 23, 2023
ecb525a
Adjust test details where `Context.cancel()` is called
goodboy Oct 23, 2023
ef0cfc4
Get inter-peer suite passing with all `Context` state checks!
goodboy Oct 23, 2023
d651f3d
Tons of interpeer test cleanup
goodboy Oct 25, 2023
227c9ea
Test with `any(portals)` since `gather_contexts()` will return `list[…
goodboy Nov 6, 2023
48accbd
Fix doc string "its" typo..
goodboy Nov 6, 2023
87cd725
Add `open_root_actor(ensure_registry: bool)`
goodboy Nov 7, 2023
3f15923
More thurough hard kill doc strings
goodboy Dec 11, 2023
f415fc4
`.discovery.get_arbiter()`: add warning around this now deprecated usage
goodboy Dec 12, 2023
250275d
Guarding for IPC failures in `._runtime._invoke()`
goodboy Jan 2, 2024
bea31f6
._child: remove some unused imports..
goodboy Jan 2, 2024
8143848
Use `import <name> as <name>,` style over `__all__` in pkg mod
goodboy Jan 2, 2024
0002418
`StackLevelAdapter._log(stacklevel: int)` for custom levels..
goodboy Jan 2, 2024
ce7b8a5
Drop unused walrus assign of `re`
goodboy Jan 2, 2024
fdf3a1b
Only use `greenback` if actor-runtime is up..
goodboy Jan 2, 2024
0bcdea2
Fmt repr as multi-line style call
goodboy Jan 2, 2024
734bc09
Move missing-key-in-msg raiser to `._exceptions`
goodboy Jan 2, 2024
0294455
`_root`: drop unused `typing` import
goodboy Jan 2, 2024
28ea8e7
Bump timeout on resource cache test a bitty bit.
goodboy Jan 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 14 additions & 7 deletions examples/full_fledged_streaming_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,28 @@ async def push_to_chan(portal, send_chan):
print("AGGREGATOR COMPLETE!")


# this is the main actor and *arbiter*
async def main():
# a nursery which spawns "actors"
async with tractor.open_nursery(
arbiter_addr=('127.0.0.1', 1616)
) as nursery:
async def main() -> list[int]:
'''
This is the "root" actor's main task's entrypoint.

By default (and if not otherwise specified) that root process
also acts as a "registry actor" / "registrar" on the localhost
for the purposes of multi-actor "service discovery".

'''
# yes, a nursery which spawns `trio`-"actors" B)
nursery: tractor.ActorNursery
async with tractor.open_nursery() as nursery:

seed = int(1e3)
pre_start = time.time()

portal = await nursery.start_actor(
portal: tractor.Portal = await nursery.start_actor(
name='aggregator',
enable_modules=[__name__],
)

stream: tractor.MsgStream
async with portal.open_stream_from(
aggregate,
seed=seed,
Expand Down
62 changes: 39 additions & 23 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def test_whatever():

If fixtures:

- ``arb_addr`` (a socket addr tuple where arbiter is listening)
- ``reg_addr`` (a socket addr tuple where arbiter is listening)
- ``loglevel`` (logging level passed to tractor internals)
- ``start_method`` (subprocess spawning backend)

Expand All @@ -40,16 +40,16 @@ async def test_whatever():
def wrapper(
*args,
loglevel=None,
arb_addr=None,
reg_addr=None,
start_method=None,
**kwargs
):
# __tracebackhide__ = True

if 'arb_addr' in inspect.signature(fn).parameters:
if 'reg_addr' in inspect.signature(fn).parameters:
# injects test suite fixture value to test as well
# as `run()`
kwargs['arb_addr'] = arb_addr
kwargs['reg_addr'] = reg_addr

if 'loglevel' in inspect.signature(fn).parameters:
# allows test suites to define a 'loglevel' fixture
Expand All @@ -71,7 +71,7 @@ def wrapper(
async def _main():
async with tractor.open_root_actor(
# **kwargs,
arbiter_addr=arb_addr,
registry_addrs=[reg_addr] if reg_addr else None,
loglevel=loglevel,
start_method=start_method,

Expand All @@ -92,9 +92,6 @@ async def _main():
return wrapper


_arb_addr = '127.0.0.1', random.randint(1000, 9999)


# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives
if platform.system() == 'Windows':
_KILL_SIGNAL = signal.CTRL_BREAK_EVENT
Expand Down Expand Up @@ -173,9 +170,23 @@ def ci_env() -> bool:
return _ci_env


# choose randomly at import time
_reg_addr: tuple[str, int] = (
'127.0.0.1',
random.randint(1000, 9999),
)


@pytest.fixture(scope='session')
def arb_addr():
return _arb_addr
def reg_addr() -> tuple[str, int]:

# globally override the runtime to the per-test-session-dynamic
# addr so that all tests never conflict with any other actor
# tree using the default.
from tractor import _root
_root._default_lo_addrs = [_reg_addr]

return _reg_addr


def pytest_generate_tests(metafunc):
Expand Down Expand Up @@ -216,30 +227,35 @@ def sig_prog(proc, sig):
def daemon(
loglevel: str,
testdir,
arb_addr: tuple[str, int],
reg_addr: tuple[str, int],
):
'''
Run a daemon actor as a "remote arbiter".
Run a daemon root actor as a separate actor-process tree and
"remote registrar" for discovery-protocol related tests.

'''
if loglevel in ('trace', 'debug'):
# too much logging will lock up the subproc (smh)
loglevel = 'info'

cmdargs = [
sys.executable, '-c',
"import tractor; tractor.run_daemon([], registry_addr={}, loglevel={})"
.format(
arb_addr,
"'{}'".format(loglevel) if loglevel else None)
# XXX: too much logging will lock up the subproc (smh)
loglevel: str = 'info'

code: str = (
"import tractor; "
"tractor.run_daemon([], registry_addrs={reg_addrs}, loglevel={ll})"
).format(
reg_addrs=str([reg_addr]),
ll="'{}'".format(loglevel) if loglevel else None,
)
cmd: list[str] = [
sys.executable,
'-c', code,
]
kwargs = dict()
kwargs = {}
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP

proc = testdir.popen(
cmdargs,
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
Expand Down
29 changes: 17 additions & 12 deletions tests/test_cancellation.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async def do_nuthin():
],
ids=['no_args', 'unexpected_args'],
)
def test_remote_error(arb_addr, args_err):
def test_remote_error(reg_addr, args_err):
"""Verify an error raised in a subactor that is propagated
to the parent nursery, contains the underlying boxed builtin
error type info and causes cancellation and reraising all the
Expand All @@ -57,7 +57,7 @@ def test_remote_error(arb_addr, args_err):

async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:

# on a remote type error caused by bad input args
Expand Down Expand Up @@ -97,15 +97,15 @@ async def main():
assert exc.type == errtype


def test_multierror(arb_addr):
def test_multierror(reg_addr):
'''
Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors.

'''
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:

await nursery.run_in_actor(assert_err, name='errorer1')
Expand All @@ -130,14 +130,14 @@ async def main():
@pytest.mark.parametrize(
'num_subactors', range(25, 26),
)
def test_multierror_fast_nursery(arb_addr, start_method, num_subactors, delay):
def test_multierror_fast_nursery(reg_addr, start_method, num_subactors, delay):
"""Verify we raise a ``BaseExceptionGroup`` out of a nursery where
more then one actor errors and also with a delay before failure
to test failure during an ongoing spawning.
"""
async def main():
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:

for i in range(num_subactors):
Expand Down Expand Up @@ -175,15 +175,20 @@ async def do_nothing():


@pytest.mark.parametrize('mechanism', ['nursery_cancel', KeyboardInterrupt])
def test_cancel_single_subactor(arb_addr, mechanism):
"""Ensure a ``ActorNursery.start_actor()`` spawned subactor
def test_cancel_single_subactor(reg_addr, mechanism):
'''
Ensure a ``ActorNursery.start_actor()`` spawned subactor
cancels when the nursery is cancelled.
"""

'''
async def spawn_actor():
"""Spawn an actor that blocks indefinitely.
"""
'''
Spawn an actor that blocks indefinitely then cancel via
either `ActorNursery.cancel()` or an exception raise.

'''
async with tractor.open_nursery(
arbiter_addr=arb_addr,
registry_addrs=[reg_addr],
) as nursery:

portal = await nursery.start_actor(
Expand Down
2 changes: 1 addition & 1 deletion tests/test_child_manages_service_nursery.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async def open_actor_local_nursery(
)
def test_actor_managed_trio_nursery_task_error_cancels_aio(
asyncio_mode: bool,
arb_addr
reg_addr: tuple,
):
'''
Verify that a ``trio`` nursery created managed in a child actor
Expand Down