-
Notifications
You must be signed in to change notification settings - Fork 13.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-7703: position() may return a wrong offset after seekToEnd #6407
KAFKA-7703: position() may return a wrong offset after seekToEnd #6407
Conversation
ed52b04
to
10acb07
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, good find. Left a couple comments.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Outdated
Show resolved
Hide resolved
fecccf5
to
45ed81e
Compare
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@viktorsomogyi Thanks for the update. I left one suggestion to consider. Let me know what you think.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Outdated
Show resolved
Hide resolved
808ab74
to
31eecaa
Compare
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Show resolved
Hide resolved
boolean match = this.subscription.contains(topicPartition.topic()); | ||
if (!match) { | ||
log.info("Assigned partition {} for non-subscribed topic; subscription is {}", topicPartition, this.subscription); | ||
synchronized (SubscriptionState.this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spotbugs throw an error for this but I don't think it's accurate since we synchronize the whole method and this predicate will be executed there. Should I add this to the excludes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I'm not sure why we need this additional block if we are already holding the lock. The predicate wouldn't be used after this method returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. Left a few more comments.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Show resolved
Hide resolved
return !hasValidPosition() && !awaitingReset(); | ||
} | ||
|
||
private boolean isPaused() { | ||
private synchronized boolean isPaused() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need synchronization here if SubscriptionState is synchronized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all method in SubscriptionState is synchronized. Basically when I don't need to modify assignment
I only synchronize until I get the TopicPartitionState
and then lock that object to avoid locking the whole collection when it's not necessary.
If you look at isPaused
there:
public boolean isPaused(TopicPartition tp) {
TopicPartitionState assignedOrNull = assignedStateOrNull(tp);
return assignedOrNull != null && assignedOrNull.isPaused();
}
the first line would only get the state from the collection, release the lock of SubscriptionState
and only lock the given TopicPartitionState
.
When I write the collection then I lock for instance in assignFromUser
then I lock the whole method as I think we should keep writes consistent.
If you think this is unnecessary or complicated I can just lock SubscriptionState
every time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I have a slight preference to just lock SubscriptionState
every time since it is the simplest option. I don't think contention is a major problem since there's only the heartbeat thread which is sleeping most of the time. Unless there's some reason to think the cost of lock acquisition itself is a concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I don't think lock acquisition would be expensive in this case.
boolean match = this.subscription.contains(topicPartition.topic()); | ||
if (!match) { | ||
log.info("Assigned partition {} for non-subscribed topic; subscription is {}", topicPartition, this.subscription); | ||
synchronized (SubscriptionState.this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I'm not sure why we need this additional block if we are already holding the lock. The predicate wouldn't be used after this method returns.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Show resolved
Hide resolved
3aeaed9
to
9705b17
Compare
9705b17
to
e1568af
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay. This is looking good. I had just a few additional comments.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
Outdated
Show resolved
Hide resolved
retest this please |
Compilation failing:
|
9e6e24a
to
818389b
Compare
Fixed the compilation error (it was due to a conflict with another commit) but the test failures seem suspicious. Please hold on for now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, just a couple more comments.
public void resetGroupSubscription() { | ||
this.groupSubscription.retainAll(subscription); | ||
synchronized void resetGroupSubscription() { | ||
groupSubscription = new HashSet<>(groupSubscription); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we also treat subscription
as immutable once created, could we change this to groupSubscription = subscription
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, will do.
this.subscription = topicsToSubscribe; | ||
this.groupSubscription.addAll(topicsToSubscribe); | ||
subscription = topicsToSubscribe; | ||
groupSubscription = topicsToSubscribe; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic looks a little different. The "group subscription" is a little hard to understand. It's intended to be the union of the subscriptions of all consumers in the group. When we change the local subscription, we should still keep the subscription from the rest of the group. Perhaps separately we can consider how to simplify this bookkeeping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out. I'll fix this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @viktorsomogyi .
I will go ahead and merge. The failing test is a known flake: https://issues.apache.org/jira/browse/KAFKA-8122. |
When poll is called which resets the offsets to the beginning, followed by a seekToEnd and a position, it could happen that the "reset to earliest" call in poll overrides the "reset to latest" initiated by seekToEnd in a very delicate way: 1. both request has been issued and returned to the client side (listOffsetResponse has happened) 2. in Fetcher.resetOffsetIfNeeded(TopicPartition, Long, OffsetData) the thread scheduler could prefer the heartbeat thread with the "reset to earliest" call, overriding the offset to the earliest and setting the SubscriptionState with that position. 3. The thread scheduler continues execution of the thread (application thread) with the "reset to latest" call and discards it as the "reset to earliest" already set the position - the wrong one. 4. The blocking position call returns with the earliest offset instead of the latest, despite it wasn't expected. The fix makes SubscriptionState synchronized so that we can verify that the reset is expected while holding the lock. Reviewers: Jason Gustafson <jason@confluent.io>
@hachikuji thanks a lot for reviewing this! |
…che#6407) When poll is called which resets the offsets to the beginning, followed by a seekToEnd and a position, it could happen that the "reset to earliest" call in poll overrides the "reset to latest" initiated by seekToEnd in a very delicate way: 1. both request has been issued and returned to the client side (listOffsetResponse has happened) 2. in Fetcher.resetOffsetIfNeeded(TopicPartition, Long, OffsetData) the thread scheduler could prefer the heartbeat thread with the "reset to earliest" call, overriding the offset to the earliest and setting the SubscriptionState with that position. 3. The thread scheduler continues execution of the thread (application thread) with the "reset to latest" call and discards it as the "reset to earliest" already set the position - the wrong one. 4. The blocking position call returns with the earliest offset instead of the latest, despite it wasn't expected. The fix makes SubscriptionState synchronized so that we can verify that the reset is expected while holding the lock. Reviewers: Jason Gustafson <jason@confluent.io>
When poll is called which resets the offsets to the beginning, followed by a seekToEnd and a position, it could happen that the "reset to earliest" call in poll overrides the "reset to latest" initiated by seekToEnd in a very delicate way:
The fix makes the TopicPartitionState in SubscriptionState synchronized and starts to track the requested reset timestamp. With this we can precisely decide if the incoming offset reset is really what we want (by comparing the timestamp set when assigning for reset and the one that is actually used on seek). Therefore the latest initiated offset reset will happen only. Synchronization furthermore ensures that this is done in an atomic manner to avoid further similar bugs.
Committer Checklist (excluded from commit message)