-
Notifications
You must be signed in to change notification settings - Fork 13.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-13817 Always sync nextTimeToEmit with wall clock #12166
Conversation
We should sync nextTimeToEmit with wall clock on each method call to ensure throttling works correctly in case of clock drift. If we dont, then in the event of significant clock drift, throttling might not happen for a long time, this can hurt performance.
|
||
// Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, if we dont set it everytime, | ||
// they can get out of sync during a clock drift | ||
sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok to have comments here? it wasn't obvious to me what this piece of code was doing initially, I thought having comments might help, but I don't feel strongly, please let me know if you'd like it removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with the comment
@@ -333,6 +352,87 @@ public void shouldJoinWithCustomStoreSuppliers() { | |||
runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows); | |||
} | |||
|
|||
@Test | |||
public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is quite convoluted because it relies on low-level API, this appears to be the 1st instance in test (other test relies on higher level API), is this acceptable?
I resort to this approach because we need to manipulate TimeTracker which isn't available in high level API. And I don't feel comfortable to make larger change in the codebase.
Please let me know if you think there's a better way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'a a minor improvement to the actual code, so it might be ok to not add a test for it? -- Otherwise, I don't have a proposal for better code... It's in the guts so it's messy (and thus maybe not worth?) to test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is KStreamWindowAggregateTest#shouldEmitWithLargeInterval()
that tests a similar thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am happy to defer to you or fellow contributors/maintainers.
My personal view is that the behavior change is quite subtle, so having test to codify it is useful, but if we are happy to merge it without unit test I am happy too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @qingwei91 , thanks for fixing and great test coverage! Regarding test complexity, can you do something similar as https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java#L768 to test time drift. Instead of mocking low level stores, can you check the final results?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the advice, I will try to mimick that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can also take a look by end of this week. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @qingwei91 ! The fix looks good. Just one comment about the test to see if we can make it simpler.
|
||
// Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, if we dont set it everytime, | ||
// they can get out of sync during a clock drift | ||
sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with the comment
@qingwei91 -- What is the status of this PR? Seems there is open comments that would need to be addressed? Would be great if we could push this over the finish line. |
@mjsax sorry, I will try to pick this back up this weekend |
637b1f0
to
26f6fa6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @qingwei91 ! LGTM. Failed test doesn't seem related.
@mjsax can help approve and merge as a committer. |
Thanks for the PR! Merged to |
Reviewers: Matthias J. Sax <matthias@confluent.io>, Hao Li <hli@confluent.io>
We should sync nextTimeToEmit with wall clock on each method call to ensure throttling works correctly in case of clock drift.
If we dont, then in the event of significant clock drift, throttling might not happen for a long time, this can hurt performance.
I've added a unit test to simulate clock drift and verify my change works.
Committer Checklist (excluded from commit message)