Skip to content

Commit

Permalink
KAFKA-14500; [5/N] Implement JoinGroup protocol in new GroupCoordinat…
Browse files Browse the repository at this point in the history
…or (apache#13870)

This patch implements the existing JoinGroup protocol within the new group coordinator. 

Some notable differences:
* Methods return a CoordinatorResult to the runtime framework, which includes records to append to the log as well as a future to complete after the append succeeds/fails.
* The coordinator runtime ensures that only a single thread will be processing a group at any given time, therefore there is no more locking on groups.
* Instead of using on purgatories, we rely on the Timer interface to schedule/cancel delayed operations.

Reviewers: David Jacot <djacot@confluent.io>
  • Loading branch information
jeffkbkim authored and jeqo committed Aug 15, 2023
1 parent cf4d290 commit eee2c77
Show file tree
Hide file tree
Showing 25 changed files with 4,981 additions and 616 deletions.
12 changes: 7 additions & 5 deletions checkstyle/suppressions.xml
Expand Up @@ -322,14 +322,16 @@
<!-- group coordinator -->
<suppress checks="CyclomaticComplexity"
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="MethodLength"
<suppress checks="(NPathComplexity|MethodLength)"
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
<suppress checks="NPathComplexity"
files="(GroupMetadataManager).java"/>
<suppress checks="ClassFanOutComplexity"
files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember).java"/>
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest).java"/>
files="(RecordHelpersTest|GroupMetadataManagerTest).java"/>
<suppress checks="JavaNCSS"
files="GroupMetadataManagerTest.java"/>

<!-- storage -->
<suppress checks="CyclomaticComplexity"
Expand Down
Expand Up @@ -84,6 +84,68 @@ public static String maybeTruncateReason(final String reason) {
}
}

/**
* Since JoinGroupRequest version 4, a client that sends a join group request with
* {@link UNKNOWN_MEMBER_ID} needs to rejoin with a new member id generated
* by the server. Once the second join group request is complete, the client is
* added as a new member of the group.
*
* Prior to version 4, a client is immediately added as a new member if it sends a
* join group request with UNKNOWN_MEMBER_ID.
*
* @param apiVersion The JoinGroupRequest api version.
*
* @return whether a known member id is required or not.
*/
public static boolean requiresKnownMemberId(short apiVersion) {
return apiVersion >= 4;
}

/**
* Starting from version 9 of the JoinGroup API, static members are able to
* skip running the assignor based on the `SkipAssignment` field. We leverage
* this to tell the leader that it is the leader of the group but by skipping
* running the assignor while the group is in stable state.
* Notes:
* 1) This allows the leader to continue monitoring metadata changes for the
* group. Note that any metadata changes happening while the static leader is
* down won't be noticed.
* 2) The assignors are not idempotent nor free from side effects. This is why
* we skip entirely the assignment step as it could generate a different group
* assignment which would be ignored by the group coordinator because the group
* is the stable state.
*
* Prior to version 9 of the JoinGroup API, we wanted to avoid current leader
* performing trivial assignment while the group is in stable stage, because
* the new assignment in leader's next sync call won't be broadcast by a stable group.
* This could be guaranteed by always returning the old leader id so that the current
* leader won't assume itself as a leader based on the returned message, since the new
* member.id won't match returned leader id, therefore no assignment will be performed.
*
* @param apiVersion The JoinGroupRequest api version.
*
* @return whether the version supports skipping assignment.
*/

public static boolean supportsSkippingAssignment(short apiVersion) {
return apiVersion >= 9;
}

/**
* Get the client's join reason.
*
* @param request The JoinGroupRequest.
*
* @return The join reason.
*/
public static String joinReason(JoinGroupRequestData request) {
String joinReason = request.reason();
if (joinReason == null || joinReason.isEmpty()) {
joinReason = "not provided";
}
return joinReason;
}

