Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/batched_gather_return_exceptions #68

Merged
merged 3 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added clubbi_utils/common/__init__.py
Empty file.
31 changes: 27 additions & 4 deletions clubbi_utils/common/batched_gather.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,38 @@
import asyncio
from clubbi_utils.common.iter_as_chunks import iter_as_chunks
from typing import AsyncGenerator, Awaitable, Callable, Iterable, List, TypeVar
from typing import AsyncIterator, Awaitable, Callable, Iterable, List, TypeVar, overload, Literal, Union

OUTPUT = TypeVar("OUTPUT")
INPUT = TypeVar("INPUT")


async def batched_gather(
@overload
def batched_gather(
f: Callable[[INPUT], Awaitable[OUTPUT]],
collection: Iterable[INPUT],
number_of_workers: int,
) -> AsyncGenerator[List[OUTPUT], None]:
return_exceptions: Literal[True],
) -> AsyncIterator[List[Union[Exception, OUTPUT]]]:
pass


@overload
def batched_gather(
f: Callable[[INPUT], Awaitable[OUTPUT]],
collection: Iterable[INPUT],
number_of_workers: int,
return_exceptions: bool = False,
) -> AsyncIterator[List[OUTPUT]]:
pass


async def batched_gather( # type: ignore[misc]
Copy link
Contributor Author

@giuliano-macedo giuliano-macedo Apr 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote this type: ignore because i was getting: Overloaded function implementation cannot produce return type of signature 2 and it doesn't make a lot of sense to me, it seems to be related to this bug, but I'm not sure.

f: Callable[[INPUT], Awaitable[OUTPUT]],
collection: Iterable[INPUT],
number_of_workers: int,
return_exceptions: bool = False,
) -> AsyncIterator[List[Union[Exception, OUTPUT]]]:
for batch in iter_as_chunks(collection, number_of_workers):
yield list(await asyncio.gather(*(f(input_) for input_ in batch)))
coros = (f(input_) for input_ in batch)
chunk = list(await asyncio.gather(*coros, return_exceptions=return_exceptions))
yield chunk
27 changes: 23 additions & 4 deletions tests/common/test_batched_gather.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,33 @@
from typing import List
from typing import List, Optional, Union
from clubbi_utils.common.batched_gather import batched_gather
import asyncio
import time
from unittest import IsolatedAsyncioTestCase
from itertools import zip_longest


async def sleep_and_return(delay: float) -> float:
async def sleep_and_return(delay: Optional[float]) -> float:
if delay is None:
raise RuntimeError()
await asyncio.sleep(delay)
return delay


async def run_and_join_results(collection: List[float], number_of_workesrs: int) -> List[float]:
return sum([b async for b in batched_gather(sleep_and_return, collection, number_of_workesrs)], [])
async def run_and_join_results(collection: List[float], number_of_workers: int) -> List[float]:
return sum([b async for b in batched_gather(sleep_and_return, collection, number_of_workers)], [])


async def run_and_join_results_with_exception(
collection: List[Optional[float]],
number_of_workers: int,
) -> List[Union[float, Exception]]:
return sum([b async for b in batched_gather(sleep_and_return, collection, number_of_workers, True)], [])


class TestBatchedGather(IsolatedAsyncioTestCase):
def setUp(self) -> None:
self._collection = [0.125] * 6
self._err_collection = [0.1 if i % 2 else None for i in range(5)]
return super().setUp()

async def test_batched_gather(self):
Expand All @@ -33,3 +44,11 @@ async def test_batched_gather_speed(self):
time_took_full = time.time() - start

self.assertTrue(time_took_half >= time_took_full)

async def test_batched_gather_return_exception(self):
output = await run_and_join_results_with_exception(self._err_collection, len(self._err_collection) // 2)
for f, o in zip_longest(self._err_collection, output):
if f is not None:
self.assertEqual(f, o)
else:
self.assertTrue(isinstance(o, RuntimeError))