New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12017][table-runtime-blink] Introduce Rank and Deduplicate operators for blink streaming runtime #8109
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
60252e4
to
ea882fc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The indent config for your IDE seems not configured well, there are lots of indent issues.
I still need some more time to go through all rank function related classes.
...ble-planner-blink/src/main/scala/org/apache/flink/table/codegen/EqualiserCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...ble-planner-blink/src/main/scala/org/apache/flink/table/codegen/EqualiserCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala
Outdated
Show resolved
Hide resolved
|
||
val rowTypeInfo = inputTransform.getOutputType.asInstanceOf[BaseRowTypeInfo] | ||
|
||
val generateRetraction = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think keep first row will not generate retraction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whether generate Retraction could be inferred based on Retraction rules, which is done in FLINK-12098. I could add a TODO message here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update here.
...nk/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecExchange.scala
Outdated
Show resolved
Hide resolved
...-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunctionBase.java
Outdated
Show resolved
Hide resolved
...-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunctionBase.java
Outdated
Show resolved
Hide resolved
BTW, i think we need some dedicated tests for all rank functions. |
...e/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/SortedMap.java
Outdated
Show resolved
Hide resolved
...time-blink/src/main/java/org/apache/flink/table/runtime/rank/AbstractUpdateRankFunction.java
Outdated
Show resolved
Hide resolved
615ae4b
to
8e25cfd
Compare
...nk-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SortCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala
Outdated
Show resolved
Hide resolved
...ble/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java
Outdated
Show resolved
Hide resolved
.../flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java
Show resolved
Hide resolved
...k-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java
Show resolved
Hide resolved
...time-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunction.java
Outdated
Show resolved
Hide resolved
protected long hitCount = 0L; | ||
protected long requestCount = 0L; | ||
|
||
AbstractRankFunction(long minRetentionTime, long maxRetentionTime, BaseRowTypeInfo inputRowType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use inputArity outputArity instead of inputRowType and outputRowType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inputRowType is still useful in the other case, outputRowType could be removed.
private transient Map<BaseRow, TopNBuffer> kvSortedMap; | ||
|
||
public AppendRankFunction( | ||
long minRetentionTime, long maxRetentionTime, BaseRowTypeInfo inputRowType, BaseRowTypeInfo outputRowType, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use BaseRowSerializer
instead of inputRowType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inputRowType need to used when create StateDescriptor.
Such as :
ListTypeInfo<BaseRow> valueTypeInfo = new ListTypeInfo<>(inputRowType); MapStateDescriptor<BaseRow, List<BaseRow>> mapStateDescriptor = new MapStateDescriptor( "data-state-with-append", sortKeyType, valueTypeInfo);
4f9f70c
to
f88533d
Compare
...src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala
Outdated
Show resolved
Hide resolved
...src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala
Outdated
Show resolved
Hide resolved
with StreamPhysicalRel | ||
with StreamExecNode[BaseRow] { | ||
|
||
private val DEFAULT_MAX_PARALLELISM = 1 << 7 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
too small?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The variable's name is confusing, it's not max parallelism of operators, but max number of key-groups. Maybe it's better to use StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM
or KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 . I'm in favor of the later one which is also used in KeyedStream.java#134
and has a clear javadoc description.
...time-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunction.java
Outdated
Show resolved
Hide resolved
super.open(configure); | ||
String stateName = keepLastRow ? "DeduplicateFunctionCleanupTime" : "DeduplicateFunctionCleanupTime"; | ||
initCleanupTimeState(stateName); | ||
ValueStateDescriptor rowStateDesc = new ValueStateDescriptor("rowState", rowTypeInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in case of firstRow
, only pk is needed for state, we don't have to store the whole row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think we should only store the PK in state here.
If we only store PK, the ideal state schema should be ValueState<Boolean>
, but this can't share the same state with LastRow mode. Maybe we need to separate the implementation for firstRow and lastRow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly, only keepLastRow and generate retract is true, we need to store complete row, else store pk is ok. Thanks.
...link/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunctionHelper.java
Outdated
Show resolved
Hide resolved
2c6eb76
to
b049244
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @beyond1920 for the great work!
I think this PR is close to ready. Except we need to polish DeduplicateFunction
a bit more, I only some other minor comments.
Cheers,
Jark
...src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala
Outdated
Show resolved
Hide resolved
...link-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/TableUtil.scala
Outdated
Show resolved
Hide resolved
...le-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
Outdated
Show resolved
Hide resolved
...r-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingWithStateTestBase.scala
Outdated
Show resolved
Hide resolved
...time-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunction.java
Outdated
Show resolved
Hide resolved
...le-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/AbstractRankFunction.java
Outdated
Show resolved
Hide resolved
...le-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/AbstractRankFunction.java
Outdated
Show resolved
Hide resolved
...able-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/AppendRankFunction.java
Outdated
Show resolved
Hide resolved
...able-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/UpdateRankFunction.java
Outdated
Show resolved
Hide resolved
...le-runtime-blink/src/test/java/org/apache/flink/table/runtime/window/WindowOperatorTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wuchong thanks for your suggestions. I split keepFirstRow and keepLastRow into different functions. And Update the API of BaseRowKeySelector.
bce8cf6
to
8cd47d6
Compare
@flinkbot approve all. +1 to merge. Wait until Travis turns green. |
1d79a64
to
288fa78
Compare
…plicate to StreamTransformation
2. Introduce SortedMapSerializerSnapshot to do snapshot for SortedMapTypeInfo.
2.Other minor update.
… DeduplicateKeepLastRowFunction 2. other minor update.
Merging... |
…rators for blink streaming runtime This closes apache#8109
…rators for blink streaming runtime This closes apache#8109
What is the purpose of the change
Introduce Rank and Deduplicate operators for blink streaming runtime
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation