New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12665] [table-planner-blink] Introduce MiniBatchIntervalInferRule and watermark assigner operators #8562
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
f70b276
to
6c15cdc
Compare
bcb020a
to
560eab3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall. I've left some minor comments.
|
||
private final long watermarkDelay; | ||
|
||
// timezone watermarkDelay. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use "offset" here may be more precise.
.toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'rowtime) | ||
val t = failingDataSource(TestData.tupleData5.map { | ||
case (a, b, c, d, e) => (b, a, c, d, e) | ||
}).assignTimestampsAndWatermarks( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's ok here as mini-batch window is supported now. But it's necessary to construct a StreamExecWatermarkAssigner
node to fully verify the correctness of mini-batch watermark.
val tableName = scan.getTable.getQualifiedName.mkString(".") | ||
val inputBlocks = block.children.filter(b => tableName.equals(b.getOutputTableName)) | ||
Preconditions.checkArgument(inputBlocks.size <= 1) | ||
if (inputBlocks.size == 1) { | ||
val mergedInterval = FlinkRelOptUtil.mergeMiniBatchInterval( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
traits of sinkBlock have already been initialized before the first round of optimization, so miniBatchInterval
can be ignored if the block is sink block.
val updatedTrait = rel match { | ||
case _: StreamExecGroupWindowAggregate => | ||
// TODO introduce mini-batch window aggregate later | ||
MiniBatchIntervalTrait.NONE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not proper to use MiniBatchIntervalTrait.NONE
here, since we can not distinguish window without miniBatch with other cases which yield MiniBatchIntervalTrait.NONE
.
A specific value may be needed.
+- LogicalTableScan(table=[[_DataStreamTable_3]]) | ||
|
||
== Optimized Logical Plan == | ||
Calc(select=[id1, rowtime AS ts, text], updateAsRetraction=[true], accMode=[Acc], reuse_id=[1]): rowcount = , cumulative cost = {rows, cpu, io, network, memory} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's necessary to add cases with multi-layer blocks to validate the correctness of trait propagating among blocks.
…ule and watermark assigner operators
Looks good. 👍 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, verifying this locally and merge
What is the purpose of the change
Introduce MiniBatchIntervalInferRule and watermark assigner operators
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation