-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-13446][table-runtime-blink] Fix assign logic for row count sliding window #9248
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 70ee112 (Tue Aug 06 15:55:46 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @hequn8128 , I left some comments.
@@ -1045,7 +1045,7 @@ public void testSlidingCountWindow() throws Exception { | |||
|
|||
testHarness.processWatermark(new Watermark(12000)); | |||
testHarness.setProcessingTime(12000L); | |||
expectedOutput.add(record("key2", 15L, 5L, 0L)); | |||
expectedOutput.add(record("key2", 6L, 3L, 0L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we miss "15L, 5L" here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have not missed outputs here.
For the slide window(size=5, slide=3), the first window is [-2,2], the second is [1, 5], so here, we only have one output for key2.
if (0 > windowStart) { | ||
windows.add(new CountWindow(windowId, windowSize + windowStart)); | ||
} else { | ||
windows.add(new CountWindow(windowId, windowSize)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about to encode CountWindow as "countStart, countEnd" like TimeWindow. Then the logic of assigning windows will be similar to SlidingWindowAssigner
which is much simpler IMO. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
What is the purpose of the change
For blink planner, the Row count sliding window outputs incorrectly. The window assigner assigns less window than what expected. This means the window outputs fewer data.
This pull request fixes the window assigner and trigger logic, i.e., correct the window start and end calculation.
Brief change log
CountWindow
, because in some cases, the count window fires with a trigger size less than the window size.CountSlidingWindowAssigner
.Verifying this change
This change is already covered by existing tests, such as GroupWindowITCase and WindowOperatorTest.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation