-
Notifications
You must be signed in to change notification settings - Fork 210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Maximum possible offset to be committed for partition #1053
Conversation
long maxCommitted = maxCommittedOffsets.getOrDefault(partition, Long.MAX_VALUE); | ||
|
||
long offsetToBeCommitted = Math.min(minInflight, maxCommitted); | ||
if (offsetToBeCommitted >= 0 && offsetToBeCommitted < Long.MAX_VALUE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we log case when this if
is false? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a sanity check. :)
Negative offset would mean we tried to commit such offset somewhere.
Inflight and committed offsets for a specific partition cannot be equal to Long.MAX_VALUE
at the same time, so the other check would mean somebody wanted to commit such offset (hard to say if it's possible to achieve such offset in real 😄).
Personally I would leave it as is, such offsets should just be ignored. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Umh... ok, I'll just add that log. And add a test case, that we should skip such offsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done :)
maxDrainedCommittedOffsets.forEach((partition, drainedOffset) -> | ||
maxCommittedOffsets.compute(partition, (p, storedOffset) -> | ||
storedOffset == null || storedOffset < drainedOffset ? drainedOffset : storedOffset) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not 100% sure what is going on here, can you clarify a bit? 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so there's a map(partition->offset)
. We want this map to contain the largest offset per partition. If we have a new offset (drainedOffset
) to be added, we need to check if the partition contained a larger offset already (storedOffset == null || storedOffset < drainedOffset
-> it means it didn't, so we can override it). We're inserting/updating items using compute
method here.
In our current approach the only state of
OffsetCommitter
that has been persisted for the use of upcoming algorithm runs wereinflightOffsets
that were used for preventing from skipping some messages that were currently inflight.Turns out we should store maximum possible offsets as well.
Imagine such scenario (just as in added test case):
OffsetCommitter
algorithm runs and loses the information about message with offset=2 being delivered (it remembers only offset=1 as the one that is still inflight at the time)This fix adds another collections which is persisted between algorithm runs:
maxCommittedOffsets
which stores maximum offsets that could not yet be committed because ofinflightOffsets
.In above case we would store offset=2 for later use.