Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image #13901

Merged
merged 13 commits into from
Jul 5, 2023
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@
<suppress checks="CyclomaticComplexity"
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="MethodLength"
files="(ConsumerGroupTest|GroupMetadataManagerTest).java"/>
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
<suppress checks="NPathComplexity"
files="(GroupMetadataManager).java"/>
<suppress checks="ParameterNumber"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ public void onNewMetadataImage(
MetadataDelta delta
) {
throwIfNotActive();
runtime.onNewMetadataImage(newImage, delta);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.consumer.Assignment;
Expand All @@ -50,14 +51,18 @@
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -88,10 +93,12 @@ public class GroupMetadataManager {
public static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private Time time = null;
private List<PartitionAssignor> assignors = null;
private TopicsImage topicsImage = null;
dajac marked this conversation as resolved.
Show resolved Hide resolved
private int consumerGroupMaxSize = Integer.MAX_VALUE;
private int consumerGroupHeartbeatIntervalMs = 5000;
private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
private MetadataImage metadataImage = null;

Builder withLogContext(LogContext logContext) {
this.logContext = logContext;
Expand All @@ -103,6 +110,11 @@ Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
return this;
}

Builder withTime(Time time) {
this.time = time;
return this;
}

Builder withAssignors(List<PartitionAssignor> assignors) {
this.assignors = assignors;
return this;
Expand All @@ -118,15 +130,21 @@ Builder withConsumerGroupHeartbeatInterval(int consumerGroupHeartbeatIntervalMs)
return this;
}

Builder withTopicsImage(TopicsImage topicsImage) {
this.topicsImage = topicsImage;
Builder withConsumerGroupMetadataRefreshIntervalMs(int consumerGroupMetadataRefreshIntervalMs) {
this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs;
return this;
}

Builder withMetadataImage(MetadataImage metadataImage) {
this.metadataImage = metadataImage;
return this;
}

GroupMetadataManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
if (time == null) time = Time.SYSTEM;

if (assignors == null || assignors.isEmpty()) {
throw new IllegalStateException("Assignors must be set before building.");
Expand All @@ -135,10 +153,12 @@ GroupMetadataManager build() {
return new GroupMetadataManager(
snapshotRegistry,
logContext,
time,
assignors,
topicsImage,
metadataImage,
consumerGroupMaxSize,
consumerGroupHeartbeatIntervalMs
consumerGroupHeartbeatIntervalMs,
consumerGroupMetadataRefreshIntervalMs
);
}
}
Expand All @@ -153,6 +173,11 @@ GroupMetadataManager build() {
*/
private final SnapshotRegistry snapshotRegistry;

/**
* The system time.
*/
private final Time time;

/**
* The supported partition assignors keyed by their name.
*/
Expand All @@ -168,6 +193,11 @@ GroupMetadataManager build() {
*/
private final TimelineHashMap<String, Group> groups;

/**
* The group ids keyed by topic names.
*/
private final TimelineHashMap<String, TimelineHashSet<String>> groupsByTopics;

/**
* The maximum number of members allowed in a single consumer group.
*/
Expand All @@ -179,26 +209,45 @@ GroupMetadataManager build() {
private final int consumerGroupHeartbeatIntervalMs;

/**
* The topics metadata (or image).
* The metadata refresh interval.
*/
private TopicsImage topicsImage;
private final int consumerGroupMetadataRefreshIntervalMs;

/**
* The metadata image.
*
* Package private for testing.
dajac marked this conversation as resolved.
Show resolved Hide resolved
*/
private MetadataImage metadataImage;

private GroupMetadataManager(
SnapshotRegistry snapshotRegistry,
LogContext logContext,
Time time,
List<PartitionAssignor> assignors,
TopicsImage topicsImage,
MetadataImage metadataImage,
int consumerGroupMaxSize,
int consumerGroupHeartbeatIntervalMs
int consumerGroupHeartbeatIntervalMs,
int consumerGroupMetadataRefreshIntervalMs
) {
this.log = logContext.logger(GroupMetadataManager.class);
this.snapshotRegistry = snapshotRegistry;
this.topicsImage = topicsImage;
this.time = time;
this.metadataImage = metadataImage;
this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
this.defaultAssignor = assignors.get(0);
this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
this.consumerGroupMaxSize = consumerGroupMaxSize;
this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs;
}

/**
* @return The current metadata image used by the group metadata manager.
*/
public MetadataImage image() {
return metadataImage;
}

/**
Expand Down Expand Up @@ -506,32 +555,54 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
.setClientHost(clientHost)
.build();

boolean updatedMemberSubscriptions = false;
if (!updatedMember.equals(member)) {
records.add(newMemberSubscriptionRecord(groupId, updatedMember));

if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed topics to: " +
updatedMember.subscribedTopicNames());
updatedMemberSubscriptions = true;
}

subscriptionMetadata = group.computeSubscriptionMetadata(
member,
updatedMember,
topicsImage
);

if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
+ subscriptionMetadata + ".");
records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
}
if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed regex to: " +
updatedMember.subscribedTopicRegex());
updatedMemberSubscriptions = true;
}
}

groupEpoch += 1;
records.add(newGroupEpochRecord(groupId, groupEpoch));
long currentTimeMs = time.milliseconds();
boolean maybeUpdateMetadata = updatedMemberSubscriptions || group.refreshMetadataNeeded(currentTimeMs);
boolean updatedSubscriptionMetadata = false;
if (maybeUpdateMetadata) {
subscriptionMetadata = group.computeSubscriptionMetadata(
member,
updatedMember,
metadataImage.topics()
);

log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + ".");
if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
+ subscriptionMetadata + ".");
records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
updatedSubscriptionMetadata = true;
}
}

if (updatedMemberSubscriptions || updatedSubscriptionMetadata) {
groupEpoch += 1;
records.add(newGroupEpochRecord(groupId, groupEpoch));
log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + ".");
}

if (maybeUpdateMetadata) {
dajac marked this conversation as resolved.
Show resolved Hide resolved
group.setNextMetadataRefreshTime(
Math.min(Long.MAX_VALUE, currentTimeMs + consumerGroupMetadataRefreshIntervalMs),
groupEpoch
);
}

// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The
// delta between the existing and the new target assignment is persisted to the partition.
int targetAssignmentEpoch = group.assignmentEpoch();
Expand Down Expand Up @@ -635,7 +706,7 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
member,
null,
topicsImage
metadataImage.topics()
);

if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
Expand Down Expand Up @@ -709,14 +780,16 @@ public void replay(
String groupId = key.groupId();
String memberId = key.memberId();

ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, value != null);
Set<String> oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames());

if (value != null) {
ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true);
ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true);
consumerGroup.updateMember(new ConsumerGroupMember.Builder(oldMember)
.updateWith(value)
.build());
updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames());
dajac marked this conversation as resolved.
Show resolved Hide resolved
} else {
ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false);
if (oldMember.memberEpoch() != -1) {
throw new IllegalStateException("Received a tombstone record to delete member " + memberId
Expand All @@ -727,6 +800,80 @@ public void replay(
+ " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone.");
}
consumerGroup.removeMember(memberId);
updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames());
}
}

/**
* @return The set of groups subscribed to the topic.
*/
public Set<String> groupsSubscribedToTopic(String topicName) {
Set<String> groups = groupsByTopics.get(topicName);
dajac marked this conversation as resolved.
Show resolved Hide resolved
return groups != null ? groups : Collections.emptySet();
}

/**
* Subscribes a group to a topic.
*
* @param groupId The group id.
* @param topicName The topic name.
*/
private void subscribeGroupToTopic(
String groupId,
String topicName
) {
groupsByTopics
.computeIfAbsent(topicName, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
.add(groupId);
}

/**
* Unsubscribes a group from a topic.
*
* @param groupId The group id.
* @param topicName The topic name.
*/
private void unsubscribeGroupFromTopic(
String groupId,
String topicName
) {
groupsByTopics.computeIfPresent(topicName, (__, groupIds) -> {
groupIds.remove(groupId);
return groupIds.isEmpty() ? null : groupIds;
});
}

/**
* Updates the group by topics mapping.
*
* @param groupId The group id.
* @param oldSubscribedTopics The old group subscriptions.
* @param newSubscribedTopics The new group subscriptions.
*/
private void updateGroupsByTopics(
String groupId,
Set<String> oldSubscribedTopics,
Set<String> newSubscribedTopics
) {
if (oldSubscribedTopics.isEmpty()) {
newSubscribedTopics.forEach(topicName ->
subscribeGroupToTopic(groupId, topicName)
);
} else if (newSubscribedTopics.isEmpty()) {
oldSubscribedTopics.forEach(topicName ->
unsubscribeGroupFromTopic(groupId, topicName)
);
} else {
oldSubscribedTopics.forEach(topicName -> {
if (!newSubscribedTopics.contains(topicName)) {
unsubscribeGroupFromTopic(groupId, topicName);
}
});
newSubscribedTopics.forEach(topicName -> {
if (!oldSubscribedTopics.contains(topicName)) {
subscribeGroupToTopic(groupId, topicName);
}
});
}
}

Expand Down Expand Up @@ -874,4 +1021,34 @@ public void replay(
consumerGroup.updateMember(newMember);
}
}

/**
* A new metadata image is available.
*
* @param newImage The new metadata image.
* @param delta The delta image.
*/
public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
metadataImage = newImage;

// Notify all the groups subscribed to the created, updated or
dajac marked this conversation as resolved.
Show resolved Hide resolved
// deleted topics.
Set<String> allGroupIds = new HashSet<>();
delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
String topicName = topicDelta.name();
Set<String> groupIds = groupsByTopics.get(topicName);
if (groupIds != null) allGroupIds.addAll(groupIds);
});
delta.topicsDelta().deletedTopicIds().forEach(topicId -> {
TopicImage topicImage = delta.image().topics().getTopic(topicId);
Set<String> groupIds = groupsByTopics.get(topicImage.name());
if (groupIds != null) allGroupIds.addAll(groupIds);
dajac marked this conversation as resolved.
Show resolved Hide resolved
});
allGroupIds.forEach(groupId -> {
Group group = groups.get(groupId);
if (group != null && group.type() == Group.GroupType.CONSUMER) {
((ConsumerGroup) group).resetNextMetadataRefreshTime();
}
});
}
}