New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12062][table-runtime-blink] Introduce bundle operator to streaming table runtime #8086
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @KurtYoung , left some comments.
while (isInFinishingBundle) { | ||
checkpointingLock.wait(); | ||
} | ||
isInFinishingBundle = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why need isInFinishingBundle
to wait checkpointingLock
? which other thread will invoke finishBundle
?
Should Flink guarantee thread security for processElement
, processWatermark
, prepareSnapshotPreBarrier
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, i will add more comments to explain why we need this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this as long as we only have count bundle trigger. After we introduce time based trigger, this will be needed. I will delete them for now.
* Called when a bundle is finished. Transform a bundle to zero, one, or more output elements. | ||
*/ | ||
public abstract void finishBundle(Map<K, V> buffer, Collector<OUT> out) throws Exception; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need add a method endInput
to handle group aggregate without keys in bounded streaming?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is done by close now
checkpointingLock.wait(); | ||
} | ||
try { | ||
finishBundle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember Flink didn't allow data to be sent in close
, only after introducing endInput
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore it. Sent data in close
is OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to api doc of StreamOperator::close()
:
The method is expected to flush all remaining buffered data. Exceptions during this flushing of buffered should be propagated, in order to cause the operation to be recognized as failed, because the last data items are not processed properly
I think it's ok to emit data during close
Although we can send data in |
+1 LGTM |
…ming table runtime.
…ming table runtime. This closes apache#8086
…ming table runtime. This closes apache#8086
What is the purpose of the change
Introducing bundle operator to streaming table runtime, which will save incoming records in a key-value map. Once bundler triggers, the bundle function will be invoked. All buffered data will be passed in, and one can do some optimizations based on this.
One useful scenario for bundle operator is "Group Aggregate". We can organize the bundle data with grouping key. Once bundle triggers, we can first pre aggregate all data belongs to same key in memory, then we only have to operate with state once for each key. This will save lots of cost and have better performance.
Brief change log
Verifying this change
unittest
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation