Skip to content

Commit

Permalink
deterministic remote iterator cleanup
Browse files Browse the repository at this point in the history
* standard 'async for' doesn't allow us to close the remote iterator (need PEP 533)
  • Loading branch information
wemoloh committed Oct 4, 2019
1 parent 561ffd3 commit 711215a
Showing 1 changed file with 46 additions and 19 deletions.
65 changes: 46 additions & 19 deletions eider.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,24 @@ class StopAsyncIteration(Exception):


@coroutine
def async_for(it, body, else_=None):
"""Simulate 'async for' in Python 3.4."""
iter = type(it).__aiter__(it)
anext = type(iter).__anext__
running = True
while running:
try:
x = yield from anext(iter)
except StopAsyncIteration:
running = False
else:
if (yield from body(x)):
def async_for(iterable, body):
"""Like 'async for', but properly cleans up remote resources. If PEP 533
is ever accepted, we can define RemoteIterator.__aiterclose__ and deprecate
this function in favor of plain 'async for'."""
iterator = type(iterable).__aiter__(iterable)
anext = type(iterator).__anext__
try: # In Python 3.5+, this could be 'async with iterator:'.
while True:
try:
x = yield from anext(iterator)
except StopAsyncIteration:
break
else:
if else_ is not None:
yield from else_()
else:
stop = yield from body(x)
if stop is not None:
return stop
finally:
yield from iterator.close()


class CoroutineContextManager(Coroutine):
Expand Down Expand Up @@ -788,7 +790,7 @@ class RemoteIterator:

def __init__(self, robj):
self.robj = robj
self.iter = robj.iter()
self.iter = -1

def __aiter__(self):
return self
Expand All @@ -799,10 +801,10 @@ def __anext__(self):
if iter is None:
raise StopAsyncIteration

if isinstance(iter, Future):
if iter == -1:
# first call
try:
iter = yield from iter
iter = yield from self.robj.iter()
except AttributeError:
# fallback to sequence protocol
iter = 0
Expand Down Expand Up @@ -832,10 +834,35 @@ def __anext__(self):
raise StopAsyncIteration
except Exception:
self.iter = None
iter._close()
yield from iter._close()
raise
return it['value']

def close(self):
iter = self.iter
self.iter = None
if isinstance(iter, RemoteObject):
return iter._close()
fut = Future(loop=self.robj._rsession.conn.loop)
fut.set_result(None)
return fut

@coroutine
def __aenter__(self):
return self

@coroutine
def __aexit__(self, exc_type, exc_value, traceback):
yield from self.close()

# Synchronous context manager for Python 3.4

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()


class RemoteCall(Future):

Expand Down

0 comments on commit 711215a

Please sign in to comment.