Skip to content

Commit

Permalink
KAFKA-14639: A single partition may be revoked and assign during a si…
Browse files Browse the repository at this point in the history
…ngle round of rebalance (#13550)

This is a really long story, but the incident started in KAFKA-13419 when we observed a member sending out a topic partition owned from the previous generation when a member missed a rebalance cycle due to REBALANCE_IN_PROGRESS.

This patch changes the AbstractStickyAssignor.AllSubscriptionsEqual method.  In short, it should no long check and validate only the highest generation.  Instead, we consider 3 cases:
1. Member will continue to hold on to its partition if there are no other owners
2. If there are 1+ owners to the same partition. One with the highest generation will win.
3. If two members of the same generation hold on to the same partition.  We will log an error but remove both from the assignment. (Same with the current logic)

Here are some important notes that lead to the patch:
- If a member is kicked out of the group, and `UNKNOWN_MEMBER_ID` will be thrown.
- It seems to be a common situation that members are late to joinGroup and therefore get `REBALANCE_IN_PROGRESS` error.  This is why we don't want to reset generation because it might cause lots of revocations and can be disruptive

To summarize the current behavior of different errors:
`REBALANCE_IN_PROGRESS`
- heartbeat: requestRejoin if member state is stable
- joinGroup: rejoin immediately
- syncGroup: rejoin immediately
- commit: requestRejoin and fail the commit. Raise this exception if the generation is staled, i.e. another rebalance is already in progress.

`UNKNOWN_MEMBER_ID`
- heartbeat: resetStateAndRejoinif generation hasn't changed. otherwise, ignore
- joinGroup: resetStateAndRejoin if generation unchanged, otherwise rejoin immediately
- syncGroup:  resetStateAndRejoin if generation unchanged, otherwise rejoin immediately

`ILLEGAL_GENERATION`
- heartbeat: resetStateAndRejoinif generation hasn't changed. otherwise, ignore
- syncGroup: raised the exception if generation has been resetted or the member hasn't completed rebalancing.  then resetStateAndRejoin if generation unchanged, otherwise rejoin immediately

Reviewers: David Jacot <djacot@confluent.io>
  • Loading branch information
philipnee committed Apr 28, 2023
1 parent 10b3e66 commit c6ad151
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
Map<String, Subscription> subscriptions,
Map<String, List<TopicPartition>> consumerToOwnedPartitions,
Set<TopicPartition> partitionsWithMultiplePreviousOwners) {
Set<String> membersOfCurrentHighestGeneration = new HashSet<>();
boolean isAllSubscriptionsEqual = true;

Set<String> subscribedTopics = new HashSet<>();
Expand All @@ -137,8 +136,8 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
Map<TopicPartition, String> allPreviousPartitionsToOwner = new HashMap<>();

for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
String consumer = subscriptionEntry.getKey();
Subscription subscription = subscriptionEntry.getValue();
final String consumer = subscriptionEntry.getKey();
final Subscription subscription = subscriptionEntry.getValue();

// initialize the subscribed topics set if this is the first subscription
if (subscribedTopics.isEmpty()) {
Expand All @@ -149,47 +148,57 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
}

MemberData memberData = memberData(subscription);
final int memberGeneration = memberData.generation.orElse(DEFAULT_GENERATION);
maxGeneration = Math.max(maxGeneration, memberGeneration);

List<TopicPartition> ownedPartitions = new ArrayList<>();
consumerToOwnedPartitions.put(consumer, ownedPartitions);

// Only consider this consumer's owned partitions as valid if it is a member of the current highest
// generation, or it's generation is not present but we have not seen any known generation so far
if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration
|| !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) {

// If the current member's generation is higher, all the previously owned partitions are invalid
if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) {
allPreviousPartitionsToOwner.clear();
partitionsWithMultiplePreviousOwners.clear();
for (String droppedOutConsumer : membersOfCurrentHighestGeneration) {
consumerToOwnedPartitions.get(droppedOutConsumer).clear();
}

membersOfCurrentHighestGeneration.clear();
maxGeneration = memberData.generation.get();
}
// the member has a valid generation, so we can consider its owned partitions if it has the highest
// generation amongst
for (final TopicPartition tp : memberData.partitions) {
if (allTopics.contains(tp.topic())) {
String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
if (otherConsumer == null) {
// this partition is not owned by other consumer in the same generation
ownedPartitions.add(tp);
} else {
final int otherMemberGeneration = subscriptions.get(otherConsumer).generationId().orElse(DEFAULT_GENERATION);

membersOfCurrentHighestGeneration.add(consumer);
for (final TopicPartition tp : memberData.partitions) {
// filter out any topics that no longer exist or aren't part of the current subscription
if (allTopics.contains(tp.topic())) {
String otherConsumer = allPreviousPartitionsToOwner.put(tp, consumer);
if (otherConsumer == null) {
// this partition is not owned by other consumer in the same generation
ownedPartitions.add(tp);
} else {
if (memberGeneration == otherMemberGeneration) {
// if two members of the same generation own the same partition, revoke the partition
log.error("Found multiple consumers {} and {} claiming the same TopicPartition {} in the "
+ "same generation {}, this will be invalidated and removed from their previous assignment.",
consumer, otherConsumer, tp, maxGeneration);
consumerToOwnedPartitions.get(otherConsumer).remove(tp);
+ "same generation {}, this will be invalidated and removed from their previous assignment.",
consumer, otherConsumer, tp, memberGeneration);
partitionsWithMultiplePreviousOwners.add(tp);
consumerToOwnedPartitions.get(otherConsumer).remove(tp);
allPreviousPartitionsToOwner.put(tp, consumer);
} else if (memberGeneration > otherMemberGeneration) {
// move partition from the member with an older generation to the member with the newer generation
ownedPartitions.add(tp);
consumerToOwnedPartitions.get(otherConsumer).remove(tp);
allPreviousPartitionsToOwner.put(tp, consumer);
log.warn("Consumer {} in generation {} and consumer {} in generation {} claiming the same " +
"TopicPartition {} in different generations. The topic partition wil be " +
"assigned to the member with the higher generation {}.",
consumer, memberGeneration,
otherConsumer, otherMemberGeneration,
tp,
memberGeneration);
} else {
// let the other member continue to own the topic partition
log.warn("Consumer {} in generation {} and consumer {} in generation {} claiming the same " +
"TopicPartition {} in different generations. The topic partition wil be " +
"assigned to the member with the higher generation {}.",
consumer, memberGeneration,
otherConsumer, otherMemberGeneration,
tp,
otherMemberGeneration);
}
}
}
}
}

return isAllSubscriptionsEqual;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,95 @@ public void testPartitionsTransferringOwnershipIncludeThePartitionClaimedByMulti
assertTrue(isFullyBalanced(assignment));
}

@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
@EnumSource(RackConfig.class)
public void testEnsurePartitionsAssignedToHighestGeneration(RackConfig rackConfig) {
initializeRacks(rackConfig);
Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
partitionsPerTopic.put(topic, partitionInfos(topic, 3));
partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));

int currentGeneration = 10;

// ensure partitions are always assigned to the member with the highest generation
subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3),
partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0)), currentGeneration, 0));
subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3),
partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1)), currentGeneration - 1, 1));
subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3),
partitions(tp(topic2, 1), tp(topic3, 0), tp(topic3, 2)), currentGeneration - 2, 1));

Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 0), tp(topic3, 0))),
new HashSet<>(assignment.get(consumer1)));
assertEquals(new HashSet<>(partitions(tp(topic, 1), tp(topic2, 1), tp(topic3, 1))),
new HashSet<>(assignment.get(consumer2)));
assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), tp(topic3, 2))),
new HashSet<>(assignment.get(consumer3)));
assertTrue(assignor.partitionsTransferringOwnership.isEmpty());

verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
assertTrue(isFullyBalanced(assignment));
}

@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
@EnumSource(RackConfig.class)
public void testNoReassignmentOnCurrentMembers(RackConfig rackConfig) {
initializeRacks(rackConfig);
Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
partitionsPerTopic.put(topic, partitionInfos(topic, 3));
partitionsPerTopic.put(topic1, partitionInfos(topic1, 3));
partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));
partitionsPerTopic.put(topic3, partitionInfos(topic3, 3));

int currentGeneration = 10;

subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
partitions(), DEFAULT_GENERATION, 0));
subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
partitions(tp(topic, 0), tp(topic2, 0), tp(topic1, 0)), currentGeneration - 1, 1));
subscriptions.put(consumer3, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
partitions(tp(topic3, 2), tp(topic2, 2), tp(topic1, 1)), currentGeneration - 2, 2));
subscriptions.put(consumer4, buildSubscriptionV2Above(topics(topic, topic2, topic3, topic1),
partitions(tp(topic3, 1), tp(topic, 1), tp(topic, 2)), currentGeneration - 3, 3));

Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
// ensure assigned partitions don't get reassigned
assertEquals(new HashSet<>(partitions(tp(topic1, 2), tp(topic2, 1), tp(topic3, 0))),
new HashSet<>(assignment.get(consumer1)));
assertTrue(assignor.partitionsTransferringOwnership.isEmpty());

verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
assertTrue(isFullyBalanced(assignment));
}

@ParameterizedTest(name = TEST_NAME_WITH_RACK_CONFIG)
@EnumSource(RackConfig.class)
public void testOwnedPartitionsAreInvalidatedForConsumerWithMultipleGeneration(RackConfig rackConfig) {
initializeRacks(rackConfig);
Map<String, List<PartitionInfo>> partitionsPerTopic = new HashMap<>();
partitionsPerTopic.put(topic, partitionInfos(topic, 3));
partitionsPerTopic.put(topic2, partitionInfos(topic2, 3));

int currentGeneration = 10;

subscriptions.put(consumer1, buildSubscriptionV2Above(topics(topic, topic2),
partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1)), currentGeneration, 0));
subscriptions.put(consumer2, buildSubscriptionV2Above(topics(topic, topic2),
partitions(tp(topic, 0), tp(topic2, 1), tp(topic2, 2)), currentGeneration - 2, 1));

Map<String, List<TopicPartition>> assignment = assignor.assignPartitions(partitionsPerTopic, subscriptions);
assertEquals(new HashSet<>(partitions(tp(topic, 0), tp(topic2, 1), tp(topic, 1))),
new HashSet<>(assignment.get(consumer1)));
assertEquals(new HashSet<>(partitions(tp(topic, 2), tp(topic2, 2), tp(topic2, 0))),
new HashSet<>(assignment.get(consumer2)));
assertTrue(assignor.partitionsTransferringOwnership.isEmpty());

verifyValidityAndBalance(subscriptions, assignment, partitionsPerTopic);
assertTrue(isFullyBalanced(assignment));
}

@Test
public void testRackAwareAssignmentWithUniformSubscription() {
Map<String, Integer> topics = mkMap(mkEntry("t1", 6), mkEntry("t2", 7), mkEntry("t3", 2));
Expand Down

0 comments on commit c6ad151

Please sign in to comment.