-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-34053][table-planner] Support state ttl hint for group aggregate #24179
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @xuyangzhong, thanks for the contribution! I left some comments, PTAL
...-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHints.java
Outdated
Show resolved
Hide resolved
BiRel biRel = (BiRel) node; | ||
Optional<String> leftName = extractAliasOrTableName(biRel.getLeft()); | ||
Optional<String> rightName = extractAliasOrTableName(biRel.getRight()); | ||
newHints = validateAndGetNewHintsFromBiRel(leftName, rightName, oldHints); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think validateAndGetNewHints
is good enough, and it's ok to have an overloaded method here. Another point is I don't think passing Optional<T>
as arguments is a good practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with you. Since the improvement work for the input type Optional is not a part of this pr, I have created a new jira for it. https://issues.apache.org/jira/browse/FLINK-34235
Assigned to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assigned to you
...e-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/QueryHintsResolver.java
Outdated
Show resolved
Hide resolved
...able/flink-table-planner/src/main/java/org/apache/flink/table/planner/hint/StateTtlHint.java
Outdated
Show resolved
Hide resolved
...ink-table-planner/src/main/java/org/apache/flink/table/planner/hint/FlinkHintStrategies.java
Show resolved
Hide resolved
@@ -123,6 +130,7 @@ class StreamPhysicalGlobalGroupAggregate( | |||
generateUpdateBefore, | |||
needRetraction, | |||
indexOfCountStar.map(Integer.valueOf).orNull, | |||
StateTtlHint.getStateTtlFromHintOnSingleRel(hints), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is commented on explainTerms
.
Before STATE_TTL
hint is introduced, the rest query hints(mainly join hints for batch and lookup join hints for stream) are converted and use itemIf
So, I rethink the explanation format. How about
// for agg
.itemIf(
"stateTtl",
StateTtlHint.getStateTtlFromHintOnSingleRel(hints),
hints.stream().anyMatch(hint => StateTtlHint.isStateTtlHint(hint.hintName)))
// which yields
// stateTtl=[1d]
// for regular join
val ttlHint = StateTtlHint.getStateTtlFromHintOnBiRel(getHints)
.itemIf("leftStateTtl", ttlHint.get(0), ttlHint.containsKey(0))
.itemIf("rightStateTtl", ttlHint.get(1), ttlHint.containsKey(1))
// which yields
// leftStateTtl=[1d], rightStateTtl=[3d]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the printing of state hint should be placed in RelTreeWriterImpl and continue to be managed uniformly by withQueryHint.
newHints = validateAndGetNewHintsFromBiRel(leftName, rightName, oldHints); | ||
} else if (node instanceof SingleRel) { | ||
SingleRel singleRel = (SingleRel) node; | ||
Optional<String> tableName = extractAliasOrTableName(singleRel.getInput()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we get an empty table/alias name here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emm it seems not, I will modify it separately in this separate jira. WDYT? https://issues.apache.org/jira/browse/FLINK-34235
e0f0798
to
805c771
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update. LGTM
...ble-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala
Outdated
Show resolved
Hide resolved
ff6f253
to
f365926
Compare
f365926
to
3969d39
Compare
Hi, @LadyForest. After rebasing onto master, the test for GroupAggregateRestoreTest#AGG_WITH_STATE_TTL_HINT failed due to an unknown reason. It succeeded after the savepoint metadata was regenerated. Considering the feature freeze on January 30th, I will continue to monitor CI for any unstable tests after merging this feature and will promptly address any issues during the bugfix period. |
After debugging, I found that the |
What is the purpose of the change
Currently, state ttl hint can work only on join node. This pr aims to support state ttl hint on group agg node.
Brief change log
Verifying this change
Some tests are added for it.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation