Skip to content

Commit 82332fb

Browse files
committed
Lift fork prims into _subint_forkserver mod
The smoketest (prior commit) empirically validated the "fork-from-main-interp-worker-thread" arch on py3.14. Promote the validated primitives out of the `ai/conc-anal/` smoketest into `tractor.spawn._subint_forkserver` so they can eventually be wired into a real "subint forkserver" spawn backend. Deats, - new module `tractor/spawn/_subint_forkserver.py` (337 LOC): - `fork_from_worker_thread(child_target, thread_name)` — spawn a main-interp `threading.Thread`, call `os.fork()` from it, shuttle the child pid back to main via a pipe - `run_trio_in_subint(bootstrap, ...)` — post-fork helper: create a fresh subint + drive `_interpreters.exec()` on a dedicated worker thread running the `bootstrap` str (typically imports `trio`, defines an async entry, calls `trio.run()`) - `wait_child(pid, expect_exit_ok)` — `os.waitpid()` + pass/fail classification reusable from harness AND the eventual real spawn path - feature-gated py3.14+ via the public `concurrent.interpreters` presence check; matches the gate in `tractor.spawn._subint` - module docstring doc's the CPython-block context (cross-refs `_subint_fork` stub + the two `conc-anal/` docs) and status: EXPERIMENTAL, not yet registered in `_spawn._methods` Also, refactor the smoketest `ai/conc-anal/subint_fork_from_main_thread_smoketest.py` to import the primitives from the new module rather than inline its own copies. Keeps the smoketest and the tractor-side impl in sync as the forkserver design evolves; the smoketest remains a zero-`tractor`-runtime CPython-level check (imports ONLY the three primitives, no runtime bring-up). Status: next step is to drive these from a parent-side `trio.run()` and hook the returned child pid into the normal actor-nursery/IPC flow — then register `subint_forkserver` as a `SpawnMethodKey` in `_spawn.py`. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code
1 parent de4f470 commit 82332fb

2 files changed

Lines changed: 411 additions & 141 deletions

File tree

ai/conc-anal/subint_fork_from_main_thread_smoketest.py

Lines changed: 74 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,9 @@
6262
from __future__ import annotations
6363
import argparse
6464
import os
65-
import signal
6665
import sys
6766
import threading
6867
import time
69-
from typing import Callable
7068

7169

7270
# Hard-require py3.14 for the public `concurrent.interpreters`
@@ -84,8 +82,22 @@
8482
sys.exit(2)
8583

8684

85+
# The actual primitives this script exercises live in
86+
# `tractor.spawn._subint_forkserver` — we re-import them here
87+
# rather than inlining so the module and the validation stay
88+
# in sync. (Early versions of this file had them inline for
89+
# the "zero tractor imports" isolation guarantee; now that
90+
# CPython-level feasibility is confirmed, the validated
91+
# primitives have moved into tractor proper.)
92+
from tractor.spawn._subint_forkserver import (
93+
fork_from_worker_thread,
94+
run_trio_in_subint,
95+
wait_child,
96+
)
97+
98+
8799
# ----------------------------------------------------------------
88-
# small observability helpers
100+
# small observability helpers (test-harness only)
89101
# ----------------------------------------------------------------
90102

91103

@@ -94,49 +106,24 @@ def _banner(title: str) -> None:
94106
print(f'\n{line}\n{title}\n{line}', flush=True)
95107

96108

97-
def _wait_child(
98-
pid: int,
99-
*,
109+
def _report(
100110
label: str,
111+
*,
112+
ok: bool,
113+
status_str: str,
101114
expect_exit_ok: bool,
102-
) -> bool:
103-
'''
104-
Await a forked child's exit status and render pass/fail.
105-
106-
`expect_exit_ok=True` means we expect a normal exit (code
107-
0 via WEXITSTATUS). `expect_exit_ok=False` means we expect
108-
an abnormal death (WIFSIGNALED or nonzero WEXITSTATUS) —
109-
used for the `control_*` scenario where CPython is
110-
supposed to abort the child.
111-
112-
'''
113-
_, status = os.waitpid(pid, 0)
114-
exited_normally = os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0
115-
signaled = os.WIFSIGNALED(status)
116-
sig = os.WTERMSIG(status) if signaled else None
117-
rc = os.WEXITSTATUS(status) if os.WIFEXITED(status) else None
118-
119-
if expect_exit_ok:
120-
ok = exited_normally
121-
expected_str = 'normal exit (rc=0)'
122-
else:
123-
ok = not exited_normally
124-
expected_str = (
125-
'abnormal death (signal or nonzero exit)'
126-
)
127-
128-
verdict = 'PASS' if ok else 'FAIL'
129-
status_str = (
130-
f'signal={signal.Signals(sig).name}'
131-
if signaled
132-
else f'rc={rc}'
115+
) -> None:
116+
verdict: str = 'PASS' if ok else 'FAIL'
117+
expected_str: str = (
118+
'normal exit (rc=0)'
119+
if expect_exit_ok
120+
else 'abnormal death (signal or nonzero exit)'
133121
)
134122
print(
135123
f'[{verdict}] {label}: '
136124
f'expected {expected_str}; observed {status_str}',
137125
flush=True,
138126
)
139-
return ok
140127

141128

142129
# ----------------------------------------------------------------
@@ -256,74 +243,34 @@ def scenario_main_thread_fork() -> int:
256243
# ----------------------------------------------------------------
257244

258245

259-
def _fork_from_worker_thread(
260-
child_target: Callable[[], int] | None = None,
261-
label: str = 'worker_thread_fork',
246+
def _run_worker_thread_fork_scenario(
247+
label: str,
248+
*,
249+
child_target=None,
262250
) -> int:
263251
'''
264-
Fork from a main-interp worker thread (not a subint).
265-
Returns the child's exit code observed by the parent.
266-
267-
`child_target` is called IN THE CHILD before `os._exit`.
268-
If omitted, the child just `_exit(0)`s immediately.
269-
270-
`label` is used in the pass/fail banner so reuse of this
271-
helper across scenarios reports the scenario name, not
272-
just the underlying fork-mechanism name.
252+
Thin wrapper: delegate the actual fork to the
253+
`tractor.spawn._subint_forkserver` primitive, then wait
254+
on the child and render a pass/fail banner.
273255
274256
'''
275-
# Use a simple pipe to shuttle the child PID back to main.
276-
rfd, wfd = os.pipe()
277-
278-
def _worker() -> None:
279-
pid = os.fork()
280-
if pid == 0:
281-
# CHILD: close parent's pipe ends, do work, exit.
282-
os.close(rfd)
283-
os.close(wfd)
284-
rc = 0
285-
if child_target is not None:
286-
try:
287-
rc = child_target() or 0
288-
except BaseException as err:
289-
print(
290-
f' CHILD: child_target raised: '
291-
f'{type(err).__name__}: {err}',
292-
file=sys.stderr, flush=True,
293-
)
294-
rc = 2
295-
os._exit(rc)
296-
else:
297-
# PARENT (still in worker thread): send pid to
298-
# main thread via the pipe.
299-
os.write(wfd, pid.to_bytes(8, 'little'))
300-
301-
t = threading.Thread(
302-
target=_worker,
303-
name=f'worker-fork-thread[{label}]',
304-
daemon=False,
305-
)
306-
t.start()
307-
t.join(timeout=10.0)
308-
if t.is_alive():
309-
print(
310-
f'[FAIL] {label}: worker-thread fork driver '
311-
f'did not return in 10s',
312-
flush=True,
257+
try:
258+
pid: int = fork_from_worker_thread(
259+
child_target=child_target,
260+
thread_name=f'worker-fork-thread[{label}]',
313261
)
262+
except RuntimeError as err:
263+
print(f'[FAIL] {label}: {err}', flush=True)
314264
return 1
315-
316-
pid_bytes = os.read(rfd, 8)
317-
os.close(rfd)
318-
os.close(wfd)
319-
pid = int.from_bytes(pid_bytes, 'little')
320265
print(f' forked child pid={pid}', flush=True)
321-
322-
return 0 if _wait_child(
323-
pid,
324-
label=label,
266+
ok, status_str = wait_child(pid, expect_exit_ok=True)
267+
_report(
268+
label,
269+
ok=ok,
270+
status_str=status_str,
325271
expect_exit_ok=True,
326-
) else 1
272+
)
273+
return 0 if ok else 1
327274

328275

329276
def scenario_worker_thread_fork() -> int:
@@ -332,9 +279,8 @@ def scenario_worker_thread_fork() -> int:
332279
'(expected: child exits normally — this is the one '
333280
'that matters)'
334281
)
335-
return _fork_from_worker_thread(
336-
child_target=None,
337-
label='worker_thread_fork',
282+
return _run_worker_thread_fork_scenario(
283+
'worker_thread_fork',
338284
)
339285

340286

@@ -343,52 +289,39 @@ def scenario_worker_thread_fork() -> int:
343289
# ----------------------------------------------------------------
344290

345291

346-
def _child_trio_in_subint() -> int:
347-
'''
348-
CHILD-side: from fork-thread (main-interp), create a fresh
349-
subint and run `trio.run()` in it on a dedicated worker
350-
thread. Returns 0 on success.
351-
'''
352-
child_interp = _interpreters.create('legacy')
353-
subint_bootstrap = (
354-
'import trio\n'
355-
'async def _main():\n'
356-
' await trio.sleep(0.05)\n'
357-
' return 42\n'
358-
'result = trio.run(_main)\n'
359-
'assert result == 42, f"trio.run returned {result}"\n'
360-
'print(" CHILD subint: trio.run OK, result=42", '
361-
'flush=True)\n'
362-
)
363-
err = None
292+
_CHILD_TRIO_BOOTSTRAP: str = (
293+
'import trio\n'
294+
'async def _main():\n'
295+
' await trio.sleep(0.05)\n'
296+
' return 42\n'
297+
'result = trio.run(_main)\n'
298+
'assert result == 42, f"trio.run returned {result}"\n'
299+
'print(" CHILD subint: trio.run OK, result=42", '
300+
'flush=True)\n'
301+
)
364302

365-
def _drive() -> None:
366-
nonlocal err
367-
try:
368-
_interpreters.exec(child_interp, subint_bootstrap)
369-
except BaseException as e:
370-
err = e
371303

372-
t = threading.Thread(
373-
target=_drive,
374-
name='child-subint-trio-thread',
375-
daemon=False,
376-
)
377-
t.start()
378-
t.join(timeout=10.0)
304+
def _child_trio_in_subint() -> int:
305+
'''
306+
CHILD-side `child_target`: drive a trivial `trio.run()`
307+
inside a fresh legacy-config subint on a worker thread,
308+
using the `tractor.spawn._subint_forkserver.run_trio_in_subint`
309+
primitive. Returns 0 on success.
379310
311+
'''
380312
try:
381-
_interpreters.destroy(child_interp)
382-
except _interpreters.InterpreterError:
383-
pass
384-
385-
if t.is_alive():
313+
run_trio_in_subint(
314+
_CHILD_TRIO_BOOTSTRAP,
315+
thread_name='child-subint-trio-thread',
316+
)
317+
except RuntimeError as err:
386318
print(
387-
' CHILD: subint trio thread did not return in 10s',
319+
f' CHILD: run_trio_in_subint timed out / thread '
320+
f'never returned: {err}',
388321
flush=True,
389322
)
390323
return 3
391-
if err is not None:
324+
except BaseException as err:
392325
print(
393326
f' CHILD: subint bootstrap raised: '
394327
f'{type(err).__name__}: {err}',
@@ -403,9 +336,9 @@ def scenario_full_architecture() -> int:
403336
'[arch-full] worker-thread fork + child runs trio in a '
404337
'subint (end-to-end proposed arch)'
405338
)
406-
return _fork_from_worker_thread(
339+
return _run_worker_thread_fork_scenario(
340+
'full_architecture',
407341
child_target=_child_trio_in_subint,
408-
label='full_architecture',
409342
)
410343

411344

0 commit comments

Comments
 (0)