Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14500; [3/N] add GroupMetadataKey/Value record helpers #13704

Merged
merged 9 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@
files="(ConsumerGroupMember).java"/>
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest).java"/>

<!-- storage -->
<suppress checks="CyclomaticComplexity"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -339,7 +343,7 @@ public static Record newCurrentAssignmentRecord(
* Creates a ConsumerGroupCurrentMemberAssignment tombstone.
*
* @param groupId The consumer group id.
* @param memberId The consumer group member id.
* @param memberId The consumer group member id.
* @return The record.
*/
public static Record newCurrentAssignmentTombstoneRecord(
Expand All @@ -357,6 +361,81 @@ public static Record newCurrentAssignmentTombstoneRecord(
);
}

/**
* Creates a GroupMetadata record.
*
* @param group The generic group.
* @param metadataVersion The metadata version.
* @return The record.
*/
public static Record newGroupMetadataRecord(
GenericGroup group,
MetadataVersion metadataVersion
) {
List<GroupMetadataValue.MemberMetadata> members = new ArrayList<>();
dajac marked this conversation as resolved.
Show resolved Hide resolved
group.allMembers().forEach(member -> {
byte[] subscription = group.protocolName().map(member::metadata).orElse(null);
if (subscription == null) {
throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol.");
}

byte[] assignment = member.assignment();
if (assignment == null) {
throw new IllegalStateException("Attempted to write member " + member.memberId() +
" of group + " + group.groupId() + " with no assignment.");
}

members.add(
new GroupMetadataValue.MemberMetadata()
.setMemberId(member.memberId())
.setClientId(member.clientId())
.setClientHost(member.clientHost())
.setRebalanceTimeout(member.rebalanceTimeoutMs())
.setSessionTimeout(member.sessionTimeoutMs())
.setGroupInstanceId(member.groupInstanceId().orElse(null))
.setSubscription(subscription)
.setAssignment(assignment)
);
});

return new Record(
new ApiMessageAndVersion(
new GroupMetadataKey()
.setGroup(group.groupId()),
(short) 2
),
new ApiMessageAndVersion(
new GroupMetadataValue()
.setProtocol(group.protocolName().orElse(null))
.setProtocolType(group.protocolType().orElse(""))
.setGeneration(group.generationId())
.setLeader(group.leaderOrNull())
.setCurrentStateTimestamp(group.currentStateTimestampOrDefault())
.setMembers(members),
metadataVersion.groupMetadataValueVersion()
)
);
}

/**
* Creates a GroupMetadata tombstone.
*
* @param groupId The group id.
* @return The record.
*/
public static Record newGroupMetadataTombstoneRecord(
String groupId
) {
return new Record(
new ApiMessageAndVersion(
new GroupMetadataKey()
.setGroup(groupId),
(short) 2
),
null // Tombstone
);
}

private static List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> toTopicPartitions(
Map<Uuid, Set<Integer>> topicPartitions
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.consumer.ClientAssignor;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
Expand All @@ -33,16 +36,30 @@
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
import org.apache.kafka.coordinator.group.generic.GenericGroupState;
import org.apache.kafka.coordinator.group.generic.Protocol;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedAssignment;
import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedTopicAssignment;
Expand All @@ -60,6 +77,7 @@
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.RecordHelpers.newTargetAssignmentTombstoneRecord;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class RecordHelpersTest {

Expand Down Expand Up @@ -383,4 +401,203 @@ public void testNewCurrentAssignmentTombstoneRecord() {
"member-id"
));
}

private static Stream<Arguments> metadataToExpectedGroupMetadataValue() {
return Stream.of(
Arguments.arguments(MetadataVersion.IBP_0_10_0_IV0, (short) 0),
Arguments.arguments(MetadataVersion.IBP_1_1_IV0, (short) 1),
Arguments.arguments(MetadataVersion.IBP_2_2_IV0, (short) 2),
Arguments.arguments(MetadataVersion.IBP_3_5_IV0, (short) 3)
);
}

@ParameterizedTest
@MethodSource("metadataToExpectedGroupMetadataValue")
dajac marked this conversation as resolved.
Show resolved Hide resolved
public void testNewGroupMetadataRecord(
MetadataVersion metadataVersion,
short expectedGroupMetadataValueVersion
) {
Time time = new MockTime();

List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
expectedMembers.add(
new GroupMetadataValue.MemberMetadata()
.setMemberId("member-1")
.setClientId("client-1")
.setClientHost("host-1")
.setRebalanceTimeout(1000)
.setSessionTimeout(1500)
.setGroupInstanceId("group-instance-1")
.setSubscription(new byte[]{0, 1})
.setAssignment(new byte[]{1, 2})
);

expectedMembers.add(
new GroupMetadataValue.MemberMetadata()
.setMemberId("member-2")
.setClientId("client-2")
.setClientHost("host-2")
.setRebalanceTimeout(1000)
.setSessionTimeout(1500)
.setGroupInstanceId("group-instance-2")
.setSubscription(new byte[]{1, 2})
.setAssignment(new byte[]{2, 3})
);

Record expectedRecord = new Record(
new ApiMessageAndVersion(
new GroupMetadataKey()
.setGroup("group-id"),
(short) 2),
new ApiMessageAndVersion(
new GroupMetadataValue()
.setProtocol("range")
.setProtocolType("consumer")
.setLeader("member-1")
.setGeneration(1)
.setCurrentStateTimestamp(time.milliseconds())
.setMembers(expectedMembers),
expectedGroupMetadataValueVersion));
dajac marked this conversation as resolved.
Show resolved Hide resolved

GenericGroup group = new GenericGroup(
new LogContext(),
"group-id",
GenericGroupState.PREPARING_REBALANCE,
time
);

expectedMembers.forEach(member -> {
group.add(new GenericGroupMember(
member.memberId(),
Optional.of(member.groupInstanceId()),
member.clientId(),
member.clientHost(),
member.rebalanceTimeout(),
member.sessionTimeout(),
"consumer",
Collections.singletonList(new Protocol(
"range",
member.subscription()
)),
member.assignment()
));
});

group.initNextGeneration();
Record groupMetadataRecord = RecordHelpers.newGroupMetadataRecord(
group,
metadataVersion
);

assertEquals(expectedRecord, groupMetadataRecord);
}

@Test
public void testNewGroupMetadataTombstoneRecord() {
Record expectedRecord = new Record(
new ApiMessageAndVersion(
new GroupMetadataKey()
.setGroup("group-id"),
(short) 2),
null);

Record groupMetadataRecord = RecordHelpers.newGroupMetadataTombstoneRecord("group-id");
assertEquals(expectedRecord, groupMetadataRecord);
}

@Test
public void testNewGroupMetadataRecordThrowsWhenNullSubscription() {
Time time = new MockTime();

List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
expectedMembers.add(
new GroupMetadataValue.MemberMetadata()
.setMemberId("member-1")
.setClientId("client-1")
.setClientHost("host-1")
.setRebalanceTimeout(1000)
.setSessionTimeout(1500)
.setGroupInstanceId("group-instance-1")
.setSubscription(new byte[]{0, 1})
.setAssignment(new byte[]{1, 2})
);

GenericGroup group = new GenericGroup(
new LogContext(),
"group-id",
GenericGroupState.PREPARING_REBALANCE,
time
);

expectedMembers.forEach(member -> {
group.add(new GenericGroupMember(
member.memberId(),
Optional.of(member.groupInstanceId()),
member.clientId(),
member.clientHost(),
member.rebalanceTimeout(),
member.sessionTimeout(),
"consumer",
Collections.singletonList(new Protocol(
"range",
null
)),
member.assignment()
));
});

assertThrows(IllegalStateException.class, () ->
RecordHelpers.newGroupMetadataRecord(
group,
MetadataVersion.IBP_3_5_IV2
));
}

@Test
public void testNewGroupMetadataRecordThrowsWhenEmptyAssignment() {
Time time = new MockTime();

List<GroupMetadataValue.MemberMetadata> expectedMembers = new ArrayList<>();
expectedMembers.add(
new GroupMetadataValue.MemberMetadata()
.setMemberId("member-1")
.setClientId("client-1")
.setClientHost("host-1")
.setRebalanceTimeout(1000)
.setSessionTimeout(1500)
.setGroupInstanceId("group-instance-1")
.setSubscription(new byte[]{0, 1})
.setAssignment(null)
);

GenericGroup group = new GenericGroup(
new LogContext(),
"group-id",
GenericGroupState.PREPARING_REBALANCE,
time
);

expectedMembers.forEach(member ->
group.add(new GenericGroupMember(
member.memberId(),
Optional.of(member.groupInstanceId()),
member.clientId(),
member.clientHost(),
member.rebalanceTimeout(),
member.sessionTimeout(),
"consumer",
Collections.singletonList(new Protocol(
"range",
member.subscription()
)),
member.assignment()
))
);

assertThrows(IllegalStateException.class, () ->
RecordHelpers.newGroupMetadataRecord(
group,
MetadataVersion.IBP_3_5_IV2
));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the following test:

  • A unit test for the tombstone method;
  • A unit test which triggers the IllegalStateException exceptions;
  • Unit tests which verifies the various record versions? It could be a parameterised one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i updated testNewGroupMetadataRecord for 3). can you take a look?

Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,20 @@ public short listOffsetRequestVersion() {
}
}

public short groupMetadataValueVersion() {
dajac marked this conversation as resolved.
Show resolved Hide resolved
if (this.isLessThan(IBP_0_10_1_IV0)) {
return 0;
} else if (this.isLessThan(IBP_2_1_IV0)) {
return 1;
} else if (this.isLessThan(IBP_2_3_IV0)) {
return 2;
} else {
// Serialize with the highest supported non-flexible version
// until a tagged field is introduced or the version is bumped.
return 3;
}
}

private static final Map<String, MetadataVersion> IBP_VERSIONS;
static {
{
Expand Down