Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14462; [20/N] Refresh subscription metadata on new metadata image #13901

Merged
merged 13 commits into from
Jul 5, 2023
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@
<suppress checks="CyclomaticComplexity"
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="MethodLength"
files="(ConsumerGroupTest|GroupMetadataManagerTest).java"/>
files="(GroupMetadataManager|ConsumerGroupTest|GroupMetadataManagerTest).java"/>
<suppress checks="NPathComplexity"
files="(GroupMetadataManager).java"/>
<suppress checks="ParameterNumber"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ public void onNewMetadataImage(
MetadataDelta delta
) {
throwIfNotActive();
runtime.onNewMetadataImage(newImage, delta);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.consumer.Assignment;
Expand All @@ -50,14 +51,18 @@
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -88,10 +93,12 @@ public class GroupMetadataManager {
public static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
private Time time = null;
private List<PartitionAssignor> assignors = null;
private TopicsImage topicsImage = null;
dajac marked this conversation as resolved.
Show resolved Hide resolved
private int consumerGroupMaxSize = Integer.MAX_VALUE;
private int consumerGroupHeartbeatIntervalMs = 5000;
private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
private MetadataImage metadataImage = null;

Builder withLogContext(LogContext logContext) {
this.logContext = logContext;
Expand All @@ -103,6 +110,11 @@ Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
return this;
}

Builder withTime(Time time) {
this.time = time;
return this;
}

Builder withAssignors(List<PartitionAssignor> assignors) {
this.assignors = assignors;
return this;
Expand All @@ -118,15 +130,21 @@ Builder withConsumerGroupHeartbeatInterval(int consumerGroupHeartbeatIntervalMs)
return this;
}

Builder withTopicsImage(TopicsImage topicsImage) {
this.topicsImage = topicsImage;
Builder withConsumerGroupMetadataRefreshIntervalMs(int consumerGroupMetadataRefreshIntervalMs) {
this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs;
return this;
}

Builder withMetadataImage(MetadataImage metadataImage) {
this.metadataImage = metadataImage;
return this;
}

GroupMetadataManager build() {
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry = new SnapshotRegistry(logContext);
if (topicsImage == null) topicsImage = TopicsImage.EMPTY;
if (metadataImage == null) metadataImage = MetadataImage.EMPTY;
if (time == null) time = Time.SYSTEM;

if (assignors == null || assignors.isEmpty()) {
throw new IllegalStateException("Assignors must be set before building.");
Expand All @@ -135,10 +153,12 @@ GroupMetadataManager build() {
return new GroupMetadataManager(
snapshotRegistry,
logContext,
time,
assignors,
topicsImage,
metadataImage,
consumerGroupMaxSize,
consumerGroupHeartbeatIntervalMs
consumerGroupHeartbeatIntervalMs,
consumerGroupMetadataRefreshIntervalMs
);
}
}
Expand All @@ -153,6 +173,11 @@ GroupMetadataManager build() {
*/
private final SnapshotRegistry snapshotRegistry;

/**
* The system time.
*/
private final Time time;

/**
* The supported partition assignors keyed by their name.
*/
Expand All @@ -168,6 +193,11 @@ GroupMetadataManager build() {
*/
private final TimelineHashMap<String, Group> groups;

/**
* The group ids keyed by topic names.
*/
private final TimelineHashMap<String, TimelineHashSet<String>> groupsByTopics;

/**
* The maximum number of members allowed in a single consumer group.
*/
Expand All @@ -179,26 +209,43 @@ GroupMetadataManager build() {
private final int consumerGroupHeartbeatIntervalMs;

/**
* The topics metadata (or image).
* The metadata refresh interval.
*/
private final int consumerGroupMetadataRefreshIntervalMs;

/**
* The metadata image.
*/
private TopicsImage topicsImage;
private MetadataImage metadataImage;

private GroupMetadataManager(
SnapshotRegistry snapshotRegistry,
LogContext logContext,
Time time,
List<PartitionAssignor> assignors,
TopicsImage topicsImage,
MetadataImage metadataImage,
int consumerGroupMaxSize,
int consumerGroupHeartbeatIntervalMs
int consumerGroupHeartbeatIntervalMs,
int consumerGroupMetadataRefreshIntervalMs
) {
this.log = logContext.logger(GroupMetadataManager.class);
this.snapshotRegistry = snapshotRegistry;
this.topicsImage = topicsImage;
this.time = time;
this.metadataImage = metadataImage;
this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
this.defaultAssignor = assignors.get(0);
this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
this.consumerGroupMaxSize = consumerGroupMaxSize;
this.consumerGroupHeartbeatIntervalMs = consumerGroupHeartbeatIntervalMs;
this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs;
}

/**
* @return The current metadata image used by the group metadata manager.
*/
public MetadataImage image() {
return metadataImage;
}

/**
Expand Down Expand Up @@ -472,7 +519,8 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
String assignorName,
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions
) throws ApiException {
List<Record> records = new ArrayList<>();
final long currentTimeMs = time.milliseconds();
final List<Record> records = new ArrayList<>();

// Get or create the consumer group.
boolean createIfNotExists = memberEpoch == 0;
Expand Down Expand Up @@ -506,30 +554,47 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
.setClientHost(clientHost)
.build();

boolean bumpGroupEpoch = false;
if (!updatedMember.equals(member)) {
records.add(newMemberSubscriptionRecord(groupId, updatedMember));

if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed topics to: " +
updatedMember.subscribedTopicNames());
bumpGroupEpoch = true;
}

subscriptionMetadata = group.computeSubscriptionMetadata(
member,
updatedMember,
topicsImage
);
if (!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
log.info("[GroupId " + groupId + "] Member " + memberId + " updated its subscribed regex to: " +
updatedMember.subscribedTopicRegex());
bumpGroupEpoch = true;
}
}

if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
+ subscriptionMetadata + ".");
records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
}
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
// The subscription metadata is updated in two cases:
// 1) The member has updated its subscriptions;
// 2) The refresh deadline has been reached.
subscriptionMetadata = group.computeSubscriptionMetadata(
member,
updatedMember,
metadataImage.topics()
);

if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
log.info("[GroupId " + groupId + "] Computed new subscription metadata: "
+ subscriptionMetadata + ".");
bumpGroupEpoch = true;
records.add(newGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
}

if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newGroupEpochRecord(groupId, groupEpoch));

log.info("[GroupId " + groupId + "] Bumped group epoch to " + groupEpoch + ".");
}

group.setMetadataRefreshDeadline(currentTimeMs + consumerGroupMetadataRefreshIntervalMs, groupEpoch);
}

// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The
Expand Down Expand Up @@ -635,7 +700,7 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
member,
null,
topicsImage
metadataImage.topics()
);

if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
Expand Down Expand Up @@ -709,14 +774,15 @@ public void replay(
String groupId = key.groupId();
String memberId = key.memberId();

ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, value != null);
Set<String> oldSubscribedTopicNames = new HashSet<>(consumerGroup.subscribedTopicNames());

if (value != null) {
ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, true);
ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true);
consumerGroup.updateMember(new ConsumerGroupMember.Builder(oldMember)
.updateWith(value)
.build());
} else {
ConsumerGroup consumerGroup = getOrMaybeCreateConsumerGroup(groupId, false);
ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false);
if (oldMember.memberEpoch() != -1) {
throw new IllegalStateException("Received a tombstone record to delete member " + memberId
Expand All @@ -728,6 +794,81 @@ public void replay(
}
consumerGroup.removeMember(memberId);
}

updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames());
dajac marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @return The set of groups subscribed to the topic.
*/
public Set<String> groupsSubscribedToTopic(String topicName) {
Set<String> groups = groupsByTopics.get(topicName);
dajac marked this conversation as resolved.
Show resolved Hide resolved
return groups != null ? groups : Collections.emptySet();
}

/**
* Subscribes a group to a topic.
*
* @param groupId The group id.
* @param topicName The topic name.
*/
private void subscribeGroupToTopic(
String groupId,
String topicName
) {
groupsByTopics
.computeIfAbsent(topicName, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
.add(groupId);
}

/**
* Unsubscribes a group from a topic.
*
* @param groupId The group id.
* @param topicName The topic name.
*/
private void unsubscribeGroupFromTopic(
String groupId,
String topicName
) {
groupsByTopics.computeIfPresent(topicName, (__, groupIds) -> {
groupIds.remove(groupId);
return groupIds.isEmpty() ? null : groupIds;
});
}

/**
* Updates the group by topics mapping.
*
* @param groupId The group id.
* @param oldSubscribedTopics The old group subscriptions.
* @param newSubscribedTopics The new group subscriptions.
*/
private void updateGroupsByTopics(
String groupId,
Set<String> oldSubscribedTopics,
Set<String> newSubscribedTopics
) {
if (oldSubscribedTopics.isEmpty()) {
newSubscribedTopics.forEach(topicName ->
subscribeGroupToTopic(groupId, topicName)
);
} else if (newSubscribedTopics.isEmpty()) {
oldSubscribedTopics.forEach(topicName ->
unsubscribeGroupFromTopic(groupId, topicName)
);
} else {
oldSubscribedTopics.forEach(topicName -> {
if (!newSubscribedTopics.contains(topicName)) {
unsubscribeGroupFromTopic(groupId, topicName);
}
});
newSubscribedTopics.forEach(topicName -> {
if (!oldSubscribedTopics.contains(topicName)) {
subscribeGroupToTopic(groupId, topicName);
}
});
}
}

/**
Expand Down Expand Up @@ -874,4 +1015,32 @@ public void replay(
consumerGroup.updateMember(newMember);
}
}

/**
* A new metadata image is available.
*
* @param newImage The new metadata image.
* @param delta The delta image.
*/
public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
metadataImage = newImage;

// Notify all the groups subscribed to the created, updated or
dajac marked this conversation as resolved.
Show resolved Hide resolved
// deleted topics.
Set<String> allGroupIds = new HashSet<>();
delta.topicsDelta().changedTopics().forEach((topicId, topicDelta) -> {
String topicName = topicDelta.name();
allGroupIds.addAll(groupsSubscribedToTopic(topicName));
});
delta.topicsDelta().deletedTopicIds().forEach(topicId -> {
TopicImage topicImage = delta.image().topics().getTopic(topicId);
allGroupIds.addAll(groupsSubscribedToTopic(topicImage.name()));
});
allGroupIds.forEach(groupId -> {
Group group = groups.get(groupId);
if (group != null && group.type() == Group.GroupType.CONSUMER) {
((ConsumerGroup) group).requestMetadataRefresh();
}
});
}
}