Skip to content

feat: add skip.when conditional column generation#502

Open
nabinchha wants to merge 42 commits intomainfrom
nmulepati/feat/479-skip-conditional-gen-implementation
Open

feat: add skip.when conditional column generation#502
nabinchha wants to merge 42 commits intomainfrom
nmulepati/feat/479-skip-conditional-gen-implementation

Conversation

@nabinchha
Copy link
Copy Markdown
Contributor

@nabinchha nabinchha commented Apr 7, 2026

Summary

Adds per-row conditional column generation via SkipConfig(when="..."). Columns can now be skipped based on a Jinja2 expression evaluated against the current row, with automatic downstream propagation and an opt-out mechanism. This avoids expensive LLM calls for rows that don't need a particular column (e.g., complaint analysis only for low-rated reviews).

Changes

Added

  • SkipConfig model on SingleColumnConfigskip=SkipConfig(when="{{ expr }}") with configurable value and propagate_skip flag
  • skip_evaluator.pyNativeSandboxedEnvironment for safe expression evaluation with StrictUndefined, plus should_skip_by_propagation for upstream skip detection
  • skip_tracker.py — centralized __internal_skipped_columns metadata management on record dicts (apply, read, strip before DataFrame/parquet, round-trip preservation via restore IDs)
  • test_skip_config.py, test_skip_evaluator.py, test_skip_tracker.py — unit tests for config validation, expression evaluation, and tracker logic
  • Validation rules: SkipConfig rejected on sampler/seed-dataset columns, rejected with allow_resize, and self-referencing when expressions are caught
  • validate_skip_references in validation.py — checks skip.when column references exist, rejects skip on sampler/seed and allow_resize columns

Changed

  • Sequential engine (dataset_builder.py): _run_full_column_generator now splits into skip-aware and non-skip-aware paths; fan-out methods (_fan_out_with_threads, _fan_out_with_async) evaluate skip per-record, exclude skipped rows from generator input, and merge results back with skip metadata preserved. _column_can_skip fast-path short-circuits when no skip config or propagation applies. Skip metadata is preserved across replace_buffer via prepare_records_for_skip_metadata_round_trip / restore_skip_metadata restore-ID mechanism.
  • Async engine (async_scheduler.py): _run_cell returns (result, skipped) tuple; _run_batch pre-evaluates skip before building the batch DataFrame; skipped cells report as skipped in progress tracking
  • DAG (dag.py): skip.when column references added as dependency edges; refactored edge-building into shared _add_dependency_edges helper
  • ExecutionGraph (execution_graph.py): stores SkipConfig, propagate_skip, required_columns, and side_effects_by_producer per column with accessor methods (get_skip_config, should_propagate_skip, get_required_columns, get_side_effect_columns)
  • DatasetBatchManager and RowGroupBufferManager: strip __internal_skipped_columns metadata before DataFrame construction and parquet writes
  • Tutorial notebook (2-structured-outputs-and-jinja-expressions): added conditional generation section demonstrating all three skip patterns (expression gate, propagation, opt-out)
  • Architecture docs (dataset-builders.md): documented conditional generation subsystem
  • mkdocs.yml and notebook README: updated tutorial title to include "Conditional Generation"

Fixed

  • _run_full_column_generator now correctly preserves row ordering when merging skipped and generated records back together (8931b45)
  • _column_can_skip returns False for allow_resize=True columns to prevent row-count mismatches in the skip-aware merge path

Attention Areas

Reviewers: Please pay special attention to the following:

  • dataset_builder.py — The skip-aware split/merge in _run_full_column_generator_with_skip and the restore-ID mechanism in _run_full_column_generator_without_skip that preserves skip metadata across DataFrame round-trips
  • async_scheduler.py_run_batch now pre-evaluates skip before building the batch DataFrame; _run_cell returns a (result, skipped) tuple
  • skip_evaluator.py — Fail-safe behavior (skip on evaluation error) is intentional to avoid wasting LLM calls on ambiguous rows
  • skip_tracker.py — The prepare_records_for_skip_metadata_round_trip / restore_skip_metadata pair uses hidden restore-ID columns to survive DataFrame round-trips where row order or count may change

Description updated with AI

nabinchha and others added 21 commits March 30, 2026 14:19
Adds implementation plan for a `skip_when` field on `SingleColumnConfig`
that enables conditional column generation. When the Jinja2 expression
evaluates truthy, the cell is set to None and the generator is skipped.
Skips auto-propagate through the DAG to downstream columns.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…e propagation

- Introduce SkipConfig(when, value) as nested model on SingleColumnConfig
- Move propagate_skip to SingleColumnConfig as independent field, fixing
  bug where columns with no SkipConfig couldn't participate in propagation
- Add full sync engine implementation (Steps 4a-4d) covering both
  _fan_out_with_threads and _run_full_column_generator dispatch paths
- Add serialization boundary stripping for both DatasetBatchManager (sync)
  and RowGroupBufferManager (async)
- Simplify architecture diagrams for readability
- Update all references, design decisions, verification plan

Made-with: Cursor
- Explain why propagation must not use get_upstream_columns() once
  skip.when adds DAG edges; add _required_columns and
  get_required_columns() to the execution graph plan
- Point async _run_cell at get_required_columns for parity with sync
- Clarify DropSkippedRowsProcessorConfig vs stripping __skipped__ for
  DataFrames; tighten resolved-questions wording
- Extend DAG/graph verification with gating_col regression case

Refs #479

Made-with: Cursor
- Document new skip_provenance.py (key constant, read/write/strip API)
- Point sync builder, async scheduler, and batch buffers at shared helpers
- Strip metadata before every DataFrame from buffer dicts, including
  FULL_COLUMN active subsets
- Split §3 into skip_evaluator vs skip_provenance; extend verification

Refs #479

Made-with: Cursor
Drop legacy skip_when naming in headings and #362 cross-reference.

Refs #479

Made-with: Cursor
…ng, caller-owns-deserialization

- SkipConfig._validate_when_syntax now checks find_undeclared_variables
  is non-empty, rejecting expressions without {{ }} delimiters that
  would silently skip every row
- evaluate_skip_when centralizes try/except so both sync and async
  engines get identical fail-safe behavior on eval errors
- evaluate_skip_when takes a single pre-deserialized record; caller
  runs deserialize_json_values once and passes to both skip eval and
  generator (no double deserialization, no redundant parameter)
- Update _should_skip_cell, async _run_cell, Files Modified table,
  and verification section accordingly

Refs #479

Made-with: Cursor
Document _side_effects_by_producer inverse map and
get_side_effect_columns() accessor on ExecutionGraph, needed by
_write_skip_to_record / apply_skip_to_record to clear __trace,
__reasoning_content, etc. on skip. Added to both Step 2b metadata
section and Files Modified table.

