Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,17 @@ pub async fn from_aggregate_rel(
aggr_exprs.push(agg_func?.as_ref().clone());
}

// Ensure that all expressions have a unique name
// Ensure that all expressions have a unique name. Both grouping and
// aggregate expressions become fields in the aggregate's output schema,
// so they share a single namespace.
let mut name_tracker = NameTracker::new();
let group_exprs = group_exprs
.iter()
.map(|e| name_tracker.get_uniquely_named_expr(e.clone()))
.into_iter()
.map(|e| name_tracker.get_uniquely_named_expr(e))
.collect::<Result<Vec<Expr>, _>>()?;
let aggr_exprs = aggr_exprs
.into_iter()
.map(|e| name_tracker.get_uniquely_named_expr(e))
.collect::<Result<Vec<Expr>, _>>()?;

input.aggregate(group_exprs, aggr_exprs)?.build()
Expand Down
21 changes: 21 additions & 0 deletions datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,27 @@ async fn aggregate_identical_grouping_expressions() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn aggregate_identical_measures() -> Result<()> {
// Two identical aggregate measures share the same schema_name; without
// NameTracker dedup over measures, building the Aggregate's output
// DFSchema fails with "Schema contains duplicate unqualified field name".
let proto_plan = read_json(
"tests/testdata/test_plans/aggregate_identical_measures.substrait.json",
);

let plan = generate_plan_from_substrait(proto_plan).await?;
assert_snapshot!(
plan,
@r"
Projection: __common_expr_1 AS sum_a_1, __common_expr_1 AS sum(data.a)__temp__0 AS sum_a_2
Aggregate: groupBy=[[]], aggr=[[sum(data.a) AS __common_expr_1]]
TableScan: data projection=[a]
"
);
Ok(())
}

#[tokio::test]
async fn simple_intersect_consume() -> Result<()> {
let proto_plan = read_json("tests/testdata/test_plans/intersect.substrait.json");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
{
"extensionUris": [{
"extensionUriAnchor": 1,
"uri": "/functions_arithmetic.yaml"
}],
"extensions": [{
"extensionFunction": {
"extensionUriReference": 1,
"functionAnchor": 0,
"name": "sum:i64"
}
}],
"relations": [{
"root": {
"input": {
"aggregate": {
"common": {
"direct": {}
},
"input": {
"read": {
"common": {
"direct": {}
},
"baseSchema": {
"names": ["a"],
"struct": {
"types": [{
"i64": {
"nullability": "NULLABILITY_NULLABLE"
}
}],
"nullability": "NULLABILITY_REQUIRED"
}
},
"namedTable": {
"names": ["data"]
}
}
},
"groupings": [{
"groupingExpressions": []
}],
"measures": [
{
"measure": {
"functionReference": 0,
"phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
"outputType": {
"i64": {
"nullability": "NULLABILITY_NULLABLE"
}
},
"invocation": "AGGREGATION_INVOCATION_ALL",
"arguments": [{
"value": {
"selection": {
"directReference": {
"structField": {
"field": 0
}
},
"rootReference": {}
}
}
}]
}
},
{
"measure": {
"functionReference": 0,
"phase": "AGGREGATION_PHASE_INITIAL_TO_RESULT",
"outputType": {
"i64": {
"nullability": "NULLABILITY_NULLABLE"
}
},
"invocation": "AGGREGATION_INVOCATION_ALL",
"arguments": [{
"value": {
"selection": {
"directReference": {
"structField": {
"field": 0
}
},
"rootReference": {}
}
}
}]
}
}
]
}
},
"names": ["sum_a_1", "sum_a_2"]
}
}],
"version": {
"minorNumber": 54,
"producer": "manual"
}
}
Loading