-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-15961][table-planner][table-planner-blink] Introduce Python Physical Correlate RelNodes which are containers for Python TableFunction #11051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 68b0714 (Mon Feb 10 12:54:15 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
CI report:
Bot commandsThe @flinkbot bot supports the following commands:
|
68b0714 to
9c17e49
Compare
hequn8128
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HuangXingBo Thanks a lot for the PR. Some comments below.
| for (i <- 0 until y) { | ||
| val row = new Row(2) | ||
| row.setField(0, x) | ||
| row.setField(1, i * i) | ||
| collect(row) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about remove these lines. The code will never be called.
| for (i <- 0 until y) { | ||
| val row = new Row(2) | ||
| row.setField(0, x) | ||
| row.setField(1, i * i) | ||
| collect(row) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about remove these lines?
| right match { | ||
| // right node is a table function | ||
| case scan: FlinkLogicalTableFunctionScan => true | ||
| // right node is a java table function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the right node must be a java table function. The original comment is ok.
| // a filter is pushed above the table function | ||
| case calc: FlinkLogicalCalc if CorrelateUtil.getTableFunctionScan(calc).isDefined => true | ||
| case calc: FlinkLogicalCalc => | ||
| PythonUtil.isNonPythonCall(CorrelateUtil.getTableFunctionScan(calc).get.getCall) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method getTableFunctionScan may returns None.
| scan: FlinkLogicalTableFunctionScan, | ||
| condition: Option[RexNode], | ||
| schema: RowSchema, | ||
| joinSchema: RowSchema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this member. It has never been used.
|
|
||
| /** | ||
| * Flink RelNode which matches along with join a user defined table function. | ||
| * Flink RelNode which matches along with join a java user defined table function. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java => Java/Scala
| // a filter is pushed above the table function | ||
| FlinkLogicalCalc calc = (FlinkLogicalCalc) right; | ||
| RelSubset input = (RelSubset) calc.getInput(); | ||
| FlinkLogicalTableFunctionScan scan = (FlinkLogicalTableFunctionScan) input.getOriginal(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't cast the original node to FlinkLogicalTableFunctionScan.
| private final RelNode right; | ||
|
|
||
| BatchExecPythonCorrelateFactory(RelNode rel) { | ||
| this.correlateRel = rel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this member?
| * The factory is responsible to creating {@link StreamExecPythonCorrelate}. | ||
| */ | ||
| private static class StreamExecPythonCorrelateFactory { | ||
| private final RelNode correlateRel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this member.
| } | ||
|
|
||
| /** | ||
| * The factory is responsible to creating {@link StreamExecPythonCorrelate}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
responsible to => responsible for
… Correlate Rule And RelNode
… Correlate Rule And RelNode-fix-1
9c17e49 to
67f8b75
Compare
|
Thanks a lot for @hequn8128 review, I have addressed the comments at the latest commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HuangXingBo Thanks a lot for the update. The test failed due to the problems in BatchExecCorrelateBase and StreamExecCorrelateBase. See the detailed comments below.
As for the last commit that adding implementations for the Python Correlate RelNode, maybe it's better to add the commit later. Because in this PR, there are no ways to write tests to cover the implementation.
What do you think?
|
|
||
| override def explainTerms(pw: RelWriter): RelWriter = { | ||
| val rexCall = scan.getCall.asInstanceOf[RexCall] | ||
| val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test failed due to this line. We can remove this line as it has never been used.
Remember to remove the useless import after removing this line.
|
|
||
| override def explainTerms(pw: RelWriter): RelWriter = { | ||
| val rexCall = scan.getCall.asInstanceOf[RexCall] | ||
| val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Thanks a lot for @hequn8128 , I will move the last commit to FLINK-15972. |
67f8b75 to
30afe69
Compare
… Correlate Rule And RelNode-fix-2
30afe69 to
f0fbdc6
Compare
|
@flinkbot run travis |
… Correlate rules and RelNodes This closes apache#11051.
|
@HuangXingBo Hi, the jira id in the commit log is wrong. Would be great if you can check on this next time. |
What is the purpose of the change
This pull request introduce Python Physical Correlate RelNodes which are containers for Python TableFunction
Brief change log
BatchExecPythonCorrelateRule,StreamExecPythonCorrelateRuleandDataStreamPythonCorrelateRuleDataStreamPythonCorrelate,StreamExecPythonCorrelateandBatchExecPythonCorrelateVerifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation