New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-19282][table sql/planner]Supports watermark push down with Wat… #13449
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit b042779 (Tue Sep 22 04:43:42 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
3925844
to
28d2598
Compare
@flinkbot run azure |
Thanks for the PR @fsk119. I would like to take a look at this PR soon. Some code changes might conflict with FLIP-107. We need to make sure that both efforts are aligned. |
Thanks for your reply. Looking forward for your suggestions! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @fsk119 , I left some comments
tableConfig: TableConfig, | ||
contextTerm: String = "parameters") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indent
@@ -18,14 +18,16 @@ | |||
|
|||
package org.apache.flink.table.planner.codegen | |||
|
|||
import org.apache.calcite.rex.RexNode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: reorder imports
@@ -35,7 +37,8 @@ object WatermarkGeneratorCodeGenerator { | |||
def generateWatermarkGenerator( | |||
config: TableConfig, | |||
inputType: RowType, | |||
watermarkExpr: RexNode): GeneratedWatermarkGenerator = { | |||
watermarkExpr: RexNode, | |||
contextTerm: String = null): GeneratedWatermarkGenerator = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use None
instead of null
in Scala
@@ -44,11 +47,33 @@ object WatermarkGeneratorCodeGenerator { | |||
" but is " + watermarkOutputType) | |||
} | |||
val funcName = newName("WatermarkGenerator") | |||
val ctx = CodeGeneratorContext(config) | |||
val ctx = if (contextTerm != null) { | |||
new ConstantCodeGeneratorContext(config, contextTerm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we use ConstantCodeGeneratorContext
here ? ConstantCodeGeneratorContext
is used for constant reducer, we should create a new special CodeGeneratorContext for watermark generator
import java.util.Collections | ||
import java.util.function.{Function => JFunction, Supplier => JSupplier} | ||
|
||
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reorder imports
/* | ||
* Used by apply method to deal with. | ||
* */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/**
*
*/
util.buildStreamProgram(FlinkStreamProgram.DEFAULT_REWRITE()); | ||
CalciteConfig calciteConfig = TableConfigUtils.getCalciteConfig(util.tableEnv().getConfig()); | ||
calciteConfig.getStreamProgram().get().addLast( | ||
"PushWatermarkIntoTableSourceScanRule", | ||
FlinkHepRuleSetProgramBuilder.<StreamOptimizeContext>newBuilder() | ||
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE()) | ||
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP) | ||
.add(RuleSets.ofList( | ||
WatermarkAssignerProjectTransposeRule.INSTANCE, | ||
PushWatermarkIntoTableSourceScanRule.INSTANCE)) | ||
.build() | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's better to build a specific program that only contains the rules needed for the current test, this could avoid interference with other rules.
" 'bounded' = 'false'\n" + | ||
")"; | ||
util.tableEnv().executeSql(ddl); | ||
util.verifyPlan("select * from MyTable"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some tests about
- projection/filter in
select
, - watermark expression contains two field references
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please refer to testWatermarkOnComputedColumnWithQuery
and testWatermarkOnComputedColumnWithMultipleInputs
.
import java.time.ZoneOffset; | ||
|
||
/** | ||
* Test rule PushWatermarkIntoTableSourceScanRule. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test for [[PushWatermarkIntoTableSourceScanRule]] and [[WatermarkAssignerProjectTransposeRule]]
@Before | ||
override def before(): Unit = { | ||
super.before() | ||
env.setParallelism(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why parallelism is 1 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, I left some comments
...link/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossProjectRule.java
Outdated
Show resolved
Hide resolved
.../apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanBaseRule.java
Outdated
Show resolved
Hide resolved
.../apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanBaseRule.java
Outdated
Show resolved
Hide resolved
.../apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanBaseRule.java
Outdated
Show resolved
Hide resolved
...lanner-blink/src/main/java/org/apache/flink/table/planner/operations/MergeTableLikeUtil.java
Outdated
Show resolved
Hide resolved
...e-planner-blink/src/main/java/org/apache/flink/table/planner/sources/DynamicSourceUtils.java
Show resolved
Hide resolved
|
||
@Before | ||
public void setup() { | ||
util.buildStreamProgram(FlinkStreamProgram.LOGICAL_REWRITE()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
...link/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/SourceWatermarkTest.scala
Outdated
Show resolved
Hide resolved
</Resource> | ||
<Resource name="planAfter"> | ||
<![CDATA[ | ||
WatermarkAssigner(rowtime=[c], watermark=[-(c, 5000:INTERVAL SECOND)]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the watermark node can't be pushed down
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! I add the wrong rule into the rule set. It works now.
" b bigint,\n" + | ||
" c timestamp(3) not null,\n" + | ||
" d as c + interval '5' second,\n" + | ||
" watermark for d as d - interval '5' second\n" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a test about watermark for d as d - interval '5' second + other timestamp field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The watermark expression only allows one input reference. The expression mentioned is illegal
...test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
Show resolved
Hide resolved
...org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRule.java
Outdated
Show resolved
Hide resolved
...org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRule.java
Outdated
Show resolved
Hide resolved
...org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRule.java
Outdated
Show resolved
Hide resolved
...org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRule.java
Outdated
Show resolved
Hide resolved
...org/apache/flink/table/planner/plan/rules/logical/ProjectWatermarkAssignerTransposeRule.java
Show resolved
Hide resolved
...e/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
Outdated
Show resolved
Hide resolved
...e/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanAcrossCalcRule.java
Outdated
Show resolved
Hide resolved
.../apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @fsk119 , LGTM
Thanks for your review! Very thanks! |
…ermarkStrategy
What is the purpose of the change
Allow planner to push watermark strategy into DynamicTableSource.
Brief change log
FunctionContext
to interface which allows us convertWatermarkStrategy.Context
toFunctionContext
;bindConstructorTerm
inExprCodeGenerator
;WatermarkGeneratorCodeGenerator
will generateWatermarkGenerator
with parameterized constructor and use the term to open udf ;Verifying this change
This change added tests and can be verified as follows:
WatermarkGeneratorCodeGenTest
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation