From b9212e0141d969c68e6c7f60d9ef4b412aea75b8 Mon Sep 17 00:00:00 2001 From: LiaCastaneda Date: Fri, 22 May 2026 10:23:20 +0200 Subject: [PATCH] Fix: dedupe aggregate measures, not just groupings --- .../consumer/rel/aggregate_rel.rs | 12 +- .../tests/cases/roundtrip_logical_plan.rs | 21 ++++ ...ggregate_identical_measures.substrait.json | 103 ++++++++++++++++++ 3 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 datafusion/substrait/tests/testdata/test_plans/aggregate_identical_measures.substrait.json diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs index ac7d2479c397a..413ee4b537c29 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/aggregate_rel.rs @@ -109,11 +109,17 @@ pub async fn from_aggregate_rel( aggr_exprs.push(std::sync::Arc::unwrap_or_clone(agg_func?)); } - // Ensure that all expressions have a unique name + // Ensure that all expressions have a unique name. Both grouping and + // aggregate expressions become fields in the aggregate's output schema, + // so they share a single namespace. let mut name_tracker = NameTracker::new(); let group_exprs = group_exprs - .iter() - .map(|e| name_tracker.get_uniquely_named_expr(e.clone())) + .into_iter() + .map(|e| name_tracker.get_uniquely_named_expr(e)) + .collect::, _>>()?; + let aggr_exprs = aggr_exprs + .into_iter() + .map(|e| name_tracker.get_uniquely_named_expr(e)) .collect::, _>>()?; input.aggregate(group_exprs, aggr_exprs)?.build() diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs index 1d65256d76420..872c2d0cd2a81 100644 --- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs @@ -1116,6 +1116,27 @@ async fn aggregate_identical_grouping_expressions() -> Result<()> { Ok(()) } +#[tokio::test] +async fn aggregate_identical_measures() -> Result<()> { + // Two identical aggregate measures share the same schema_name; without + // NameTracker dedup over measures, building the Aggregate's output + // DFSchema fails with "Schema contains duplicate unqualified field name". + let proto_plan = read_json( + "tests/testdata/test_plans/aggregate_identical_measures.substrait.json", + ); + + let plan = generate_plan_from_substrait(proto_plan).await?; + assert_snapshot!( + plan, + @r" + Projection: __common_expr_1 AS sum_a_1, __common_expr_1 AS sum(data.a)__temp__0 AS sum_a_2 + Aggregate: groupBy=[[]], aggr=[[sum(data.a) AS __common_expr_1]] + TableScan: data projection=[a] + " + ); + Ok(()) +} + #[tokio::test] async fn simple_intersect_consume() -> Result<()> { let proto_plan = read_json("tests/testdata/test_plans/intersect.substrait.json"); diff --git a/datafusion/substrait/tests/testdata/test_plans/aggregate_identical_measures.substrait.json b/datafusion/substrait/tests/testdata/test_plans/aggregate_identical_measures.substrait.json new file mode 100644 index 0000000000000..620d55e93ee1e --- /dev/null +++ b/datafusion/substrait/tests/testdata/test_plans/aggregate_identical_measures.substrait.json @@ -0,0 +1,103 @@ +{ + "extensionUris": [{ + "extensionUriAnchor": 1, + "uri": "/functions_arithmetic.yaml" + }], + "extensions": [{ + "extensionFunction": { + "extensionUriReference": 1, + "functionAnchor": 0, + "name": "sum:i64" + } + }], + "relations": [{ + "root": { + "input": { + "aggregate": { + "common": { + "direct": {} + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": ["a"], + "struct": { + "types": [{ + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }], + "nullability": "NULLABILITY_REQUIRED" + } + }, + "namedTable": { + "names": ["data"] + } + } + }, + "groupings": [{ + "groupingExpressions": [] + }], + "measures": [ + { + "measure": { + "functionReference": 0, + "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", + "outputType": { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "invocation": "AGGREGATION_INVOCATION_ALL", + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": {} + } + } + }] + } + }, + { + "measure": { + "functionReference": 0, + "phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT", + "outputType": { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "invocation": "AGGREGATION_INVOCATION_ALL", + "arguments": [{ + "value": { + "selection": { + "directReference": { + "structField": { + "field": 0 + } + }, + "rootReference": {} + } + } + }] + } + } + ] + } + }, + "names": ["sum_a_1", "sum_a_2"] + } + }], + "version": { + "minorNumber": 54, + "producer": "manual" + } +}