Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/SMOODEV-969-py-shared-rate-limiter.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@smooai/fetch': patch
---

SMOODEV-969: Python — share the sliding-window rate-limiter state across `fetch()` calls made through a single `FetchBuilder`. Previously `_client.fetch()` reconstructed the limiter per call, defeating the cross-call rate limit. The builder now lazily constructs one `SlidingWindowRateLimiter`, hands the same instance to every `fetch()` it dispatches, and rebuilds it when the caller changes options via `with_rate_limit`. A new `SlidingWindowRateLimiter.acquire_wait()` method blocks until a slot is free (mirroring the Rust port's `acquire` loop) so successive builder-mediated calls naturally queue instead of raising `RateLimitError`. The low-level `fetch()` entrypoint retains its raise-on-full `acquire()` semantics for back-compat with `rate_limit_retry` plumbing.
47 changes: 46 additions & 1 deletion python/src/smooai_fetch/_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from smooai_fetch._client import fetch as _fetch
from smooai_fetch._defaults import DEFAULT_RETRY_OPTIONS
from smooai_fetch._rate_limit import SlidingWindowRateLimiter
from smooai_fetch._response import FetchResponse
from smooai_fetch._types import (
AuthTokenProvider,
Expand Down Expand Up @@ -57,6 +58,13 @@ def __init__(self) -> None:
self._hooks: LifecycleHooks = LifecycleHooks()
self._auth_token_provider: AuthTokenProvider | None = None
self._auth_scheme: str = "Bearer"
# Long-lived state for the rate limiter. Hoisted onto the builder so
# every `fetch()` call routed through this builder shares the same
# sliding-window state, matching the Go / Rust ports (SMOODEV-969).
# Lazily constructed the first time the builder fires a request and
# rebuilt if the caller swaps the options.
self._rate_limit_state: SlidingWindowRateLimiter | None = None
self._rate_limit_state_options: RateLimitOptions | None = None

def with_retry(self, options: RetryOptions | None = None) -> FetchBuilder:
"""Configure retry behavior.
Expand Down Expand Up @@ -85,15 +93,50 @@ def with_timeout(self, timeout_ms: float) -> FetchBuilder:
def with_rate_limit(self, options: RateLimitOptions) -> FetchBuilder:
"""Configure rate limiting.

The sliding-window state is shared across every `fetch()` call routed
through this builder, matching the Go / Rust ports. Calling
`with_rate_limit` again with new options resets the shared state.

Args:
options: Rate limit configuration.

Returns:
The builder instance for method chaining.
"""
self._rate_limit = options
# Invalidate the cached limiter so the next fetch() rebuilds with the
# new options.
self._rate_limit_state = None
self._rate_limit_state_options = None
return self

def _shared_rate_limiter(self) -> SlidingWindowRateLimiter | None:
"""Return the builder-owned rate limiter, constructing it on first use.

Returns ``None`` when rate limiting is not configured. The instance is
cached on the builder so successive `fetch()` calls share the
sliding-window state.
"""
if self._rate_limit is None:
self._rate_limit_state = None
self._rate_limit_state_options = None
return None

if self._rate_limit_state is None or self._rate_limit_state_options is not self._rate_limit:
self._rate_limit_state = SlidingWindowRateLimiter(self._rate_limit)
self._rate_limit_state_options = self._rate_limit

return self._rate_limit_state

def reset_rate_limit_state(self) -> None:
"""Drop the cached sliding-window state.

Mostly useful for tests; the next `fetch()` will lazily build a fresh
limiter.
"""
self._rate_limit_state = None
self._rate_limit_state_options = None

def with_rate_limit_retry(self, options: RateLimitRetryOptions | None) -> FetchBuilder:
"""Configure retry behavior specifically for rate-limit rejections.

Expand Down Expand Up @@ -278,4 +321,6 @@ async def fetch(
if body is not None:
opts.body = body

return await _fetch(url, opts)
# Pass the builder-owned rate limiter so successive fetch() calls
# share sliding-window state (SMOODEV-969).
return await _fetch(url, opts, rate_limiter=self._shared_rate_limiter())
37 changes: 29 additions & 8 deletions python/src/smooai_fetch/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,12 @@ def _should_retry_default(error: Exception, attempt: int, retry_options: RetryOp
return True


async def fetch(url: str, options: FetchOptions | None = None) -> FetchResponse[Any]:
async def fetch(
url: str,
options: FetchOptions | None = None,
*,
rate_limiter: SlidingWindowRateLimiter | None = None,
) -> FetchResponse[Any]:
"""Execute an HTTP request with retry, timeout, rate limiting, and circuit breaking.

Pipeline: hooks -> timeout -> rate_limit -> circuit_breaker -> retry -> execute ->
Expand All @@ -205,6 +210,11 @@ async def fetch(url: str, options: FetchOptions | None = None) -> FetchResponse[
Args:
url: The URL to request.
options: Configuration options for the request.
rate_limiter: Optional pre-built sliding-window rate limiter. When
supplied, this instance is used instead of constructing a fresh
one from ``options.container_options.rate_limit``. Allows the
caller (e.g. ``FetchBuilder``) to share sliding-window state
across multiple ``fetch()`` invocations.

Returns:
A FetchResponse containing the parsed response data.
Expand Down Expand Up @@ -247,10 +257,18 @@ async def fetch(url: str, options: FetchOptions | None = None) -> FetchResponse[
headers: dict[str, str] = request_kwargs.setdefault("headers", {})
headers["Authorization"] = f"{opts.auth_scheme} {token}"

# Build rate limiter
rate_limiter: SlidingWindowRateLimiter | None = None
if container_options and container_options.rate_limit:
rate_limiter = SlidingWindowRateLimiter(container_options.rate_limit)
# Build rate limiter: prefer the caller-supplied instance (so the builder
# can share sliding-window state across calls), falling back to a fresh
# per-call limiter if only `container_options.rate_limit` is provided.
# When the caller supplies the limiter (i.e. via FetchBuilder), we use
# the waiting `acquire_wait()` API so successive calls naturally queue
# for the next slot. When it's a fresh per-call limiter, we keep the
# raise-on-full `acquire()` API so existing `rate_limit_retry` plumbing
# remains unchanged.
effective_rate_limiter: SlidingWindowRateLimiter | None = rate_limiter
use_waiting_acquire = rate_limiter is not None
if effective_rate_limiter is None and container_options and container_options.rate_limit:
effective_rate_limiter = SlidingWindowRateLimiter(container_options.rate_limit)

# Build circuit breaker
circuit_breaker: CircuitBreaker | None = None
Expand All @@ -276,15 +294,18 @@ async def _do_request() -> FetchResponse[Any]:

async def _gated() -> FetchResponse[Any]:
# Rate limit check
if rate_limiter is not None:
await rate_limiter.acquire()
if effective_rate_limiter is not None:
if use_waiting_acquire:
await effective_rate_limiter.acquire_wait()
else:
await effective_rate_limiter.acquire()
return await _do_request()

# If a rate-limit-specific retry policy is configured AND a rate limiter
# is active, run the gated call inside its own retry loop. Only
# RateLimitError rejections drive that inner loop — everything else
# falls through to the main retry loop below.
if rate_limiter is not None and rate_limit_retry is not None and rate_limit_retry.attempts > 0:
if effective_rate_limiter is not None and rate_limit_retry is not None and rate_limit_retry.attempts > 0:
rl_retry_opts: RetryOptions = rate_limit_retry

def _should_retry_rate_limit(error: Exception, _attempt: int) -> bool | float:
Expand Down
45 changes: 45 additions & 0 deletions python/src/smooai_fetch/_rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
from smooai_fetch._errors import RateLimitError
from smooai_fetch._types import RateLimitOptions

# Minimum time (in seconds) we sleep when waiting for a window slot to free up.
# Prevents pathological busy-loops if the remaining time computation lands at
# zero due to floating-point quirks.
_MIN_WAIT_SECONDS = 0.001


class SlidingWindowRateLimiter:
"""A sliding window rate limiter using asyncio.Lock.
Expand Down Expand Up @@ -50,6 +55,46 @@ async def acquire(self) -> None:

self._timestamps.append(now)

async def acquire_wait(self) -> None:
"""Acquire a token, sleeping until a slot becomes available.

Mirrors the Rust port's ``acquire`` loop: try to acquire, and if the
window is full sleep for the remaining time before retrying. Used by
``FetchBuilder`` so successive ``fetch()`` calls naturally wait for
the next slot instead of surfacing ``RateLimitError`` to the caller.

The raise-on-full ``acquire`` API remains so that callers using the
low-level ``fetch()`` entrypoint with ``container_options.rate_limit``
+ ``rate_limit_retry`` retain their existing behavior.
"""
while True:
try:
await self.acquire()
return
except RateLimitError as err:
# RateLimitError messages always include a millisecond hint —
# parse it back out rather than threading a second value out
# of acquire(). Fall back to a small sleep if parsing fails.
wait_seconds = _extract_remaining_seconds(err) or _MIN_WAIT_SECONDS
await asyncio.sleep(max(wait_seconds, _MIN_WAIT_SECONDS))

def reset(self) -> None:
"""Clear all recorded timestamps, resetting the rate limiter."""
self._timestamps.clear()


def _extract_remaining_seconds(err: RateLimitError) -> float | None:
"""Pull the ``Try again in N ms`` hint out of a RateLimitError message."""
msg = str(err)
marker = "Try again in "
idx = msg.find(marker)
if idx == -1:
return None
tail = msg[idx + len(marker) :]
end = tail.find("ms")
if end == -1:
return None
try:
return float(tail[:end].strip()) / 1000.0
except ValueError:
return None
148 changes: 148 additions & 0 deletions python/tests/test_rate_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
RateLimitError,
RateLimitOptions,
RateLimitRetryOptions,
RetryOptions,
fetch,
)
from smooai_fetch._rate_limit import SlidingWindowRateLimiter
Expand Down Expand Up @@ -191,3 +192,150 @@ async def test_concurrent_acquires():
# The 4th should fail
with pytest.raises(RateLimitError):
await limiter.acquire()


async def test_acquire_wait_blocks_until_slot_free():
"""`acquire_wait` should sleep instead of raising when the window is full."""
limiter = SlidingWindowRateLimiter(RateLimitOptions(max_requests=2, window_ms=200))

await limiter.acquire_wait()
await limiter.acquire_wait()

start = asyncio.get_event_loop().time()
await limiter.acquire_wait() # must wait until the oldest entry expires
elapsed_ms = (asyncio.get_event_loop().time() - start) * 1000

# Should wait roughly one window (200 ms). Allow some slack on slow CI.
assert elapsed_ms >= 150, f"expected >=150ms wait, got {elapsed_ms:.0f}ms"
assert elapsed_ms < 1_000, f"expected <1s wait, got {elapsed_ms:.0f}ms"


class TestSharedRateLimitState:
"""Tests for SMOODEV-969: rate-limiter state is shared across `fetch()`
calls made through a single `FetchBuilder` instance, matching the
Go / Rust ports."""

URL = "https://api.example.com/ping"

@respx.mock
async def test_sequential_fetches_share_state_and_wait(self):
"""Issue 5 sequential fetch() calls on a max=3/window=1s client.

The 4th and 5th must wait until the window slides before being
dispatched — proving the limiter state is held on the builder and
is not reset per call.
"""
route = respx.get(self.URL)
route.return_value = httpx.Response(
200,
json={"ok": True},
headers={"Content-Type": "application/json"},
)

builder = (
FetchBuilder()
.with_retry(RetryOptions(attempts=0))
.with_rate_limit(RateLimitOptions(max_requests=3, window_ms=1_000))
)

start = asyncio.get_event_loop().time()
for _ in range(5):
r = await builder.fetch(self.URL)
assert r.ok
elapsed_ms = (asyncio.get_event_loop().time() - start) * 1000

# Five sequential requests through a 3-per-1s limiter must consume
# at least one full window for the 4th request to acquire a slot.
assert elapsed_ms >= 900, f"expected >=900ms (one window), got {elapsed_ms:.0f}ms"
assert elapsed_ms < 3_000, f"expected <3s total, got {elapsed_ms:.0f}ms"
assert route.call_count == 5

@respx.mock
async def test_concurrent_fetches_share_state_and_serialize(self):
"""Five concurrent fetch() calls via asyncio.gather should all succeed.

The first 3 fire immediately; the 4th and 5th are delayed by the
shared sliding window.
"""
route = respx.get(self.URL)
route.return_value = httpx.Response(
200,
json={"ok": True},
headers={"Content-Type": "application/json"},
)

builder = (
FetchBuilder()
.with_retry(RetryOptions(attempts=0))
.with_rate_limit(RateLimitOptions(max_requests=3, window_ms=1_000))
)

start = asyncio.get_event_loop().time()
results = await asyncio.gather(*(builder.fetch(self.URL) for _ in range(5)))
elapsed_ms = (asyncio.get_event_loop().time() - start) * 1000

assert len(results) == 5
assert all(r.ok for r in results)
# Concurrent: 4th + 5th must still wait for the window to slide
# before being dispatched.
assert elapsed_ms >= 900, f"expected >=900ms (one window), got {elapsed_ms:.0f}ms"
assert elapsed_ms < 3_000, f"expected <3s total, got {elapsed_ms:.0f}ms"
assert route.call_count == 5

@respx.mock
async def test_with_rate_limit_invalidates_cached_state(self):
"""Calling `with_rate_limit` again with new options resets state."""
route = respx.get(self.URL)
route.return_value = httpx.Response(
200,
json={"ok": True},
headers={"Content-Type": "application/json"},
)

builder = (
FetchBuilder()
.with_retry(RetryOptions(attempts=0))
.with_rate_limit(RateLimitOptions(max_requests=1, window_ms=10_000))
)

await builder.fetch(self.URL)
# Window is full at this point; swap the options to a fresh limiter
# and verify the next call goes through without waiting.
builder.with_rate_limit(RateLimitOptions(max_requests=3, window_ms=10_000))

start = asyncio.get_event_loop().time()
await builder.fetch(self.URL)
elapsed_ms = (asyncio.get_event_loop().time() - start) * 1000

assert elapsed_ms < 500, f"expected fresh limiter to allow immediately, got {elapsed_ms:.0f}ms"

@respx.mock
async def test_state_isolated_between_builders(self):
"""Two separate builders each get their own sliding window."""
route = respx.get(self.URL)
route.return_value = httpx.Response(
200,
json={"ok": True},
headers={"Content-Type": "application/json"},
)

builder_a = (
FetchBuilder()
.with_retry(RetryOptions(attempts=0))
.with_rate_limit(RateLimitOptions(max_requests=2, window_ms=10_000))
)
builder_b = (
FetchBuilder()
.with_retry(RetryOptions(attempts=0))
.with_rate_limit(RateLimitOptions(max_requests=2, window_ms=10_000))
)

# Exhaust builder_a's window.
await builder_a.fetch(self.URL)
await builder_a.fetch(self.URL)

# builder_b's window is untouched — call should not wait.
start = asyncio.get_event_loop().time()
await builder_b.fetch(self.URL)
elapsed_ms = (asyncio.get_event_loop().time() - start) * 1000
assert elapsed_ms < 500, f"expected isolated builder to fire immediately, got {elapsed_ms:.0f}ms"
Loading