Skip to content

Commit

Permalink
KAFKA-16312, KAFKA-16185: Local epochs in reconciliation (#15511)
Browse files Browse the repository at this point in the history
The goal of this commit is to change the following internals of the reconciliation:

- Introduce a "local epoch" to the local target assignment. When a new target is received by the server, we compare it with the current value. If it is the same, no change. Otherwise, we bump the local epoch and store the new target assignment. Then, on the reconciliation, we also store the epoch in the reconciled assignment and keep using target != current to trigger the reconciliation.
- When we are not in a group (we have not received an assignment), we use null to represent the local target assignment instead of an empty list, to avoid confusions with an empty assignment received by the server. Similarly, we use null to represent the current assignment, when we haven't reconciled the assignment yet.
We also carry the new epoch into the request builder to ensure that we report the owned partitions for the last local epoch.
- To address KAFKA-16312 (call onPartitionsAssigned on empty assignments after joining), we apply the initial assignment returned by the group coordinator (whether empty or not) as a normal reconciliation. This avoids introducing another code path to trigger rebalance listeners - reconciliation is the only way to transition to STABLE. The unneeded parts of reconciliation (autocommit, revocation) will be skipped in the existing. Since a lot of unit tests assumed that not reconciliation behavior is invoked when joining the group with an empty assignment, this required a lot of the changes in the unit tests.

Reviewers: Lianet Magrans <lianetmr@gmail.com>, David Jacot <djacot@confluent.io>
  • Loading branch information
lucasbru committed Mar 18, 2024
1 parent e9c50b1 commit 5c92987
Show file tree
Hide file tree
Showing 5 changed files with 485 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.MembershipManager.LocalAssignment;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
Expand Down Expand Up @@ -534,25 +535,25 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() {

boolean sendAllFields = membershipManager.state() == MemberState.JOINING;

// RebalanceTimeoutMs - only sent if has changed since the last heartbeat
// RebalanceTimeoutMs - only sent when joining or if it has changed since the last heartbeat
if (sendAllFields || sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
sentFields.rebalanceTimeoutMs = rebalanceTimeoutMs;
}

if (!this.subscriptions.hasPatternSubscription()) {
// SubscribedTopicNames - only sent if has changed since the last heartbeat
// SubscribedTopicNames - only sent when joining or if it has changed since the last heartbeat
TreeSet<String> subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription());
if (sendAllFields || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription()));
sentFields.subscribedTopicNames = subscribedTopicNames;
}
} else {
// SubscribedTopicRegex - only sent if has changed since the last heartbeat
// SubscribedTopicRegex - only sent if it has changed since the last heartbeat
// - not supported yet
}

