New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image #13901
Conversation
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java
Outdated
Show resolved
Hide resolved
...coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
Show resolved
Hide resolved
...dinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
@@ -568,5 +568,20 @@ public void testMetadataRefreshDeadline() { | |||
assertTrue(group.hasMetadataExpired(time.milliseconds())); | |||
assertEquals(time.milliseconds() + 1000, group.metadataRefreshDeadline().deadlineMs); | |||
assertEquals(group.groupEpoch() + 1, group.metadataRefreshDeadline().epoch); | |||
|
|||
// Set the refresh deadline. | |||
group.setMetadataRefreshDeadline(time.milliseconds() + 1000, group.groupEpoch()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can just go back an epoch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't understand your comment. could you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, i just fixed the previous test case at L566.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when we set the refresh deadline, there is no check on the group epoch. Maybe it's fine that we went backwards an epoch, but I'm not sure which scenario this would be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could for instance if the write is lost so the new epoch would not be known anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you’re right. I actually use the current group epoch here because hasMetadataExpired would be true if epoch + 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok -- so this is behavior we want then? I guess I was just having trouble seeing when we would update to the lower epoch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. The epoch value is not so important here. The important part is that it should be reset regardless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm -- reset meaning we should refresh? I guess my point was that if we lower the epoch we may delay the reset. I guess worst case, we just have the wait the refresh interval though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reworked the test to reduce the confusion. Let me know what you think.
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
...p-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
Show resolved
Hide resolved
...coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java
Outdated
Show resolved
Hide resolved
This reverts commit a3d7e76.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm. thanks David!
This patch adds (1) the logic to propagate a new MetadataImage to the running coordinators; and (2) the logic to ensure that all the consumer groups subscribed to topics with changes will refresh their subscriptions metadata on the next heartbeat. In the mean time, it ensures that freshly loaded consumer groups also refresh their subscriptions metadata on the next heartbeat.
Committer Checklist (excluded from commit message)