Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16312, KAFKA-16185: Local epochs in reconciliation #15511

Merged
merged 11 commits into from
Mar 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.MembershipManager.LocalAssignment;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
Expand Down Expand Up @@ -538,16 +539,16 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() {
}
});

// RebalanceTimeoutMs - only sent if has changed since the last heartbeat
if (sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
// RebalanceTimeoutMs - only sent when joining or if has changed since the last heartbeat
if (membershipManager.memberEpoch() == 0 || sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
lucasbru marked this conversation as resolved.
Show resolved Hide resolved
data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
sentFields.rebalanceTimeoutMs = rebalanceTimeoutMs;
}

if (!this.subscriptions.hasPatternSubscription()) {
// SubscribedTopicNames - only sent if has changed since the last heartbeat
// SubscribedTopicNames - only sent when joining or if has changed since the last heartbeat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inherited, but now that we're here: "or if it has changed..."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

TreeSet<String> subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription());
if (!subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
if (membershipManager.memberEpoch() == 0 || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription()));
sentFields.subscribedTopicNames = subscribedTopicNames;
}
Expand All @@ -566,18 +567,18 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() {

// ClientAssignors - not supported yet

// TopicPartitions - only sent if it has changed since the last heartbeat. Note that
// the string consists of just the topic ID and the partitions. When an assignment is
// received, we might not yet know the topic name, and then it is learnt subsequently
// by a metadata update.
TreeSet<String> assignedPartitions = membershipManager.currentAssignment().entrySet().stream()
.map(entry -> entry.getKey() + "-" + entry.getValue())
.collect(Collectors.toCollection(TreeSet::new));
if (!assignedPartitions.equals(sentFields.topicPartitions)) {
// TopicPartitions - sent with the first heartbeat after a new assignment from the server was
// reconciled. This is ensured by resending the topic partitions whenever the local assignment,
// including its local epoch is changed (although the local epoch is not sent in the heartbeat).
LocalAssignment local = membershipManager.currentAssignment();
if (local == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could use isNone.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

data.setTopicPartitions(Collections.emptyList());
sentFields.topicPartitions = null;
} else if (!local.equals(sentFields.topicPartitions)) {
lucasbru marked this conversation as resolved.
Show resolved Hide resolved
List<ConsumerGroupHeartbeatRequestData.TopicPartitions> topicPartitions =
buildTopicPartitionsList(membershipManager.currentAssignment());
buildTopicPartitionsList(local.partitions);
data.setTopicPartitions(topicPartitions);
sentFields.topicPartitions = assignedPartitions;
sentFields.topicPartitions = local;
}

return data;
Expand All @@ -597,7 +598,7 @@ static class SentFields {
private int rebalanceTimeoutMs = -1;
private TreeSet<String> subscribedTopicNames = null;
private String serverAssignor = null;
private TreeSet<String> topicPartitions = null;
private LocalAssignment topicPartitions = null;

SentFields() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -99,7 +103,7 @@ public interface MembershipManager extends RequestManager {
* @return Current assignment for the member as received from the broker (topic IDs and
* partitions). This is the last assignment that the member has successfully reconciled.
*/
Map<Uuid, SortedSet<Integer>> currentAssignment();
LocalAssignment currentAssignment();

/**
* Transition the member to the FENCED state, where the member will release the assignment by
Expand Down Expand Up @@ -185,4 +189,56 @@ public interface MembershipManager extends RequestManager {
* releasing its assignment. This is expected to be used when the poll timer is reset.
*/
void maybeRejoinStaleMember();

/**
* A data structure to represent the current assignment, and current target assignment of a member in a consumer group.
*
* Besides the assigned partitions, it contains a local epoch that is bumped whenever the assignment changes, to ensure
* that two assignments with the same partitions but different local epochs are not considered equal.
lucasbru marked this conversation as resolved.
Show resolved Hide resolved
*/
final class LocalAssignment {

public final long localEpoch;

public final Map<Uuid, SortedSet<Integer>> partitions;

public LocalAssignment(long localEpoch, Map<Uuid, SortedSet<Integer>> partitions) {
this.localEpoch = localEpoch;
this.partitions = partitions;
}

public LocalAssignment(long localEpoch, SortedSet<TopicIdPartition> topicIdPartitions) {
this.localEpoch = localEpoch;
this.partitions = new HashMap<>();
topicIdPartitions.forEach(topicIdPartition -> {
Uuid topicId = topicIdPartition.topicId();
partitions.computeIfAbsent(topicId, k -> new TreeSet<>()).add(topicIdPartition.partition());
});
}

@Override
public String toString() {
return "{" +
"localEpoch=" + localEpoch +
", partitions=" + partitions +
'}';
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final LocalAssignment that = (LocalAssignment) o;
return localEpoch == that.localEpoch && Objects.equals(partitions, that.partitions);
}

@Override
public int hashCode() {
return Objects.hash(localEpoch, partitions);
}
}
}