Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator #14017

Merged
merged 11 commits into from
Jul 27, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,30 @@ public CompletableFuture<SyncGroupResponseData> syncGroup(
return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}

return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
"This API is not implemented yet."
));
if (!isGroupIdNotEmpty(request.groupId())) {
return CompletableFuture.completedFuture(new SyncGroupResponseData()
.setErrorCode(Errors.INVALID_GROUP_ID.code()));
}

CompletableFuture<SyncGroupResponseData> responseFuture = new CompletableFuture<>();

runtime.scheduleWriteOperation("generic-group-sync",
topicPartitionFor(request.groupId()),
coordinator -> coordinator.genericGroupSync(context, request, responseFuture)
).exceptionally(exception -> {
if (!(exception instanceof KafkaException)) {
log.error("Request {} hit an unexpected exception: {}",
dajac marked this conversation as resolved.
Show resolved Hide resolved
request, exception.getMessage());
}

if (!responseFuture.isDone()) {
responseFuture.complete(new SyncGroupResponseData()
.setErrorCode(Errors.forException(exception).code()));
}
return null;
});

return responseFuture;
}

/**
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -366,11 +366,13 @@ public static Record newCurrentAssignmentTombstoneRecord(
* Creates a GroupMetadata record.
*
* @param group The generic group.
* @param assignment The generic group assignment.
* @param metadataVersion The metadata version.
* @return The record.
*/
public static Record newGroupMetadataRecord(
GenericGroup group,
Map<String, byte[]> assignment,
MetadataVersion metadataVersion
) {
List<GroupMetadataValue.MemberMetadata> members = new ArrayList<>(group.allMembers().size());
Expand All @@ -380,10 +382,10 @@ public static Record newGroupMetadataRecord(
throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol.");
}

byte[] assignment = member.assignment();
if (assignment == null) {
byte[] memberAssignment = assignment.get(member.memberId());
if (memberAssignment == null) {
throw new IllegalStateException("Attempted to write member " + member.memberId() +
" of group + " + group.groupId() + " with no assignment.");
" of group " + group.groupId() + " with no assignment.");
}

members.add(
Expand All @@ -395,7 +397,7 @@ public static Record newGroupMetadataRecord(
.setSessionTimeout(member.sessionTimeoutMs())
.setGroupInstanceId(member.groupInstanceId().orElse(null))
.setSubscription(subscription)
.setAssignment(assignment)
.setAssignment(memberAssignment)
);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -181,6 +183,15 @@ public CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGro
return groupMetadataManager.consumerGroupHeartbeat(context, request);
}

/**
* Handles a JoinGroup request.
*
* @param context The request context.
* @param request The actual JoinGroup request.
*
* @return A Result containing the JoinGroup response and
* a list of records to update the state machine.
*/
public CoordinatorResult<Void, Record> genericGroupJoin(
RequestContext context,
JoinGroupRequestData request,
Expand All @@ -193,6 +204,27 @@ public CoordinatorResult<Void, Record> genericGroupJoin(
);
}

/**
* Handles a SyncGroup request.
*
* @param context The request context.
* @param request The actual SyncGroup request.
*
* @return A Result containing the SyncGroup response and
* a list of records to update the state machine.
*/
public CoordinatorResult<Void, Record> genericGroupSync(
RequestContext context,
SyncGroupRequestData request,
CompletableFuture<SyncGroupResponseData> responseFuture
) {
return groupMetadataManager.genericGroupSync(
context,
request,
responseFuture
);
}

/**
* The coordinator has been loaded. This is used to apply any
* post loading operations (e.g. registering timers).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,7 @@ public Set<String> allStaticMemberIds() {
}

// For testing only.
Set<String> allDynamicMemberIds() {
public Set<String> allDynamicMemberIds() {
Set<String> dynamicMemberSet = new HashSet<>(allMemberIds());
staticMembers.values().forEach(dynamicMemberSet::remove);
return dynamicMemberSet;
Expand Down Expand Up @@ -1075,6 +1075,15 @@ public ListGroupsResponseData.ListedGroup asListedGroup() {
.setGroupState(state.toString());
}

/**
* @return All member assignments keyed by member id.
*/
public Map<String, byte[]> groupAssignment() {
return allMembers().stream().collect(Collectors.toMap(
GenericGroupMember::memberId, GenericGroupMember::assignment
));
}

/**
* Checks whether the transition to the target state is valid.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.utils.Bytes;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

/**
* This class encapsulates a generic group member's metadata.
Expand Down Expand Up @@ -54,7 +56,7 @@ public class GenericGroupMember {
/**
* An empty assignment.
*/
public static final byte[] EMPTY_ASSIGNMENT = new byte[0];
public static final byte[] EMPTY_ASSIGNMENT = Bytes.EMPTY;

/**
* The member id.
Expand Down Expand Up @@ -400,13 +402,15 @@ public void setIsNew(boolean value) {
public String toString() {
return "GenericGroupMember(" +
"memberId='" + memberId + '\'' +
", groupInstanceId='" + groupInstanceId + '\'' +
", groupInstanceId='" + groupInstanceId.orElse("") + '\'' +
", clientId='" + clientId + '\'' +
", clientHost='" + clientHost + '\'' +
", rebalanceTimeoutMs=" + rebalanceTimeoutMs +
", sessionTimeoutMs=" + sessionTimeoutMs +
", protocolType='" + protocolType + '\'' +
", supportedProtocols=" + supportedProtocols +
", supportedProtocols=" + supportedProtocols.stream()
.map(JoinGroupRequestProtocol::name)
.collect(Collectors.toList()) +
')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
Expand Down Expand Up @@ -398,4 +400,98 @@ public void testJoinGroupInvalidGroupId() throws Exception {

assertEquals(expectedResponse, response.get());
}

@Test
public void testSyncGroup() {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);

SyncGroupRequestData request = new SyncGroupRequestData()
.setGroupId("foo");

service.startup(() -> 1);

when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("generic-group-sync"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(CompletableFuture.completedFuture(
new SyncGroupResponseData()
));

CompletableFuture<SyncGroupResponseData> responseFuture = service.syncGroup(
requestContext(ApiKeys.SYNC_GROUP),
request,
BufferSupplier.NO_CACHING
);

assertFalse(responseFuture.isDone());
}

@Test
public void testSyncGroupWithException() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);

SyncGroupRequestData request = new SyncGroupRequestData()
.setGroupId("foo");

service.startup(() -> 1);

when(runtime.scheduleWriteOperation(
ArgumentMatchers.eq("generic-group-sync"),
ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
ArgumentMatchers.any()
)).thenReturn(FutureUtils.failedFuture(new IllegalStateException()));

CompletableFuture<SyncGroupResponseData> future = service.syncGroup(
requestContext(ApiKeys.SYNC_GROUP),
request,
BufferSupplier.NO_CACHING
);

assertTrue(future.isDone());
assertEquals(
new SyncGroupResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()),
future.get()
);
}

@Test
public void testSyncGroupInvalidGroupId() throws Exception {
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime = mockRuntime();
GroupCoordinatorService service = new GroupCoordinatorService(
new LogContext(),
createConfig(),
runtime
);

service.startup(() -> 1);

SyncGroupRequestData request = new SyncGroupRequestData()
.setGroupId(null)
.setMemberId(UNKNOWN_MEMBER_ID);


CompletableFuture<SyncGroupResponseData> response = service.syncGroup(
requestContext(ApiKeys.SYNC_GROUP),
request,
BufferSupplier.NO_CACHING
);

assertTrue(response.isDone());
SyncGroupResponseData expectedResponse = new SyncGroupResponseData()
.setErrorCode(Errors.INVALID_GROUP_ID.code());

assertEquals(expectedResponse, response.get());
}
}