Skip to content

fix: rebind RecursiveQueryExec batches to the declared output schema#21770

Merged
adriangb merged 1 commit intoapache:mainfrom
pydantic:fix-recursive-cte-schema-leak
Apr 22, 2026
Merged

fix: rebind RecursiveQueryExec batches to the declared output schema#21770
adriangb merged 1 commit intoapache:mainfrom
pydantic:fix-recursive-cte-schema-leak

Conversation

@adriangb
Copy link
Copy Markdown
Contributor

Which issue does this PR close?

  • Closes #.

Rationale for this change

A recursive CTE whose anchor aliases a computed column (e.g. upper(val) AS val) and whose recursive term leaves the same expression un-aliased (upper(r.val)) currently returns the wrong column name — but only when the outer query has both ORDER BY and LIMIT. The plan-level schema is correct (taken from the anchor), but RecursiveQueryExec forwards recursive-term RecordBatches with their native schemas intact. Downstream consumers that key on batch.schema().field(i).name()SortExec's TopK path, CSV/JSON writers, user-code collectors — then observe the leaked recursive-branch name instead of the anchor's.

MRE (fails on datafusion-cli pre-fix):

CREATE TABLE records (id VARCHAR NOT NULL, parent_id VARCHAR,
                     ts TIMESTAMP NOT NULL, val VARCHAR);
INSERT INTO records VALUES
  ('a00', NULL,  TIMESTAMP '2025-01-01 00:00:00', 'v_span'),
  ('a01', 'a00', TIMESTAMP '2025-01-01 00:00:01', 'v_log_1'),
  ('a02', 'a01', TIMESTAMP '2025-01-01 00:00:02', 'v_log_2'),
  ('a03', 'a02', TIMESTAMP '2025-01-01 00:00:03', 'v_log_3');

WITH RECURSIVE descendants AS (
  SELECT id, parent_id, ts, upper(val) AS val
    FROM records WHERE id = 'a00'
  UNION ALL
  SELECT r.id, r.parent_id, r.ts, upper(r.val)
    FROM records r INNER JOIN descendants d ON r.parent_id = d.id
)
SELECT id, parent_id, ts, val
  FROM descendants ORDER BY ts ASC LIMIT 10;

Pre-fix header column reads upper(r.val); expected val.

Only ORDER BY + LIMIT triggers it because:

  • SortExec without fetch re-materialises batches via ExternalSorter (stable schema).
  • LimitExec without sort sits above RecursiveQueryExec, never mixing branches.
  • SortExec with fetch uses the TopK path, which emits interleave_record_batch output that carries whichever input batch's schema was used last.

What changes are included in this PR?

In RecursiveQueryStream::push_batch, rebind each emitted batch to the declared output schema (taken from the anchor term). Logical-plan coercion in LogicalPlanBuilder::to_recursive_query already guarantees matching column types, so this is a zero-copy field rebind. 14 lines of production code + comment.

Are these changes tested?

Yes.

  • datafusion/core/tests/sql/select.rs::test_recursive_cte_batch_schema_stable_with_order_by_limit — runs the MRE and asserts every collected RecordBatch's schema field names equal ["id", "parent_id", "ts", "val"]. Fails pre-fix with left: ["id", "parent_id", "ts", "upper(r.val)"].
  • datafusion/sqllogictest/test_files/cte.slt — round-trips the buggy query through a headered CSV file (whose header is written from each batch's schema) and re-reads it as headerless CSV so the header row is compared as a data row. SLT otherwise cannot assert column names directly, so this is the only way to surface the leak inside SLT.

Both regression tests were verified to fail on the base branch before the fix was applied and pass after.

Are there any user-facing changes?

Recursive CTEs with mismatched anchor/recursive column names will now emit batches with the anchor-declared names consistently, regardless of downstream operators. No API changes.

@github-actions github-actions Bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) physical-plan Changes to the physical-plan crate labels Apr 21, 2026
@adriangb adriangb marked this pull request as ready for review April 22, 2026 00:43
@adriangb adriangb requested a review from kosiew April 22, 2026 00:43
Copy link
Copy Markdown
Contributor

@kosiew kosiew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adriangb

Looks good overall.

When a recursive CTE's anchor term aliases a computed column (e.g.
`upper(val) AS val`) and the recursive term leaves the same expression
un-aliased (`upper(r.val)`), `RecursiveQueryExec` declared its output
schema from the anchor but forwarded batches from both branches with
their native schemas intact. Downstream consumers that key on
`batch.schema().field(i).name()` — TopK (ORDER BY + LIMIT), CSV/JSON
writers, custom collectors — then observed the recursive branch's
leaked field name instead of the anchor's.

Rebind each emitted batch to the declared output schema in
`RecursiveQueryStream::push_batch`. Logical-plan coercion in
`LogicalPlanBuilder::to_recursive_query` already guarantees matching
column types, so this is a zero-copy field rebind.

Regression coverage:
- Rust test in `datafusion/core/tests/sql/select.rs` asserts every
  collected `RecordBatch` carries the anchor's field names.
- sqllogictest in `cte.slt` round-trips the result through a headered
  CSV file (whose header row is written from each batch's own schema)
  and re-reads it to surface the leaked name inside SLT.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@adriangb adriangb force-pushed the fix-recursive-cte-schema-leak branch from 3d4b807 to 47d75a6 Compare April 22, 2026 12:10
@adriangb adriangb added this pull request to the merge queue Apr 22, 2026
Merged via the queue into apache:main with commit 83c2c01 Apr 22, 2026
35 checks passed
@adriangb adriangb deleted the fix-recursive-cte-schema-leak branch April 22, 2026 16:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants