Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16554: Online downgrade triggering and group type conversion #15721

Merged
merged 98 commits into from Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from 89 commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
50c655f
initial push
dongnuo123 Feb 21, 2024
92353f5
unit test
dongnuo123 Feb 21, 2024
5a1a557
unit test
dongnuo123 Feb 21, 2024
67dc1f3
Merge branch 'trunk' into group-protocol-migration-config
dongnuo123 Mar 18, 2024
980a535
rename GroupProtocolMigrationPolicy
dongnuo123 Mar 18, 2024
2e91a40
Merge branch 'trunk' into group-protocol-migration-config
dongnuo123 Mar 20, 2024
124ba9e
upgrade conversion
dongnuo123 Mar 22, 2024
3d7371d
upgrade conversion
dongnuo123 Mar 22, 2024
0ace3d8
upgrade conversion
dongnuo123 Mar 24, 2024
9075ea1
draft
dongnuo123 Mar 24, 2024
b333674
upgrade
dongnuo123 Mar 25, 2024
75ff595
upgrade
dongnuo123 Mar 25, 2024
1b72303
upgrade
dongnuo123 Mar 25, 2024
36aa59c
unit tests
dongnuo123 Mar 27, 2024
b9db1f8
unit tests
dongnuo123 Mar 27, 2024
905cb0c
add useLegacyProtocol to ConsumerGroupMember
dongnuo123 Mar 27, 2024
52963db
addressing comments
dongnuo123 Mar 27, 2024
38d7328
Merge branch 'apache:trunk' into group-protocol-migration-config
dongnuo123 Mar 27, 2024
f356896
Merge branch 'group-protocol-migration-config' into online-migration-…
dongnuo123 Mar 27, 2024
eb97afb
merge config adding branch
dongnuo123 Mar 27, 2024
f1250c6
useLegacyProtocol
dongnuo123 Mar 28, 2024
e0ff833
Merge branch 'online-migration-triggering' into online-migration-trig…
dongnuo123 Mar 28, 2024
0062975
blocked by protocol check
dongnuo123 Mar 29, 2024
c04cff5
replace useLegacyProtocol with supportedProtocols
dongnuo123 Apr 2, 2024
48ca1a8
initial downgrade conversion
dongnuo123 Apr 3, 2024
b532242
triggering
dongnuo123 Apr 3, 2024
b33d2d3
fix unit tests
dongnuo123 Apr 3, 2024
4072e63
the first unit test
dongnuo123 Apr 4, 2024
9fb3061
rename
dongnuo123 Apr 4, 2024
a1edf2f
upgrade conversion
dongnuo123 Mar 22, 2024
35edfec
upgrade conversion
dongnuo123 Mar 22, 2024
01d135c
upgrade conversion
dongnuo123 Mar 24, 2024
147320e
draft
dongnuo123 Mar 24, 2024
83e7568
upgrade
dongnuo123 Mar 25, 2024
3d8197e
upgrade
dongnuo123 Mar 25, 2024
f53dc40
upgrade
dongnuo123 Mar 25, 2024
7aa4250
unit tests
dongnuo123 Mar 27, 2024
3d6f801
unit tests
dongnuo123 Mar 27, 2024
6d0b726
merge config adding branch
dongnuo123 Mar 27, 2024
78d362f
Merge branch 'trunk' into group-protocol-migration-config
dongnuo123 Apr 4, 2024
96789ba
rename config
dongnuo123 Apr 4, 2024
54a8bc5
rename config
dongnuo123 Apr 4, 2024
8c6e91a
remove manually triggering a rebalance
dongnuo123 Apr 4, 2024
2098c58
rename withConsumerGroupMigrationPolicy
dongnuo123 Apr 4, 2024
a3bc367
unit test
dongnuo123 Apr 4, 2024
144bd1b
rename
dongnuo123 Apr 4, 2024
c666ee4
Merge branch 'trunk' into online-migration-triggering
dongnuo123 Apr 4, 2024
74f16bf
Merge remote-tracking branch 'origin/online-migration-triggering' int…
dongnuo123 Apr 4, 2024
0bad2d2
testLastNewProtocolConsumerLeavingUpgradingConsumerGroup
dongnuo123 Apr 5, 2024
bcf5b86
address comments
dongnuo123 Apr 6, 2024
0f86874
change unit test name
dongnuo123 Apr 6, 2024
56876be
Merge branch 'group-protocol-migration-config' into online-migration-…
dongnuo123 Apr 6, 2024
d1bbab7
comments
dongnuo123 Apr 6, 2024
88112ca
useLegacyProtocol
dongnuo123 Mar 28, 2024
9d1e0b8
bring in legacyProtocolMembersSupportedProtocols
dongnuo123 Apr 2, 2024
3dd2547
update supproted protocols count in consumer group
dongnuo123 Apr 7, 2024
fb729e2
todo: only comparing reference?
dongnuo123 Apr 7, 2024
d68d8f5
debug log
dongnuo123 Apr 7, 2024
7024405
completeAllSyncFutures
dongnuo123 Apr 7, 2024
fedaa64
make the policy case insensitive
dongnuo123 Apr 9, 2024
2842684
Merge branch 'group-protocol-migration-config' into online-migration-…
dongnuo123 Apr 9, 2024
47a8149
Merge branch 'online-migration-triggering' into online-migration-trig…
dongnuo123 Apr 9, 2024
2a1e1a3
fix unit tests
dongnuo123 Apr 9, 2024
1be837b
Merge branch 'online-migration-triggering' into online-migration-trig…
dongnuo123 Apr 9, 2024
76dc222
Merge branch 'trunk' into online-migration-triggering
dongnuo123 Apr 10, 2024
448597f
address comments
dongnuo123 Apr 11, 2024
4cae8bd
Merge branch 'trunk' into online-migration-triggering-downgrade
dongnuo123 Apr 11, 2024
3e697a6
Merge branch 'online-migration-triggering' into online-migration-trig…
dongnuo123 Apr 11, 2024
384b7aa
rename ClassicProtocolCollection
dongnuo123 Apr 11, 2024
56482ff
Merge branch 'online-migration-triggering' into online-migration-trig…
dongnuo123 Apr 11, 2024
6ede663
rename ClassicProtocolCollection
dongnuo123 Apr 11, 2024
469dc68
resolve comments
dongnuo123 Apr 14, 2024
c317550
resolve comments
dongnuo123 Apr 14, 2024
4309361
Merge branch 'online-migration-triggering' into online-migration-trig…
dongnuo123 Apr 14, 2024
11cee40
resolve comments
dongnuo123 Apr 14, 2024
aec2205
Merge branch 'online-migration-triggering' into online-migration-trig…
dongnuo123 Apr 14, 2024
2bddbcd
rename numClassicProtocolMembers
dongnuo123 Apr 14, 2024
0dc8f8d
Merge branch 'online-migration-triggering' into online-migration-trig…
dongnuo123 Apr 14, 2024
9b19011
unit test
dongnuo123 Apr 14, 2024
34397df
serialize version
dongnuo123 Apr 14, 2024
62105d9
resolve comments
dongnuo123 Apr 15, 2024
5981368
Merge branch 'online-migration-triggering' into online-migration-trig…
dongnuo123 Apr 15, 2024
aa7f24e
unit tests
dongnuo123 Apr 15, 2024
5a4cf8a
comments
dongnuo123 Apr 16, 2024
a03a377
comments
dongnuo123 Apr 16, 2024
7bbf833
comments
dongnuo123 Apr 16, 2024
057e078
Merge branch 'online-migration-triggering' into online-migration-trig…
dongnuo123 Apr 16, 2024
ac04e85
Merge branch 'trunk' into online-migration-triggering-downgrade
dongnuo123 Apr 17, 2024
2a554e9
merge trunk
dongnuo123 Apr 17, 2024
cf2c2c5
resolve comments
dongnuo123 Apr 17, 2024
19dbf7f
resolve comments
dongnuo123 Apr 17, 2024
2c7d903
resolve comments
dongnuo123 Apr 17, 2024
901d8bf
resolve comments
dongnuo123 Apr 17, 2024
bdb208e
comments
dongnuo123 Apr 18, 2024
509acc9
addressing comments
dongnuo123 Apr 22, 2024
6e2f824
completely replace ConsumerGroupFenceMember records
dongnuo123 Apr 22, 2024
2b73350
move validation to ConsumerGroupFenceMember
dongnuo123 Apr 23, 2024
9116e43
fail the test on a schema exception
dongnuo123 Apr 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
Expand Down Expand Up @@ -777,6 +778,59 @@ public ClassicGroup classicGroup(
}
}

public boolean validateOnlineDowngrade(ConsumerGroup consumerGroup, String memberId) {
if (!consumerGroupMigrationPolicy.isDowngradeEnabled()) {
log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.",
consumerGroup.groupId());
return false;
} else if (!consumerGroup.allMembersUseClassicProtocolExcept(memberId)) {
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
log.debug("Cannot downgrade consumer group {} to classic group because not all its members use the classic protocol.",
consumerGroup.groupId());
return false;
} else if (consumerGroup.numMembers() <= 1) {
log.info("Skip downgrading the consumer group {} to classic group because it's empty.",
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
consumerGroup.groupId());
return false;
} else if (consumerGroup.numMembers() - 1 > classicGroupMaxSize) {
log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.",
consumerGroup.groupId());
}
return true;
}

public CompletableFuture<Void> convertToClassicGroup(ConsumerGroup consumerGroup, String leavingMemberId, List<Record> records) {
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
consumerGroup.createGroupTombstoneRecords(records);
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
ClassicGroup classicGroup;
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
try {
classicGroup = consumerGroup.toClassicGroup(
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
leavingMemberId,
logContext,
time,
consumerGroupSessionTimeoutMs,
metadataImage,
records
);
} catch (SchemaException e) {
log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse " +
"the Consumer Protocol " + ConsumerProtocol.PROTOCOL_TYPE + ".", e);

throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.",
consumerGroup.groupId(), e.getMessage()));
}

groups.put(consumerGroup.groupId(), classicGroup);
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
metrics.onClassicGroupStateTransition(null, classicGroup.currentState());

CompletableFuture<Void> appendFuture = new CompletableFuture<>();
appendFuture.whenComplete((__, t) -> {
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
if (t == null) {
classicGroup.allMembers().forEach(member -> rescheduleClassicGroupMemberHeartbeat(classicGroup, member));
prepareRebalance(classicGroup, String.format("Downgrade group %s.", classicGroup.groupId()));
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
}
});
return appendFuture;
}

/**
* Validates the online upgrade if the Classic Group receives a ConsumerGroupHeartbeat request.
*
Expand Down Expand Up @@ -1422,10 +1476,12 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
) throws ApiException {
ConsumerGroup group = consumerGroup(groupId);
List<Record> records;
CompletableFuture<Void> appendFuture = null;
if (instanceId == null) {
ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
log.info("[GroupId {}] Member {} left the consumer group.", groupId, memberId);
records = consumerGroupFenceMember(group, member);
if (validateOnlineDowngrade(group, memberId)) appendFuture = convertToClassicGroup(group, memberId, records);
} else {
ConsumerGroupMember member = group.staticMember(instanceId);
throwIfStaticMemberIsUnknown(member, instanceId);
Expand All @@ -1438,11 +1494,16 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
log.info("[GroupId {}] Static Member {} with instance id {} left the consumer group.",
group.groupId(), memberId, instanceId);
records = consumerGroupFenceMember(group, member);
if (validateOnlineDowngrade(group, memberId)) appendFuture = convertToClassicGroup(group, memberId, records);
}
}
return new CoordinatorResult<>(records, new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(memberEpoch));
return new CoordinatorResult<>(
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
records,
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
.setMemberEpoch(memberEpoch),
appendFuture
);
}

/**
Expand Down Expand Up @@ -1549,7 +1610,10 @@ private void scheduleConsumerGroupSessionTimeout(
ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
log.info("[GroupId {}] Member {} fenced from the group because its session expired.",
groupId, memberId);
return new CoordinatorResult<>(consumerGroupFenceMember(group, member));
CompletableFuture<Void> appendFuture = null;
List<Record> records = consumerGroupFenceMember(group, member);
if (validateOnlineDowngrade(group, memberId)) appendFuture = convertToClassicGroup(group, memberId, records);
return new CoordinatorResult<>(records, appendFuture);
} catch (GroupIdNotFoundException ex) {
log.debug("[GroupId {}] Could not fence {} because the group does not exist.",
groupId, memberId);
Expand Down Expand Up @@ -1599,7 +1663,10 @@ private void scheduleConsumerGroupRebalanceTimeout(
log.info("[GroupId {}] Member {} fenced from the group because " +
"it failed to transition from epoch {} within {}ms.",
groupId, memberId, memberEpoch, rebalanceTimeoutMs);
return new CoordinatorResult<>(consumerGroupFenceMember(group, member));
CompletableFuture<Void> appendFuture = null;
List<Record> records = consumerGroupFenceMember(group, member);
if (validateOnlineDowngrade(group, memberId)) appendFuture = convertToClassicGroup(group, memberId, records);
return new CoordinatorResult<>(records, appendFuture);
} else {
log.debug("[GroupId {}] Ignoring rebalance timeout for {} because the member " +
"left the epoch {}.", groupId, memberId, memberEpoch);
Expand Down
Expand Up @@ -26,14 +26,20 @@
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Record;
import org.apache.kafka.coordinator.group.RecordHelpers;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.timeline.SnapshotRegistry;
Expand All @@ -42,6 +48,7 @@
import org.apache.kafka.timeline.TimelineObject;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1190,6 +1197,101 @@ private static Map<Uuid, Set<Integer>> topicPartitionMapFromList(
return topicPartitionMap;
}

/**
* Create a corresponding ClassicGroup and append the record for the creation for group downgrade.
* The member with leavingMemberId will not be converted to the new ClassicGroup as it's the last
* member using new consumer protocol that left and triggered the downgrade.
*
* @param leavingMemberId The member that will not be converted in the ClassicGroup.
* @param logContext The logContext to create the ClassicGroup.
* @param time The time to create the ClassicGroup.
* @param consumerGroupSessionTimeoutMs The consumerGroupSessionTimeoutMs.
* @param metadataImage The metadataImage.
* @param records The record list.
* @return The created ClassicGroup.
*/
public ClassicGroup toClassicGroup(
String leavingMemberId,
LogContext logContext,
Time time,
int consumerGroupSessionTimeoutMs,
MetadataImage metadataImage,
List<Record> records
) {
ClassicGroup classicGroup = new ClassicGroup(
logContext,
groupId(),
ClassicGroupState.STABLE,
time,
metrics,
groupEpoch(),
Optional.ofNullable(ConsumerProtocol.PROTOCOL_TYPE),
Optional.empty(),
members().keySet().stream().filter(member -> !member.equals(leavingMemberId)).findAny(),
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
Optional.of(time.milliseconds())
);

members().forEach((memberId, member) -> {
if (!memberId.equals(leavingMemberId)) {
classicGroup.add(
new ClassicGroupMember(
memberId,
Optional.ofNullable(member.instanceId()),
member.clientId(),
member.clientHost(),
member.rebalanceTimeoutMs(),
consumerGroupSessionTimeoutMs,
ConsumerProtocol.PROTOCOL_TYPE,
member.supportedJoinGroupRequestProtocols(),
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
null
)
);
}
});

classicGroup.setProtocolName(Optional.of(classicGroup.selectProtocol()));
classicGroup.setSubscribedTopics(classicGroup.computeSubscribedTopics());

Map<String, byte[]> assignments = new HashMap<>();

classicGroup.allMembers().forEach(classicGroupMember -> {
byte[] assignment = Utils.toArray(ConsumerProtocol.serializeAssignment(
new ConsumerPartitionAssignor.Assignment(ConsumerGroup.topicPartitionListFromMap(
targetAssignment().get(classicGroupMember.memberId()).partitions(),
metadataImage.topics()
)),
ConsumerProtocol.deserializeVersion(
ByteBuffer.wrap(classicGroupMember.metadata(classicGroup.protocolName().orElse("")))
)
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
));

classicGroupMember.setAssignment(assignment);
assignments.put(classicGroupMember.memberId(), assignment);
});

records.add(RecordHelpers.newGroupMetadataRecord(
classicGroup, assignments, metadataImage.features().metadataVersion()));
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved

return classicGroup;
}

/**
* @return The list of TopicPartition converted from the map of topic id and partition set.
*/
private static List<TopicPartition> topicPartitionListFromMap(
Map<Uuid, Set<Integer>> topicPartitions,
TopicsImage topicsImage
) {
List<TopicPartition> topicPartitionList = new ArrayList<>();
topicPartitions.forEach((topicId, partitionSet) -> {
TopicImage topicImage = topicsImage.getTopic(topicId);
if (topicImage != null) {
partitionSet.forEach(partition -> topicPartitionList.add(new TopicPartition(topicImage.name(), partition)));
}
});
return topicPartitionList;
}

/**
* Checks whether at least one of the given protocols can be supported. A
* protocol can be supported if it is supported by all members that use the
Expand All @@ -1212,9 +1314,12 @@ public boolean supportsClassicProtocols(String memberProtocolType, Set<String> m
}

/**
* @param memberId The member to remove.
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
*
* @return A boolean indicating whether all the members use the classic protocol.
*/
public boolean allMembersUseClassicProtocol() {
return numClassicProtocolMembers() == members().size();
public boolean allMembersUseClassicProtocolExcept(String memberId) {
return numClassicProtocolMembers() == members().size() - 1 &&
!getOrMaybeCreateMember(memberId, false).useClassicProtocol();
}
}
Expand Up @@ -479,6 +479,22 @@ public Map<Uuid, Set<Integer>> partitionsPendingRevocation() {
return partitionsPendingRevocation;
}

/**
* @return The supported classic protocols converted to JoinGroupRequestProtocolCollection.
*/
public JoinGroupRequestData.JoinGroupRequestProtocolCollection supportedJoinGroupRequestProtocols() {
JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols =
new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
supportedClassicProtocols().ifPresent(classicProtocols -> classicProtocols.forEach(protocol ->
protocols.add(
new JoinGroupRequestData.JoinGroupRequestProtocol()
.setName(protocol.name())
.setMetadata(protocol.metadata())
)
));
return protocols;
}

/**
* @return The classicMemberMetadata if the consumer uses the classic protocol.
*/
Expand Down
Expand Up @@ -16,19 +16,25 @@
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.opentest4j.AssertionFailedError;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.AssertionFailureBuilder.assertionFailure;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -198,6 +204,62 @@ private static void assertApiMessageAndVersionEquals(
}
}
}
} else if (actual.message() instanceof GroupMetadataValue) {
dongnuo123 marked this conversation as resolved.
Show resolved Hide resolved
GroupMetadataValue expectedValue = (GroupMetadataValue) expected.message();
GroupMetadataValue actualValue = (GroupMetadataValue) actual.message();

assertEquals(expectedValue.protocolType(), actualValue.protocolType());
assertEquals(expectedValue.generation(), actualValue.generation());
assertEquals(expectedValue.protocol(), actualValue.protocol());
assertEquals(expectedValue.leader(), actualValue.leader());
assertEquals(expectedValue.currentStateTimestamp(), actualValue.currentStateTimestamp());

Map<String, GroupMetadataValue.MemberMetadata> expectedMemberMap =
expectedValue.members().stream()
.collect(Collectors.toMap(GroupMetadataValue.MemberMetadata::memberId, Function.identity()));
Map<String, GroupMetadataValue.MemberMetadata> actualMemberMap =
actualValue.members().stream()
.collect(Collectors.toMap(GroupMetadataValue.MemberMetadata::memberId, Function.identity()));

if (expectedMemberMap.size() != actualMemberMap.size()) {
fail("Member metadata maps have different sizes");
} else {
expectedMemberMap.forEach((memberId, expectedMemberMetadata) -> {
GroupMetadataValue.MemberMetadata actualMemberMetadata = actualMemberMap.get(memberId);
if (actualMemberMetadata == null) {
fail("Member metadata maps have different values.");
}
assertEquals(expectedMemberMetadata.groupInstanceId(), actualMemberMetadata.groupInstanceId());
assertEquals(expectedMemberMetadata.clientId(), actualMemberMetadata.clientId());
assertEquals(expectedMemberMetadata.clientHost(), actualMemberMetadata.clientHost());
assertEquals(expectedMemberMetadata.rebalanceTimeout(), actualMemberMetadata.rebalanceTimeout());
assertEquals(expectedMemberMetadata.sessionTimeout(), actualMemberMetadata.sessionTimeout());

// If the subscriptions or the assignments don't match, deserialize them and do the comparison.
if (!expectedMemberMetadata.subscription().equals(actualMemberMetadata.subscription())) {
ConsumerPartitionAssignor.Subscription expectedSubscription =
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(expectedMemberMetadata.subscription()));
ConsumerPartitionAssignor.Subscription actualSubscription =
ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(actualMemberMetadata.subscription()));

assertUnorderedListEquals(expectedSubscription.topics(), actualSubscription.topics());
assertEquals(expectedSubscription.userData(), actualSubscription.userData());
assertUnorderedListEquals(expectedSubscription.ownedPartitions(), actualSubscription.ownedPartitions());
assertEquals(expectedSubscription.rackId(), actualSubscription.rackId());
assertEquals(expectedSubscription.groupInstanceId(), actualSubscription.groupInstanceId());
assertEquals(expectedSubscription.generationId(), actualSubscription.generationId());
}
if (!expectedMemberMetadata.assignment().equals(actualMemberMetadata.assignment())) {
ConsumerPartitionAssignor.Assignment expectedAssignment =
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(expectedMemberMetadata.assignment()));
ConsumerPartitionAssignor.Assignment actualAssignment =
ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(actualMemberMetadata.assignment()));

assertUnorderedListEquals(expectedAssignment.partitions(), actualAssignment.partitions());
assertEquals(expectedAssignment.userData(), actualAssignment.userData());
}
});
}
} else {
assertEquals(expected.message(), actual.message());
}
Expand Down