fix(async): pack of fixes for async engine under degraded providers#585
fix(async): pack of fixes for async engine under degraded providers#585andreatgretel merged 10 commits intomainfrom
Conversation
The gate previously only excluded `ModelRateLimitError`, leaving `ModelTimeoutError`, `ModelInternalServerError`, and `ModelAPIConnectionError` to count toward the sliding-window error rate. Under provider degradation these errors cluster in time (concurrent in-flight requests time out together), so 5/10 in a row is easy and trips the gate even when salvage could recover the rows. Refs #575.
Diagnostic A/Bs against build.nvidia.com showed runs failing silently under provider degradation - no log indication that retryable errors were piling up until the early-shutdown gate fired (or, post-fix, until salvage exhaustion). Surfacing this earlier helps users distinguish "DataDesigner is broken" from "the upstream provider is slow today." Tracks a separate sliding window over retryable-vs-not for every task outcome (independent of the early-shutdown gate's window) and emits a throttled WARN when the rolling fraction crosses the threshold. Refs #575.
Before: when the early-shutdown gate fired, any row group still in flight stayed in `_rg_states` un-checkpointed. The buffer manager later raised `FileNotFoundError` when the builder tried to read the finalized parquet. User-visible result: `0 records produced`. After: a new `_finalize_after_shutdown` step runs in `run()`'s finally block, after `_cancel_workers` has drained in-flight tasks (Codex caveat: in-flight `from_scratch`/`batch` tasks must not be allowed to write into a buffer that's being finalized). For each remaining row group it drops rows that aren't fully complete, then delegates to the existing `_checkpoint_completed_row_groups` so the buffer manager's zero-survivor handling (skip empty parquet, free buffer) kicks in unchanged. Also surfaces partial completion as a structured signal: scheduler exposes `early_shutdown: bool` and `partial_row_groups: tuple[int, ...]` properties so callers can detect partial completion programmatically rather than parsing log lines. Builder uses this to emit a more specific WARN distinguishing early shutdown from non-shutdown drops. Refs #575.
In `release_failure`, the cascade counter wasn't reset, so a sequence like 429 → 500 → 429 was treated as 2 consecutive 429s. The cascade counter feeds AIMD's reduce-once-per-cascade logic; the second 429 should start a fresh cascade and trigger another concurrency reduction, but currently doesn't. Standalone bug surfaced during #575 investigation; not on the failure path that drives the gate-trip outcome but worth fixing while we're in this code.
A real-workload run of #575 showed the early-shutdown gate still trips even with the gate-exclusion fix in place: the trigger is 10 timeouts inside Anonymizer's QA-repair custom columns, all wrapped in CustomColumnGenerationError (non-retryable) by the catch-all in CustomColumnGenerator. Two fixes here: 1. Re-raise RETRYABLE_MODEL_ERRORS unchanged before the wrap so the scheduler's _is_retryable correctly classifies them. 2. Surface _AsyncBridgedModelFacade timeouts as ModelTimeoutError instead of stdlib TimeoutError. Without this the sync bridge times out as the wrong exception type and is still classified non-retryable even after fix #1. Also moves _RETRYABLE_MODEL_ERRORS from async_scheduler to models/errors as the public RETRYABLE_MODEL_ERRORS tuple - both the scheduler and the wrap site need it, and models/errors is the appropriate home alongside the error class definitions. Refs #575.
…runs When the async scheduler hits early shutdown and produces zero records, the buffer manager skips writing parquet (correctly), so ArtifactStorage.load_dataset_with_dropped_columns() raises FileNotFoundError. Previously this surfaced as a generic DataDesignerGenerationError wrapping the FileNotFoundError, which is ambiguous (could be missing files for any reason). This commit: - Adds DataDesignerEarlyShutdownError as a subclass of DataDesignerGenerationError so existing handlers still match while callers that want to react programmatically (retry on different alias, surface a degraded-provider message, etc.) can catch the specific type. - Plumbs the scheduler's structured signals (early_shutdown, partial_row_groups) up through the builder so they're available at data_designer.create() time without re-introspecting the scheduler. - create() raises the typed error in both failure modes (load fails or empty DataFrame returned) when builder.early_shutdown is True. Refs #575.
Review: PR #585 —
|
Initialize _last_degraded_warn_at to -inf so the first WARN is always emitted. The previous initialization to 0.0 suppressed the first WARN on fresh CI runners where time.monotonic() returns a small value (system boot uptime), making the throttle interval check (now - 0.0 < interval) true on the first attempt.
Five real correctness issues caught in review of the original PR, plus a few smaller cleanups and test simplifications. Throttle - cascade reset (regression of existing AIMD invariant): release_failure() now resets consecutive_429s only when in_flight == 0. Resetting unconditionally broke "reduce once per cascade" when 429/500/429 arrived interleaved within a single in-flight burst - the second 429 was treated as a new cascade and the limit got halved twice for what was effectively one rate-limit event. Interface - typed-error gating: DataDesignerEarlyShutdownError now fires only when early_shutdown is true AND actual_num_records == 0. Without this, a partial-salvage run that fails to load for unrelated reasons (corrupt parquet, schema drift, disk hiccup) was misdiagnosed as "zero records produced," hiding the real cause. Async - WARN window scope: the degraded-provider warning was fed by every task outcome, including samplers and non-LLM customs. In realistic pipelines (one model column, several non-model columns) the rate stayed under threshold even when every model call was failing, silencing the WARN exactly when it mattered. Now gated on is_llm. Async/builder - signal preservation across raises: scheduler.early_shutdown and partial_row_groups are captured in a try/finally around future.result(), so a processor failure during the salvage path doesn't drop the structured signal. Both build() and build_preview() now reset per-run state at the start so reused builders don't leak prior-run flags. Async - dead code: dispatch_error capture in run() was unread (the post- finally check is unreachable on the exception path). Removed. Smaller cleanups: - early-shutdown WARN says "non-retryable error rate exceeded threshold" - bridge timeout WARN demoted to debug (ModelTimeoutError already surfaces it; the throttled degraded-provider WARN is the user-facing signal) - TODO note for threading degraded_warn_* through RunConfig - doc note in _finalize_after_shutdown clarifying that pre-batch processor isn't re-run on partial-salvage row groups Tests: - new regression tests for the cascade burst case, partial-salvage error gating, and LLM-only WARN window - direct unit test for _reset_run_state - dedup via _make_storage / _seed_plus_cell_setup helpers - WARN emission cases parametrized into a single test - shared parametrize lists hoisted to module-level constants - redundant cascade test dropped in favor of the more thorough drain variant; redundant healthy-baseline test folded into the zero-survivor test
Greptile SummaryThis PR fixes the async engine's early-shutdown gate misfiring under transient provider degradation by excluding retryable errors (
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py | Core behavioral change: retryable errors excluded from the early-shutdown gate, new _finalize_after_shutdown salvages in-flight row groups post-cancellation, degraded-provider WARN machinery added — logic is correct and ordering constraints are respected. |
| packages/data-designer-engine/src/data_designer/engine/column_generators/generators/custom.py | RETRYABLE_MODEL_ERRORS re-raise guard added in both _generate and agenerate; bridge now raises ModelTimeoutError instead of stdlib TimeoutError so the scheduler classifies it retryable. |
| packages/data-designer-engine/src/data_designer/engine/models/clients/throttle_manager.py | release_failure now resets consecutive_429s only when in_flight == 0, correctly guarding against double-reduction within a concurrent burst. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py | Captures early_shutdown, partial_row_groups, and actual_num_records from the scheduler in finally blocks; adds _reset_run_state to prevent state leakage on builder reuse. |
| packages/data-designer/src/data_designer/interface/data_designer.py | Raises DataDesignerEarlyShutdownError only when early_shutdown=True AND actual_num_records == 0; partial-salvage runs fall through to the generic error, preventing misdiagnosis. |
Flowchart
%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[Task completes or fails] --> B{Success?}
B -- Yes --> C[_check_error_rate success=True]
B -- No --> D{is_retryable?}
D -- Yes --> E[Skip _check_error_rate\nRetryable errors excluded\nfrom early-shutdown gate]
D -- No --> F[_check_error_rate success=False]
C --> G{is_llm?}
E --> G
F --> G
G -- Yes --> H[_record_retryable_outcome\nUpdate degraded-provider window]
H --> I{Window full & rate >= threshold\n& interval elapsed?}
I -- Yes --> J[WARN: Provider degraded]
I -- No --> K[Continue]
F --> L{Error rate >= threshold?}
L -- Yes --> M[_early_shutdown = True]
L -- No --> K
M --> N[_main_dispatch_loop returns]
N --> O[finally: _cancel_workers]
O --> P{_early_shutdown & _rg_states non-empty?}
P -- Yes --> Q[_finalize_after_shutdown]
Q --> R[Drop incomplete rows per RG]
R --> S{survivors > 0?}
S -- Yes --> T[Add to partial_row_groups\n_checkpoint_completed_row_groups]
S -- No --> U[Log zero-survivor\n_checkpoint_completed_row_groups]
P -- No --> V[Clean exit path]
T --> V
U --> V
V --> W{early_shutdown & actual_num_records == 0?}
W -- Yes --> X[Raise DataDesignerEarlyShutdownError]
W -- No --> Y[Normal completion or DataDesignerGenerationError]
Reviews (2): Last reviewed commit: "test(interface): consolidate create() er..." | Re-trigger Greptile
| if builder.early_shutdown and builder.actual_num_records == 0: | ||
| raise DataDesignerEarlyShutdownError( | ||
| "🛑 Generation produced zero records — early shutdown was triggered. " | ||
| "The non-retryable error rate exceeded the configured threshold; check the " | ||
| "warnings above (and any 'Provider showing degraded performance' logs) for " | ||
| "the contributing failures." | ||
| ) from e |
There was a problem hiding this comment.
Do we need a similar check for preview?
| def _reset_run_state(self) -> None: | ||
| """Clear per-run signals so reused builder instances don't leak state across runs.""" | ||
| self._early_shutdown = False | ||
| self._partial_row_groups = () | ||
| self._actual_num_records = -1 | ||
| self._task_traces = [] |
There was a problem hiding this comment.
Per style guide, this needs to get pushed down to maintain public before private pardigm.
| RETRYABLE_EXCEPTION_FACTORIES = [ | ||
| pytest.param(lambda: ModelRateLimitError("429"), id="rate_limit"), | ||
| pytest.param(lambda: ModelTimeoutError("timeout"), id="timeout"), | ||
| pytest.param(lambda: ModelInternalServerError("503"), id="internal_server"), | ||
| pytest.param(lambda: ModelAPIConnectionError("conn reset"), id="api_connection"), | ||
| ] |
There was a problem hiding this comment.
Could we loop over what's already available in errors (RETRYABLE_MODEL_ERRORS)?
| with pytest.raises(RuntimeError, match="connection timed out"): | ||
| proxy.generate(prompt="hello") | ||
|
|
||
| def test_bridge_timeout_raises_model_timeout_error(self) -> None: |
There was a problem hiding this comment.
I guess some how the class based testing pattern slipped in.... could we take this opportunity to flatten this out?
There was a problem hiding this comment.
Tests in this suite also imports inline....
| assert tracker.is_row_group_complete(0, 10, ["seed", "col"]) | ||
|
|
||
|
|
||
| RETRYABLE_ERROR_FACTORIES = [ |
There was a problem hiding this comment.
Same comment as above...
|
Thanks for the careful diagnosis and write-up here, @andreatgretel — the live A/B against SummaryThe PR keeps the early-shutdown gate as a circuit-breaker for genuine non-retryable failures while excluding clustered transient errors (rate-limit, timeout, 5xx, connection) that cluster under provider degradation. It also salvages partially-completed row groups instead of leaving them un-checkpointed, fixes a FindingsSuggestions — Take it or leave it
if len(dataset_for_profiler) == 0:
if builder.early_shutdown and builder.actual_num_records == 0:
raise DataDesignerEarlyShutdownError(...)
raise DataDesignerGenerationError(...)
What Looks Good
VerdictShip it (with nits). Only Suggestions; nothing blocking. The follow-ups already filed in the PR description ( This review was generated by an AI assistant. |
Style cleanups, parametrization, docstring polish, and one consistency
fix in the typed-error path. All non-blocking ("Ship it (with nits)").
interface/data_designer.py:
- preview() now raises DataDesignerEarlyShutdownError when shutdown
produced zero records (parity with create()), and also gates on
actual_num_records == 0 so partial-salvage runs that fail to load
don't get misdiagnosed
- create()'s defensive empty-DF guard mirrors the load-failure guard
with the same actual_num_records == 0 check
async_scheduler.py:
- _record_retryable_outcome docstring clarifies that the call site
filters by is_llm; the function alone reads as if every outcome feeds
the window
dataset_builder.py:
- moved _reset_run_state() down past the public methods to match the
project's public-before-private convention
test_custom.py:
- flattened TestAsyncBridgedModelFacade class into module-level test
functions (matches the rest of the file)
- hoisted inline imports (asyncio, threading, patch, _AsyncBridgedModelFacade,
SyncClientUnavailableError) to top of file
- driven retryable-error parametrize off RETRYABLE_MODEL_ERRORS directly
instead of the hand-rolled factory list, so new retryable types pick
up coverage automatically
- dropped the redundant "Sanity" block in test_async_bridge_timeout_raises_
model_timeout_error - pytest.raises already enforces the type, the
duplicate block was running the same slow scenario twice
test_async_scheduler.py:
- parametrize over RETRYABLE_MODEL_ERRORS directly (same as above)
test_data_designer.py:
- added preview-path tests for the typed-error and partial-salvage
fall-through cases
- updated the existing empty-DF test to also patch actual_num_records=0
(otherwise the new gating in the empty-DF guard skips the typed error)
Five separate tests (two existing, three new from earlier in this PR) all probed the same dispatch logic in create(): "given a load outcome and a builder state, which error type should fire?" Pulled them into a single parametrized matrix indexed by (load_side_effect, early_shutdown, actual_num_records). Net result: 5 named tests → 1 parametrized test with 6 cells, and the previously-missing empty_df + shutdown + partial salvage cell is now covered. Test names retain readable IDs (load_fails_shutdown_zero_records etc.) so failures still pinpoint the exact case in pytest output.
📋 Summary
Fixes the async engine's tendency to abandon entire runs (returning 0 records) when the provider is degraded. The root cause was the early-shutdown gate counting retryable errors (timeouts, transient 5xx, connection blips) toward its trip threshold, then leaving partially-completed row groups un-checkpointed. This PR keeps the gate as a circuit-breaker for genuine structural failures (auth, schema, code bugs) but stops it from firing under transient provider stress, where salvage rounds can recover.
The set of changes also closes a few smaller correctness gaps along the way:
CustomColumnGeneratorwas stripping retryability from wrapped exceptions, the sync-to-async bridge raised a non-retryable stdlibTimeoutError, and the throttle'sconsecutive_429scounter wasn't being reset on non-rate-limit failures.Replaces #578 (~921 lines, cooldown-queue rabbit hole) with a focused fix (~830 lines including tests) that addresses the actual failure mode observed in production runs.
🔗 Related Issue
Closes #575
🔄 Changes
🐛 Fixed
✨ Added
🧪 Testing
✅ Checklist
🔍 Attention Areas
📎 Follow-ups (out of scope, worth filing)