Skip to content

Commit

Permalink
add get method
Browse files Browse the repository at this point in the history
  • Loading branch information
mosquito committed Jun 12, 2022
1 parent 907477b commit e21dcd2
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
7 changes: 6 additions & 1 deletion aiomisc/iterator_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def __exit__(
self.close()

def put(self, item: Any) -> None:
if self.is_closed:
raise ChannelClosed
self.queue.append(item)

def __await__(self) -> Any:
Expand All @@ -70,6 +72,9 @@ def __await__(self) -> Any:

return self.queue.popleft()

async def get(self) -> Any:
return await self


class IteratorWrapperStatistic(Statistic):
started: int
Expand Down Expand Up @@ -167,7 +172,7 @@ def __aiter__(self) -> AsyncIterator[Any]:

async def __anext__(self) -> Awaitable[T]:
try:
item, is_exc = await self.__channel
item, is_exc = await self.__channel.get()
except ChannelClosed:
await self.wait_closed()
raise StopAsyncIteration
Expand Down
10 changes: 5 additions & 5 deletions tests/test_thread_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ def executor(loop: asyncio.AbstractEventLoop):
thread_pool.shutdown(wait=True)


async def test_from_thread_channel(loop, threaded_decorator):
channel = FromThreadChannel(maxsize=2, loop=loop)
async def test_from_thread_channel(threaded_decorator):
channel = FromThreadChannel(maxsize=2)

@threaded_decorator
def in_thread():
Expand All @@ -60,7 +60,7 @@ def in_thread():


async def test_from_thread_channel_wait_before(loop, threaded_decorator):
channel = FromThreadChannel(maxsize=1, loop=loop)
channel = FromThreadChannel(maxsize=1)

@threaded_decorator
def in_thread():
Expand All @@ -79,14 +79,14 @@ def in_thread():


async def test_from_thread_channel_close(loop):
channel = FromThreadChannel(maxsize=1, loop=loop)
channel = FromThreadChannel(maxsize=1)
with channel:
channel.put(1)

with pytest.raises(ChannelClosed):
channel.put(2)

channel = FromThreadChannel(maxsize=1, loop=loop)
channel = FromThreadChannel(maxsize=1)
task = loop.create_task(channel.get())

with pytest.raises(asyncio.TimeoutError):
Expand Down

0 comments on commit e21dcd2

Please sign in to comment.