Skip to content

Commit

Permalink
KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (#8777)
Browse files Browse the repository at this point in the history
Fix the failed testMultiConsumerStickyAssignment by modifying the logic error in allSubscriptionsEqual method.

We will create the consumerToOwnedPartitions to keep the set of previously owned partitions encoded in the Subscription. It's our basis to do the reassignment. In the allSubscriptionsEqual, we'll get the member generation of the subscription, and remove all previously owned partitions as invalid if the current generation is higher. However, the logic before my fix, will remove the current highest member out of the consumerToOwnedPartitions, which should be kept because it's the current higher generation member. Fix this logic error.

Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
  • Loading branch information
showuon authored and guozhangwang committed Jun 2, 2020
1 parent e9c7f85 commit 2c30619
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,20 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
|| !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {

// If the current member's generation is higher, all the previously owned partitions are invalid
if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
membersOfCurrentHighestGeneration.clear();
maxGeneration = memberData.generation.get();
}

membersOfCurrentHighestGeneration.add(consumer);
for (final TopicPartition tp : memberData.partitions) {
// filter out any topics that no longer exist or aren't part of the current subscription
if (allTopics.contains(tp.topic())) {
ownedPartitions.add(tp);
}
}

// If the current member's generation is higher, all the previous owned partitions are invalid
if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
membersOfCurrentHighestGeneration.clear();
maxGeneration = memberData.generation.get();
}
}
}

Expand Down

0 comments on commit 2c30619

Please sign in to comment.