New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CC-4318 Fix off-by-one error for offset reporting to Kafka topic #425
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @cyrusv. I have a question below ... not convinced this is the right approach.
*/ | ||
public Long committedOffset() { | ||
return committedOffset; | ||
public long offset() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cyrusv, I'm not sure I understand why we're returning 1 past what was last committed to HDFS. We need to verify that what is committed to the consumer offsets exactly matches what is committed to HDFS. I'm not convinced this really identified/fixed the bug.
Second, offset
is initialized to -1L
, so is it ever possible that this method return that? Before this change, the offset getter might return null, but the code in DataWriter looked for this case and never wrote that into the offsets.
Finally, we lost JavaDoc - can we put it back to explain what offset this actually returns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I wonder if this is the source of the confusion: the consumer offset should point to the offset of the next record to be consumed, whereas the prior change recorded for the consumer offset the last record committed to HDFS.
It'd be good to document this, though I still think the offset()
method should mention why it's 1 past the last committed offset and why (to match the consumer offsets). Also, we still need to handle the -1 case mentioned above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback, @rhauch -- I've renamed some variables and added some documentation to clarify the work done here. You can see the work is most closely related to 284fbc2
, for reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided to stick to a primitive with -1
initial value, since a lot of logic in the TopicPartitionWriter
tests against -1
and I think keeping this code as-is (relative to pre-284fbc2
) with a clearer name will provide the most stability. I could be convinced that null
initial value would be preferable if we'd rather have NPEs than misleading long values, even though I decided it wasn't worth the tradeoff since we've seen this structure work successfully in many production envs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @cyrusv. I think it's worth trying to minimize the changes and risk, so I have a few suggestions below. Perhaps the biggest is to realize that the TopicPartitionWriter has no notion of "consumer", and so "consumer" in fields and method names doesn't really make sense.
offset = -1L; | ||
committedOffset = null; | ||
// The next offset to consume after the last commit (one more than last offset written to HDFS) | ||
committedConsumerOffset = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that using "Consumer" in the variable name makes sense, because this offset really is all about the record offset that this writer has committed to HDFS.
I'm also not a fan of removing or renaming the old offset
variable, mostly because that incurs changes on more lines, potentially affects the logic (especially on line 337), and makes this PR larger than it needs to be. Because the old committedOffset
field would be changed to be the same, why not remove committedOffset
instead and then add a comment around line 568 about why the + 1
is used. Then, the JavaDoc for committedOffset()
can be changed as you have in this PR.
*/ | ||
public Long committedOffset() { | ||
return committedOffset; | ||
public long committedConsumerOffset() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, this method doesn't know about the consumer -- this offset literally is 1 past the offset of the last record committed to HDFS. Perhaps rename it to nextOffset()
instead (as suggested by your test).
src/main/java/io/confluent/connect/hdfs/TopicPartitionWriter.java
Outdated
Show resolved
Hide resolved
Consumer groups were reporting current-offset as one less than log-end-offset in the fully caught-up state. This fixes the topic's perception of current offset to match the current offset we are storing in HDFS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more request to fix JavaDoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more request to fix JavaDoc. Otherwise this looks great.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks, @cyrusv!
Consumer groups were reporting current-offset as one less than
log-end-offset in the fully caught-up state.
This fixes the topic's perception of current offset to match
the current offset we are storing in HDFS