Skip to content

Commit d7ca68c

Browse files
committed
Mv trio_proc/mp_proc to per-backend submods
Split the monolithic `spawn._spawn` into a slim "core" + per-backend submodules so a future `._subint` backend (per issue #379) can drop in without piling more onto `_spawn.py`. `._spawn` retains the cross-backend supervisor machinery: `SpawnMethodKey`, `_methods` registry, `_spawn_method`/`_ctx` state, `try_set_start_method()`, the `new_proc()` dispatcher, and the shared helpers `exhaust_portal()`, `cancel_on_completion()`, `hard_kill()`, `soft_kill()`, `proc_waiter()`. Deats, - mv `trio_proc()` → new `spawn._trio` - mv `mp_proc()` → new `spawn._mp`, reads `_ctx` and `_spawn_method` via `from . import _spawn` for late binding bc both get mutated by `try_set_start_method()` - `_methods` wires up the new submods via late bottom-of-module imports to side-step circular dep (both backend mods pull shared helpers from `._spawn`) - prune now-unused imports from `_spawn.py` — `sys`, `is_root_process`, `current_actor`, `is_main_process`, `_mp_main`, `ActorFailure`, `pretty_struct`, `_pformat` Also, - `_testing.pytest.pytest_generate_tests()` now drives the valid-backend set from `typing.get_args(SpawnMethodKey)` so adding a new backend (e.g. `'subint'`) doesn't require touching the harness - refresh `spawn/__init__.py` docstring for the new layout (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
1 parent b5b0504 commit d7ca68c

5 files changed

Lines changed: 565 additions & 418 deletions

File tree

tractor/_testing/pytest.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import platform
2828
from typing import (
2929
Callable,
30+
get_args,
3031
)
3132

3233
import pytest
@@ -341,12 +342,11 @@ def pytest_generate_tests(
341342
# XXX some weird windows bug with `pytest`?
342343
spawn_backend = 'trio'
343344

344-
# TODO: maybe just use the literal `._spawn.SpawnMethodKey`?
345-
assert spawn_backend in (
346-
'mp_spawn',
347-
'mp_forkserver',
348-
'trio',
349-
)
345+
# drive the valid-backend set from the canonical `Literal` so
346+
# adding a new spawn backend (e.g. `'subint'`) doesn't require
347+
# touching the harness.
348+
from tractor.spawn._spawn import SpawnMethodKey
349+
assert spawn_backend in get_args(SpawnMethodKey)
350350

351351
# NOTE: used-to-be-used-to dyanmically parametrize tests for when
352352
# you just passed --spawn-backend=`mp` on the cli, but now we expect

tractor/spawn/__init__.py

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,31 @@
1515
# along with this program. If not, see <https://www.gnu.org/licenses/>.
1616

1717
'''
18-
Actor process spawning machinery using
19-
multiple backends (trio, multiprocessing).
18+
Actor process spawning machinery using multiple backends.
2019
21-
NOTE: to avoid circular imports, this ``__init__``
22-
does NOT eagerly import submodules. Use direct
23-
module paths like ``tractor.spawn._spawn`` or
24-
``tractor.spawn._entry`` instead.
20+
Layout
21+
------
22+
- `._spawn`: the "core" supervisor machinery — spawn-method
23+
registry (`SpawnMethodKey`, `_methods`, `_spawn_method`,
24+
`_ctx`, `try_set_start_method`), the `new_proc` dispatcher,
25+
and the cross-backend helpers `exhaust_portal`,
26+
`cancel_on_completion`, `hard_kill`, `soft_kill`,
27+
`proc_waiter`.
28+
29+
Per-backend submodules (each exposes a single `*_proc()`
30+
coroutine registered in `_spawn._methods`):
31+
32+
- `._trio`: the `trio`-native subprocess backend (default,
33+
all platforms), spawns via `trio.lowlevel.open_process()`.
34+
- `._mp`: the stdlib `multiprocessing` backends —
35+
`'mp_spawn'` and `'mp_forkserver'` variants — driven by
36+
the `mp.context` bound to `_spawn._ctx`.
37+
38+
Entry-point helpers live in `._entry`/`._mp_fixup_main`/
39+
`._forkserver_override`.
40+
41+
NOTE: to avoid circular imports, this ``__init__`` does NOT
42+
eagerly import submodules. Use direct module paths like
43+
``tractor.spawn._spawn`` or ``tractor.spawn._trio`` instead.
2544
2645
'''

tractor/spawn/_mp.py

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
# tractor: structured concurrent "actors".
2+
# Copyright 2018-eternity Tyler Goodlet.
3+
4+
# This program is free software: you can redistribute it and/or modify
5+
# it under the terms of the GNU Affero General Public License as published by
6+
# the Free Software Foundation, either version 3 of the License, or
7+
# (at your option) any later version.
8+
9+
# This program is distributed in the hope that it will be useful,
10+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
# GNU Affero General Public License for more details.
13+
14+
# You should have received a copy of the GNU Affero General Public License
15+
# along with this program. If not, see <https://www.gnu.org/licenses/>.
16+
17+
'''
18+
The `multiprocessing` subprocess spawning backends (`spawn`
19+
and `forkserver` variants).
20+
21+
Driven by the stdlib `multiprocessing` context selected via
22+
`try_set_start_method()` in the `_spawn` core module, which
23+
sets the module-global `_ctx` and `_spawn_method` read here.
24+
25+
'''
26+
from __future__ import annotations
27+
import multiprocessing as mp
28+
from typing import (
29+
Any,
30+
TYPE_CHECKING,
31+
)
32+
33+
import trio
34+
from trio import TaskStatus
35+
36+
from tractor.runtime._state import (
37+
current_actor,
38+
is_main_process,
39+
)
40+
from tractor.log import get_logger
41+
from tractor.discovery._addr import UnwrappedAddress
42+
from tractor.runtime._portal import Portal
43+
from tractor.runtime._runtime import Actor
44+
from tractor._exceptions import ActorFailure
45+
from ._entry import _mp_main
46+
# NOTE: module-import (not from-import) so we dynamically see
47+
# the *current* `_ctx` / `_spawn_method` values, which are mutated
48+
# by `try_set_start_method()` after module load time.
49+
from . import _spawn
50+
from ._spawn import (
51+
cancel_on_completion,
52+
proc_waiter,
53+
soft_kill,
54+
)
55+
56+
57+
if TYPE_CHECKING:
58+
from tractor.ipc import (
59+
_server,
60+
Channel,
61+
)
62+
from tractor.runtime._supervise import ActorNursery
63+
64+
65+
log = get_logger('tractor')
66+
67+
68+
async def mp_proc(
69+
name: str,
70+
actor_nursery: ActorNursery, # type: ignore # noqa
71+
subactor: Actor,
72+
errors: dict[tuple[str, str], Exception],
73+
# passed through to actor main
74+
bind_addrs: list[UnwrappedAddress],
75+
parent_addr: UnwrappedAddress,
76+
_runtime_vars: dict[str, Any], # serialized and sent to _child
77+
*,
78+
infect_asyncio: bool = False,
79+
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
80+
proc_kwargs: dict[str, any] = {}
81+
82+
) -> None:
83+
84+
# uggh zone
85+
try:
86+
from multiprocessing import semaphore_tracker # type: ignore
87+
resource_tracker = semaphore_tracker
88+
resource_tracker._resource_tracker = resource_tracker._semaphore_tracker # noqa
89+
except ImportError:
90+
# 3.8 introduces a more general version that also tracks shared mems
91+
from multiprocessing import resource_tracker # type: ignore
92+
93+
assert _spawn._ctx
94+
start_method = _spawn._ctx.get_start_method()
95+
if start_method == 'forkserver':
96+
97+
from multiprocessing import forkserver # type: ignore
98+
# XXX do our hackery on the stdlib to avoid multiple
99+
# forkservers (one at each subproc layer).
100+
fs = forkserver._forkserver
101+
curr_actor = current_actor()
102+
if is_main_process() and not curr_actor._forkserver_info:
103+
# if we're the "main" process start the forkserver
104+
# only once and pass its ipc info to downstream
105+
# children
106+
# forkserver.set_forkserver_preload(enable_modules)
107+
forkserver.ensure_running()
108+
fs_info = (
109+
fs._forkserver_address, # type: ignore # noqa
110+
fs._forkserver_alive_fd, # type: ignore # noqa
111+
getattr(fs, '_forkserver_pid', None),
112+
getattr(
113+
resource_tracker._resource_tracker, '_pid', None),
114+
resource_tracker._resource_tracker._fd,
115+
)
116+
else: # request to forkerserver to fork a new child
117+
assert curr_actor._forkserver_info
118+
fs_info = (
119+
fs._forkserver_address, # type: ignore # noqa
120+
fs._forkserver_alive_fd, # type: ignore # noqa
121+
fs._forkserver_pid, # type: ignore # noqa
122+
resource_tracker._resource_tracker._pid,
123+
resource_tracker._resource_tracker._fd,
124+
) = curr_actor._forkserver_info
125+
else:
126+
# spawn method
127+
fs_info = (None, None, None, None, None)
128+
129+
proc: mp.Process = _spawn._ctx.Process( # type: ignore
130+
target=_mp_main,
131+
args=(
132+
subactor,
133+
bind_addrs,
134+
fs_info,
135+
_spawn._spawn_method,
136+
parent_addr,
137+
infect_asyncio,
138+
),
139+
# daemon=True,
140+
name=name,
141+
)
142+
143+
# `multiprocessing` only (since no async interface):
144+
# register the process before start in case we get a cancel
145+
# request before the actor has fully spawned - then we can wait
146+
# for it to fully come up before sending a cancel request
147+
actor_nursery._children[subactor.aid.uid] = (subactor, proc, None)
148+
149+
proc.start()
150+
if not proc.is_alive():
151+
raise ActorFailure("Couldn't start sub-actor?")
152+
153+
log.runtime(f"Started {proc}")
154+
155+
ipc_server: _server.Server = actor_nursery._actor.ipc_server
156+
try:
157+
# wait for actor to spawn and connect back to us
158+
# channel should have handshake completed by the
159+
# local actor by the time we get a ref to it
160+
event, chan = await ipc_server.wait_for_peer(
161+
subactor.aid.uid,
162+
)
163+
164+
# XXX: monkey patch poll API to match the ``subprocess`` API..
165+
# not sure why they don't expose this but kk.
166+
proc.poll = lambda: proc.exitcode # type: ignore
167+
168+
# except:
169+
# TODO: in the case we were cancelled before the sub-proc
170+
# registered itself back we must be sure to try and clean
171+
# any process we may have started.
172+
173+
portal = Portal(chan)
174+
actor_nursery._children[subactor.aid.uid] = (subactor, proc, portal)
175+
176+
# unblock parent task
177+
task_status.started(portal)
178+
179+
# wait for ``ActorNursery`` block to signal that
180+
# subprocesses can be waited upon.
181+
# This is required to ensure synchronization
182+
# with user code that may want to manually await results
183+
# from nursery spawned sub-actors. We don't want the
184+
# containing nurseries here to collect results or error
185+
# while user code is still doing it's thing. Only after the
186+
# nursery block closes do we allow subactor results to be
187+
# awaited and reported upwards to the supervisor.
188+
with trio.CancelScope(shield=True):
189+
await actor_nursery._join_procs.wait()
190+
191+
async with trio.open_nursery() as nursery:
192+
if portal in actor_nursery._cancel_after_result_on_exit:
193+
nursery.start_soon(
194+
cancel_on_completion,
195+
portal,
196+
subactor,
197+
errors
198+
)
199+
200+
# This is a "soft" (cancellable) join/reap which
201+
# will remote cancel the actor on a ``trio.Cancelled``
202+
# condition.
203+
await soft_kill(
204+
proc,
205+
proc_waiter,
206+
portal
207+
)
208+
209+
# cancel result waiter that may have been spawned in
210+
# tandem if not done already
211+
log.warning(
212+
"Cancelling existing result waiter task for "
213+
f"{subactor.aid.uid}")
214+
nursery.cancel_scope.cancel()
215+
216+
finally:
217+
# hard reap sequence
218+
if proc.is_alive():
219+
log.cancel(f"Attempting to hard kill {proc}")
220+
with trio.move_on_after(0.1) as cs:
221+
cs.shield = True
222+
await proc_waiter(proc)
223+
224+
if cs.cancelled_caught:
225+
proc.terminate()
226+
227+
proc.join()
228+
log.debug(f"Joined {proc}")
229+
230+
# pop child entry to indicate we are no longer managing subactor
231+
actor_nursery._children.pop(subactor.aid.uid)
232+
233+
# TODO: prolly report to ``mypy`` how this causes all sorts of
234+
# false errors..
235+
# subactor, proc, portal = actor_nursery._children.pop(subactor.uid)

0 commit comments

Comments
 (0)