Skip to content

fix: inline retry for rate-limited tasks in async scheduler#578

Closed
andreatgretel wants to merge 8 commits intomainfrom
andreatgretel/fix/async-rate-limit-stall
Closed

fix: inline retry for rate-limited tasks in async scheduler#578
andreatgretel wants to merge 8 commits intomainfrom
andreatgretel/fix/async-rate-limit-stall

Conversation

@andreatgretel
Copy link
Copy Markdown
Contributor

@andreatgretel andreatgretel commented Apr 27, 2026

📋 Summary

Fixes the async engine producing 0 records under sustained API rate limiting. Rate-limited tasks were deferred to salvage, which retried them in bursts through the same congested throttle - climbing the error rate, tripping early shutdown, and dropping all records before checkpointing. This adds a per-task cooldown queue that releases scheduler permits while a task waits, paces retries through the AIMD signal, and falls through to salvage only as a last resort. Also addresses related throttle-manager edge cases surfaced during review (Retry-After clipping, mixed-failure streak handling).

🔗 Related Issue

Fixes #575

🔄 Changes

🐛 Fixed

  • Cooldown queue for rate-limited tasks (async_scheduler.py#L315-L351, #L820-L863): On ModelRateLimitError, the worker pushes (Task, wakeup_at) onto _cooldown_pending and exits cleanly, releasing both submission and LLM-wait permits. The dispatch loop drains entries whose wakeup has passed back into the frontier; they re-acquire permits via the standard path and naturally pace through the throttle's acquire_async. Per-task budget is DEFAULT_RATE_LIMIT_RETRIES = 5; exhaustion falls through to the existing salvage path.
  • Retry-After honored as a floor (throttle_manager.py#L266-L289): release_rate_limited no longer clips an explicit Retry-After header below max_cooldown_seconds. The cap only constrains synthetic exponential growth; when the server provides a hint, we never retry earlier than it asked.
  • ModelRateLimitError.retry_after (errors.py#L67-L80): _raise_from_provider_error now propagates the provider's Retry-After value through the exception, so the cooldown queue can wait the server-mandated duration on the throttled-client path.
  • consecutive_429s resets on non-rate-limit failure (throttle_manager.py#L341-L356): a sequence like 429 → 500 → 429 is treated as two fresh cascades instead of sustained throttling.
  • Stale cooldown entries are pruned: drain drops entries whose row group has been checkpointed or whose row was dropped by a sibling column's terminal failure, so the all_done gate isn't held hostage by a cooldown nothing is waiting on.
  • from_scratch tasks route to salvage: seed tasks are dispatched by _dispatch_seeds, not the frontier, so the cooldown queue's drain wouldn't bring them back. They skip the cooldown queue and go straight to salvage, which already knows how to re-dispatch seeds.

🔧 Changed

  • Exponential cooldown backoff at floor (throttle_manager.py): When concurrency is at minimum (1) and consecutive 429s keep arriving, cooldown duration grows exponentially (base * backoff_factor ^ (consecutive_429s - 1), capped at max_cooldown_seconds).
  • Saturation detection (telemetry-only): After saturation_threshold consecutive 429s at minimum concurrency, the domain is marked as saturated with a warning log. is_saturated() is a public hook reserved for future scoping; no current behavioral effect on the client.
  • New ThrottleConfig fields (run_config.py): cooldown_backoff_factor (default 2.0), max_cooldown_seconds (default 30.0), saturation_threshold (default 5). Sensible defaults; no user action needed.

Why a cooldown queue, not inline retry?

The first iteration of this PR put the retry loop inline in the scheduler worker. Adversarial review surfaced two real regressions: (1) a worker holding the LLM-wait permit across every cooldown attempt could starve unrelated row groups under multi-model load, and (2) a ModelRateLimitError raised by a custom generator outside ThrottledModelClient (no acquire_async to enforce cooldown) became a tight retry loop with no real backoff. Both trace to the same architectural choice: retry-while-holding-permits, with cooldown enforcement implicit.

Replacing it with a per-task cooldown queue:

  • Workers exit immediately on 429, releasing all permits — no cross-model starvation.
  • Cooldown wait is explicit (a wakeup time on the queue), so unthrottled 429s are paced just like throttled ones.
  • The structural difference from "salvage but earlier" is that salvage retries deferred tasks in bursts (the original failure mode), while the cooldown queue dispatches tasks as their own wakeup expires — natural pacing without re-saturating the throttle.
  • Salvage is preserved as the third-line backstop for non-rate-limit retryables and for tasks that exhaust the cooldown budget.

Behavior comparison

Before (stalls): Rate limit → task deferred to salvage → salvage retries through congested throttle → more rate limits → cooldowns compound → stalls 15+ min → early shutdown → 0 records.

After (cooldown queue): Rate limit → worker exits, releases permits → task waits in cooldown queue → drain re-dispatches when wakeup expires → throttle's acquire_async paces remainder → task usually succeeds within 1-4 retries → no salvage needed → no error rate increase → no early shutdown.

A/B test results (Anonymizer biographies pipeline, gpt-oss-120b at max_parallel=4, early shutdown enabled)

With fix Without fix
Detection (225 tasks) 225 ok, 0 failed 219 ok, 1 failed, 5 skipped
Salvage rounds 0 3 (13 tasks deferred)
Rewrite (325 tasks) 325 ok, 0 failed 240 ok, early shutdown triggered
Final result ✅ All stages completed ❌ 0 records produced

The A/B comparison was run on the original inline-retry version. The cooldown queue is equivalent in the common case (1-4 retries succeed) and strictly better under sustained 429s and on multi-model pipelines. A user-reported reproduction of the same #575 failure mode (Anonymizer notebook 04, rewrite phase: early shutdown at 345s, 156 ok, scheduler exited with 1 unfinished row group → 0 records → DataDesignerGenerationError) confirms the original failure mode independent of our test setup.

🧪 Testing

  • make test passes (1909 engine tests; 9 new tests for the cooldown queue + throttle regressions)
  • Unit tests added covering: cooldown-queue recovery, exhaustion fallback to salvage, permit-release-during-cooldown (P1), unthrottled-429 pacing (P2), stale-entry pruning, from_scratch routing, Retry-After honored
  • Throttle manager regression tests: Retry-After floor, default cooldown still capped, streak reset on release_failure, saturation cleared on non-rate-limit failure
  • A/B tested against live build.nvidia.com endpoint with rate limiting active
  • E2E tests — N/A, covered by live A/B testing

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated — N/A, no user-facing API changes

🔍 Attention Areas

⚠️ Reviewers: Please pay special attention to the following:

  • Cooldown queue + drain logic: core architectural change. The drain handles three states (ready-to-redispatch, stale, still-waiting) and the _main_dispatch_loop / _drain_frontier exit conditions and wake bounds are timeout-aware via _wait_for_wake_or_cooldown.
  • from_scratch carve-out: seed/from-scratch tasks bypass the cooldown queue because the frontier wouldn't re-emit them. Worth confirming this is the right tradeoff for plugin-provided from-scratch generators that hit rate limits.
  • ModelRateLimitError constructor: now accepts a kw-only retry_after argument. Existing call sites that construct it positionally are unaffected, but worth a sanity check.
  • Architectural journey: this PR was rewritten mid-review. Original commits shipped inline retry; subsequent commits addressed adversarial review findings (Retry-After, mixed-failure streaks) and then refactored to the cooldown queue. Git history has the full progression: f7a17cb6 (adversarial fixes) → f390e4ad (cooldown queue refactor) → beed73d0 (Codex follow-up).
  • The second rate-limit failure path (custom generators like chunked_validate in Anonymizer with internal retry pools that convert rate limits to non-retryable errors before they reach the scheduler) is out of scope for this PR. That path is unchanged.

Description updated with AI

Rate-limited tasks were deferred to salvage rounds (max 2), which
retried through the same congested throttle. Under sustained rate
limiting this caused salvage to stall, error rate to climb, early
shutdown to trigger, and all records to be dropped.

Replace the salvage-deferral path with an inline retry loop (up to
10 attempts) that keeps tasks in their execution context and retries
through the ThrottleManager's AIMD cooldown. Rate limits never count
toward the early shutdown error rate.

Also adds exponential cooldown backoff when the throttle is stuck at
minimum concurrency, and saturation detection for observability.
Replace 6 separate test functions with 2 parametrized tests
(3 cases each) for cooldown backoff and saturation detection.
Follows project convention: flat functions, no classes, parametrize
over duplicate.
else:
raise ValueError(f"Unknown task type: {task.task_type}")
break
except ModelRateLimitError:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth considering: this currently catches every ModelRateLimitError regardless of AIMD state. We could scope it to only fire when the throttle is at the concurrency floor (is_saturated() or current_limit == DEFAULT_MIN_LIMIT), and let salvage handle above-floor cases.

Argument for scoping it down: Above floor, AIMD has room to reduce concurrency. Tasks deferred to salvage will retry with lower concurrency and should recover naturally - that path isn't actually broken. The catastrophic stall (#575) only manifests at the floor where AIMD can't reduce further.

Argument for current behavior: Even above floor, salvage retries deferred tasks in a burst, which can re-saturate the throttle and force AIMD to reduce again from scratch. Inline retry naturally paces tasks through cooldowns at any concurrency.

Our A/B test (without fix, disable_early_shutdown=True) showed salvage actually recovered all 13 deferred tasks across 3 rounds — so above-floor salvage isn't catastrophically broken. The 0-records failure required floor + early shutdown enabled.

Scoped version would look like:

except ModelRateLimitError:
    if self._early_shutdown or _rl_attempt == DEFAULT_RATE_LIMIT_RETRIES:
        raise
    # Only retry inline when AIMD is stuck at floor; let salvage handle
    # above-floor cases where AIMD can still reduce concurrency.
    if not self._tm.is_saturated(provider, model_id, domain):
        raise
    ...

- Honor explicit Retry-After above max_cooldown_seconds (server value
  is now a floor; max_cooldown only caps synthetic exponential growth).
- Reset consecutive_429s and saturated on non-rate-limit failure so a
  429 -> 500 -> 429 sequence isn't treated as sustained throttling.
- Correct saturation_threshold field description (telemetry-only).
- Document inline-retry permit tradeoff and the implicit cooldown
  coupling with ThrottleManager.acquire_async.
- Add unit tests for the inline retry loop, Retry-After handling,
  and streak resets.
Inline retry held LLM-wait permits across every retry attempt, which under
sustained 429s could starve unrelated row groups or models. It also relied
on ThrottledModelClient.acquire_async being the source of cooldown, so a
ModelRateLimitError raised by a custom generator outside that path turned
into a tight-loop retry with no real backoff.

Move retry from inline-in-worker to a scheduler-level cooldown queue:

- On ModelRateLimitError the worker pushes (Task, wakeup_at) into
  _cooldown_pending and exits cleanly, releasing all permits.
- _main_dispatch_loop and _drain_frontier drain ready cooldown entries
  back into the frontier each iteration; the wake_event wait is now
  bounded by the soonest cooldown wakeup.
- Per-task retry budget tracked in _rate_limit_retries; once exhausted
  the task falls through to the existing salvage path.
- Retry counter persists across salvage so a follow-up 429 in salvage
  short-circuits straight to deferral instead of re-entering the queue.
- Cooldown wait honors the exception's retry_after when present, falling
  back to the configurable rate_limit_cooldown_seconds (default 2.0).

Tests updated to use a small cooldown and added coverage for the new
guarantees: permits free during cooldown wait, no hot-loop on
unthrottled 429s, recovery without falling through to salvage.
Three findings from the post-refactor review:

P1 - Preserve provider Retry-After: catch_llm_exceptions translated
ProviderError to a bare ModelRateLimitError with no retry_after, so the
cooldown queue always fell back to the configured default. Give
ModelRateLimitError a retry_after attribute and propagate it through
_raise_from_provider_error so the queue can wait the server-mandated
duration on the throttled-client path.

P2 - Drop stale cooldown entries: a sibling column terminal failure can
drop the row (or checkpoint the row group) while a task is waiting in
the cooldown queue. Without pruning, the all_done gate kept the
scheduler alive for the full cooldown waiting on nothing. Drain now
discards entries whose row_group is gone or whose row_index is dropped.

P3 - Route from_scratch through salvage: the cooldown drain relies on
get_ready_tasks to re-emit a freed task, but seed/from_scratch tasks
aren't in the frontier - they're dispatched only by _dispatch_seeds.
A 429 from a from_scratch generator would silently never retry. Skip
the cooldown queue for from_scratch tasks and route them straight to
salvage, which already knows how to re-dispatch seeds.
A single request that gets a 429 was just gone before this change - AIMD
only paced *future* requests, and the failed call itself was never
retried. Custom generators with cross-alias failover (Anonymizer's
chunked_validate is the canonical example) had to count this as the
alias being exhausted, even when the next attempt against the same
alias would have succeeded after the cooldown.

Add a per-call retry loop in ThrottledModelClient on
ProviderError(kind=RATE_LIMIT). Because release_rate_limited sets
state.blocked_until synchronously with the failure, the next iteration's
acquire_async waits through the AIMD cooldown - so retries pace through
the throttle's signal rather than re-saturating it. After exhausting
max_rate_limit_retries (default 3), the original error propagates so
the async scheduler's cooldown queue can take the next layer.

This fixes the "second failure path" the PR description originally
flagged as out of scope for #575: custom generators that surface 429s
without retrying same-alias. They get cooldown-aware retries
transparently, no caller-side changes needed.

Tests:
- New retry tests for sync and async call paths (success after retry,
  exhaustion, non-rate-limit not retried)
- Existing rate-limit-shape tests pinned to max_rate_limit_retries=0 so
  they continue exercising single-attempt semantics
Consolidates and removes tests added across the review iterations:

- Fold the dedicated retry-after recovery test into a parametrize on
  test_rate_limit_recovers_via_cooldown_queue. The throttle manager
  also has its own retry-after unit test, so the scheduler test can
  carry just enough timing assertion to confirm the value is honored
  end-to-end.
- Drop test_rate_limited_task_releases_permits_during_cooldown: the
  worker exits before pushing to _cooldown_pending, so permits being
  free during the wait is structural - not a behavior worth a
  dedicated test with an asyncio.create_task watcher.
- Drop test_unthrottled_rate_limit_does_not_hot_loop: timing-based
  check; the cooldown_pending wakeup_at value is the spec, and other
  recovery tests already exercise that path.
- Drop the bespoke _PerRowRateLimitWithRetryAfter mock in favor of an
  optional retry_after_seconds attribute on _PerRowRateLimitGenerator.
- Combine the two retry-after / synthetic-cap throttle tests into one
  parametrized test.
- Combine the streak-reset and saturation-cleared throttle tests into
  one (single release_failure call asserts both behaviors).
@andreatgretel
Copy link
Copy Markdown
Contributor Author

Closing in favor of a focused fix for #575 (see issue comment #575 (comment)). Diagnostic A/Bs showed rate-limit handling doesn't change the user-visible failure — the actual trigger is the async early-shutdown gate firing on clustered ModelTimeoutErrors. New PR will scope down to: (1) exclude retryable errors from the gate, (2) WARN log on degraded provider performance, (3) partial-RG checkpoint as safety net. ~150 prod lines vs. ~921 here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Async engine stalls under sustained rate limiting - AIMD has no circuit breaker

1 participant