feat: add workflow chaining#636
Conversation
d8327e0 to
4950ed5
Compare
Review: PR #636 — feat: add workflow chainingSummaryThis PR introduces a substantial new feature: composite workflows for chaining multi-stage Data Designer runs with disk handoffs, plus several supporting changes. Concretely:
Changes are scoped within the documented dependency direction (interface → engine → config). No reverse imports introduced. FindingsCode correctness
Tests
Project conventions
Performance
Security / risk
Backward compatibility
VerdictApprove with minor follow-ups. The implementation is well-structured, tests are extensive and exercise both the mocked-fast and real-engine paths, and the architecture respects the project's layering invariants. The biggest opportunities for follow-up are:
None of these block the merge — they'd be reasonable cleanup commits or a follow-up PR. |
Greptile SummaryThis PR introduces experimental workflow chaining for Data Designer, allowing multi-stage generation pipelines where each stage hands off its output on disk to the next. The implementation spans a new
|
| Filename | Overview |
|---|---|
| packages/data-designer/src/data_designer/interface/composite_workflow.py | New file implementing the core workflow chaining API; logic for stage execution, output_processors, output selection, and fingerprinting is sound, but _validate_distinct_output_processors does not guard against duplicate names within the output_processors list itself. |
| packages/data-designer/src/data_designer/interface/data_designer.py | Adds acreate() (asyncio.to_thread wrapper), compose_workflow(), shared throttle_manager propagation into create_resource_provider, and set_run_config throttle rebuild — all straightforward and correct. |
| packages/data-designer-engine/src/data_designer/engine/validation.py | validate_columns_not_all_dropped now counts non-dropped seed columns toward remaining_cols when processor_configs are present, enabling seeded processor-only configs; the logic correctly handles the empty-processor-list case via falsy check. |
| packages/data-designer-engine/src/data_designer/engine/resources/seed_reader.py | Adds shallow copy of the seed reader before attach() to prevent concurrent creates from sharing mutable top-level state; constraint that subclasses must not hold nested mutable state is documented in the comment. |
| packages/data-designer-config/src/data_designer/config/data_designer_config.py | Removes min_length=1 from columns field, allowing empty-column configs for seeded processor-only stages; boundary validation in the engine enforces the invariant instead. |
| packages/data-designer/tests/interface/test_composite_workflow.py | Comprehensive test coverage for stage chaining, callbacks, empty upstream, output_processors, processor output selection, async create, and validation rejections; no test for duplicates within the output_processors list. |
Sequence Diagram
sequenceDiagram
participant User
participant CW as CompositeWorkflow
participant DD as DataDesigner
User->>CW: workflow.add_stage(name, builder, output_processors, output)
Note over CW: Validate output, distinct names, dir names
User->>CW: workflow.run()
loop For each stage
CW->>CW: Clone config_builder, inject upstream seed path
CW->>DD: "create(stage_builder, artifact_path=workflow_path)"
DD-->>CW: DatasetCreationResults (result)
CW->>CW: "actual_records = result.count_records()"
alt output_processors present
CW->>DD: "create(output_processor_builder, num_records=actual_records)"
DD-->>CW: DatasetCreationResults (output_result)
CW->>CW: _select_output_result() to get output_source_result
end
alt on_success callback set
CW->>CW: "output_seed_path = on_success(result.base_dataset_path)"
else
CW->>CW: "output_seed_path = _resolve_stage_output_path(output_source_result, output)"
end
CW->>CW: "stage_results[name] = output_result"
CW->>CW: "stage_output_paths[name] = output_seed_path"
CW->>CW: _write_workflow_metadata(workflow_path, metadata)
end
CW-->>User: CompositeWorkflowResults
User->>CW: results.load_stage_output(name) reads from stage_output_paths
User->>CW: results[name].load_dataset() reads from output_result.artifact_storage
Prompt To Fix All With AI
Fix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
packages/data-designer/src/data_designer/interface/composite_workflow.py:557-565
`_validate_distinct_output_processors` checks for name collisions between the stage's inline processors and `output_processors`, but it does not check for duplicate names *within* `output_processors` itself. If two entries share the same name (e.g., two `DropColumnsProcessorConfig(name="drop")` instances), both would write to the same `processors_outputs_path / "drop"` directory, with the second run silently overwriting the first. Any call to `load_stage_output()` or `load_processor_dataset()` referencing that name would then return the output of whichever processor ran last, not the one the author intended.
```suggestion
def _validate_distinct_output_processors(
config_builder: DataDesignerConfigBuilder,
output_processors: list[ProcessorConfig],
) -> None:
stage_processor_names = {processor.name for processor in config_builder.get_processor_configs()}
duplicate_names = stage_processor_names.intersection(processor.name for processor in output_processors)
if duplicate_names:
names = ", ".join(sorted(duplicate_names))
raise DataDesignerWorkflowError(f"Output processor names must be distinct from stage processor names: {names}.")
output_processor_names = [processor.name for processor in output_processors]
seen: set[str] = set()
duplicates_within = {name for name in output_processor_names if name in seen or seen.add(name)} # type: ignore[func-returns-value]
if duplicates_within:
names = ", ".join(sorted(duplicates_within))
raise DataDesignerWorkflowError(f"Output processor names must be unique within output_processors: {names}.")
```
Reviews (7): Last reviewed commit: "fix: address workflow review nits" | Re-trigger Greptile
Signed-off-by: Andre Manoel <amanoel@nvidia.com>
Signed-off-by: Andre Manoel <amanoel@nvidia.com>
|
Thanks for putting this together, @andreatgretel — a lot of moving parts here (compose API, async, throttle reuse, processor-only configs, output selection) and the design coheres pretty well. SummaryAdds FindingsWarnings — Worth addressing
Suggestions — Take it or leave it
What Looks Good
VerdictNeeds changes — no Critical issues, and the two prior review points are resolved, but the Warnings are worth addressing before merge:
Suggestions are take-it-or-leave-it, though the This review was generated by an AI assistant. |
Signed-off-by: Andre Manoel <amanoel@nvidia.com>
|
Thanks Nabin, this was super helpful. I pushed
I also updated the PR description and reran the targeted suites plus smoke tests, including the real NVIDIA provider smoke. Latest smoke rerun: review smoke |
|
Followup on New observations on
|
|
Thanks for putting this together, @andreatgretel — this is a substantial, well-tested addition and I really enjoyed the read. SummaryAdds FindingsWarnings — Worth addressing
Suggestions — Take it or leave it
What Looks Good
VerdictShip it (with nits) — only the missing docstrings, the experimental-status surfacing, and the private-attribute access rise above optional polish, and all of them are quick follow-ups. None of them block the feature behavior, which already has strong test coverage and clean error handling. This review was generated by an AI assistant. |
nabinchha
left a comment
There was a problem hiding this comment.
Approving — see #636 (comment) for the full review. Nits called out there are non-blocking.
…kflow-chaining-greptile # Conflicts: # fern/versions/latest.yml
|
MkDocs preview: https://837c4c0f.dd-docs-preview.pages.dev Fern preview: https://nvidia-preview-pr-636.docs.buildwithfern.com/nemo/datadesigner
|
| def _validate_distinct_output_processors( | ||
| config_builder: DataDesignerConfigBuilder, | ||
| output_processors: list[ProcessorConfig], | ||
| ) -> None: | ||
| stage_processor_names = {processor.name for processor in config_builder.get_processor_configs()} | ||
| duplicate_names = stage_processor_names.intersection(processor.name for processor in output_processors) | ||
| if duplicate_names: | ||
| names = ", ".join(sorted(duplicate_names)) | ||
| raise DataDesignerWorkflowError(f"Output processor names must be distinct from stage processor names: {names}.") |
There was a problem hiding this comment.
_validate_distinct_output_processors checks for name collisions between the stage's inline processors and output_processors, but it does not check for duplicate names within output_processors itself. If two entries share the same name (e.g., two DropColumnsProcessorConfig(name="drop") instances), both would write to the same processors_outputs_path / "drop" directory, with the second run silently overwriting the first. Any call to load_stage_output() or load_processor_dataset() referencing that name would then return the output of whichever processor ran last, not the one the author intended.
| def _validate_distinct_output_processors( | |
| config_builder: DataDesignerConfigBuilder, | |
| output_processors: list[ProcessorConfig], | |
| ) -> None: | |
| stage_processor_names = {processor.name for processor in config_builder.get_processor_configs()} | |
| duplicate_names = stage_processor_names.intersection(processor.name for processor in output_processors) | |
| if duplicate_names: | |
| names = ", ".join(sorted(duplicate_names)) | |
| raise DataDesignerWorkflowError(f"Output processor names must be distinct from stage processor names: {names}.") | |
| def _validate_distinct_output_processors( | |
| config_builder: DataDesignerConfigBuilder, | |
| output_processors: list[ProcessorConfig], | |
| ) -> None: | |
| stage_processor_names = {processor.name for processor in config_builder.get_processor_configs()} | |
| duplicate_names = stage_processor_names.intersection(processor.name for processor in output_processors) | |
| if duplicate_names: | |
| names = ", ".join(sorted(duplicate_names)) | |
| raise DataDesignerWorkflowError(f"Output processor names must be distinct from stage processor names: {names}.") | |
| output_processor_names = [processor.name for processor in output_processors] | |
| seen: set[str] = set() | |
| duplicates_within = {name for name in output_processor_names if name in seen or seen.add(name)} # type: ignore[func-returns-value] | |
| if duplicates_within: | |
| names = ", ".join(sorted(duplicates_within)) | |
| raise DataDesignerWorkflowError(f"Output processor names must be unique within output_processors: {names}.") |
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer/src/data_designer/interface/composite_workflow.py
Line: 557-565
Comment:
`_validate_distinct_output_processors` checks for name collisions between the stage's inline processors and `output_processors`, but it does not check for duplicate names *within* `output_processors` itself. If two entries share the same name (e.g., two `DropColumnsProcessorConfig(name="drop")` instances), both would write to the same `processors_outputs_path / "drop"` directory, with the second run silently overwriting the first. Any call to `load_stage_output()` or `load_processor_dataset()` referencing that name would then return the output of whichever processor ran last, not the one the author intended.
```suggestion
def _validate_distinct_output_processors(
config_builder: DataDesignerConfigBuilder,
output_processors: list[ProcessorConfig],
) -> None:
stage_processor_names = {processor.name for processor in config_builder.get_processor_configs()}
duplicate_names = stage_processor_names.intersection(processor.name for processor in output_processors)
if duplicate_names:
names = ", ".join(sorted(duplicate_names))
raise DataDesignerWorkflowError(f"Output processor names must be distinct from stage processor names: {names}.")
output_processor_names = [processor.name for processor in output_processors]
seen: set[str] = set()
duplicates_within = {name for name in output_processor_names if name in seen or seen.add(name)} # type: ignore[func-returns-value]
if duplicates_within:
names = ", ".join(sorted(duplicates_within))
raise DataDesignerWorkflowError(f"Output processor names must be unique within output_processors: {names}.")
```
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Thanks, agreed this is worth tracking as a follow-up. Created #675 for the duplicate output_processors validation gap.
📋 Summary
Adds workflow chaining so users can split a generation pipeline into explicit Data Designer stages, hand results between stages on disk, and keep shared runtime state such as throttling on one parent
DataDesigner. The latest slice also makes processor-expressible stage transforms first class through seeded processor-only configs, stageoutput_processors,output="processor:<name>", and selected-output export.🔗 Related Issue
Related to #552
🗺️ Plan Progress
Tracking against
plans/workflow-chaining/workflow-chaining.md:✅ Done in this PR
CompositeWorkflow,DataDesigner.compose_workflow(), linear disk handoff throughLocalFileSeedSource, deterministic stage artifact directories, workflow metadata,on_successcallbacks, empty-output handling, and stage result inspection.DatasetCreationResults.to_config_builder(),PreviewResults.to_config_builder(),DataDesigner.acreate(), shared throttle-manager reuse, and concurrent-safe seed/resource-provider handling.output_processors=[...],output="processor:<name>", selected-outputload_dataset()/count_records()/export(), and validation for invalid/no-output cases.⏳ Still Missing
allow_resizeremoval, docs updates, migration examples, and fail-fast row-count-change guards.ResumeMode, metadata validation, and callback output path checks.acreate(), and an explicit join/merge contract.🔄 Changes
✨ Added
DataDesigner.compose_workflow()andCompositeWorkflowfor linear stage chaining with deterministicartifacts/<workflow>/stage-N-namedirectories.DatasetCreationResults.to_config_builder()andPreviewResults.to_config_builder()for lightweight notebook chaining.DataDesigner.acreate()for awaitable creates, including shared-instanceasyncio.gather(...)coverage.output_processors=[ProcessorConfig, ...]toCompositeWorkflow.add_stage()for inline stage-boundary transforms.output="processor:<name>"so downstream stages and finalload_dataset()/count_records()/export()can read named processor artifacts.load_stage_output(),count_stage_output_records(), andget_stage_output_path()so callers can inspect the actual selected output handed downstream.🔧 Changed
DataDesignerthrottle manager across stage runs and gathered async creates.output="processor:<name>"atadd_stage()so typos fail before the workflow runs.set_run_config()throttle overrides effective by rebuilding the shared throttle manager when runtime config changes.🐛 Fixed
DataDesignerartifact root during workflow runs.push_to_hub()for selected processor/callback outputs instead of silently pushing the raw final-stage artifact.📚 Docs
plans/workflow-chaining/workflow-chaining.mdwith implemented scope, remaining phases, and the output-processor/output-selection direction.🧪 Tests
Demo
This displays the new composition path:
output_processorsremove bulky generation context before the handoff.output="processor:chatml"makes the schema-transform artifact the final workflow output forload_dataset(),count_records(), andexport().load_stage_output("sft_export")reads what the stage exposed downstream, whileresults["sft_export"]still exposes the stage's normal result and processor artifacts.🔍 Attention Areas
composite_workflow.py- New public workflow API, stage metadata/fingerprints, output-processor execution, and selected-output semantics.data_designer_config.pyandvalidation.py- Config schema and boundary validation now allow seeded processor-only configs while rejecting invalid/no-output cases.data_designer.py-acreate(), shared throttle-manager wiring, explicit artifact-root handling, and processor-only profiling fallback.throttle_manager.py- Shared throttle behavior across sequential workflow stages and concurrent async creates.🧪 Testing
make testpasses/tmp/dd-workflow-smokesValidation run:
make test- 3320 passed, 158 warnings.venv/bin/ruff check --fix ..venv/bin/ruff format ..venv/bin/pytest packages/data-designer/tests/interface -q- 137 passed, 30 warnings.venv/bin/pytest packages/data-designer/tests/interface/test_composite_workflow.py -q- 43 passed.venv/bin/pytest packages/data-designer-engine/tests/engine/resources/test_seed_reader.py -q- 43 passed.venv/bin/pytest packages/data-designer/tests/interface packages/data-designer-engine/tests/engine/test_validation.py packages/data-designer-engine/tests/engine/resources/test_seed_reader.py -q- 190 passed, 30 warnings.venv/bin/pytest /home/ubuntu/Code/reviews/DataDesigner-pr-636/smoke_test.py -q -v- 2 passed.venv/bin/pytest /tmp/dd-workflow-smokes/test_workflow_chaining_smokes.py -q -s- 10 passed.venv/bin/pytest packages/data-designer-engine/tests/engine/test_validation.py packages/data-designer-config/tests/config/test_config_builder.py packages/data-designer-config/tests/config/test_fingerprint.py packages/data-designer-engine/tests/engine/dataset_builders/test_dataset_builder.py -q- 206 passed, 39 warningsmake check-fern-docs- passed, Fern check 0 errors and 2 warnings.venv/bin/mkdocs build- passedLatest smoke rerun after the
output_processorsrename:/home/ubuntu/Code/reviews/DataDesigner-pr-636/smoke_test.py- 2 passed/tmp/dd-workflow-smokes/test_workflow_chaining_smokes.py- 10 passed, including the real NVIDIA provider smoke with 2 successful requests and no failures✅ Checklist
Description updated with AI