public JoinGroupRequest(JoinGroupRequestData data, short version) {
super(ApiKeys.JOIN_GROUP, version);
this.data = data;
Expand Down
7 changes: 6 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Expand Up @@ -531,7 +531,12 @@ class BrokerServer(
config.consumerGroupHeartbeatIntervalMs,
config.consumerGroupMaxSize,
config.consumerGroupAssignors,
config.offsetsTopicSegmentBytes
config.offsetsTopicSegmentBytes,
config.groupMaxSize,
config.groupInitialRebalanceDelay,
GroupCoordinatorConfig.GENERIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS,
config.groupMinSessionTimeoutMs,
config.groupMaxSessionTimeoutMs
)
val timer = new SystemTimerReaper(
"group-coordinator-reaper",
Expand Down
Expand Up @@ -25,6 +25,11 @@
*/
public class GroupCoordinatorConfig {

/**
* The timeout used to wait for a new member in milliseconds.
*/
public static final int GENERIC_GROUP_NEW_MEMBER_JOIN_TIMEOUT_MS = 5 * 60 * 1000;

/**
* The number of threads or event loops running.
*/
Expand Down Expand Up @@ -56,19 +61,54 @@ public class GroupCoordinatorConfig {
*/
public final int offsetsTopicSegmentBytes;

/**
* The generic group maximum size.
*/
public final int genericGroupMaxSize;

/**
* The delay in milliseconds introduced for the first rebalance of a generic group.
*/
public final int genericGroupInitialRebalanceDelayMs;

/**
* The timeout used to wait for a new member in milliseconds.
*/
public final int genericGroupNewMemberJoinTimeoutMs;

/**
* The generic group minimum session timeout.
*/
public final int genericGroupMinSessionTimeoutMs;

/**
* The generic group maximum session timeout.
*/
public final int genericGroupMaxSessionTimeoutMs;

public GroupCoordinatorConfig(
int numThreads,
int consumerGroupSessionTimeoutMs,
int consumerGroupHeartbeatIntervalMs,
int consumerGroupMaxSize,
List<PartitionAssignor> consumerGroupAssignors,
int offsetsTopicSegmentBytes
int offsetsTopicSegmentBytes,
int genericGroupMaxSize,
int genericGroupInitialRebalanceDelayMs,
int genericGroupNewMemberJoinTimeoutMs,
int genericGroupMinSessionTimeoutMs,
int genericGroupMaxSessionTimeoutMs
) {
this.numThreads = numThreads;
this.consumerGroupSessionTimeoutMs = consumerGroupSessionTimeoutMs;
this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
this.consumerGroupMaxSize = consumerGroupMaxSize;
this.consumerGroupAssignors = consumerGroupAssignors;
this.offsetsTopicSegmentBytes = offsetsTopicSegmentBytes;
this.genericGroupMaxSize = genericGroupMaxSize;
this.genericGroupInitialRebalanceDelayMs = genericGroupInitialRebalanceDelayMs;
this.genericGroupNewMemberJoinTimeoutMs = genericGroupNewMemberJoinTimeoutMs;
this.genericGroupMinSessionTimeoutMs = genericGroupMinSessionTimeoutMs;
this.genericGroupMaxSessionTimeoutMs = genericGroupMaxSessionTimeoutMs;
}
}
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
Expand Down Expand Up @@ -123,10 +124,10 @@ public GroupCoordinatorService build() {
throw new IllegalArgumentException("Writer must be set.");
if (loader == null)
throw new IllegalArgumentException("Loader must be set.");
if (timer == null)
throw new IllegalArgumentException("Timer must be set.");
if (time == null)
throw new IllegalArgumentException("Time must be set.");
if (timer == null)
throw new IllegalArgumentException("Timer must be set.");

String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));
Expand All @@ -150,6 +151,7 @@ public GroupCoordinatorService build() {
.withPartitionWriter(writer)
.withLoader(loader)
.withCoordinatorBuilderSupplier(supplier)
.withTime(time)
.build();

return new GroupCoordinatorService(
Expand Down Expand Up @@ -286,9 +288,33 @@ public CompletableFuture<JoinGroupResponseData> joinGroup(
return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}

return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
"This API is not implemented yet."
));
CompletableFuture<JoinGroupResponseData> responseFuture = new CompletableFuture<>();

if (!isGroupIdNotEmpty(request.groupId())) {
responseFuture.complete(new JoinGroupResponseData()
.setMemberId(request.memberId())
.setErrorCode(Errors.INVALID_GROUP_ID.code()));

return responseFuture;
}

runtime.scheduleWriteOperation("generic-group-join",
topicPartitionFor(request.groupId()),
coordinator -> coordinator.genericGroupJoin(context, request, responseFuture)
).exceptionally(exception -> {
if (!(exception instanceof KafkaException)) {
log.error("JoinGroup request {} hit an unexpected exception: {}",
request, exception.getMessage());
}

if (!responseFuture.isDone()) {
responseFuture.complete(new JoinGroupResponseData()
.setErrorCode(Errors.forException(exception).code()));
}
return null;
});

return responseFuture;
}

/**
Expand Down Expand Up @@ -599,4 +625,8 @@ public void shutdown() {
Utils.closeQuietly(runtime, "coordinator runtime");
log.info("Shutdown complete.");
}

private static boolean isGroupIdNotEmpty(String groupId) {
return groupId != null && !groupId.isEmpty();
}
}

0 comments on commit eee2c77

Please sign in to comment.