The __skipped__ leak into active_df (greptile's other P1) was already
fixed in 7046378 via strip_skip_metadata_from_records.

Refs #479

Made-with: Cursor
Introduce SkipConfig on SingleColumnConfig to gate column generation
with a Jinja2 expression. Columns can be skipped by expression or by
upstream propagation (propagate_skip flag).

- SkipConfig: Pydantic model with config-time syntax/delimiter/variable
  validation and cached column extraction from the Jinja2 AST
- skip_evaluator: runtime expression evaluation via NativeSandboxedEnvironment
  with fail-safe error handling (skip on expected failures)
- skip_provenance: centralized __skipped__ record tracking shared by
  sync builder, async scheduler, and buffer managers
- DAG/ExecutionGraph: skip.columns wired as dependency edges in both
  topological sort and static execution graph
- Validation: validate_skip_references checks reference existence,
  sampler/seed scope, and allow_resize conflicts
- Sync builder: cell-by-cell and full-column skip with merge-back
- Async scheduler: cell and batch skip with live-buffer provenance

Made-with: Cursor
- Add skip evaluation to _fan_out_with_async (was missing, causing
  skipped rows to still be sent to the LLM)
- Preserve __skipped__ provenance on non-skipped records after
  full-column generation so multi-hop propagation works
- Use single live-buffer reference in _run_batch skip loop for
  consistency with _run_cell
- Move Template import to TYPE_CHECKING and reorder import blocks
- Replace O(n²) sum() with itertools.chain in dag.py
- Add set_required_columns/set_propagate_skip/set_skip_config
  setters to ExecutionGraph for symmetry with existing API

Made-with: Cursor
Add a new recipe demonstrating skip.when patterns (expression gate,
propagation, opt-out) with a customer support ticket pipeline.

Also extract _should_skip_record in async_scheduler, remove the
redundant propagate_skip param from should_skip_by_propagation, and
pass a precomputed all_side_effects set through the DAG sort.

Made-with: Cursor
@nabinchha nabinchha requested a review from a team as a code owner April 7, 2026 17:36
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Apr 7, 2026

Greptile Summary

This PR adds per-row conditional column generation via SkipConfig(when="...") — a significant feature touching the config layer, both the sequential and async execution engines, DAG/execution-graph infrastructure, validation, and documentation. The implementation is well-structured: skip decisions live in skip_evaluator.py, skip metadata tracking in skip_tracker.py, and both the sync and async engines share the unified should_skip_column_for_record path. The previous reviewer concern about allow_resize=True columns incorrectly entering the skip-aware merge path has been addressed: _column_can_skip now returns False for any column with allow_resize=True, routing them through _run_full_column_generator_without_skip with restore-ID semantics.

Confidence Score: 5/5

Safe to merge — all remaining findings are P2 or lower and do not affect correctness.

No P0 or P1 issues found. The previous concern about allow_resize=True columns entering the skip-aware merge path has been addressed by _column_can_skip. Both the sync and async engines correctly handle skip evaluation, metadata propagation, and the restore-ID round-trip. Test coverage is comprehensive across config validation, evaluator, tracker, dataset_builder, and async_scheduler.

No files require special attention.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/dataset_builder.py Core sequential engine: adds skip-aware and non-skip-aware FULL_COLUMN paths, fan-out skip evaluation, and _column_can_skip fast-path; logic is correct, allow_resize guard addressed.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Async engine: _run_cell returns (result, skipped) tuple, _run_batch pre-evaluates skip before await boundary; result_idx increment correctly handles concurrent drops alongside pre_skipped rows.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/skip_evaluator.py NativeSandboxedEnvironment (SandboxedEnvironment + NativeEnvironment) is a valid Jinja2 pattern; fail-safe on errors is intentional; lru_cache(64) is appropriate for skip template reuse.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/skip_tracker.py Restore-ID round-trip mechanism is correct; allow_resize=False identity check uses count+set comparison that catches duplicates/drops simultaneously; strip helpers are safe (shallow copy).
packages/data-designer-config/src/data_designer/config/base.py SkipConfig model with pydantic validators for syntax, variable presence, and self-reference; cached_property for columns is safe because ConfigBase is not frozen.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/execution_graph.py Adds skip_configs, propagate_skip, required_columns, and side_effect maps to ExecutionGraph; second pass resolves skip.when column edges with the same validation as required_columns.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dag.py Refactored edge-building into _add_dependency_edges helper; pre-computes all_side_effects as a set (O(1) lookup) and uses dict membership check for column lookup — both are performance improvements over the original O(n) list operations.
packages/data-designer-engine/src/data_designer/engine/validation.py validate_skip_references checks sampler/seed and allow_resize — redundant with pydantic validators but harmless defense-in-depth; reference existence check against allowed_references is correct.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py Correctly strips __internal_skipped_columns before DataFrame construction and parquet writes; existing get_current_batch and write methods updated consistently.
packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/row_group_buffer.py get_dataframe now strips skip metadata before DataFrame construction; get_row returns live dict reference which is relied upon by async scheduler's in-place skip writes.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[_run_full_column_generator] --> B{column_can_skip?}
    B -- No: allow_resize=True or no skip config --> C[_run_full_column_generator_without_skip]
    B -- Yes --> D[_run_full_column_generator_with_skip]

    C --> C1[prepare_records_for_skip_metadata_round_trip: inject restore-ID column]
    C1 --> C2[generator.generate full batch DataFrame]
    C2 --> C3{restore_context?}
    C3 -- Yes --> C4[restore_skip_metadata: allow_resize aware]
    C3 -- No --> C5[replace_buffer]
    C4 --> C5

    D --> D1[Iterate records: _should_skip_cell per row]
    D1 --> D2{any skipped?}
    D2 -- No --> C
    D2 -- Yes --> D3[_merge_skipped_and_generated: active_df = strip_skip_metadata]
    D3 --> D4[generator.generate active rows only]
    D4 --> D5[Merge: skipped records + gen results: carry prior skip metadata forward]
    D5 --> D6[replace_buffer allow_resize=False]

    F[async _run_cell] --> F1{_should_skip_record?}
    F1 -- Yes --> F2[apply_skip_to_record: return value skipped=True]
    F1 -- No --> F3[agenerate dict copy: update buffer cells]

    G[async _run_batch] --> G1[Pre-evaluate skip before await boundary]
    G1 --> G2{batch_df empty?}
    G2 -- Yes --> G3[return early: skip values already written]
    G2 -- No --> G4[agenerate batch_df await]
    G4 --> G5[Merge back: skip pre_dropped + pre_skipped: result_idx tracks position]
Loading

Reviews (10): Last reviewed commit: "Merge branch 'main' into nmulepati/feat/..." | Re-trigger Greptile

…esize from skip branch

Two bugs in the sequential engine's _run_full_column_generator:

1. replace_buffer(df.to_dict()) erased __internal_skipped_columns in
   three code paths (MultiColumnConfig, non-skip-aware, has_skipped=False
   fallthrough), breaking propagate_skip for downstream columns when an
   independent FULL_COLUMN generator ran between skip-setting and
   propagating columns.

2. _column_can_skip returned True for allow_resize=True columns via
   propagation, causing the skip-aware merge path to raise on the 1:1
   row-count check for 1:N generators.

- Add restore_skip_metadata helper to skip_tracker.py
- Guard _column_can_skip against allow_resize=True columns
- Refactor _run_full_column_generator into three focused methods
- Remove dead allow_resize / _log_resize_if_changed from skip path
- Remove redundant _require_graph() calls in skip helpers
- Add single_column_config_by_name cached property
- Add integration tests for both bugs and unit tests for the helper

Made-with: Cursor
@nabinchha
Copy link
Copy Markdown
Contributor Author

Both P1s from Greptile's review are addressed in 1affce9:

P1: Skip metadata erased by non-skip-aware replace_buffer calls

Added restore_skip_metadata() helper in skip_tracker.py that re-attaches __internal_skipped_columns after df.to_dict(orient="records") strips it. Applied in all replace_buffer paths (MultiColumnConfig, non-skip-aware, has_skipped=False fallthrough). Refactored _run_full_column_generator into three focused methods to eliminate the duplication.

P1: allow_resize=True columns enter skip branch via propagation

_column_can_skip now returns False for allow_resize=True columns, so they never enter the skip-aware merge path (option (a) — the column runs the normal generation path regardless of upstream skips).

Tests added:

  • test_skip_metadata_preserved_across_non_skip_aware_full_column — interleaved independent FULL_COLUMN between skip-setting and propagating columns
  • test_skip_metadata_preserved_when_no_rows_skipped_for_current_column — has_skipped=False fallthrough preserves sibling metadata
  • test_allow_resize_column_not_blocked_by_upstream_skip — 1:N generator with skippable upstream
  • Unit tests for restore_skip_metadata in test_skip_tracker.py

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 9, 2026

Docs preview: https://4e4e1c7e.dd-docs-preview.pages.dev

Notebook tutorials are placeholder-only in previews.

@andreatgretel
Copy link
Copy Markdown
Contributor

the skip integration tests in test_dataset_builder.py all run through the sync engine. might be good to add at least one that flips DATA_DESIGNER_ASYNC_ENGINE=1 to exercise the async _run_cell / _run_batch skip paths, since the buffer management model is different

- Extract shared skip decision logic (_should_skip_cell / _should_skip_record)
  into should_skip_column_for_record() in skip_evaluator.py so both sync and
  async engines call the same function (andreatgretel review comment)
- Extend SkipConfig self-reference validation to cover side-effect columns
  (e.g. review__trace on the review column) — previously only checked
  self.name, now checks self.name | self.side_effect_columns
- Add async engine integration tests for skip paths: cell-by-cell with
  propagation and full-column batch skip (exercises _run_cell / _run_batch)
- Fix test_allow_resize_column_not_blocked_by_upstream_skip to use default
  propagate_skip=True so it actually exercises the allow_resize guard
- Move get_skipped_column_names from skip_tracker to skip_evaluator (sole
  production consumer)

Made-with: Cursor
@nabinchha
Copy link
Copy Markdown
Contributor Author

Done in bf6332e — added two async engine skip tests in test_async_scheduler.py: test_scheduler_skip_cell_by_cell_with_propagation (exercises _run_cell skip + downstream propagation) and test_scheduler_skip_full_column_batch (exercises _run_batch skip with active/skipped row splitting). Both use a buffer manager so the skip metadata write-back path is covered.

@nabinchha nabinchha requested a review from andreatgretel April 10, 2026 16:44
# |---|---|---|
# | **Expression gate** | `skip=dd.SkipConfig(when="...")` | Skip this column when the Jinja2 expression is truthy |
# | **Skip propagation** (default) | Downstream column depends on a skipped column | Automatically skipped too (`propagate_skip=True` by default) |
# | **Propagation opt-out** | `propagate_skip=False` on the downstream column | Always generates, even if an upstream was skipped |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice examples. small nit: "Always generates" might be a touch too strong - if the column's own skip.when references an upstream value that got skipped (now None), the evaluator's fail-safe kicks in and skips the row anyway. maybe "generates even when upstream columns were skipped" would be more accurate?

andreatgretel
andreatgretel previously approved these changes Apr 13, 2026
Copy link
Copy Markdown
Contributor

@andreatgretel andreatgretel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking good, approving with nits. I think greptile's comment about allow_resize columns entering the skip branch via propagation is still open and worth addressing before merge.

- test_skip_evaluator: parametrized should_skip_column_for_record covering
  propagation, expression gates, short-circuiting, and disabled propagation
- test_execution_graph: skip metadata accessors (get_skip_config,
  should_propagate_skip, get_required_columns, get_side_effect_columns,
  resolve_side_effect, skip.when DAG edges)
- test_dataset_builder: chained transitive propagation (4 levels),
  two independent skip gates, custom skip.value, row count preservation

Made-with: Cursor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants