Skip to content

Commit

Permalink
drop aiometer._concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
graingert committed Jul 5, 2021
1 parent 2e2257b commit c5f8f76
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 41 deletions.
1 change: 0 additions & 1 deletion src/aiometer/_concurrency/__init__.py

This file was deleted.

25 changes: 0 additions & 25 deletions src/aiometer/_concurrency/channels.py

This file was deleted.

23 changes: 11 additions & 12 deletions src/aiometer/_impl/amap.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import anyio

from .._concurrency import open_memory_channel
from .run_on_each import run_on_each
from .types import T, U

Expand Down Expand Up @@ -60,24 +59,24 @@ def amap(
) -> AsyncContextManager[AsyncIterable]:
@asynccontextmanager
async def _amap() -> AsyncIterator[AsyncIterable]:
receive_channel, send_channel = open_memory_channel[Any](
send_channel, receive_channel = anyio.create_memory_object_stream(
max_buffer_size=len(args)
)

async with receive_channel, send_channel:
with send_channel, receive_channel:
async with anyio.create_task_group() as task_group:

async def sender() -> None:
await run_on_each(
async_fn,
args,
max_at_once=max_at_once,
max_per_second=max_per_second,
_include_index=_include_index,
_send_to=send_channel,
)
# Make any `async for ... in results: ...` terminate.
await send_channel.aclose()
with send_channel:
await run_on_each(
async_fn,
args,
max_at_once=max_at_once,
max_per_second=max_per_second,
_include_index=_include_index,
_send_to=send_channel,
)

task_group.start_soon(sender)

Expand Down
6 changes: 3 additions & 3 deletions src/aiometer/_impl/run_on_each.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from typing import Awaitable, Callable, List, NamedTuple, Optional, Sequence

import anyio
from anyio.streams.memory import MemoryObjectSendStream

from .._concurrency import MemorySendChannel
from .meters import HardLimitMeter, Meter, MeterState, RateLimitMeter
from .types import T


class _Config(NamedTuple):
include_index: bool
send_to: Optional[MemorySendChannel]
send_to: Optional[MemoryObjectSendStream]
meter_states: List[MeterState]


Expand All @@ -34,7 +34,7 @@ async def run_on_each(
max_at_once: int = None,
max_per_second: float = None,
_include_index: bool = False,
_send_to: MemorySendChannel = None,
_send_to: MemoryObjectSendStream = None,
) -> None:
meters: List[Meter] = []

Expand Down

0 comments on commit c5f8f76

Please sign in to comment.