Skip to content

Commit

Permalink
Stop channel based async gen streams on exit
Browse files Browse the repository at this point in the history
I'm not sure how this ever worked but when a "fake" async gen
(i.e. function with special `chan`, `cid` kwargs) is completed
we need to signal the end of the stream just like with normal
async gens. Also don't fail when trying to remove tasks that were
never tracked.

Fixes #46
  • Loading branch information
Tyler Goodlet committed Nov 30, 2018
1 parent 58ebacf commit a588047
Showing 1 changed file with 11 additions and 2 deletions.
13 changes: 11 additions & 2 deletions tractor/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from itertools import chain
import importlib
import inspect
import traceback
import uuid
import typing
from typing import Dict, List, Tuple, Any, Optional, Union
Expand Down Expand Up @@ -115,6 +114,10 @@ async def _invoke(
with trio.open_cancel_scope() as cs:
task_status.started(cs)
await coro
if not cs.cancelled_caught:
# task was not cancelled so we can instruct the
# far end async gen to tear down
await chan.send({'stop': None, 'cid': cid})
else:
await chan.send({'functype': 'asyncfunction', 'cid': cid})
with trio.open_cancel_scope() as cs:
Expand All @@ -137,7 +140,13 @@ async def _invoke(
# RPC task bookeeping
tasks = actor._rpc_tasks.get(chan, None)
if tasks:
tasks.remove((cs, func))
try:
tasks.remove((cs, func))
except ValueError:
# If we're cancelled before the task returns then the
# cancel scope will not have been inserted yet
log.warn(
f"Task {func} was likely cancelled before it was started")

if not tasks:
actor._rpc_tasks.pop(chan, None)
Expand Down

0 comments on commit a588047

Please sign in to comment.