Skip to content

chore: async engine readiness - blockers and polish before default#553

Open
andreatgretel wants to merge 2 commits intomainfrom
andreatgretel/chore/async-engine-readiness
Open

chore: async engine readiness - blockers and polish before default#553
andreatgretel wants to merge 2 commits intomainfrom
andreatgretel/chore/async-engine-readiness

Conversation

@andreatgretel
Copy link
Copy Markdown
Contributor

📋 Summary

Hardens the async engine for production readiness before it becomes the default execution path. Processor callback failures now propagate as DatasetGenerationError (fail-fast) instead of silently dropping row groups, allow_resize=True triggers a graceful sync fallback with DeprecationWarning, and try/finally cleanup in run() ensures workers are always drained.

🔗 Related Issue

Closes #462

🔄 Changes

🔧 Changed

  • Processor callback failures (pre-batch and post-batch) now propagate as DatasetGenerationError instead of silently dropping row groups
  • allow_resize=True triggers graceful sync fallback with DeprecationWarning instead of raising
  • try/finally refactor in AsyncTaskScheduler.run() eliminates duplicated cleanup logic and ensures workers are always cancelled via asyncio.shield()
  • _run_cell_by_cell_generator now branches on instance flag _use_async instead of module-level DATA_DESIGNER_ASYNC_ENGINE, so the sync fallback decision propagates correctly

✨ Added

  • Row-count guards in pre-batch (strict_row_count=True) and post-batch callbacks reject unsupported row-count changes
  • Partial-completion warning when actual records fall short of target
  • task_traces field on PreviewResults for async scheduler trace data
  • Tests: post-batch failure propagation, early shutdown worker drain, checkpoint correctness, execution graph integration

🐛 Fixed

  • DatasetGenerationError from callbacks no longer gets double-wrapped by the scheduler's generic except Exception handler
  • stacklevel in allow_resize DeprecationWarning now points at user code instead of library internals

🔍 Attention Areas

⚠️ Reviewers: Please pay special attention to the following:

  • async_scheduler.py — fail-fast semantics change: pre/post-batch failures now propagate instead of being swallowed
  • dataset_builder.py_resolve_async_compatibility replaces _validate_async_compatibility, row-count guards in callbacks

🧪 Testing

  • make test passes
  • Unit tests added/updated
  • E2E tests added/updated (if applicable)

All 48 async-related tests pass. 18 smoke tests pass. Ruff check and format clean on all changed files.

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)
  • Architecture docs updated (if applicable)

- Processor callback failures (pre-batch and post-batch) now raise
  DatasetGenerationError instead of silently dropping row groups
- Early shutdown and all error paths drain in-flight workers via a
  finally block in AsyncTaskScheduler.run()
- Pre-batch and post-batch processors that change row count in async
  mode raise immediately (strict_row_count guard)
- Partial completion logs a warning when actual < target records
- allow_resize=True auto-falls back to sync engine with a deprecation
  warning instead of raising, using a per-run _use_async flag
- Preview path mirrors the trace check from the full build path;
  PreviewResults exposes task_traces

Closes #462
- Prevent double-wrapping of DatasetGenerationError in scheduler callbacks
- Fix stacklevel in allow_resize DeprecationWarning to point at user code
- Update stale comment to reflect fail-fast behavior
- Rename misleading test and remove unused caplog fixture
- Add zero-warnings assertion for happy-path case
- Move warnings import to module level
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Apr 16, 2026

Greptile Summary

This PR hardens the async engine before it becomes the default execution path: pre- and post-batch processor failures now propagate as DatasetGenerationError (fail-fast) instead of silently dropping row groups, allow_resize=True auto-falls back to the sync engine with a DeprecationWarning, and a try/finally refactor in AsyncTaskScheduler.run() ensures workers are always drained even when DatasetGenerationError propagates (fixing a latent bug where only CancelledError triggered cleanup). The instance flag _use_async correctly threads the fallback decision into _run_cell_by_cell_generator, and new row-count guards in both pre- and post-batch callbacks reject unsupported transformations early.

