New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-30542][table-runtime] Introduce adaptive local hash aggregate to adaptively determine whether local hash aggregate is required at runtime #21586
Conversation
78fb096
to
968c4cb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@swuferhong Thanks for contribution, I left some comments.
...link-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
Outdated
Show resolved
Hide resolved
...la/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalHashAggregate.scala
Outdated
Show resolved
Hide resolved
...r/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...link-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
Outdated
Show resolved
Hide resolved
968c4cb
to
fe3ddc2
Compare
...link-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala
Outdated
Show resolved
Hide resolved
...-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...r/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...r/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
Outdated
Show resolved
Hide resolved
@flinkbot run azure |
aa0d21b
to
45efc3e
Compare
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@swuferhong Thanks for contribution, I left some comments.
...r/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...r/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...r/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...r/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...r/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala
Show resolved
Hide resolved
aggInfo => | ||
aggInfo.function match { | ||
case _: SumAggFunction => | ||
fieldExprs += GenerateUtils.generateFieldAccess( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming the sum field type is bigint, if some element of the field value is null, we emit the result to downstream global agg is default value -1L, whether the final is corrected?
...nner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution, I left some comments
...c/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecHashAggregate.java
Outdated
Show resolved
Hide resolved
...r/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...r/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...r/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...r/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala
Outdated
Show resolved
Hide resolved
@@ -69,7 +69,7 @@ class EnforceLocalHashAggRule | |||
val agg: BatchPhysicalHashAggregate = call.rel(0) | |||
val expand: BatchPhysicalExpand = call.rel(2) | |||
|
|||
val localAgg = createLocalAgg(agg, expand) | |||
val localAgg = createLocalAgg(agg, expand, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why supportAdaptiveLocalHashAgg
is always true
...scala/org/apache/flink/table/planner/plan/rules/physical/batch/EnforceLocalSortAggRule.scala
Outdated
Show resolved
Hide resolved
...nner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala
Outdated
Show resolved
Hide resolved
...nner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala
Outdated
Show resolved
Hide resolved
...nner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/HashAggITCase.scala
Outdated
Show resolved
Hide resolved
…to adaptively determine whether local hash aggregate is required at runtime
45efc3e
to
60b3cfe
Compare
a7eb550
to
8d3265c
Compare
224964e
to
ab48c32
Compare
ab48c32
to
50ad6e1
Compare
@flinkbot run azure |
1 similar comment
@flinkbot run azure |
ff0095b
to
7c472c7
Compare
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…to adaptively determine whether local hash aggregate is required at runtime This closes apache#21586
What is the purpose of the change
This pr is aims to introduce adaptive local hash aggregate in
batch
mode to adaptively determine whether local hash aggregate is required according to the aggregation degree of local hash aggregate.adaptive local hash agg
will first sampling the data, and then decide whetherlocal hash agg
is required according to the degree of aggregation of the sampled data. If requiringlocal hash agg
, it will not take any action. If not required, we will transfer the output rowType of data to same with aggregated results and pass to downstream.Brief change log
HashAggCodeGenerator
,HashAggITCase
.Verifying this change
HashAggITCase
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation