Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Code cleanups in group-coordinator module #14117

Merged
merged 6 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilderSupplier;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
Expand Down Expand Up @@ -134,25 +134,25 @@ public GroupCoordinatorService build() {
String logPrefix = String.format("GroupCoordinator id=%d", nodeId);
LogContext logContext = new LogContext(String.format("[%s] ", logPrefix));

CoordinatorBuilderSupplier<ReplicatedGroupCoordinator, Record> supplier = () ->
new ReplicatedGroupCoordinator.Builder(config);
CoordinatorShardBuilderSupplier<GroupCoordinatorShard, Record> supplier = () ->
new GroupCoordinatorShard.Builder(config);

CoordinatorEventProcessor processor = new MultiThreadedEventProcessor(
logContext,
"group-coordinator-event-processor-",
config.numThreads
);

CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime =
new CoordinatorRuntime.Builder<ReplicatedGroupCoordinator, Record>()
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
new CoordinatorRuntime.Builder<GroupCoordinatorShard, Record>()
.withTime(time)
.withTimer(timer)
.withLogPrefix(logPrefix)
.withLogContext(logContext)
.withEventProcessor(processor)
.withPartitionWriter(writer)
.withLoader(loader)
.withCoordinatorBuilderSupplier(supplier)
.withCoordinatorShardBuilderSupplier(supplier)
.withTime(time)
.build();

Expand All @@ -177,7 +177,7 @@ public GroupCoordinatorService build() {
/**
* The coordinator runtime.
*/
private final CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime;
private final CoordinatorRuntime<GroupCoordinatorShard, Record> runtime;

/**
* Boolean indicating whether the coordinator is active or not.
Expand All @@ -199,7 +199,7 @@ public GroupCoordinatorService build() {
GroupCoordinatorService(
LogContext logContext,
GroupCoordinatorConfig config,
CoordinatorRuntime<ReplicatedGroupCoordinator, Record> runtime
CoordinatorRuntime<GroupCoordinatorShard, Record> runtime
) {
this.log = logContext.logger(CoordinatorLoader.class);
this.config = config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
Expand Down Expand Up @@ -48,8 +47,8 @@
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.runtime.Coordinator;
import org.apache.kafka.coordinator.group.runtime.CoordinatorBuilder;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilder;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
import org.apache.kafka.image.MetadataDelta;
Expand All @@ -60,22 +59,21 @@
import java.util.concurrent.CompletableFuture;

/**
* The group coordinator replicated state machine that manages the metadata of all generic and
* consumer groups. It holds the hard and the soft state of the groups. This class has two kinds
* of methods:
* The group coordinator shard is a replicated state machine that manages the metadata of all
* generic and consumer groups. It holds the hard and the soft state of the groups. This class
* has two kinds of methods:
* 1) The request handlers which handle the requests and generate a response and records to
* mutate the hard state. Those records will be written by the runtime and applied to the
* hard state via the replay methods.
* 2) The replay methods which apply records to the hard state. Those are used in the request
* handling as well as during the initial loading of the records from the partitions.
*/
public class ReplicatedGroupCoordinator implements Coordinator<Record> {
public class GroupCoordinatorShard implements CoordinatorShard<Record> {

public static class Builder implements CoordinatorBuilder<ReplicatedGroupCoordinator, Record> {
public static class Builder implements CoordinatorShardBuilder<GroupCoordinatorShard, Record> {
private final GroupCoordinatorConfig config;
private LogContext logContext;
private SnapshotRegistry snapshotRegistry;
private TopicPartition topicPartition;
private Time time;
private CoordinatorTimer<Void, Record> timer;

Expand All @@ -86,47 +84,39 @@ public Builder(
}

@Override
public CoordinatorBuilder<ReplicatedGroupCoordinator, Record> withLogContext(
public CoordinatorShardBuilder<GroupCoordinatorShard, Record> withLogContext(
LogContext logContext
) {
this.logContext = logContext;
return this;
}

@Override
public CoordinatorBuilder<ReplicatedGroupCoordinator, Record> withTime(
public CoordinatorShardBuilder<GroupCoordinatorShard, Record> withTime(
Time time
) {
this.time = time;
return this;
}

@Override
public CoordinatorBuilder<ReplicatedGroupCoordinator, Record> withTimer(
public CoordinatorShardBuilder<GroupCoordinatorShard, Record> withTimer(
CoordinatorTimer<Void, Record> timer
) {
this.timer = timer;
return this;
}

@Override
public CoordinatorBuilder<ReplicatedGroupCoordinator, Record> withSnapshotRegistry(
public CoordinatorShardBuilder<GroupCoordinatorShard, Record> withSnapshotRegistry(
SnapshotRegistry snapshotRegistry
) {
this.snapshotRegistry = snapshotRegistry;
return this;
}

@Override
public CoordinatorBuilder<ReplicatedGroupCoordinator, Record> withTopicPartition(
TopicPartition topicPartition
) {
this.topicPartition = topicPartition;
return this;
}

@Override
public ReplicatedGroupCoordinator build() {
public GroupCoordinatorShard build() {
if (logContext == null) logContext = new LogContext();
if (config == null)
throw new IllegalArgumentException("Config must be set.");
Expand All @@ -136,16 +126,13 @@ public ReplicatedGroupCoordinator build() {
throw new IllegalArgumentException("Time must be set.");
if (timer == null)
throw new IllegalArgumentException("Timer must be set.");
if (topicPartition == null)
throw new IllegalArgumentException("TopicPartition must be set.");

GroupMetadataManager groupMetadataManager = new GroupMetadataManager.Builder()
.withLogContext(logContext)
.withSnapshotRegistry(snapshotRegistry)
.withTime(time)
.withTimer(timer)
.withTopicPartition(topicPartition)
.withAssignors(config.consumerGroupAssignors)
.withConsumerGroupAssignors(config.consumerGroupAssignors)
.withConsumerGroupMaxSize(config.consumerGroupMaxSize)
.withConsumerGroupHeartbeatInterval(config.consumerGroupHeartbeatIntervalMs)
.withGenericGroupInitialRebalanceDelayMs(config.genericGroupInitialRebalanceDelayMs)
Expand All @@ -162,7 +149,7 @@ public ReplicatedGroupCoordinator build() {
.withOffsetMetadataMaxSize(config.offsetMetadataMaxSize)
.build();

return new ReplicatedGroupCoordinator(
return new GroupCoordinatorShard(
groupMetadataManager,
offsetMetadataManager
);
Expand All @@ -185,7 +172,7 @@ public ReplicatedGroupCoordinator build() {
* @param groupMetadataManager The group metadata manager.
* @param offsetMetadataManager The offset metadata manager.
*/
ReplicatedGroupCoordinator(
GroupCoordinatorShard(
GroupMetadataManager groupMetadataManager,
OffsetMetadataManager offsetMetadataManager
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
Expand Down Expand Up @@ -131,11 +130,10 @@ public static class Builder {
private SnapshotRegistry snapshotRegistry = null;
private Time time = null;
private CoordinatorTimer<Void, Record> timer = null;
private List<PartitionAssignor> assignors = null;
private List<PartitionAssignor> consumerGroupAssignors = null;
private int consumerGroupMaxSize = Integer.MAX_VALUE;
private int consumerGroupHeartbeatIntervalMs = 5000;
private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
private TopicPartition topicPartition = null;
private MetadataImage metadataImage = null;
private int consumerGroupSessionTimeoutMs = 45000;
private int genericGroupMaxSize = Integer.MAX_VALUE;
Expand Down Expand Up @@ -164,8 +162,8 @@ Builder withTimer(CoordinatorTimer<Void, Record> timer) {
return this;
}

Builder withAssignors(List<PartitionAssignor> assignors) {
this.assignors = assignors;
Builder withConsumerGroupAssignors(List<PartitionAssignor> consumerGroupAssignors) {
this.consumerGroupAssignors = consumerGroupAssignors;
return this;
}

Expand Down Expand Up @@ -194,11 +192,6 @@ Builder withMetadataImage(MetadataImage metadataImage) {
return this;
}

Builder withTopicPartition(TopicPartition tp) {
this.topicPartition = tp;
return this;
}

Builder withGenericGroupMaxSize(int genericGroupMaxSize) {
this.genericGroupMaxSize = genericGroupMaxSize;
return this;
Expand Down Expand Up @@ -232,20 +225,15 @@ GroupMetadataManager build() {

if (timer == null)
throw new IllegalArgumentException("Timer must be set.");
if (assignors == null || assignors.isEmpty())
if (consumerGroupAssignors == null || consumerGroupAssignors.isEmpty())
throw new IllegalArgumentException("Assignors must be set before building.");

if (topicPartition == null) {
throw new IllegalStateException("TopicPartition must be set before building.");
}

return new GroupMetadataManager(
topicPartition,
snapshotRegistry,
logContext,
time,
timer,
assignors,
consumerGroupAssignors,
metadataImage,
consumerGroupMaxSize,
consumerGroupSessionTimeoutMs,
Expand All @@ -260,11 +248,6 @@ GroupMetadataManager build() {
}
}

/**
* The topic partition associated with the metadata manager.
*/
private final TopicPartition topicPartition;

/**
* The log context.
*/
Expand Down Expand Up @@ -370,7 +353,6 @@ GroupMetadataManager build() {
private final int genericGroupMaxSessionTimeoutMs;

private GroupMetadataManager(
TopicPartition topicPartition,
SnapshotRegistry snapshotRegistry,
LogContext logContext,
Time time,
Expand All @@ -394,7 +376,6 @@ private GroupMetadataManager(
this.timer = timer;
this.metadataImage = metadataImage;
this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
this.topicPartition = topicPartition;
this.defaultAssignor = assignors.get(0);
this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
Expand Down Expand Up @@ -1973,8 +1954,7 @@ private CoordinatorResult<Void, Record> completeGenericGroupJoin(
} else {
group.initNextGeneration();
if (group.isInState(EMPTY)) {
log.info("Group {} with generation {} is now empty ({}-{})",
groupId, group.generationId(), topicPartition.topic(), topicPartition.partition());
log.info("Group {} with generation {} is now empty.", groupId, group.generationId());

CompletableFuture<Void> appendFuture = new CompletableFuture<>();
appendFuture.whenComplete((__, t) -> {
Expand All @@ -1992,8 +1972,8 @@ private CoordinatorResult<Void, Record> completeGenericGroupJoin(
return new CoordinatorResult<>(records, appendFuture);

} else {
log.info("Stabilized group {} generation {} ({}) with {} members",
groupId, group.generationId(), topicPartition, group.size());
log.info("Stabilized group {} generation {} with {} members.",
groupId, group.generationId(), group.size());

// Complete the awaiting join group response future for all the members after rebalancing
group.allMembers().forEach(member -> {
Expand Down Expand Up @@ -2272,9 +2252,8 @@ CoordinatorResult<Void, Record> prepareRebalance(

group.transitionTo(PREPARING_REBALANCE);

log.info("Preparing to rebalance group {} in state {} with old generation {} ({}-{}) (reason: {})",
group.groupId(), group.currentState(), group.generationId(),
topicPartition.topic(), topicPartition.partition(), reason);
log.info("Preparing to rebalance group {} in state {} with old generation {} (reason: {}).",
group.groupId(), group.currentState(), group.generationId(), reason);

return isInitialRebalance ? EMPTY_RESULT : maybeCompleteJoinElseSchedule(group);
}
Expand Down
Loading