Skip to content

Async engine stalls under sustained rate limiting - AIMD has no circuit breaker #575

@andreatgretel

Description

@andreatgretel

Priority Level

High (Major functionality broken)

Describe the bug

When DATA_DESIGNER_ASYNC_ENGINE=1 is enabled and the LLM endpoint returns sustained 429 responses (e.g., build.nvidia.com free tier), the async engine effectively stalls for minutes with zero progress before eventually producing zero records. The same workload completes in sync mode (with some dropped records).

The root cause is that ModelRateLimitError is intentionally excluded from the early shutdown error-rate check (async_scheduler.py:742), because 429s are meant to be handled by the AIMD backoff loop. However, AIMD has no circuit breaker for the case where the endpoint is genuinely saturated - it reduces concurrency to 1, keeps hitting 429s, and cycles through cooldown waits + salvage rounds indefinitely until enough non-rate-limit errors accumulate to trigger early shutdown.

The failure cascade:

  1. Burst of concurrent requests triggers 429s from the endpoint
  2. AIMD reduces concurrency: 4 → 3 → 2 → 1 (each 429 multiplies by reduce_factor=0.75)
  3. At concurrency 1, each 429 still applies cooldown_seconds block, so effective throughput is ~1 attempt per cooldown period
  4. With high 429 rates (e.g., 90%), most attempts fail, extending the block
  5. _check_error_rate skips ModelRateLimitError, so early shutdown never triggers from rate limits alone
  6. Deferred tasks enter salvage rounds (salvage_max_rounds=2), retrying through the same congested throttle
  7. Pipeline appears "stuck" with zero log output for 8+ minutes

Steps/Code to reproduce bug

Mock server (returns 429 on 90% of requests)
# mock_server.py
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
import time, uuid, uvicorn

counter = {"n": 0}

async def chat_completions(request: Request) -> JSONResponse:
    counter["n"] += 1
    if (counter["n"] % 10) < 9:  # 90% rate limited
        return JSONResponse(
            status_code=429,
            content={"error": {"message": "Rate limit exceeded"}},
            headers={"Retry-After": "5"},
        )
    body = await request.json()
    return JSONResponse(status_code=200, content={
        "id": f"chatcmpl-{uuid.uuid4().hex[:12]}", "object": "chat.completion",
        "created": int(time.time()), "model": body.get("model", "mock"),
        "choices": [{"index": 0, "message": {"role": "assistant",
            "content": "Mock response."}, "finish_reason": "stop"}],
        "usage": {"prompt_tokens": 10, "completion_tokens": 10, "total_tokens": 20},
    })

app = Starlette(routes=[
    Route("/v1/chat/completions", chat_completions, methods=["POST"]),
    Route("/v1/models", lambda r: JSONResponse({"object": "list", "data": []}), methods=["GET"]),
])
uvicorn.run(app, host="0.0.0.0", port=8899, log_level="warning")
Reproduction script
# repro.py
import data_designer.config as dd
from data_designer.config.models import ModelConfig, ModelProvider
from data_designer.config.run_config import RunConfig
from data_designer.interface import DataDesigner

dd_instance = DataDesigner(
    artifact_path="/tmp/dd_rate_limit_repro",
    model_providers=[ModelProvider(
        name="mock", endpoint="http://localhost:8899/v1",
        provider_type="openai", api_key="fake",
    )],
)
dd_instance.set_run_config(RunConfig(buffer_size=15))

config = dd.DataDesignerConfigBuilder()
config.add_column(dd.SamplerColumnConfig(
    name="topic", sampler_type=dd.SamplerType.CATEGORY,
    params=dd.CategorySamplerParams(values=["a", "b", "c"]),
))
for col in ("summary", "analysis", "rewrite"):
    dep = {"summary": "topic", "analysis": "summary", "rewrite": "analysis"}[col]
    config.add_column(dd.LLMTextColumnConfig(
        name=col, model_alias="m",
        prompt=f"Write about {{{{{{{dep}}}}}}}",
    ))
config.add_model_config(ModelConfig(
    alias="m", model="mock", provider="mock", skip_health_check=True,
))

result = dd_instance.create(config, num_records=15, dataset_name="test")
# Start mock server, then:
DATA_DESIGNER_ASYNC_ENGINE=1 python repro.py
# Result: 170s, 0/15 records produced

DATA_DESIGNER_ASYNC_ENGINE=0 python repro.py
# Result: 8s, early shutdown (fast failure)
Live reproduction against build.nvidia.com (Anonymizer rewrite pipeline)

Ran 04_rewriting_biographies.ipynb (25 records) via the Anonymizer rewrite pipeline:

  • Sync: ~32 min, 25/25 records produced
  • Async: 15.4 min of stalling, then early shutdown, 0/25 records. Rewrite phase produced zero log output for 8.5 minutes before shutdown.

Requires DD >= 0.5.7 and latest Anonymizer main (side-effect column fix merged).

Expected behavior

The async engine should detect when AIMD is stuck at minimum concurrency under sustained rate limiting and either:

  • Trigger an early shutdown (or a rate-limit-specific shutdown threshold)
  • Fall back to sync-style drop-and-continue behavior
  • At minimum, log what's happening so users can distinguish "stuck" from "slow"

The current behavior (silent stall for 8+ minutes, then 0 records) is strictly worse than sync mode.

Agent Diagnostic / Prior Investigation

Key code paths
  • 429 exclusion from error rate: async_scheduler.py:742 - ModelRateLimitError skipped in _check_error_rate
  • AIMD floor: throttle_manager.py:268 - concurrency floor is DEFAULT_MIN_LIMIT=1, no detection of "stuck at floor"
  • Acquire timeout: throttle_manager.py:373 - DEFAULT_ACQUIRE_TIMEOUT=300s, tasks can wait 5 min per attempt
  • Salvage rounds: async_scheduler.py:360-420 - deferred tasks retried up to salvage_max_rounds=2 through the same congested throttle
  • Transport layer difference: retry.py:43,53-56 - sync keeps 429 in retryable codes (transport handles it), async strips 429 (propagates to ThrottleManager)
Proposed mitigations
  1. ThrottleManager circuit breaker: If current_limit == DEFAULT_MIN_LIMIT and consecutive_429s > threshold, raise a dedicated "endpoint saturated" error instead of continuing to poll. Keeps detection close to the signal.
  2. Rate-limit-aware shutdown in scheduler: Either count 429s toward error rate at reduced weight, or add a separate "sustained rate limit" shutdown condition.
  3. Progress logging during throttle waits: Periodic log messages during cooldown blocks so the pipeline doesn't appear completely stuck.
  4. Throttle state persistence across DD workflows: Anonymizer runs ~5 sequential DD workflows; each rediscovers the rate limit from scratch.

Additional context

Anonymizer compatibility note: running the rewrite pipeline with async also required DD >= 0.5.7 (side-effect column fix) and latest Anonymizer main (andreatgretel/bugfix/side-effect-column-collisions branch, now merged). Earlier versions fail at DAG construction before reaching the rate limit issue.

Checklist

  • I reproduced this issue or provided a minimal example
  • I searched the docs/issues myself, or had my agent do so
  • If I used an agent, I included its diagnostics above

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions