Skip to content

feat: add AsyncTaskScheduler and RowGroupBufferManager for async engine#404

Open
andreatgretel wants to merge 8 commits intomainfrom
andreatgretel/feat/async-scheduler-buffer
Open

feat: add AsyncTaskScheduler and RowGroupBufferManager for async engine#404
andreatgretel wants to merge 8 commits intomainfrom
andreatgretel/feat/async-scheduler-buffer

Conversation

@andreatgretel
Copy link
Contributor

@andreatgretel andreatgretel commented Mar 12, 2026

Summary

PR 3 of 4 in the async engine migration plan. Adds the AsyncTaskScheduler and RowGroupBufferManager - the core orchestration layer that replaces sequential column-by-column processing with parallel, dependency-aware task dispatch.

Changes

Added

  • AsyncTaskScheduler - Dependency-aware async task scheduler with:
    • Row-group admission via semaphore-based concurrency control
    • Multi-column dedup for generators that produce multiple output columns
    • Stateful generator serialization via per-instance asyncio locks
    • Retryable failure deferral with configurable salvage rounds
    • Post-salvage error logging for unfinished row groups
    • Optional task tracing for debugging/profiling
  • RowGroupBufferManager - Per-row-group buffer with cell-level writes, batch updates, row dropping, checkpoint-to-parquet with full memory cleanup, and size-mismatch validation on update_batch
  • 13 async scheduler tests covering: seed dispatch ordering, buffer integration, multiple row groups, non-retryable failure row drops, stateful serialization, bounded submission, tracing, three-column pipelines, retryable salvage recovery, and eager row-drop propagation to downstream columns
  • 9 buffer manager tests covering: init, cell/batch writes, DataFrame exclusion of dropped rows, concurrent row groups, checkpoint with memory cleanup, and on_complete callbacks

Changed

  • CompletionTracker.get_ready_tasks - Added admitted_rgs parameter to filter tasks by admitted row groups
  • CompletionTracker._seed_frontier renamed to public seed_frontier() - no longer auto-called from with_graph; root dispatch moved to the scheduler's _dispatch_seeds (handles stateful locks and multi-column dedup). seed_frontier() remains available for static introspection (capacity planning, task enumeration)
  • Updated completion tracker tests to cover both empty-frontier default and explicit seed_frontier() behavior

Fixed (from Greptile review)

  • Non-retryable from_scratch/batch failure now drops all rows in the row group (previously left the row group stuck)
  • Salvage rounds re-dispatch from_scratch tasks directly instead of relying on the frontier (which never contains them)
  • _row_group_sizes freed on checkpoint (minor memory leak)
  • Row-count mismatch warning in _run_batch writeback

Lines breakdown

Category Added Removed Net
Core libraries 560 27 +533
Tests 611 6 +605
Plan updates 56 31 +25
Total 1,212 46 +1,166

Attention Areas

Reviewers: Please pay special attention to the following:

  • async_scheduler.py - Core scheduling logic: admission loop, dispatch loop, salvage rounds (including from_scratch re-dispatch), and task execution paths (from_scratch, cell, batch)
  • completion_tracker.py - seed_frontier() is now public and opt-in; get_ready_tasks has new admitted_rgs parameter
  • row_group_buffer.py - update_batch size validation, checkpoint memory cleanup

Description updated with AI

@andreatgretel andreatgretel requested a review from a team as a code owner March 12, 2026 21:25
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 12, 2026

Greptile Summary

This PR adds the AsyncTaskScheduler and RowGroupBufferManager — the core orchestration layer for parallel, dependency-aware column generation — along with comprehensive test suites for both components and an updated CompletionTracker (seed_frontier() made public/opt-in, get_ready_tasks gains admitted_rgs filter).

All issues flagged in the previous Greptile review round (non-retryable batch drops, salvage from_scratch re-dispatch, stateful lock acquisition in salvage, _drain_frontier downstream propagation, post-salvage checkpoint sweep, early-exit logger.error, update_batch size validation, _row_group_sizes memory leak, unconditional on_complete call) are confirmed resolved.

New findings:

  • Missing batch_alias re-registration in salvage from_scratch dispatch (async_scheduler.py:171): _dispatch_seeds adds both the from_scratch task and its batch alias to _dispatched to prevent the frontier from generating a duplicate. The salvage retry re-adds only the from_scratch task but omits the batch_alias. Seeds are not in the frontier during normal scheduler operation (since seed_frontier() is not auto-called), so this does not produce a bug today — but the omission is an inconsistency that could silently cause double-dispatch if the usage pattern changes.
  • _dispatched set grows unboundedly (async_scheduler.py:333): Completed tasks are never pruned from _dispatched. For long runs with many row groups, this accumulates all historical task objects. Since completed tasks are already removed from the frontier, their entries in _dispatched are redundant and could be discarded from _in_flight's finally block.
  • Missing buffer test: on_complete(None) for all-dropped row group (test_row_group_buffer.py:108): The fix to call on_complete unconditionally is implemented correctly, but there is no test covering the case where all rows are dropped and the callback receives None.

Confidence Score: 4/5

  • Safe to merge with the minor batch_alias inconsistency understood and accepted as a latent risk.
  • All previously-identified bugs are confirmed fixed. The remaining findings are a latent inconsistency in the salvage dispatch path (not triggerable under current usage), a minor memory growth concern, and a missing test case. The core scheduling logic — admission, dispatch, stateful serialization, salvage, and checkpoint — is structurally sound and well-tested across 22 new tests.
  • Pay closest attention to async_scheduler.py — specifically the salvage dispatch block (lines 150–174) for the missing batch_alias re-registration.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py New core scheduler: admission loop, seed dispatch, main dispatch/drain loop, salvage rounds, and checkpoint logic are all well-structured. All previously-flagged bugs (non-retryable batch drop, salvage from_scratch re-dispatch, stateful lock in salvage, _drain_frontier loop, post-salvage checkpoint sweep, early-exit logger.error) confirmed fixed. Two new findings: missing batch_alias re-registration in salvage dispatch (logic inconsistency) and unbounded growth of _dispatched set (style/memory).
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/completion_tracker.py seed_frontier() is now correctly public and opt-in; get_ready_tasks admitted_rgs filter is clean; _enqueue_downstream and _reevaluate_batch_tasks logic look correct. No new issues found.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py update_batch size validation (ValueError on mismatch), checkpoint memory cleanup (_row_group_sizes.pop), and on_complete called unconditionally (with None for empty df) are all correctly implemented. No new issues found.
packages/data-designer-engine/tests/engine/dataset_builders/utils/test_row_group_buffer.py 9 tests covering init, cell/batch writes, DataFrame exclusion, concurrent row groups, checkpoint memory cleanup, and on_complete callback. Missing a test for on_complete(None) when all rows are dropped.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant Scheduler as AsyncTaskScheduler
    participant Admission as _admit_row_groups
    participant Tracker as CompletionTracker
    participant Buffer as RowGroupBufferManager

    Caller->>Scheduler: run()
    Scheduler->>Admission: create_task(_admit_row_groups)

    loop For each row group
        Admission->>Admission: _rg_semaphore.acquire()
        Admission->>Buffer: init_row_group(rg_id, rg_size)
        Admission->>Scheduler: _dispatch_seeds(rg_id)
        Scheduler->>Scheduler: acquire stateful_lock + submission_semaphore
        Scheduler->>Scheduler: create_task(_execute_seed_task)
        Scheduler->>Admission: wake_event.set()
    end
    Admission->>Scheduler: _all_rgs_admitted = True

    loop Main dispatch loop
        Scheduler->>Tracker: get_ready_tasks(dispatched, admitted_rgs)
        Tracker-->>Scheduler: [ready tasks]
        Scheduler->>Scheduler: acquire submission_semaphore per task
        Scheduler->>Scheduler: create_task(_execute_task)
        Scheduler->>Scheduler: _checkpoint_completed_row_groups()
        Scheduler->>Buffer: checkpoint_row_group(rg_id)
        Scheduler->>Scheduler: _rg_semaphore.release()
        Scheduler->>Caller: on_row_group_complete(rg_id)
    end

    alt Retryable failures exist
        loop Salvage rounds (max salvage_max_rounds)
            Scheduler->>Scheduler: re-dispatch from_scratch tasks directly
            Scheduler->>Scheduler: _drain_frontier() [loop until quiescent]
            Scheduler->>Tracker: get_ready_tasks(dispatched, admitted_rgs)
            Tracker-->>Scheduler: [ready tasks]
            Scheduler->>Scheduler: _checkpoint_completed_row_groups()
        end
    end

    alt Unfinished row groups remain
        Scheduler->>Scheduler: logger.error(incomplete row groups)
    end
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 171-174

Comment:
**Missing `batch_alias` re-registration in salvage dispatch**

In `_dispatch_seeds`, both the primary `from_scratch` task and its `batch_alias` are added to `_dispatched` together to prevent `get_ready_tasks` from generating a duplicate for the seed column:

