Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16312, KAFKA-16185: Local epochs in reconciliation #15511

Merged
merged 11 commits into from
Mar 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -533,13 +533,14 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() {
// InstanceId - set if present
membershipManager.groupInstanceId().ifPresent(data::setInstanceId);
lucasbru marked this conversation as resolved.
Show resolved Hide resolved

// RebalanceTimeoutMs - only sent when joining
if (membershipManager.memberEpoch() == 0) {
// RebalanceTimeoutMs - only sent when joining or if has changed since the last heartbeat
if (membershipManager.memberEpoch() == 0 || sentFields.rebalanceTimeoutMs != rebalanceTimeoutMs) {
lucasbru marked this conversation as resolved.
Show resolved Hide resolved
data.setRebalanceTimeoutMs(rebalanceTimeoutMs);
sentFields.rebalanceTimeoutMs = rebalanceTimeoutMs;
}

if (!this.subscriptions.hasPatternSubscription()) {
// SubscribedTopicNames - only sent when joining or if has changed since the last heartbeat
// SubscribedTopicNames - only sent when joining or if it has changed since the last heartbeat
TreeSet<String> subscribedTopicNames = new TreeSet<>(this.subscriptions.subscription());
if (membershipManager.memberEpoch() == 0 || !subscribedTopicNames.equals(sentFields.subscribedTopicNames)) {
data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription()));
Expand All @@ -552,7 +553,7 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() {

// ServerAssignor - only sent if has changed since the last heartbeat
this.membershipManager.serverAssignor().ifPresent(serverAssignor -> {
if (!serverAssignor.equals(sentFields.serverAssignor)) {
if (membershipManager.memberEpoch() == 0 || !serverAssignor.equals(sentFields.serverAssignor)) {
data.setServerAssignor(serverAssignor);
sentFields.serverAssignor = serverAssignor;
}
Expand All @@ -564,12 +565,9 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() {
// reconciled. This is ensured by resending the topic partitions whenever the local assignment,
// including its local epoch is changed (although the local epoch is not sent in the heartbeat).
LocalAssignment local = membershipManager.currentAssignment();
if (local == null) {
data.setTopicPartitions(Collections.emptyList());
sentFields.localAssignment = null;
} else if (!local.equals(sentFields.localAssignment)) {
if (!local.equals(sentFields.localAssignment)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need membershipManager.memberEpoch() == 0 here too? I suppose that it works because the current assignment is reset in the membership manager but it may be better to add it here for consistency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

List<ConsumerGroupHeartbeatRequestData.TopicPartitions> topicPartitions =
buildTopicPartitionsList(local.getPartitions());
buildTopicPartitionsList(local.partitions);
data.setTopicPartitions(topicPartitions);
sentFields.localAssignment = local;
}
Expand All @@ -587,6 +585,7 @@ private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> buildTopicPartit

// Fields of ConsumerHeartbeatRequest sent in the most recent request
static class SentFields {
private int rebalanceTimeoutMs = -1;
private TreeSet<String> subscribedTopicNames = null;
private String serverAssignor = null;
private LocalAssignment localAssignment = null;
Expand All @@ -595,6 +594,7 @@ static class SentFields {

void reset() {
subscribedTopicNames = null;
rebalanceTimeoutMs = -1;
serverAssignor = null;
localAssignment = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;

/**
Expand Down Expand Up @@ -192,11 +197,82 @@ public interface MembershipManager extends RequestManager {
* Besides the assigned partitions, it contains a local epoch that is bumped whenever the assignment changes, to ensure
* that two assignments with the same partitions but different local epochs are not considered equal.
*/
interface LocalAssignment {
final class LocalAssignment {

Map<Uuid, SortedSet<Integer>> getPartitions();
public static final long NONE_EPOCH = -1;

boolean isNone();
public static final LocalAssignment NONE = new LocalAssignment(NONE_EPOCH, Collections.emptyMap());

public final long localEpoch;

public final Map<Uuid, SortedSet<Integer>> partitions;

public LocalAssignment(long localEpoch, Map<Uuid, SortedSet<Integer>> partitions) {
this.localEpoch = localEpoch;
this.partitions = partitions;
if (localEpoch == NONE_EPOCH && !partitions.isEmpty()) {
throw new IllegalArgumentException("Local epoch must be set if there are partitions");
}
}

public LocalAssignment(long localEpoch, SortedSet<TopicIdPartition> topicIdPartitions) {
this.localEpoch = localEpoch;
this.partitions = new HashMap<>();
if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) {
throw new IllegalArgumentException("Local epoch must be set if there are partitions");
}
topicIdPartitions.forEach(topicIdPartition -> {
Uuid topicId = topicIdPartition.topicId();
partitions.computeIfAbsent(topicId, k -> new TreeSet<>()).add(topicIdPartition.partition());
});
}

public String toString() {
return "LocalAssignment{" +
"localEpoch=" + localEpoch +
", partitions=" + partitions +
'}';
}

public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final LocalAssignment that = (LocalAssignment) o;
return localEpoch == that.localEpoch && Objects.equals(partitions, that.partitions);
}

public int hashCode() {
return Objects.hash(localEpoch, partitions);
}

public boolean isNone() {
return localEpoch == NONE_EPOCH;
}

Optional<LocalAssignment> updateWith(ConsumerGroupHeartbeatResponseData.Assignment assignment) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we remove this empty line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Return if we have an assignment, and it is the same as current assignment; comparison without creating a new collection
if (localEpoch != NONE_EPOCH) {
if (partitions.size() == assignment.topicPartitions().size() &&
assignment.topicPartitions().stream().allMatch(
tp -> partitions.containsKey(tp.topicId()) &&
partitions.get(tp.topicId()).size() == tp.partitions().size() &&
partitions.get(tp.topicId()).containsAll(tp.partitions()))) {
return Optional.empty();
}
}

// Bump local epoch and replace assignment
long nextLocalEpoch = localEpoch + 1;
HashMap<Uuid, SortedSet<Integer>> partitions = new HashMap<>();
assignment.topicPartitions().forEach(topicPartitions ->
partitions.put(topicPartitions.topicId(), new TreeSet<>(topicPartitions.partitions())));
return Optional.of(new LocalAssignment(nextLocalEpoch, partitions));

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
Expand Down Expand Up @@ -176,9 +175,9 @@ public class MembershipManagerImpl implements MembershipManager {
* Assignment that the member received from the server and successfully processed, together with
* its local epoch.
*
* This is equal to LocalAssignmentImpl.NONE when we are not in a group, or we haven't reconciled any assignment yet.
* This is equal to LocalAssignment.NONE when we are not in a group, or we haven't reconciled any assignment yet.
*/
private LocalAssignmentImpl currentAssignment;
private LocalAssignment currentAssignment;

/**
* Subscription state object holding the current assignment the member has for the topics it
Expand Down Expand Up @@ -217,9 +216,9 @@ public class MembershipManagerImpl implements MembershipManager {
* Topic IDs and partitions received in the last target assignment, together with its local epoch.
*
* This member variable is reassigned every time a new assignment is received.
* It is equal to LocalAssignmentImpl.NONE whenever we are not in a group.
* It is equal to LocalAssignment.NONE whenever we are not in a group.
*/
private LocalAssignmentImpl currentTargetAssignment;
private LocalAssignment currentTargetAssignment;

/**
* If there is a reconciliation running (triggering commit, callbacks) for the
Expand Down Expand Up @@ -333,8 +332,8 @@ public MembershipManagerImpl(String groupId,
this.commitRequestManager = commitRequestManager;
this.metadata = metadata;
this.assignedTopicNamesCache = new HashMap<>();
this.currentTargetAssignment = LocalAssignmentImpl.NONE;
this.currentAssignment = LocalAssignmentImpl.NONE;
this.currentTargetAssignment = LocalAssignment.NONE;
this.currentAssignment = LocalAssignment.NONE;
this.log = logContext.logger(MembershipManagerImpl.class);
this.stateUpdatesListeners = new ArrayList<>();
this.clientTelemetryReporter = clientTelemetryReporter;
Expand Down Expand Up @@ -611,8 +610,7 @@ private void clearSubscription() {
if (subscriptions.hasAutoAssignedPartitions()) {
subscriptions.assignFromSubscribed(Collections.emptySet());
}
currentTargetAssignment = LocalAssignmentImpl.NONE;
currentAssignment = LocalAssignmentImpl.NONE;
currentAssignment = LocalAssignment.NONE;
clearPendingAssignmentsAndLocalNamesCache();
}

Expand Down Expand Up @@ -755,7 +753,7 @@ public void transitionToSendingLeaveGroup(boolean dueToExpiredPollTimer) {
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH :
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
updateMemberEpoch(leaveEpoch);
currentAssignment = LocalAssignmentImpl.NONE;
currentAssignment = LocalAssignment.NONE;
transitionTo(MemberState.LEAVING);
}

Expand Down Expand Up @@ -909,9 +907,9 @@ void maybeReconcile() {
// Find the subset of the target assignment that can be resolved to topic names, and trigger a metadata update
// if some topic IDs are not resolvable.
SortedSet<TopicIdPartition> assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate();
final LocalAssignmentImpl resolvedAssignment = new LocalAssignmentImpl(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);
final LocalAssignment resolvedAssignment = new LocalAssignment(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);

if (currentAssignment != LocalAssignmentImpl.NONE &&
if (currentAssignment != LocalAssignment.NONE &&
resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 &&
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we bring this one on the previous line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

log.debug("Ignoring reconciliation attempt. The resolvable fragment of the target assignment {} " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to consider maybe simplifying the log and clarify the situation: isn't the message here simply that we're ignoring the reconciliation because resolved target is equals to the current assignment? I get the point about intermediate assignments, but an intermediate one would have updated the current assignment so it wouldn't be equals to the resolved target (or leave a reconciliation in progress so it wouldn't even make it to this check)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to talk about the local epoch here, since it's more of implementation detail how to detect intermediate assignments. But then I should log the local epoch I supposed. Updated it accordingly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac and I discussed various options to implement the short-cutting. In the end, the outcome was to only skip calling the callbacks but not skipping the entire reconciliation if the resolved assignment is equal to the current assignment but with a different epoch. We still need to update the current assignment with the resolved assignment and the new epoch in order to trigger the "ack", and transition to sending ack as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree in the need to update the assignment. I was exactly pushing for doing only only what's needed (vs doing all that the reconciliation does), so this sounds good to me. Thanks!

Expand Down Expand Up @@ -976,6 +974,10 @@ void maybeReconcile() {
}

revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions);
}).whenComplete((__, error) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we use exceptionally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - but it requires a return null :/

if (error != null) {
log.error("Reconciliation failed.", error);
lucasbru marked this conversation as resolved.
Show resolved Hide resolved
}
});
}

Expand All @@ -993,7 +995,7 @@ long getExpirationTimeForTimeout(final long timeoutMs) {
* then complete the reconciliation by updating the assignment and making the appropriate state
* transition. Note that if any of the 2 callbacks fails, the reconciliation should fail.
*/
private void revokeAndAssign(LocalAssignmentImpl resolvedAssignment,
private void revokeAndAssign(LocalAssignment resolvedAssignment,
SortedSet<TopicIdPartition> assignedTopicIdPartitions,
SortedSet<TopicPartition> revokedPartitions,
SortedSet<TopicPartition> addedPartitions) {
Expand Down Expand Up @@ -1053,7 +1055,7 @@ private void revokeAndAssign(LocalAssignmentImpl resolvedAssignment,

// Visible for testing.
void updateAssignment(Map<Uuid, SortedSet<Integer>> partitions) {
currentAssignment = new LocalAssignmentImpl(0, partitions);
currentAssignment = new LocalAssignment(0, partitions);
}

/**
Expand Down Expand Up @@ -1385,7 +1387,7 @@ private void logPausedPartitionsBeingRevoked(Set<TopicPartition> partitionsToRev
* or the next reconciliation loop). Remove all elements from the topic names cache.
*/
private void clearPendingAssignmentsAndLocalNamesCache() {
currentTargetAssignment = LocalAssignmentImpl.NONE;
currentTargetAssignment = LocalAssignment.NONE;
assignedTopicNamesCache.clear();
}

Expand Down Expand Up @@ -1452,10 +1454,10 @@ Set<Uuid> topicsAwaitingReconciliation() {
* Visible for testing.
*/
Map<Uuid, SortedSet<Integer>> topicPartitionsAwaitingReconciliation() {
if (currentTargetAssignment == LocalAssignmentImpl.NONE) {
if (currentTargetAssignment == LocalAssignment.NONE) {
return Collections.emptyMap();
}
if (currentAssignment == LocalAssignmentImpl.NONE) {
if (currentAssignment == LocalAssignment.NONE) {
return currentTargetAssignment.partitions;
}
final Map<Uuid, SortedSet<Integer>> topicPartitionMap = new HashMap<>();
Expand Down Expand Up @@ -1507,92 +1509,4 @@ List<MemberStateListener> stateListeners() {
return unmodifiableList(stateUpdatesListeners);
}

private final static class LocalAssignmentImpl implements LocalAssignment {

private static final long NONE_EPOCH = -1;

private static final LocalAssignmentImpl NONE = new LocalAssignmentImpl(NONE_EPOCH, Collections.emptyMap());

private final long localEpoch;

private final Map<Uuid, SortedSet<Integer>> partitions;

public LocalAssignmentImpl(long localEpoch, Map<Uuid, SortedSet<Integer>> partitions) {
this.localEpoch = localEpoch;
this.partitions = partitions;
if (localEpoch == NONE_EPOCH && !partitions.isEmpty()) {
throw new IllegalArgumentException("Local epoch must be set if there are partitions");
}
}

public LocalAssignmentImpl(long localEpoch, SortedSet<TopicIdPartition> topicIdPartitions) {
this.localEpoch = localEpoch;
this.partitions = new HashMap<>();
if (localEpoch == NONE_EPOCH && !topicIdPartitions.isEmpty()) {
throw new IllegalArgumentException("Local epoch must be set if there are partitions");
}
topicIdPartitions.forEach(topicIdPartition -> {
Uuid topicId = topicIdPartition.topicId();
partitions.computeIfAbsent(topicId, k -> new TreeSet<>()).add(topicIdPartition.partition());
});
}

@Override
public String toString() {
return "{" +
"localEpoch=" + localEpoch +
", partitions=" + partitions +
'}';
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final LocalAssignmentImpl that = (LocalAssignmentImpl) o;
return localEpoch == that.localEpoch && Objects.equals(partitions, that.partitions);
}

@Override
public int hashCode() {
return Objects.hash(localEpoch, partitions);
}

@Override
public Map<Uuid, SortedSet<Integer>> getPartitions() {
return partitions;
}

@Override
public boolean isNone() {
return localEpoch == NONE_EPOCH;
}

Optional<LocalAssignmentImpl> updateWith(ConsumerGroupHeartbeatResponseData.Assignment assignment) {

// Return if we have an assignment, and it is the same as current assignment; comparison without creating a new collection
if (localEpoch != NONE_EPOCH) {
// check if the new assignment is different from the current target assignment
if (partitions.size() == assignment.topicPartitions().size() &&
assignment.topicPartitions().stream().allMatch(
tp -> partitions.containsKey(tp.topicId()) &&
partitions.get(tp.topicId()).size() == tp.partitions().size() &&
partitions.get(tp.topicId()).containsAll(tp.partitions()))) {
return Optional.empty();
}
}

// Bump local epoch and replace assignment
long nextLocalEpoch = localEpoch + 1;
HashMap<Uuid, SortedSet<Integer>> partitions = new HashMap<>();
assignment.topicPartitions().forEach(topicPartitions ->
partitions.put(topicPartitions.topicId(), new TreeSet<>(topicPartitions.partitions())));
return Optional.of(new LocalAssignmentImpl(nextLocalEpoch, partitions));

}
}
}