From 7da6c7a158ecd499e301dab78d7b5efe684d9b77 Mon Sep 17 00:00:00 2001 From: dylanhz Date: Thu, 4 Dec 2025 19:11:02 +0800 Subject: [PATCH] [FLINK-38776][table] Fix incorrect auxiliary group field names --- .../batch/BatchPhysicalWindowAggregateRule.scala | 2 +- .../table/planner/plan/utils/RelExplainUtil.scala | 2 +- .../plan/batch/sql/agg/AggregateReduceGroupingTest.xml | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala index 4b23f840431d0..cbecfa93b0611 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalWindowAggregateRule.scala @@ -397,7 +397,7 @@ class BatchPhysicalWindowAggregateRule case (udf, aggIndex) => aggBufferFieldNames(aggIndex) = udf match { case _: AggregateFunction[_, _] => - Array(aggNames(aggIndex)) + Array(aggNames(aggIndex + auxGroupSet.length)) case agf: DeclarativeAggregateFunction => agf.aggBufferAttributes.map { attr => diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala index 970ed8fae507d..90aed57acce22 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala @@ -825,7 +825,7 @@ object RelExplainUtil { val inNames = grouping.map(inFields(_)) ++ auxGrouping.map(inFields(_)) ++ aggStrings val outNames = grouping.indices.map(outFields(_)) ++ - (grouping.length + 1 until grouping.length + 1 + auxGrouping.length).map(outFields(_)) ++ + (grouping.length until grouping.length + auxGrouping.length).map(outFields(_)) ++ outFieldNames inNames .zip(outNames) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml index 8cc7192a86d69..88b90a54667be 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml @@ -381,7 +381,7 @@ LogicalProject(a4=[$0], b4=[$1], EXPR$2=[$3]) @@ -401,7 +401,7 @@ LogicalProject(a4=[$0], c4=[$1], EXPR$2=[$3], EXPR$3=[$4]) @@ -428,7 +428,7 @@ Calc(select=[a4, c4, s, EXPR$3]) +- Exchange(distribution=[hash[a4, s]]) +- LocalHashAggregate(groupBy=[a4, s], auxGrouping=[c4], select=[a4, s, c4, Partial_COUNT(b4) AS count$0]) +- Calc(select=[a4, c4, w$start AS s, CAST((($f2 - (($f3 * $f3) / $f4)) / $f4) AS INTEGER) AS b4]) - +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4]) + +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4]) +- Exchange(distribution=[keep_input_as_is[hash[a4]]]) +- Calc(select=[a4, c4, d4, b4, (b4 * b4) AS $f4]) +- Exchange(distribution=[hash[a4]]) @@ -457,7 +457,7 @@ Calc(select=[a4, c4, e, EXPR$3]) +- Exchange(distribution=[hash[a4, e]]) +- LocalHashAggregate(groupBy=[a4, e], auxGrouping=[c4], select=[a4, e, c4, Partial_COUNT(b4) AS count$0]) +- Calc(select=[a4, c4, w$end AS e, CAST((($f2 - (($f3 * $f3) / $f4)) / $f4) AS INTEGER) AS b4]) - +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4]) + +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4]) +- Exchange(distribution=[keep_input_as_is[hash[a4]]]) +- Calc(select=[a4, c4, d4, b4, (b4 * b4) AS $f4]) +- Exchange(distribution=[hash[a4]]) @@ -485,7 +485,7 @@ HashAggregate(isMerge=[true], groupBy=[a4, b4], auxGrouping=[c4], select=[a4, b4 +- Exchange(distribution=[hash[a4, b4]]) +- LocalHashAggregate(groupBy=[a4, b4], auxGrouping=[c4], select=[a4, b4, c4, Partial_COUNT(*) AS count1$0]) +- Calc(select=[a4, CAST((($f2 - (($f3 * $f3) / $f4)) / $f4) AS INTEGER) AS b4, c4]) - +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4 AS $f2, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4]) + +- HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow('w$, d4, 900000)], properties=[w$start, w$end, w$rowtime], select=[a4, c4, SUM($f4) AS $f2, SUM(b4) AS $f3, COUNT(b4) AS $f4]) +- Exchange(distribution=[keep_input_as_is[hash[a4]]]) +- Calc(select=[a4, c4, d4, b4, (b4 * b4) AS $f4]) +- Exchange(distribution=[hash[a4]])