```python
self._dispatched.add(task)
self._dispatched.add(batch_alias)   # prevents frontier from re-dispatching this column
```

In the salvage loop, after clearing the aliases, only the `from_scratch` task is re-added — the `batch_alias` is not:

```python
self._dispatched.add(task)          # re-added
# batch_alias is NOT re-added here
self._in_flight.add(task)
asyncio.create_task(self._execute_seed_task(task, gid))
```

Because `seed_frontier()` is not auto-called, seed columns do not appear in the frontier under normal scheduler operation, so this omission does not cause an observable bug today. However, if `_drain_frontier` ever encounters a stale `batch` task for this column in the frontier (e.g., from a caller that also invokes `seed_frontier()` for capacity planning), the batch alias guard would be absent and the task could be double-dispatched — bypassing the stateful lock.

Mirroring `_dispatch_seeds` defensively:
```suggestion
                    await self._submission_semaphore.acquire()
                    self._dispatched.add(task)
                    self._dispatched.add(
                        Task(column=task.column, row_group=task.row_group, row_index=None, task_type="batch")
                    )
                    self._in_flight.add(task)
                    asyncio.create_task(self._execute_seed_task(task, gid))
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py
Line: 333-334

Comment:
**`_dispatched` set grows unboundedly**

Completed tasks are never pruned from `_dispatched`. The set accumulates every task dispatched across the entire run — seeds, cells, and batch tasks for every row group and column — and is only read (never cleared) after tasks finish. For a large job with many row groups and columns, this can become a non-trivial memory footprint.

Because completed tasks are already removed from `_frontier` by `mark_cell_complete` / `mark_row_range_complete`, the `t not in dispatched` guard in `get_ready_tasks` is redundant for them. The set could be pruned by discarding a task once it transitions from `_in_flight` to done, keeping it sized to the active working set rather than the full historical set:

```python
# In _execute_task_inner finally block, after discarding from _in_flight:
self._in_flight.discard(task)
self._dispatched.discard(task)   # reclaim memory for completed tasks
```

Note: do not discard inside the retryable error branch (the task must stay in `_dispatched` until the salvage loop explicitly clears it).

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: packages/data-designer-engine/tests/engine/dataset_builders/utils/test_row_group_buffer.py
Line: 108-121

Comment:
**Missing test: `on_complete` called with `None` when all rows are dropped**

`checkpoint_row_group` now calls `on_complete(final_path)` unconditionally, where `final_path` is `None` when the row group has no records (all rows dropped). The existing `test_checkpoint_calls_on_complete` only covers the non-empty case, leaving the all-dropped code path untested.

A minimal addition would be:

```python
def test_checkpoint_calls_on_complete_when_all_rows_dropped() -> None:
    storage = _mock_artifact_storage()
    callback = Mock()

    mgr = RowGroupBufferManager(storage)
    mgr.init_row_group(0, 2)
    mgr.drop_row(0, 0)
    mgr.drop_row(0, 1)

    mgr.checkpoint_row_group(0, on_complete=callback)

    # on_complete must still fire, with None path since nothing was written
    callback.assert_called_once_with(None)
    storage.write_batch_to_parquet_file.assert_not_called()
```

This is the code path exercised when a non-retryable `from_scratch` failure drops every row in a row group, and callers using `on_complete` to gate downstream work need to know it fires correctly even for empty row groups.

How can I resolve this? If you propose a fix, please make it concise.

Last reviewed commit: 2b65c45

andreatgretel added a commit that referenced this pull request Mar 12, 2026
requests 2.32.5 asserts chardet<6 at import time, but sqlfluff and
diff_cover pull in chardet without an upper bound, resolving to 7.1.0
on fresh installs. Add a workspace constraint to cap chardet until a
new requests release ships the fix from psf/requests#7220.

Closes #404
@andreatgretel andreatgretel reopened this Mar 12, 2026
Add two missing test cases for AsyncTaskScheduler:
- Transient 503 failure deferred to salvage round and recovered
- Downstream column never dispatched for rows dropped by upstream failure

Update plan checkboxes to reflect PR 3 completion status.
- Non-retryable batch/from_scratch failure now drops all rows in the
  row group so it reaches a terminal state and gets checkpointed
- Salvage rounds re-dispatch from_scratch tasks directly instead of
  relying on the frontier (which never contains them)
- Log error if scheduler exits with unfinished row groups
- update_batch raises ValueError on size mismatch
- Free _row_group_sizes entry on checkpoint
- Add row-count mismatch warning in _run_batch writeback
- Document intentional stateful lock ordering in _dispatch_seeds
andreatgretel and others added 3 commits March 12, 2026 19:13
…Tracker

Keep the tracker self-contained for static introspection (capacity
planning, task enumeration) without requiring a scheduler. The scheduler
does not call it - it manages root dispatch directly for stateful locks
and multi-column dedup.
…nt sweep

- Acquire stateful lock before re-dispatching from_scratch tasks in
  salvage (prevents RuntimeError on lock release)
- Replace single-pass salvage drain with _drain_frontier() that
  re-checks the frontier after each task completes (ensures downstream
  tasks enqueued during retry are dispatched)
- Add post-salvage checkpoint sweep via _checkpoint_completed_row_groups
  (row groups completed during salvage are now written to parquet)
- Extract _checkpoint_completed_row_groups helper, reused by both the
  main loop and post-salvage sweep
…mplete for empty groups

- Move _checkpoint_completed_row_groups inside the salvage loop so
  completed row groups are flushed promptly between rounds
- Simplify _deferred from list[tuple[Task, int]] to list[Task] since
  the attempt count was unused (salvage_max_rounds bounds retries)
- Fire on_complete(None) when all rows in a row group are dropped so
  callers always receive a completion signal
Copy link
Contributor

@nabinchha nabinchha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Findings

Critical — Must fix before merge

async_scheduler.py:425-432_is_retryable re-implements classification the model layer already provides

  • What: Retryability is determined by substring-matching str(exc) against "429", "500", "502", "503", "504". Every code is vulnerable to false positives: "429" matches "4290 records", "502" matches "port 5023", "503" matches "5030 bytes", "504" matches "50400 tokens", etc.
  • Why: The model layer already converts raw litellm exceptions into typed ModelXxxError subclasses (via @acatch_llm_exceptions on ModelFacade). By the time exceptions reach the scheduler, they're already ModelRateLimitError, ModelTimeoutError, ModelInternalServerError, ModelAPIConnectionError, etc. The string parsing is re-doing classification that was already done upstream, and doing it less accurately.
  • Suggestion: Replace string matching with isinstance checks against the existing typed exceptions:
from data_designer.engine.models.errors import (
    ModelAPIConnectionError,
    ModelInternalServerError,
    ModelRateLimitError,
    ModelTimeoutError,
)

_RETRYABLE_MODEL_ERRORS = (
    ModelRateLimitError,      # 429
    ModelTimeoutError,        # timeout
    ModelInternalServerError, # 500
    ModelAPIConnectionError,  # network failures
)

@staticmethod
def _is_retryable(exc: Exception) -> bool:
    return isinstance(exc, _RETRYABLE_MODEL_ERRORS)

This is precise, has no false positives, and is explicit about which errors are retried. Non-LLM generators (samplers, expressions) throw errors that aren't ModelXxxError — those correctly fall through as non-retryable since sampler/expression failures are almost always logic bugs.

Warnings — Strongly recommend fixing

async_scheduler.py:210-216 — Missing try/finally around checkpoint + semaphore release

  • What: In _checkpoint_completed_row_groups, if checkpoint_row_group() or on_row_group_complete() raises, the _rg_semaphore.release() on line 216 is never reached.
  • Why: An un-released semaphore slot means _admit_row_groups blocks forever on the next acquire(), deadlocking the scheduler with no visible error.
  • Suggestion: Wrap in try/finally:
for rg_id, rg_size in completed:
    self._active_rgs.remove((rg_id, rg_size))
    try:
        if self._buffer_manager is not None:
            self._buffer_manager.checkpoint_row_group(rg_id)
        if self._on_row_group_complete:
            self._on_row_group_complete(rg_id)
    finally:
        self._rg_semaphore.release()

async_scheduler.py:359assert used for runtime validation

  • What: assert task.row_index is not None in _run_cell can be silently stripped by python -O.
  • Why: If this ever fires in production (e.g. a scheduler bug dispatches a batch task to _run_cell), it would proceed with row_index=None and crash with a confusing TypeError deeper in the stack.
  • Suggestion: Replace with an explicit check:
if task.row_index is None:
    raise ValueError(f"Cell task requires a row_index, got None for column '{task.column}'")

async_scheduler.py:340 and row_group_buffer.py:86 — Imports inside method bodies

  • What: FromScratchColumnGenerator is imported inside _run_from_scratch, and BatchStage is imported inside checkpoint_row_group.
  • Why: AGENTS.md mandates module-level imports. If these cause circular imports at module level, they should go in a TYPE_CHECKING block (for type-only use) or the architecture should be adjusted.
  • Suggestion: For async_scheduler.py, FromScratchColumnGenerator is already in the TYPE_CHECKING block for type hints — but it's also used in an isinstance check at runtime, so it genuinely needs a runtime import. This is a valid exception; add a comment explaining why (e.g. # Runtime import: needed for isinstance check; module-level would cause circular import). Same pattern for BatchStage.

row_group_buffer.py:80 — Unparameterized Callable type

  • What: on_complete: Callable | None = None doesn't specify the callback's signature.
  • Why: Callers and static analysis can't verify they're passing the right callback shape. The callback receives str | None (line 100 passes final_path which is None when the df is empty).
  • Suggestion: on_complete: Callable[[str | None], None] | None = None

async_scheduler.py:131-132 — Early exit condition may skip deferred tasks

  • What: The second break condition (all_rgs_admitted and not ready and not self._in_flight) fires if no ready tasks exist and nothing is in-flight, but deferred tasks may still be waiting for salvage.
  • Why: This is intentional (salvage runs after the main loop), but the two break conditions on lines 127-132 are partially redundant and make the exit semantics unclear. The first condition (all_done) already covers not self._in_flight via not self._active_rgs.
  • Suggestion: Add a brief comment explaining the two exit paths, or simplify to a single condition. The second break seems to cover the case where all active RGs had their non-deferred tasks finish but the RGs themselves aren't "complete" yet (because deferred tasks remain). Clarify this with a comment.

Suggestions — Consider improving

async_scheduler.py:211 — O(n) list removal in _checkpoint_completed_row_groups

  • What: self._active_rgs.remove((rg_id, rg_size)) is a linear scan per removal.
  • Why: With many row groups this becomes O(n^2). Unlikely to matter in practice (row groups are bounded by max_concurrent_row_groups), but could use a set or dict for O(1) removal.
  • Suggestion: Low priority given the small concurrent RG count, but worth noting for future scaling.

test_row_group_buffer.py:104 — Test asserts on private _buffers

  • What: assert 0 not in mgr._buffers reaches into the private buffer dict.
  • Why: Per AGENTS.md, tests should exercise public APIs. A KeyError from get_row(0, 0) after checkpoint would verify the same behavior through the public interface.
  • Suggestion: Replace with:
with pytest.raises(KeyError):
    mgr.get_row(0, 0)

async_scheduler.py:61-63id(gen) for instance identity may collide after GC

  • What: Generator identity is tracked via id(gen), but Python can reuse id() values for objects that have been garbage collected.
  • Why: In this case all generators are held in self._generators for the scheduler's lifetime, so they can't be GC'd. Safe as-is, but fragile if the pattern is copied elsewhere.
  • Suggestion: No code change needed — just a note for future maintainers. A comment on line 63 like # id() is stable because generators are held in self._generators would help.

row_group_buffer.py:99-100on_complete called with None when df is empty

  • What: When all rows are dropped and len(df) == 0, final_path stays None and on_complete(None) is called.
  • Why: Callers may not expect a None argument. This case isn't covered by tests.
  • Suggestion: Either skip the callback when there's no file to report, or add a test for the empty-df case.

What Looks Good

  • Thorough test coverage: 13 async scheduler tests + 9 buffer tests + expanded completion tracker tests cover the key scenarios: multi-RG, salvage recovery, eager row drops, stateful serialization, bounded submission, and tracing. The test_scheduler_eager_row_drop_skips_downstream_of_failed_column test is particularly well designed.
  • Clean separation of concerns: The scheduler orchestrates, the buffer manages memory, and the completion tracker handles dependency resolution. Each has a clear single responsibility with well-defined interfaces.
  • Defensive salvage design: The fix for non-retryable from_scratch/batch failures (dropping all rows in the RG) and the direct re-dispatch of from_scratch tasks in salvage rounds are both solid. The Greptile-review fixes are well addressed.

Verdict

Needs changes_is_retryable should use the typed exception hierarchy (ModelRateLimitError, etc.) that the model layer already provides, rather than string-matching on exception messages. The missing try/finally around semaphore release could cause deadlocks under failure conditions. The other warnings are lower risk but worth addressing in this PR since the code is new.

Next steps

  • Replace _is_retryable string matching with isinstance checks against ModelXxxError types
  • Add try/finally guard around checkpoint/callback to protect semaphore release
  • Replace assert with explicit ValueError
  • Add tests for _is_retryable covering both retryable model errors and non-retryable generator errors

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants