Skip to content

Commit

Permalink
Re-raise any sidestepped trio.Cancelled
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed May 6, 2021
1 parent 9f38406 commit 87971de
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions tractor/_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,15 @@ async def new_proc(
await proc.wait()

log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing this subactor
# pop child entry to indicate we no longer managing this subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)

# cancel result waiter that may have been spawned in
# tandem if not done already
if cancel_scope:
log.warning(
f"Cancelling existing result waiter task for {subactor.uid}")
"Cancelling existing result waiter task for "
f"{subactor.uid}")
cancel_scope.cancel()
else:
# `multiprocessing`
Expand Down Expand Up @@ -409,8 +410,9 @@ async def mp_new_proc(
# registered itself back we must be sure to try and clean
# any process we may have started.

reaping_cancelled = False
cancel_scope = None
reaping_cancelled: bool = False
cancel_scope: Optional[trio.CancelScope] = None
cancel_exc: Optional[trio.Cancelled] = None

if portal in actor_nursery._cancel_after_result_on_exit:
try:
Expand All @@ -422,7 +424,9 @@ async def mp_new_proc(
subactor,
errors
)
except trio.Cancelled:
except trio.Cancelled as err:
cancel_exc = err

# if the reaping task was cancelled we may have hit
# a race where the subproc disconnected before we
# could send it a message to cancel (classic 2 generals)
Expand All @@ -437,20 +441,24 @@ async def mp_new_proc(
if cs.cancelled_caught:
proc.terminate()

if not reaping_cancelled:
if proc.is_alive():
await proc_waiter(proc)
if not reaping_cancelled and proc.is_alive():
await proc_waiter(proc)

# TODO: timeout block here?
proc.join()

log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing this subactor
# pop child entry to indicate we are no longer managing subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)

# cancel result waiter that may have been spawned in
# tandem if not done already
if cancel_scope:
log.warning(
f"Cancelling existing result waiter task for {subactor.uid}")
"Cancelling existing result waiter task for "
f"{subactor.uid}")
cancel_scope.cancel()

elif reaping_cancelled: # let the cancellation bubble up
assert cancel_exc
raise cancel_exc

0 comments on commit 87971de

Please sign in to comment.