Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16312, KAFKA-16185: Local epochs in reconciliation #15511

Merged
merged 11 commits into from
Mar 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -911,9 +911,13 @@ void maybeReconcile() {
SortedSet<TopicIdPartition> assignedTopicIdPartitions = findResolvableAssignmentAndTriggerMetadataUpdate();
final LocalAssignmentImpl resolvedAssignment = new LocalAssignmentImpl(currentTargetAssignment.localEpoch, assignedTopicIdPartitions);

if (resolvedAssignment.equals(currentAssignment)) {
log.debug("Ignoring reconciliation attempt. Target assignment ready to reconcile {} " +
"is equal to the member current assignment.", resolvedAssignment);
if (currentAssignment != LocalAssignmentImpl.NONE &&
resolvedAssignment.localEpoch <= currentAssignment.localEpoch + 1 &&
resolvedAssignment.partitions.equals(currentAssignment.partitions)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we bring this one on the previous line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

log.debug("Ignoring reconciliation attempt. The resolvable fragment of the target assignment {} " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to consider maybe simplifying the log and clarify the situation: isn't the message here simply that we're ignoring the reconciliation because resolved target is equals to the current assignment? I get the point about intermediate assignments, but an intermediate one would have updated the current assignment so it wouldn't be equals to the resolved target (or leave a reconciliation in progress so it wouldn't even make it to this check)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to talk about the local epoch here, since it's more of implementation detail how to detect intermediate assignments. But then I should log the local epoch I supposed. Updated it accordingly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dajac and I discussed various options to implement the short-cutting. In the end, the outcome was to only skip calling the callbacks but not skipping the entire reconciliation if the resolved assignment is equal to the current assignment but with a different epoch. We still need to update the current assignment with the resolved assignment and the new epoch in order to trigger the "ack", and transition to sending ack as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree in the need to update the assignment. I was exactly pushing for doing only only what's needed (vs doing all that the reconciliation does), so this sounds good to me. Thanks!

"is equal to the current assignment, and no intermediate assignments were received.", resolvedAssignment);
// May bump the local epoch
currentAssignment = resolvedAssignment;
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -681,10 +682,7 @@ public void testDelayedMetadataUsedToCompleteAssignment() {
receiveAssignment(newAssignment, membershipManager);
membershipManager.poll(time.milliseconds());

// We bumped the local epoch, so new reconciliation is triggered
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
membershipManager.onHeartbeatRequestSent();

verifyReconciliationNotTriggered(membershipManager);
assertEquals(MemberState.RECONCILING, membershipManager.state());
assertEquals(Collections.singleton(topicId2), membershipManager.topicsAwaitingReconciliation());
verify(metadata).requestUpdate(anyBoolean());
Expand Down