// ServerAssignor - only sent if has changed since the last heartbeat
// ServerAssignor - sent when joining or if it has changed since the last heartbeat
this.membershipManager.serverAssignor().ifPresent(serverAssignor -> {
if (sendAllFields || !serverAssignor.equals(sentFields.serverAssignor)) {
data.setServerAssignor(serverAssignor);
Expand All @@ -562,18 +563,16 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() {

// ClientAssignors - not supported yet

// TopicPartitions - only sent if it has changed since the last heartbeat. Note that
// the string consists of just the topic ID and the partitions. When an assignment is
// received, we might not yet know the topic name, and then it is learnt subsequently
// by a metadata update.
TreeSet<String> assignedPartitions = membershipManager.currentAssignment().entrySet().stream()
.map(entry -> entry.getKey() + "-" + entry.getValue())
.collect(Collectors.toCollection(TreeSet::new));
if (!assignedPartitions.equals(sentFields.topicPartitions)) {
// TopicPartitions - sent when joining or with the first heartbeat after a new assignment from
// the server was reconciled. This is ensured by resending the topic partitions whenever the
// local assignment, including its local epoch is changed (although the local epoch is not sent
// in the heartbeat).
LocalAssignment local = membershipManager.currentAssignment();
if (sendAllFields || !local.equals(sentFields.localAssignment)) {
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> topicPartitions =
buildTopicPartitionsList(membershipManager.currentAssignment());
buildTopicPartitionsList(local.partitions);
data.setTopicPartitions(topicPartitions);
sentFields.topicPartitions = assignedPartitions;
sentFields.localAssignment = local;
}

return data;
Expand All @@ -592,15 +591,15 @@ static class SentFields {
private int rebalanceTimeoutMs = -1;
private TreeSet<String> subscribedTopicNames = null;
private String serverAssignor = null;
private TreeSet<String> topicPartitions = null;
private LocalAssignment localAssignment = null;

SentFields() {}

void reset() {
rebalanceTimeoutMs = -1;
subscribedTopicNames = null;
rebalanceTimeoutMs = -1;
serverAssignor = null;
topicPartitions = null;
localAssignment = null;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -99,7 +104,7 @@ public interface MembershipManager extends RequestManager {
* @return Current assignment for the member as received from the broker (topic IDs and
* partitions). This is the last assignment that the member has successfully reconciled.
*/
Map<Uuid, SortedSet<Integer>> currentAssignment();
LocalAssignment currentAssignment();

/**
* Transition the member to the FENCED state, where the member will release the assignment by
Expand Down Expand Up @@ -185,4 +190,88 @@ public interface MembershipManager extends RequestManager {
* releasing its assignment. This is expected to be used when the poll timer is reset.
*/
void maybeRejoinStaleMember();

/**
* A data structure to represent the current assignment, and current target assignment of a member in a consumer group.
*
* Besides the assigned partitions, it contains a local epoch that is bumped whenever the assignment changes, to ensure
* that two assignments with the same partitions but different local epochs are not considered equal.
*/
final class LocalAssignment {

public static final long NONE_EPOCH = -1;

public static final LocalAssignment NONE = new LocalAssignment(NONE_EPOCH, Collections.emptyMap());

public final long localEpoch;

public final Map<Uuid, SortedSet<Integer>> partitions;

public LocalAssignment(long localEpoch, Map<Uuid, SortedSet<Integer>> partitions) {
this.localEpoch = localEpoch;
this.partitions = partitions;
if (localEpoch == NONE_EPOCH && !partitions.isEmpty()) {
throw new IllegalArgumentException("Local epoch must be set if there are partitions");
}
}

public LocalAssignment(long localEpoch, SortedSet<TopicIdPartition> topicIdPartitions) {
this.localEpoch = localEpoch;
this.partitions = new HashMap<>();
if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) {
throw new IllegalArgumentException("Local epoch must be set if there are partitions");
}
topicIdPartitions.forEach(topicIdPartition -> {
Uuid topicId = topicIdPartition.topicId();
partitions.computeIfAbsent(topicId, k -> new TreeSet<>()).add(topicIdPartition.partition());
});
}

public String toString() {
return "LocalAssignment{" +
"localEpoch=" + localEpoch +
", partitions=" + partitions +
'}';
}

public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final LocalAssignment that = (LocalAssignment) o;
return localEpoch == that.localEpoch && Objects.equals(partitions, that.partitions);
}

public int hashCode() {
return Objects.hash(localEpoch, partitions);
}

public boolean isNone() {
return localEpoch == NONE_EPOCH;
}

Optional<LocalAssignment> updateWith(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
// Return if we have an assignment, and it is the same as current assignment; comparison without creating a new collection
if (localEpoch != NONE_EPOCH) {
if (partitions.size() == assignment.topicPartitions().size() &&
assignment.topicPartitions().stream().allMatch(
tp -> partitions.containsKey(tp.topicId()) &&
partitions.get(tp.topicId()).size() == tp.partitions().size() &&
partitions.get(tp.topicId()).containsAll(tp.partitions()))) {
return Optional.empty();
}
}

// Bump local epoch and replace assignment
long nextLocalEpoch = localEpoch + 1;
HashMap<Uuid, SortedSet<Integer>> partitions = new HashMap<>();
assignment.topicPartitions().forEach(topicPartitions ->
partitions.put(topicPartitions.topicId(), new TreeSet<>(topicPartitions.partitions())));
return Optional.of(new LocalAssignment(nextLocalEpoch, partitions));

}
}
}

0 comments on commit 5c92987

Please sign in to comment.