-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-18988][table] Continuous query with LATERAL and LIMIT produces… #13291
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 8406cd5 (Tue Sep 01 03:33:51 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
* | ||
* <p>This rule can only be used in HepPlanner. | ||
*/ | ||
class CorrelateSortToRankRule extends RelOptRule( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
side comment: Our long-term goal is to get rid of Scala. This class could have been implemented easily in Java. Please keep that in mind for future contributions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reminder, i saw most of the rules are implemented as scala code when contribution, do you mean we prefer java rules in the future ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See FLIP-32 Appendix: Porting Guidelines.
A new planner rule or node that only depends on Calcite and runtime classes should be implemented in Java.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, thanks for the share ~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We rework so many classes all the time, eventually the Scala code will hopefully be gone at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, it would be exciting if all the code can switch to Java.
8406cd5
to
3dff129
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to base an example on this PR. But the results differ between batch and streaming mode. It seems that the batch mode now outputs the global maximum.
The patch is only for streaming purpose, let me check if batch mode is supported. |
… wrong result The batch mode rank only supports RANK function, so we only rewrite the stream mode query.
3dff129
to
73fdc9a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix @danny0405, I left some comments.
btw, I find the blink batch planner does not support the given query, and I get some errors in sql-client, like "org.apache.flink.table.api.TableException: unexpected correlate variable $cor1 in the plan"
// rewrite before decorrelation | ||
chainedProgram.addLast( | ||
PRE_DECORRELATE_REWRITE, | ||
FlinkHepRuleSetProgramBuilder.newBuilder | ||
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) | ||
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP) | ||
.add(FlinkStreamRuleSets.PRE_DECORRELATION_RULES) | ||
.build()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should merge this program into DECORRELATE
* {{{ | ||
* LogicalProject(state=[$0], name=[$1]) | ||
* +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) | ||
* :- LogicalAggregate(group=[{0}]) | ||
* : +- LogicalProject(state=[$1]) | ||
* : +- LogicalTableScan(table=[[default_catalog, default_database, cities]]) | ||
* +- LogicalSort(sort0=[$1], dir0=[DESC-nulls-last], fetch=[3]) | ||
* +- LogicalProject(name=[$0], pop=[$2]) | ||
* +- LogicalFilter(condition=[=($1, $cor0.state)]) | ||
* +- LogicalTableScan(table=[[default_catalog, default_database, cities]]) | ||
* }}} | ||
* | ||
* <p>would be transformed to | ||
* | ||
* {{{ | ||
* LogicalProject(state=[$0], name=[$1]) | ||
* +- LogicalProject(state=[$1], name=[$0], pop=[$2]) | ||
* +- LogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], | ||
* partitionBy=[$1], orderBy=[$2 DESC], select=[name=$0, state=$1, pop=$2]) | ||
* +- LogicalTableScan(table=[[default_catalog, default_database, cities]]) | ||
* }}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems the rewrite is not correct, consider the following example:
SELECT state, name
FROM
(SELECT DISTINCT state FROM cities1) states,
LATERAL (
SELECT name, pop
FROM cities2
WHERE state = states.state
ORDER BY pop
DESC LIMIT 3
);
The outer table (cities1
) and the inner table (cities2
) are different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is why i added the match condition aggInput.getInput.getDigest.equals(filter.getInput.getDigest)
. If the outer and inner table are different, the rule can not be matched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation
Yes, because our batch rank only supports rank type as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add CorrelateSortToRankRuleTest
which only involves the minimal rule set to verify the logic of CorrelateSortToRankRule
, including the supported cases and unsupported cases
* {{{ | ||
* LogicalProject(state=[$0], name=[$1]) | ||
* +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0}]) | ||
* :- LogicalAggregate(group=[{0}]) | ||
* : +- LogicalProject(state=[$1]) | ||
* : +- LogicalTableScan(table=[[default_catalog, default_database, cities]]) | ||
* +- LogicalSort(sort0=[$1], dir0=[DESC-nulls-last], fetch=[3]) | ||
* +- LogicalProject(name=[$0], pop=[$2]) | ||
* +- LogicalFilter(condition=[=($1, $cor0.state)]) | ||
* +- LogicalTableScan(table=[[default_catalog, default_database, cities]]) | ||
* }}} | ||
* | ||
* <p>would be transformed to | ||
* | ||
* {{{ | ||
* LogicalProject(state=[$0], name=[$1]) | ||
* +- LogicalProject(state=[$1], name=[$0], pop=[$2]) | ||
* +- LogicalRank(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], | ||
* partitionBy=[$1], orderBy=[$2 DESC], select=[name=$0, state=$1, pop=$2]) | ||
* +- LogicalTableScan(table=[[default_catalog, default_database, cities]]) | ||
* }}} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation
val agg: Aggregate = call.rel(1) | ||
if (agg.getAggCallList.size() > 0 | ||
|| agg.getGroupSets.size() > 1 | ||
|| agg.getGroupSet.cardinality() != 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agg.getGroupSet.cardinality() != 1
we should support multiple equal conditions, such as: state = states.state and name = states.name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but the code would complicate a lot, we can support it in the future. In general, one equal condition is enough, the project distinct value in order to avoid unnecessary Cartesian Product.
val oriBuilder = call.builder() | ||
val builder = FlinkRelBuilder.of(oriBuilder.getCluster, oriBuilder.getRelOptSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: one simply way is: using FlinkRelFactories.FLINK_REL_BUILDER
to construct RelOptRule
, and then we can cast call.builder()
as FlinkRelBuilder
. one example is FlinkSubQueryRemoveRule
RankType.ROW_NUMBER, | ||
new ConstantRankRange( | ||
1, | ||
sort.fetch.asInstanceOf[RexLiteral].getValueAs(classOf[java.lang.Long])), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use SortUtil.getLimitEnd
val INSTANCE = new CorrelateSortToRankRule | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: redundant line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, LGTM overall, there is only a minor comment. @twalthr do you have any other concern?
val newCollation = RelCollations.of(newFieldCollations) | ||
|
||
val newRel = builder | ||
.push(filter.getInput()).asInstanceOf[FlinkRelBuilder] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cast is redundant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, thanks ~
Thanks @godfreyhe , i have addressed the review comments, can you take a look again ~ Thanks so much in advance ~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Thanks @danny0405 and @godfreyhe. I will merge this now... |
… wrong result This closes apache#13291. The batch mode rank only supports RANK function, so we only rewrite the stream mode query.
… wrong result
The batch mode rank only supports RANK function, so we only rewrite the
stream mode query.
What is the purpose of the change
Fix the query of pattern:
Before the patch, the query generates a wrong plan then wrong results.
Brief change log
CorrelateSortToRankRule
for the rewriteVerifying this change
Added tests.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation