From dc7810ff87f845200a3d2ced59d45c76fcdfd2d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=ADa=20Adriana?= Date: Fri, 22 May 2026 11:01:32 +0200 Subject: [PATCH] fix(substrait): dedupe names of aggregate measures, not just groupings (#22453) ## Which issue does this PR close? - Closes #. ## Rationale for this change When the substrait consumer hits an `Aggregate` with two identical measures (e.g. `sum(a)` present twice), planning fails with `Schema contains duplicate unqualified field name`. Substrait carries column names at the plan root rather than on the measures themselves, so the measures arrive at `Aggregate` schema construction without aliases -- and two identical exprs produce two identical field names. PR #20539 fixed the `NameTracker` to dedupe duplicate names in the consumer, but it was only applied to grouping expressions, not to the measures. The planner sees: ``` field 1: (qualifier: None, name: "sum(data.a)") field 2: (qualifier: None, name: "sum(data.a)") ``` which is rejected when constructing the Aggregate's output schema. ## What changes are included in this PR? Run aggregate measures through the same `NameTracker` like the grouping expressions in `from_aggregate_rel` ## Are these changes tested? Yes -- added a roundtrip test `aggregate_identical_measures`. Without the fix it produces `Error: SchemaError(DuplicateUnqualifiedField { name: "sum(data.a)" }, Some(""))` ## Are there any user-facing changes? No. (cherry picked from commit 097efae26c7d5ca8e6f124f27545a79eb227636f) --- .../consumer/rel/aggregate_rel.rs | 12 +- .../tests/cases/roundtrip_logical_plan.rs | 21 ++++ ...ggregate_identical_measures.substrait.json | 103 ++++++++++++++++++ 3 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 datafusion/substrait/tests/testdata/test_plans/aggregate_identical_measures.substrait.json diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs index da57751f6ad84..342e0998ce047 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs @@ -109,11 +109,17 @@ pub async fn from_aggregate_rel( aggr_exprs.push(agg_func?.as_ref().clone()); } - // Ensure that all expressions have a unique name + // Ensure that all expressions have a unique name. Both grouping and + // aggregate expressions become fields in the aggregate's output schema, + // so they share a single namespace. let mut name_tracker = NameTracker::new(); let group_exprs = group_exprs - .iter() - .map(|e| name_tracker.get_uniquely_named_expr(e.clone())) + .into_iter() + .map(|e| name_tracker.get_uniquely_named_expr(e)) + .collect::, _>>()?; + let aggr_exprs = aggr_exprs + .into_iter() + .map(|e| name_tracker.get_uniquely_named_expr(e)) .collect::, _>>()?; input.aggregate(group_exprs, aggr_exprs)?.build() diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 5dd4aa4e2be91..f562be66b8e31 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -1116,6 +1116,27 @@ async fn aggregate_identical_grouping_expressions() -> Result<()> { Ok(()) } +#[tokio::test] +async fn aggregate_identical_measures() -> Result<()> { + // Two identical aggregate measures share the same schema_name; without + // NameTracker dedup over measures, building the Aggregate's output + // DFSchema fails with "Schema contains duplicate unqualified field name". + let proto_plan = read_json( + "tests/testdata/test_plans/aggregate_identical_measures.substrait.json", + ); + + let plan = generate_plan_from_substrait(proto_plan).await?; + assert_snapshot!( + plan, + @r" + Projection: __common_expr_1 AS sum_a_1, __common_expr_1 AS sum(data.a)__temp__0 AS sum_a_2 + Aggregate: groupBy=[[]], aggr=[[sum(data.a) AS __common_expr_1]] + TableScan: data projection=[a] + " + ); + Ok(()) +} + #[tokio::test] async fn simple_intersect_consume() -> Result<()> { let proto_plan = read_json("tests/testdata/test_plans/intersect.substrait.json"); diff --git a/datafusion/substrait/tests/testdata/test_plans/aggregate_identical_measures.substrait.json b/datafusion/substrait/tests/testdata/test_plans/aggregate_identical_measures.substrait.json new file mode 100644 index 0000000000000..620d55e93ee1e --- /dev/null +++ b/datafusion/substrait/tests/testdata/test_plans/aggregate_identical_measures.substrait.json @@ -0,0 +1,103 @@ +{ + "extensionUris": [{ + "extensionUriAnchor": 1, + "uri": "/functions_arithmetic.yaml" + }], + "extensions": [{ + "extensionFunction": { + "extensionUriReference": 1, + "functionAnchor": 0, + "name": "sum:i64" + } + }], + "relations": [{ + "root": { + "input": { + "aggregate": { + "common": { + "direct": {} + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": ["a"], + "struct": { + "types": [{ + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }], + "nullability": "NULLABILITY_REQUIRED" + } + }, + "namedTable": { + "names": ["data"] + } + } + }, + "groupings": [{ + "groupingExpressions": [] + }], + "measures": [ + { + "measure": { + "functionReference": 0, + "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", + "outputType": { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "invocation": "AGGREGATION_INVOCATION_ALL", + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": {} + } + } + }] + } + }, + { + "measure": { + "functionReference": 0, + "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", + "outputType": { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "invocation": "AGGREGATION_INVOCATION_ALL", + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": {} + } + } + }] + } + } + ] + } + }, + "names": ["sum_a_1", "sum_a_2"] + } + }], + "version": { + "minorNumber": 54, + "producer": "manual" + } +}