-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-20738][table-planner-blink] Separate the implementation of batch group aggregate nodes #14562
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit b882bd3 (Fri May 28 07:02:00 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
662db1d to
f08790b
Compare
wenlong88
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall change LGTM, just left some minor comments.
|
|
||
| /** Return bytes size for given option in {@link TableConfig}. */ | ||
| public static long getMemorySize(TableConfig tableConfig, ConfigOption<String> option) { | ||
| return MemorySize.parse(tableConfig.getConfiguration().getString(option)).getBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConfigOption supports type of MemorySize, can we just use it?,ref:FRAMEWORK_HEAP_MEMORY
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will broke the current API, because the compile error will occur when user's before code is configuration.set( ExecutionConfigOptions.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY, "128 mb")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I create an issue to do the improvement, see https://issues.apache.org/jira/browse/FLINK-20879
| // if group by an update field or group by a field mono is null, just return null | ||
| if (inputMonotonicity == null || | ||
| grouping.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { | ||
| grouping.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the old format is better
| // if group by a update field or group by a field mono is null, just return null | ||
| if (inputMonotonicity == null || | ||
| grouping.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { | ||
| grouping.exists(e => inputMonotonicity.fieldMonotonicities(e) != CONSTANT)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
| val currentMono = fieldMonotonicities(index) | ||
| if (childMono != currentMono && | ||
| !aggCall.getAggregation.isInstanceOf[SqlCountAggFunction]) { | ||
| !aggCall.getAggregation.isInstanceOf[SqlCountAggFunction]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
… to BatchPhysicalGroupAggregateBase and do some refactoring
…te, and make BatchExecHashAggregate only extended from ExecNode
…gregate, and make BatchPhysicalLocalHashAggregate only extended from FlinkPhysicalRel
…te, and make BatchExecSortAggregate only extended from ExecNode
…gregate, and make BatchPhysicalLocalSortAggregate only extended from FlinkPhysicalRel
…Aggregate, and make BatchExecPythonGroupAggregate only extended from ExecNode
wenlong88
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…te, and make BatchExecHashAggregate only extended from ExecNode This closes apache#14562
…gregate, and make BatchPhysicalLocalHashAggregate only extended from FlinkPhysicalRel This closes apache#14562
…te, and make BatchExecSortAggregate only extended from ExecNode This closes apache#14562
…gregate, and make BatchPhysicalLocalSortAggregate only extended from FlinkPhysicalRel This closes apache#14562
…Aggregate, and make BatchExecPythonGroupAggregate only extended from ExecNode This closes apache#14562
What is the purpose of the change
Separate the implementation of batch group aggregate nodes, including BatchExecHashAggregate, BatchExecLocalHashAggregate, BatchExecSortAggregate, BatchExecLocalSortAggregate, BatchExecPythonGroupAggregate
Brief change log
Verifying this change
This change is a refactoring rework covered by existing tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation