Skip to content

Commit

Permalink
KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinat…
Browse files Browse the repository at this point in the history
…or (#14017)

This patch implements the SyncGroup API in the new group coordinator. All the new unit tests are based on the existing scala tests.

Reviewers: David Jacot <djacot@confluent.io>
  • Loading branch information
jeffkbkim committed Jul 27, 2023
1 parent ed44bcd commit d2fc907
Show file tree
Hide file tree
Showing 9 changed files with 3,339 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,30 @@ public CompletableFuture<SyncGroupResponseData> syncGroup(
return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}

return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
"This API is not implemented yet."
));
if (!isGroupIdNotEmpty(request.groupId())) {
return CompletableFuture.completedFuture(new SyncGroupResponseData()
.setErrorCode(Errors.INVALID_GROUP_ID.code()));
}

CompletableFuture<SyncGroupResponseData> responseFuture = new CompletableFuture<>();

runtime.scheduleWriteOperation("generic-group-sync",
topicPartitionFor(request.groupId()),
coordinator -> coordinator.genericGroupSync(context, request, responseFuture)
).exceptionally(exception -> {
if (!(exception instanceof KafkaException)) {
log.error("SyncGroup request {} hit an unexpected exception: {}",
request, exception.getMessage());
}

if (!responseFuture.isDone()) {
responseFuture.complete(new SyncGroupResponseData()
.setErrorCode(Errors.forException(exception).code()));
}
return null;
});

return responseFuture;
}

/**
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,13 @@ public static Record newCurrentAssignmentTombstoneRecord(
* Creates a GroupMetadata record.
*
* @param group The generic group.
* @param assignment The generic group assignment.
* @param metadataVersion The metadata version.
* @return The record.
*/
public static Record newGroupMetadataRecord(
GenericGroup group,
Map<String, byte[]> assignment,
MetadataVersion metadataVersion
) {
List<GroupMetadataValue.MemberMetadata> members = new ArrayList<>(group.allMembers().size());
Expand All @@ -384,10 +386,10 @@ public static Record newGroupMetadataRecord(
throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol.");
}

byte[] assignment = member.assignment();
if (assignment == null) {
byte[] memberAssignment = assignment.get(member.memberId());
if (memberAssignment == null) {
throw new IllegalStateException("Attempted to write member " + member.memberId() +
" of group + " + group.groupId() + " with no assignment.");
" of group " + group.groupId() + " with no assignment.");
}

members.add(
Expand All @@ -399,7 +401,7 @@ public static Record newGroupMetadataRecord(
.setSessionTimeout(member.sessionTimeoutMs())
.setGroupInstanceId(member.groupInstanceId().orElse(null))
.setSubscription(subscription)
.setAssignment(assignment)
.setAssignment(memberAssignment)
);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -181,6 +183,15 @@ public CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGro
return groupMetadataManager.consumerGroupHeartbeat(context, request);
}

/**
* Handles a JoinGroup request.
*
* @param context The request context.
* @param request The actual JoinGroup request.
*
* @return A Result containing the JoinGroup response and
* a list of records to update the state machine.
*/
public CoordinatorResult<Void, Record> genericGroupJoin(
RequestContext context,
JoinGroupRequestData request,
Expand All @@ -193,6 +204,27 @@ public CoordinatorResult<Void, Record> genericGroupJoin(
);
}

/**
* Handles a SyncGroup request.
*
* @param context The request context.
* @param request The actual SyncGroup request.
*
* @return A Result containing the SyncGroup response and
* a list of records to update the state machine.
*/
public CoordinatorResult<Void, Record> genericGroupSync(
RequestContext context,
SyncGroupRequestData request,
CompletableFuture<SyncGroupResponseData> responseFuture
) {
return groupMetadataManager.genericGroupSync(
context,
request,
responseFuture
);
}

/**
* The coordinator has been loaded. This is used to apply any
* post loading operations (e.g. registering timers).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ public Set<String> allStaticMemberIds() {
}

// For testing only.
Set<String> allDynamicMemberIds() {
public Set<String> allDynamicMemberIds() {
Set<String> dynamicMemberSet = new HashSet<>(allMemberIds());
staticMembers.values().forEach(dynamicMemberSet::remove);
return dynamicMemberSet;
Expand Down Expand Up @@ -1075,6 +1075,15 @@ public ListGroupsResponseData.ListedGroup asListedGroup() {
.setGroupState(state.toString());
}

/**
* @return All member assignments keyed by member id.
*/
public Map<String, byte[]> groupAssignment() {
return allMembers().stream().collect(Collectors.toMap(
GenericGroupMember::memberId, GenericGroupMember::assignment
));
}

/**
* Checks whether the transition to the target state is valid.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.utils.Bytes;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

/**
* This class encapsulates a generic group member's metadata.
Expand Down Expand Up @@ -54,7 +56,7 @@ public class GenericGroupMember {
/**
* An empty assignment.
*/
public static final byte[] EMPTY_ASSIGNMENT = new byte[0];
public static final byte[] EMPTY_ASSIGNMENT = Bytes.EMPTY;

/**
* The member id.
Expand Down Expand Up @@ -400,13 +402,15 @@ public void setIsNew(boolean value) {
public String toString() {
return "GenericGroupMember(" +
"memberId='" + memberId + '\'' +
", groupInstanceId='" + groupInstanceId + '\'' +
", groupInstanceId='" + groupInstanceId.orElse("") + '\'' +
", clientId='" + clientId + '\'' +
", clientHost='" + clientHost + '\'' +
", rebalanceTimeoutMs=" + rebalanceTimeoutMs +
", sessionTimeoutMs=" + sessionTimeoutMs +
", protocolType='" + protocolType + '\'' +
", supportedProtocols=" + supportedProtocols +
", supportedProtocols=" + supportedProtocols.stream()
.map(JoinGroupRequestProtocol::name)
.collect(Collectors.toList()) +
')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
Expand Down Expand Up @@ -398,4 +400,98 @@ public void testJoinGroupInvalidGroupId() throws Exception {

assertEquals(expectedResponse, response.get());
}

@Test
public void testSyncGroup() {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);

SyncGroupRequestData request = new SyncGroupRequestData()
.setGroupId("foo");

service.startup(() -> 1);

when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("generic-group-sync"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(
new SyncGroupResponseData()
));

CompletableFuture<SyncGroupResponseData> responseFuture = service.syncGroup(
requestContext(ApiKeys.SYNC_GROUP),
request,
BufferSupplier.NO_CACHING
);

assertFalse(responseFuture.isDone());
}

@Test
public void testSyncGroupWithException() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);

SyncGroupRequestData request = new SyncGroupRequestData()
.setGroupId("foo");

service.startup(() -> 1);

when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("generic-group-sync"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(new IllegalStateException()));

CompletableFuture<SyncGroupResponseData> future = service.syncGroup(
requestContext(ApiKeys.SYNC_GROUP),
request,
BufferSupplier.NO_CACHING
);

assertTrue(future.isDone());
assertEquals(
new SyncGroupResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()),
future.get()
);
}

@Test
public void testSyncGroupInvalidGroupId() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);

service.startup(() -> 1);

SyncGroupRequestData request = new SyncGroupRequestData()
.setGroupId(null)
.setMemberId(UNKNOWN_MEMBER_ID);


CompletableFuture<SyncGroupResponseData> response = service.syncGroup(
requestContext(ApiKeys.SYNC_GROUP),
request,
BufferSupplier.NO_CACHING
);

assertTrue(response.isDone());
SyncGroupResponseData expectedResponse = new SyncGroupResponseData()
.setErrorCode(Errors.INVALID_GROUP_ID.code());

assertEquals(expectedResponse, response.get());
}
}

0 comments on commit d2fc907

Please sign in to comment.