Confidence Score: 5/5

Safe to merge — no P0/P1 issues found; all changes are logically correct and well-tested.

The try/finally refactor fixes a real latent bug (workers left dangling on DatasetGenerationError), fail-fast semantics are correctly propagated through all call sites, the instance flag correctly replaces the module-level constant in _run_cell_by_cell_generator, and all 8 changed files have appropriate test coverage. All remaining observations are P2 style/observability notes.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Core change: try/finally replaces duplicated cleanup logic; pre/post-batch failures now propagate as DatasetGenerationError. Semaphore release in the finally block is correct even on error path since the scheduler terminates immediately.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py _resolve_async_compatibility correctly returns bool for sync fallback; _use_async instance flag properly overrides module-level constant in _run_cell_by_cell_generator; row-count guard and partial-completion warning are well-placed.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py strict_row_count=True raises DatasetProcessingError (a sibling of DatasetGenerationError, not a subclass); the scheduler's except-chain correctly wraps it into DatasetGenerationError via the generic except clause.
packages/data-designer-config/src/data_designer/config/preview_results.py Clean addition of optional task_traces field; typed as list[Any]
packages/data-designer/src/data_designer/interface/data_designer.py task_traces forwarded to PreviewResults using or None to normalize an empty list to None; correct and intentional.
packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py New tests cover post-batch failure propagation and early-shutdown worker drain; pre-batch failure tests correctly updated from drop semantics to raise semantics.
packages/data-designer-engine/tests/engine/dataset_builders/test_async_builder_integration.py test_resolve_async_compatibility correctly validates fallback+warning on allow_resize; metadata write test is a reasonable proxy for partial-completion coverage.
packages/data-designer-engine/tests/engine/models/test_async_engine_switch.py Patch-based test correctly replaced with direct instance-flag manipulation; simpler and more accurate after the _use_async refactor.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[build / build_preview] --> B{_resolve_async_compatibility}
    B -->|allow_resize detected| C[DeprecationWarning + logger.warning\nreturn False]
    B -->|no allow_resize| D[return True]
    C --> E[_use_async = False\nSync path]
    D --> F[_use_async = True\nAsync path]

    F --> G[AsyncTaskScheduler.run]
    G --> H[try: _main_dispatch_loop]

    H --> I{pre-batch callback\n_run_seeds_complete_check}
    I -->|DatasetGenerationError| J[re-raise]
    I -->|other Exception| K[wrap → DatasetGenerationError\nre-raise]
    J --> L[finally: cancel admission\n+ asyncio.shield cancel_workers]
    K --> L

    H --> M{post-batch callback\n_checkpoint_completed_row_groups}
    M -->|DatasetGenerationError| N[re-raise]
    M -->|other Exception| O[wrap → DatasetGenerationError\nre-raise]
    N --> L
    O --> L

    H --> P[Normal exit]
    P --> Q[finally: cancel admission\n+ drain workers]
    Q --> R[reporter.log_final\n_rg_states diagnostic]

    L --> S[DatasetGenerationError\npropagates to caller]

    E --> T[_run_cell_by_cell_generator\nuses self._use_async]
    F --> T
Loading

Reviews (1): Last reviewed commit: "fix: address review findings for async e..." | Re-trigger Greptile

@github-actions
Copy link
Copy Markdown
Contributor

Code Review: PR #553 — chore: async engine readiness - blockers and polish before default

Summary

This PR hardens the async engine for production readiness before it becomes the default execution path. The changes fall into three categories:

  1. Fail-fast error semantics — Processor callback failures (pre-batch and post-batch) now propagate as DatasetGenerationError instead of silently dropping row groups.
  2. Graceful allow_resize fallbackallow_resize=True triggers a sync fallback with DeprecationWarning instead of raising an error.
  3. Cleanup reliabilitytry/finally refactor in AsyncTaskScheduler.run() ensures workers are always drained, eliminating duplicated cleanup logic.

The PR also adds row-count guards, partial-completion warnings, task_traces on PreviewResults, and comprehensive tests for the new behaviors.

Files changed: 8 (245 additions, 103 deletions)

Findings

High Severity

None found.

Medium Severity

1. DatasetProcessingError from strict_row_count gets wrapped into DatasetGenerationError — intentional but may lose specificity

In processor_runner.py:87, the strict row-count check raises DatasetProcessingError. Since DatasetProcessingError and DatasetGenerationError are sibling classes (both extend DataDesignerError), the scheduler's except Exception as exc handler in _run_seeds_complete_check will wrap it into a DatasetGenerationError. This means callers cannot distinguish a row-count violation from other generation failures by exception type.

The original DatasetProcessingError is preserved via __cause__, so it can be introspected, but the top-level type changes. If this is intentional (normalizing errors at the scheduler boundary), a brief comment explaining this wrapping policy would help future maintainers.

  • async_scheduler.py:541-543 — wrapping site
  • processor_runner.py:87-90 — originating raise

Low Severity

2. task_traces typed as list[Any] in config package

In preview_results.py:26, task_traces uses list[Any] rather than a more specific type. This is understandable since importing TaskTrace from the engine package would violate the dependency direction (config cannot depend on engine). Consider adding a brief comment noting this is intentional (e.g., # Typed as Any to avoid config -> engine import), or define a protocol/TypeAlias in the config package if the trace shape stabilizes.

  • packages/data-designer-config/src/data_designer/config/preview_results.py:26

3. Semaphore release skipped for remaining row groups on fail-fast

In _checkpoint_completed_row_groups (line 498-522), when DatasetGenerationError propagates out of the for rg_id, rg_size in completed: loop, the semaphore is released for the failing row group (via finally), but remaining row groups in the completed list won't have their semaphores released. Since the scheduler is shutting down and no one reuses it, this has no practical impact. However, if the scheduler were ever made reusable, this could become a semaphore leak. No action needed now — just documenting for awareness.

  • async_scheduler.py:498-522

4. stacklevel=4 in DeprecationWarning is fragile

The stacklevel=4 in _resolve_async_compatibility (line 279) correctly points at user code through the chain: _resolve_async_compatibilitybuild()/build_preview()DataDesigner.build()/preview() → user code. However, if an intermediate layer is added (e.g., a decorator or wrapper method), the warning will point at the wrong frame. Consider adding a comment documenting the expected call chain so future editors know to update the stacklevel.

  • dataset_builder.py:279

Positive Observations

5. Clean try/finally refactoring in run()

The consolidation of admission cancellation and worker drain into a single finally block (lines 272-280) is well done. It eliminates the duplicated cleanup between the happy path and CancelledError path, and using asyncio.shield to protect the worker cancellation is correct practice.

6. Double-wrap prevention for DatasetGenerationError

The second commit adds except DatasetGenerationError: raise before the generic except Exception handler in both _checkpoint_completed_row_groups (line 503-504) and _run_seeds_complete_check (line 539-540). This ensures that when the on_before_checkpoint callback raises DatasetGenerationError directly (e.g., the row-count guard in dataset_builder.py:403-407), it isn't re-wrapped. Good defensive fix.

7. Instance flag _use_async instead of re-reading module global

Replacing the module-level DATA_DESIGNER_ASYNC_ENGINE check in _run_cell_by_cell_generator with the instance flag _use_async (line 537) ensures the sync fallback decision from _resolve_async_compatibility propagates correctly to all downstream code paths. The tests were also cleanly updated to test the instance flag directly rather than patching the module global.

8. Solid test coverage

The new tests (test_scheduler_post_batch_failure_raises, test_early_shutdown_drains_workers, test_scheduler_pre_batch_failure_propagates_across_row_groups) cover the critical behavioral changes well. The existing tests were correctly updated from the old "drop and continue" assertions to "fail-fast" assertions.

Verdict

APPROVE — This is a well-structured hardening PR with clear fail-fast semantics, proper cleanup guarantees, and good test coverage. The medium finding (error type wrapping) is a design-level note worth discussing but not blocking. The low-severity items are minor housekeeping suggestions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

chore: async engine readiness - blockers and polish before default

1 participant