Skip to content

Commit

Permalink
_AsyncConcurrentMappingIterable: get loop inside __iter__
Browse files Browse the repository at this point in the history
  • Loading branch information
ebonnal committed Jun 12, 2024
1 parent 232719a commit ca5908b
Showing 1 changed file with 3 additions and 6 deletions.
9 changes: 3 additions & 6 deletions streamable/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,6 @@ def __iter__(self) -> Iterator[Union[U, _RaisingIterator.ExceptionContainer]]:
class _AsyncConcurrentMappingIterable(
Iterable[Union[U, _RaisingIterator.ExceptionContainer]]
):
_LOOP = asyncio.get_event_loop()

def __init__(
self,
iterator: Iterator[T],
Expand All @@ -330,24 +328,23 @@ async def _safe_func(
return _RaisingIterator.ExceptionContainer(e)

def __iter__(self) -> Iterator[Union[U, _RaisingIterator.ExceptionContainer]]:
loop = asyncio.get_event_loop()
awaitables: Deque[
asyncio.Task[Union[U, _RaisingIterator.ExceptionContainer]]
] = deque()
element_to_yield: List[Union[U, _RaisingIterator.ExceptionContainer]] = []
# wait, queue, yield (FIFO)
while True:
if awaitables:
element_to_yield.append(
self._LOOP.run_until_complete(awaitables.popleft())
)
element_to_yield.append(loop.run_until_complete(awaitables.popleft()))
# queue tasks up to buffer_size
while len(awaitables) < self.buffer_size:
try:
elem = next(self.iterator)
except StopIteration:
# the upstream iterator is exhausted
break
awaitables.append(self._LOOP.create_task(self._safe_func(elem)))
awaitables.append(loop.create_task(self._safe_func(elem)))
if element_to_yield:
yield element_to_yield.pop()
if not awaitables:
Expand Down

0 comments on commit ca5908b

Please sign in to comment.