Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16312, KAFKA-16185: Local epochs in reconciliation #15511

Merged
merged 11 commits into from
Mar 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.MembershipManager.LocalAssignment;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
Expand Down Expand Up @@ -534,25 +535,25 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() {

boolean sendAllFields = membershipManager.state() == MemberState.JOINING;

// RebalanceTimeoutMs - only sent if has changed since the last heartbeat
// RebalanceTimeoutMs - only sent when joining or if it has changed since the last heartbeat
if (sendAllFields || sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
sentFields.rebalanceTimeoutMs = rebalanceTimeoutMs;
}

if (!this.subscriptions.hasPatternSubscription()) {
// SubscribedTopicNames - only sent if has changed since the last heartbeat
// SubscribedTopicNames - only sent when joining or if it has changed since the last heartbeat
TreeSet<String> subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription());
if (sendAllFields || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription()));
sentFields.subscribedTopicNames = subscribedTopicNames;
}
} else {
// SubscribedTopicRegex - only sent if has changed since the last heartbeat
// SubscribedTopicRegex - only sent if it has changed since the last heartbeat
// - not supported yet
}

// ServerAssignor - only sent if has changed since the last heartbeat
// ServerAssignor - sent when joining or if it has changed since the last heartbeat
this.membershipManager.serverAssignor().ifPresent(serverAssignor -> {
if (sendAllFields || !serverAssignor.equals(sentFields.serverAssignor)) {
data.setServerAssignor(serverAssignor);
Expand All @@ -562,18 +563,16 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() {

// ClientAssignors - not supported yet

// TopicPartitions - only sent if it has changed since the last heartbeat. Note that
// the string consists of just the topic ID and the partitions. When an assignment is
// received, we might not yet know the topic name, and then it is learnt subsequently
// by a metadata update.
TreeSet<String> assignedPartitions = membershipManager.currentAssignment().entrySet().stream()
.map(entry -> entry.getKey() + "-" + entry.getValue())
.collect(Collectors.toCollection(TreeSet::new));
if (!assignedPartitions.equals(sentFields.topicPartitions)) {
// TopicPartitions - sent when joining or with the first heartbeat after a new assignment from
// the server was reconciled. This is ensured by resending the topic partitions whenever the
// local assignment, including its local epoch is changed (although the local epoch is not sent
// in the heartbeat).
LocalAssignment local = membershipManager.currentAssignment();
if (sendAllFields || !local.equals(sentFields.localAssignment)) {
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> topicPartitions =
buildTopicPartitionsList(membershipManager.currentAssignment());
buildTopicPartitionsList(local.partitions);
data.setTopicPartitions(topicPartitions);
sentFields.topicPartitions = assignedPartitions;
sentFields.localAssignment = local;
}

return data;
Expand All @@ -592,15 +591,15 @@ static class SentFields {
private int rebalanceTimeoutMs = -1;
private TreeSet<String> subscribedTopicNames = null;
private String serverAssignor = null;
private TreeSet<String> topicPartitions = null;
private LocalAssignment localAssignment = null;

SentFields() {}

void reset() {
rebalanceTimeoutMs = -1;
subscribedTopicNames = null;
rebalanceTimeoutMs = -1;
serverAssignor = null;
topicPartitions = null;
localAssignment = null;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -99,7 +104,7 @@ public interface MembershipManager extends RequestManager {
* @return Current assignment for the member as received from the broker (topic IDs and
* partitions). This is the last assignment that the member has successfully reconciled.
*/
Map<Uuid, SortedSet<Integer>> currentAssignment();
LocalAssignment currentAssignment();

/**
* Transition the member to the FENCED state, where the member will release the assignment by
Expand Down Expand Up @@ -185,4 +190,88 @@ public interface MembershipManager extends RequestManager {
* releasing its assignment. This is expected to be used when the poll timer is reset.
*/
void maybeRejoinStaleMember();

/**
* A data structure to represent the current assignment, and current target assignment of a member in a consumer group.
*
* Besides the assigned partitions, it contains a local epoch that is bumped whenever the assignment changes, to ensure
* that two assignments with the same partitions but different local epochs are not considered equal.
lucasbru marked this conversation as resolved.
Show resolved Hide resolved
*/
final class LocalAssignment {

public static final long NONE_EPOCH = -1;

public static final LocalAssignment NONE = new LocalAssignment(NONE_EPOCH, Collections.emptyMap());

public final long localEpoch;

public final Map<Uuid, SortedSet<Integer>> partitions;

public LocalAssignment(long localEpoch, Map<Uuid, SortedSet<Integer>> partitions) {
this.localEpoch = localEpoch;
this.partitions = partitions;
if (localEpoch == NONE_EPOCH && !partitions.isEmpty()) {
throw new IllegalArgumentException("Local epoch must be set if there are partitions");
}
}

public LocalAssignment(long localEpoch, SortedSet<TopicIdPartition> topicIdPartitions) {
this.localEpoch = localEpoch;
this.partitions = new HashMap<>();
if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) {
throw new IllegalArgumentException("Local epoch must be set if there are partitions");
}
topicIdPartitions.forEach(topicIdPartition -> {
Uuid topicId = topicIdPartition.topicId();
partitions.computeIfAbsent(topicId, k -> new TreeSet<>()).add(topicIdPartition.partition());
});
}

public String toString() {
return "LocalAssignment{" +
"localEpoch=" + localEpoch +
", partitions=" + partitions +
'}';
}

public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final LocalAssignment that = (LocalAssignment) o;
return localEpoch == that.localEpoch && Objects.equals(partitions, that.partitions);
}

public int hashCode() {
return Objects.hash(localEpoch, partitions);
}

public boolean isNone() {
return localEpoch == NONE_EPOCH;
}

Optional<LocalAssignment> updateWith(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
// Return if we have an assignment, and it is the same as current assignment; comparison without creating a new collection
if (localEpoch != NONE_EPOCH) {
if (partitions.size() == assignment.topicPartitions().size() &&
assignment.topicPartitions().stream().allMatch(
tp -> partitions.containsKey(tp.topicId()) &&
partitions.get(tp.topicId()).size() == tp.partitions().size() &&
partitions.get(tp.topicId()).containsAll(tp.partitions()))) {
return Optional.empty();
}
}

// Bump local epoch and replace assignment
long nextLocalEpoch = localEpoch + 1;
HashMap<Uuid, SortedSet<Integer>> partitions = new HashMap<>();
assignment.topicPartitions().forEach(topicPartitions ->
partitions.put(topicPartitions.topicId(), new TreeSet<>(topicPartitions.partitions())));
return Optional.of(new LocalAssignment(nextLocalEpoch, partitions));

}
}
}