Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/fides/api/service/connectors/limiter/rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,16 @@ def limit(

Expiration is set on any keys which are stored in the cluster.

timeout_seconds defaults to the longest period factor + 5s, giving the limiter at
least one full bucket rollover window before giving up.
timeout_seconds defaults to the longest period factor + 5s (capped at 120s),
giving the limiter at least one full bucket rollover window before giving up.
The cap prevents HOUR/DAY limits from blocking a worker for unreasonable
durations; connectors like SurveyMonkey configure both minute and day limits,
and a breached day limit should fail fast rather than sleep for 24 hours.
"""
if timeout_seconds is None:
timeout_seconds = (
max(r.period.factor for r in requests) + 5 if requests else 30
timeout_seconds = min(
max(r.period.factor for r in requests) + 5 if requests else 30,
120,
)
Comment on lines +145 to 148
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/fides/api/service/connectors/limiter/rate_limiter.py:145-148

The 120 magic number is used directly here. Consider extracting it as a class constant (e.g. MAX_DEFAULT_TIMEOUT_SECONDS = 120) alongside the existing EXPIRE_AFTER_PERIOD_SECONDS. This makes it easier to discover and adjust the cap without hunting through the method body, and ties the value to a self-documenting name.

MAX_DEFAULT_TIMEOUT_SECONDS: int = 120

...

timeout_seconds = min(
    max(r.period.factor for r in requests) + 5 if requests else 30,
    self.MAX_DEFAULT_TIMEOUT_SECONDS,
)


try:
Expand Down
214 changes: 121 additions & 93 deletions tests/ops/integration_tests/limiter/test_rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import unittest.mock as mock
from collections import Counter
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import timedelta
from typing import Any, Callable, Dict, Generator, List

import pytest
from freezegun import freeze_time
from requests import Session

from fides.api.db import session
Expand Down Expand Up @@ -218,6 +220,86 @@ def test_limiter_times_out_when_bucket_full() -> None:
time.sleep(0.002)


@pytest.mark.integration
def test_minute_period_breach_waits_for_rollover() -> None:
"""A MINUTE-period breach must sleep until the bucket rolls over, not time out.

Regression: the old default timeout_seconds=30 was shorter than the MINUTE
bucket period (60s). When a breach occurred more than 30s before the next
Comment on lines +227 to +228
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests/ops/integration_tests/limiter/test_rate_limiter.py:227-228

The docstring says "the old default timeout_seconds=30 was shorter than the MINUTE bucket period". Looking at the code immediately before this PR, the default for a MINUTE request was already 60 + 5 = 65 s (dynamic), not 30 s — the 30 fallback only applied to the empty-requests case. A reader arriving at this test without the full git history will be confused about where the 30 s came from.

Consider clarifying that this is a guard against a regression from an earlier (pre-dynamic-timeout) version, or just drop the historical backstory and describe what the test actually verifies today:

Verify that a MINUTE-period breach waits for the bucket to roll over rather than timing out, even when the next boundary is more than half the timeout away.

boundary the limiter would raise RateLimiterTimeoutException instead of
waiting. This affected the Okta client (period=MINUTE) and any SaaS
connector with a per-minute rate limit (e.g. Zenoti, SurveyMonkey).

Uses real Redis for bucket state; only mocks time to avoid 60s wall-clock
waits.
"""
# "2024-01-01 00:00:05" is 5s into a minute, so the next bucket is 55s
# away. The old 30s default would time out before reaching it.
with freeze_time("2024-01-01 00:00:05") as frozen:
limiter = RateLimiter()
key = f"test_minute_rollover_{random.randint(0, 10**12)}"
request = RateLimiterRequest(
key=key, rate_limit=1, period=RateLimiterPeriod.MINUTE
)

def advancing_sleep(seconds: float) -> None:
frozen.tick(timedelta(seconds=seconds))

with mock.patch(
"fides.api.service.connectors.limiter.rate_limiter.time.sleep",
side_effect=advancing_sleep,
):
limiter.limit(requests=[request]) # fills the single slot
limiter.limit(requests=[request]) # breach -> sleep to boundary -> succeed

# If we reach here without RateLimiterTimeoutException, the fix works.


@pytest.mark.integration
def test_dynamic_timeout_capped_for_day_limits() -> None:
"""Mixed MINUTE + DAY limits must not block a worker for hours.

SurveyMonkey configures ``rate: 120/minute`` and ``rate: 500/day``.
Without a cap the dynamic timeout would be ``86400 + 5 = 86405s`` (~24h),
leaving a Celery worker sleeping until the next day bucket rolls over.
The 120s cap ensures the limiter fails fast and surfaces an error instead.

Uses real Redis for bucket state; only mocks time to avoid real sleeping.
"""
with freeze_time("2024-01-01 00:00:05") as frozen:
limiter = RateLimiter()
key = f"test_day_cap_{random.randint(0, 10**12)}"

minute_request = RateLimiterRequest(
key=f"{key}:min", rate_limit=1, period=RateLimiterPeriod.MINUTE
)
day_request = RateLimiterRequest(
key=f"{key}:day", rate_limit=1, period=RateLimiterPeriod.DAY
)
both = [minute_request, day_request]

sleep_total = [0.0]

def advancing_sleep(seconds: float) -> None:
sleep_total[0] += seconds
frozen.tick(timedelta(seconds=seconds))

with mock.patch(
"fides.api.service.connectors.limiter.rate_limiter.time.sleep",
side_effect=advancing_sleep,
):
# Fill both buckets.
limiter.limit(requests=both)

# Next call breaches both. Should timeout, not sleep for 24h.
with pytest.raises(RateLimiterTimeoutException):
limiter.limit(requests=both)

# Total mocked sleep must reflect the 120s cap, not the 86405s
# uncapped value.
assert sleep_total[0] < 130
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests/ops/integration_tests/limiter/test_rate_limiter.py:300

The assertion only checks an upper bound. Since the intended behaviour is that the worker sleeps for the full 120 s timeout before raising, adding a lower bound would confirm the cap path was actually exercised rather than the call somehow succeeding early:

# Slept up to the 120 s cap, not the uncapped 86 405 s
assert 110 <= sleep_total[0] < 130

Without a lower bound, this assertion would pass even if the second limit() call returned immediately (0 s sleep), masking a regression where the breach was never triggered.



@pytest.mark.integration_saas
@pytest.mark.asyncio
async def test_rate_limiter_full_integration(
Expand Down Expand Up @@ -286,113 +368,59 @@ def _req(self, period: RateLimiterPeriod) -> RateLimiterRequest:

def test_second_at_boundary(self) -> None:
# At the start of a second (e.g. t=1000), 1s remains.
assert RateLimiter().seconds_until_next_bucket(1000, self._req(RateLimiterPeriod.SECOND)) == 1
assert (
RateLimiter().seconds_until_next_bucket(
1000, self._req(RateLimiterPeriod.SECOND)
)
== 1
)

def test_second_mid_bucket(self) -> None:
# 0.5s into a second → 0.5s remains (integer math gives 0 here)
result = RateLimiter().seconds_until_next_bucket(1000, self._req(RateLimiterPeriod.SECOND))
result = RateLimiter().seconds_until_next_bucket(
1000, self._req(RateLimiterPeriod.SECOND)
)
assert 0 < result <= 1

def test_minute_at_start(self) -> None:
# At the exact start of a minute (e.g. t=600), 60s remain.
assert RateLimiter().seconds_until_next_bucket(600, self._req(RateLimiterPeriod.MINUTE)) == 60
assert (
RateLimiter().seconds_until_next_bucket(
600, self._req(RateLimiterPeriod.MINUTE)
)
== 60
)

def test_minute_30s_in(self) -> None:
# 30 seconds into a minute → 30s remain.
assert RateLimiter().seconds_until_next_bucket(630, self._req(RateLimiterPeriod.MINUTE)) == 30
assert (
RateLimiter().seconds_until_next_bucket(
630, self._req(RateLimiterPeriod.MINUTE)
)
== 30
)

def test_minute_59s_in(self) -> None:
# 59 seconds into a minute → 1s remains.
assert RateLimiter().seconds_until_next_bucket(659, self._req(RateLimiterPeriod.MINUTE)) == 1
assert (
RateLimiter().seconds_until_next_bucket(
659, self._req(RateLimiterPeriod.MINUTE)
)
== 1
)

def test_hour_at_start(self) -> None:
assert RateLimiter().seconds_until_next_bucket(3600, self._req(RateLimiterPeriod.HOUR)) == 3600
assert (
RateLimiter().seconds_until_next_bucket(
3600, self._req(RateLimiterPeriod.HOUR)
)
== 3600
)

def test_hour_mid(self) -> None:
assert RateLimiter().seconds_until_next_bucket(5400, self._req(RateLimiterPeriod.HOUR)) == 1800


class TestLimitDynamicTimeout:
"""Unit tests for the dynamic timeout defaulting in RateLimiter.limit()."""

def test_default_timeout_uses_max_period_factor(self) -> None:
"""When timeout_seconds is not passed, the limiter derives it from max period factor."""
limiter = RateLimiter()
mock_redis = mock.MagicMock()
# Always return usage within limit so limit() returns immediately (success path).
limiter.increment_usage = mock.MagicMock(return_value=[1])

with mock.patch("fides.api.service.connectors.limiter.rate_limiter.get_cache", return_value=mock_redis):
with mock.patch.object(limiter, "increment_usage", return_value=[1]):
limiter.limit(
requests=[RateLimiterRequest(key="k", rate_limit=10, period=RateLimiterPeriod.MINUTE)]
)
# If we reach here, the call succeeded — the dynamic timeout didn't reject it.

def test_explicit_timeout_is_respected(self) -> None:
"""Explicit timeout_seconds overrides the dynamic default."""
limiter = RateLimiter()
mock_redis = mock.MagicMock()

# Always report the bucket as over-limit so the loop runs until timeout.
with mock.patch("fides.api.service.connectors.limiter.rate_limiter.get_cache", return_value=mock_redis):
with mock.patch.object(limiter, "increment_usage", return_value=[999]):
with mock.patch.object(limiter, "decrement_usage"):
with mock.patch("time.sleep"): # skip actual sleeping
start = time.time()
with pytest.raises(RateLimiterTimeoutException):
limiter.limit(
requests=[RateLimiterRequest(key="k", rate_limit=10, period=RateLimiterPeriod.MINUTE)],
timeout_seconds=1,
)
elapsed = time.time() - start
# Should have respected the 1s timeout, not the default 65s.
assert elapsed < 5


class TestLimitSleepsToBucketBoundary:
"""Verify that on breach the limiter sleeps to the next bucket boundary."""

def test_sleep_duration_targets_next_minute_bucket(self) -> None:
"""When a MINUTE-period limit is breached, sleep should be ~seconds until next minute."""
limiter = RateLimiter()

# Simulate being 30s into a minute → expect ~30s sleep.
frozen_seconds = 630 # 10m30s epoch — 30s into the current minute

call_count = 0

def fake_increment(redis, current_seconds, requests):
nonlocal call_count
call_count += 1
# Breach on first call, succeed on second.
return [999] if call_count == 1 else [1]

sleep_calls: List[float] = []

with mock.patch("fides.api.service.connectors.limiter.rate_limiter.get_cache"):
with mock.patch.object(limiter, "increment_usage", side_effect=fake_increment):
with mock.patch.object(limiter, "decrement_usage"):
with mock.patch("time.time", side_effect=[
# start_time
float(frozen_seconds),
# while check (1st iteration)
float(frozen_seconds),
# current_seconds inside loop
float(frozen_seconds),
# remaining calculation inside sleep block
float(frozen_seconds),
# while check (2nd iteration — after sleep)
float(frozen_seconds) + 30.05,
# current_seconds inside loop (2nd)
float(frozen_seconds) + 30.05,
]):
with mock.patch("time.sleep", side_effect=lambda s: sleep_calls.append(s)):
limiter.limit(
requests=[RateLimiterRequest(key="k", rate_limit=10, period=RateLimiterPeriod.MINUTE)],
timeout_seconds=65,
)

assert len(sleep_calls) == 1
# At t=630 (30s into minute), next boundary is t=660 → 30s + 0.05 buffer.
assert 29.9 < sleep_calls[0] <= 30.1
assert (
RateLimiter().seconds_until_next_bucket(
5400, self._req(RateLimiterPeriod.HOUR)
)
== 1800
)
Loading