-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7747 Check for truncation after leader changes #6371
Conversation
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch. Left a few comments.
Node leader = fetch().leaderFor(tp); | ||
if (leader == null) | ||
leader = Node.noNode(); | ||
// TODO there a race here between reading the leader node and reading the epoch? Does it matter? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other methods are synchronized. Any reason not to do that here?
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
…e in prepareFetchRequest
…n check is run in updateFetchPositions
This ensures that the position has the most up-to-date leader as possible. This won't submit the async validation request, but just transition the subscription state.
retest this please |
1 similar comment
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few more comments. I think there were still some minor previous comments which haven't been addressed.
@@ -2206,6 +2226,7 @@ private boolean updateFetchPositions(final Timer timer) { | |||
// by always ensuring that assigned partitions have an initial position. | |||
if (coordinator != null && !coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false; | |||
|
|||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: unneeded newline
}); | ||
|
||
// Collect positions needing validation, with backoff | ||
Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = subscriptions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we save the second pass over the partitions by doing this collection in the loop above? I'm just thinking about MM-like use cases where the number of partitions could be quite large. A possible optimization is to to cache the metadata update version so that we only bother redoing this check if there has actually been a metadata update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first pass through all the partitions covers the case of metadata changing, but the second pass through is also used to resubmit the async request with backoff. We could remember the last metadata version seen and avoid unnecessary calls to the first loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we leave this for a follow-up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Works for me 👍
Long offset = this.subscriptions.position(partition); | ||
if (offset != null) | ||
return offset; | ||
SubscriptionState.FetchPosition position = this.subscriptions.validPosition(partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is interesting. There is always a race with the next leader election for a valid position. Do you think we need to be strict about the timing? I guess if you provide an epoch in seek(), this would be a good way to force validation before fetching.
@@ -2196,6 +2213,9 @@ private void close(long timeoutMs, boolean swallowException) { | |||
* @return true iff the operation completed without timing out | |||
*/ | |||
private boolean updateFetchPositions(final Timer timer) { | |||
// If any partitions have been truncated due to a leader change, we need to validate the offsets | |||
fetcher.validateOffsetsIfNeeded(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already mentioned, but I think we can be smarter about caching some state to avoid unnecessary work here. Validation is only needed if we do an unprotected seek or a metadata update arrives. Probably this can be left for a follow-up. It's only a concern when the number of partitions and the poll frequency is high.
entry.getValue().leaderEpoch().ifPresent(epoch -> this.metadata.updateLastSeenEpochIfNewer(entry.getKey(), epoch)); | ||
this.subscriptions.seek(tp, offset); | ||
this.subscriptions.seek(tp, position); | ||
this.subscriptions.maybeValidatePosition(tp, leaderAndEpoch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. Doesn't this mean we'd only validate the committed offset if there is a change to the current leader and epoch?
Also, could leaderAndEpoch
be updated by the call to updateLastSeenEpochIfNewer
above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the position I'm creating above, the leader epoch is empty which will cause it to enter validation. This relies on the FetchPosition#safeToFetchFrom
behavior. If this seems too convoluted, we could add a seekAndValidate
method or something similar.
offsetAndMetadata.offset(), | ||
offsetAndMetadata.leaderEpoch(), | ||
this.metadata.leaderAndEpoch(partition)); | ||
this.subscriptions.seek(partition, newPosition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to do validation on this new position? Also, did we lose the call to update the last seen epoch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, did we lose the call to update the last seen epoch?
I'll add that back
Don't we need to do validation on this new position?
I suppose we do, yea. Perhaps another use case of the seekAndValidate
method proposed above?
if (subscriptions.awaitingValidation(respTopicPartition)) { | ||
SubscriptionState.FetchPosition currentPosition = subscriptions.position(respTopicPartition); | ||
Metadata.LeaderAndEpoch currentLeader = currentPosition.currentLeader; | ||
if (!currentLeader.equals(cachedLeaderAndEpochs.get(respTopicPartition))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checks above ensure that we are still in the validating phase and that the current leader epoch hasn't changed. I guess it is still possible that both of these are true, but the user has seeked to a different position. Perhaps we can add position to the cached data above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's still okay as long as the position's epoch hasn't changed. What's the side effect if you seek to offset 10 (FETCHING), do validation (AWAIT_VALIDATION), seek to offset 30 (FETCHING), do validation again (AWAIT_VALIDATION), and then get back the OffsetsForLeader response from the first async validation? I think as long as the position's epoch is the same, there isn't a problem. When the second response comes back it will get ignored since we won't be in the right state. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the important thing is that the position's epoch hasn't changed. That and the current leader epoch are the only input to the OffsetsForLeaderEpoch API.
assignedState(tp).position(position); | ||
} | ||
|
||
public boolean maybeValidatePosition(TopicPartition tp, Metadata.LeaderAndEpoch leaderAndEpoch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be named currentLeaderAndEpoch
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the patch!
…he#6371) After the client detects a leader change we need to check the offset of the current leader for truncation. These changes were part of KIP-320: https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation. Reviewers: Jason Gustafson <jason@confluent.io>
After the client detects a leader change we need to check the offset of the current leader for truncation.
TODO expand on this.
Committer Checklist (excluded from commit message)