-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-14552][table] Enable partition statistics in blink planner #10315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 05b3061 (Mon Nov 25 15:39:32 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
CI report:
Bot commandsThe @flinkbot bot supports the following commands:
|
godfreyhe
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the pr @JingsongLi , i left some comments
flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java
Outdated
Show resolved
Hide resolved
flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java
Outdated
Show resolved
Hide resolved
...org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
Outdated
Show resolved
Hide resolved
...org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
Outdated
Show resolved
Hide resolved
...org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.scala
Outdated
Show resolved
Hide resolved
| // Remove tableStats after predicates pushed down | ||
| FlinkStatistic.builder().statistic(statistic).tableStats(null).build() | ||
| val newStatistic = { | ||
| val tableStats = catalogOption match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if no partitions are pruned and the original TableStats is not UNKNOWN, we could use the original TableStats and avoid fetch all partitions' statistics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- First get stats from regular table stats may be not correct.
- In the next step, I want to introduce
listPartitionsByFilter, so we can not know the msg:if no partitions are pruned.
...anner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| @Test | ||
| public void testGetPartitionStatsFromCatalog() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add some tests about UNKNOWN stats
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add others except unknown fields, because CatalogColumnStatisticsDataBase not support unknown now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://issues.apache.org/jira/browse/FLINK-14663, and @zjuwangg is fixing it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this blocker is test blocker, we can wait it until 1.10 release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, the PR is: #10394
|
Thanks @godfreyhe for your review, updated. |
flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java
Outdated
Show resolved
Hide resolved
flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| @Test | ||
| public void testGetPartitionStatsFromCatalog() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://issues.apache.org/jira/browse/FLINK-14663, and @zjuwangg is fixing it
godfreyhe
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, +1 to merge
flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/TableStats.java
Show resolved
Hide resolved
| val isStreamingMode: Boolean, | ||
| val catalogTable: CatalogTable) | ||
| val catalogTable: CatalogTable, | ||
| val tableIdentifier: Option[ObjectIdentifier]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think names already played this role
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use names is quite hack:
if (names.size() == 3) {
// it has ObjectIdentifier
return ObjectIdentifier.of(names.get(0), names.get(1), names.get(2));
} else {
// there is no ObjectIdentifier.
}
So I think in future, we should use ObjectIdentifier always instead of names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then i don't think it will be an Option
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, only one way should go to here without ObjectIdentifier.
It is temporary table with table api(TableSourceQueryOperation). But I can try to fix it.
| val newStatistic = { | ||
| val tableStats = catalogOption match { | ||
| case Some(catalog) => | ||
| def mergePartitionStats(): TableStats = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why creating an inner function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scala don't have code to break the loop.
| override def onMatch(call: RelOptRuleCall): Unit = { | ||
| val filter: Filter = call.rel(0) | ||
| val scan: LogicalTableScan = call.rel(1) | ||
| val context = call.getPlanner.getContext.unwrap(classOf[FlinkContext]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need some tests for this rule
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have PushPartitionIntoTableSourceScanRuleTest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but it doesn't cover your stats handle logics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think CatalogStatisticsTest is more professional to test stats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So who will cover the correctness about the stats after partition pruning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CatalogStatisticsTest will trigger partition pruning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, got it
flink-table/flink-table-common/src/main/java/org/apache/flink/table/plan/stats/ColumnStats.java
Show resolved
Hide resolved
|
Please rebase this |
What is the purpose of the change
Now after partition pruning, we will lost stats.
Actually, we have partition stats in catalog.
We need update statistics after partition pruning in PushPartitionIntoTableSourceScanRule.
Brief change log
TableStats.TableSourceTablePushPartitionIntoTableSourceScanRule.Verifying this change
CatalogStatisticsTest.testGetPartitionStatsFromCatalogDoes this pull request potentially affect one of the following parts:
@Public(Evolving): noDocumentation