Skip to content

Commit

Permalink
Copy the now deprecated trio.Process.aclose()
Browse files Browse the repository at this point in the history
Move it into our `_spawn.do_hard_kill()` since we do indeed rely on
the particular process killing sequence on "soft kill" failure cases.
  • Loading branch information
goodboy committed May 14, 2023
1 parent 24a0623 commit f667d16
Showing 1 changed file with 32 additions and 11 deletions.
43 changes: 32 additions & 11 deletions tractor/_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@
import platform
from typing import (
Any,
Awaitable,
Literal,
Optional,
Callable,
TypeVar,
TYPE_CHECKING,
)
from collections.abc import Awaitable

from exceptiongroup import BaseExceptionGroup
import trio
Expand Down Expand Up @@ -60,7 +59,7 @@
log = get_logger('tractor')

# placeholder for an mp start context if so using that backend
_ctx: Optional[mp.context.BaseContext] = None
_ctx: mp.context.BaseContext | None = None
SpawnMethodKey = Literal[
'trio', # supported on all platforms
'mp_spawn',
Expand All @@ -86,7 +85,7 @@ async def proc_waiter(proc: mp.Process) -> None:
def try_set_start_method(
key: SpawnMethodKey

) -> Optional[mp.context.BaseContext]:
) -> mp.context.BaseContext | None:
'''
Attempt to set the method for process starting, aka the "actor
spawning backend".
Expand Down Expand Up @@ -200,14 +199,37 @@ async def cancel_on_completion(
async def do_hard_kill(
proc: trio.Process,
terminate_after: int = 3,

) -> None:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
log.debug(f"Terminating {proc}")
with trio.move_on_after(terminate_after) as cs:
log.debug(f"Terminating {proc}")
await proc.aclose()

# NOTE: code below was copied verbatim from the now deprecated
# (in 0.20.0) ``trio._subrocess.Process.aclose()``, orig doc
# string:
#
# Close any pipes we have to the process (both input and output)
# and wait for it to exit. If cancelled, kills the process and
# waits for it to finish exiting before propagating the
# cancellation.
with trio.CancelScope(shield=True):
if proc.stdin is not None:
await proc.stdin.aclose()
if proc.stdout is not None:
await proc.stdout.aclose()
if proc.stderr is not None:
await proc.stderr.aclose()
try:
await proc.wait()
finally:
if proc.returncode is None:
proc.kill()
with trio.CancelScope(shield=True):
await proc.wait()

if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have
Expand Down Expand Up @@ -353,12 +375,11 @@ async def trio_proc(
spawn_cmd.append("--asyncio")

cancelled_during_spawn: bool = False
proc: Optional[trio.Process] = None
proc: trio.Process | None = None
try:
try:
# TODO: needs ``trio_typing`` patch?
proc = await trio.lowlevel.open_process( # type: ignore
spawn_cmd)
proc = await trio.lowlevel.open_process(spawn_cmd)

log.runtime(f"Started {proc}")

Expand Down Expand Up @@ -442,8 +463,8 @@ async def trio_proc(
nursery.cancel_scope.cancel()

finally:
# The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid
# XXX NOTE XXX: The "hard" reap since no actor zombies are
# allowed! Do this **after** cancellation/teardown to avoid
# killing the process too early.
if proc:
log.cancel(f'Hard reap sequence starting for {subactor.uid}')
Expand Down

0 comments on commit f667d16

Please sign in to comment.