Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16375: Fix for rejoin while reconciling #15579

Merged
merged 8 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,10 @@ public class MembershipManagerImpl implements MembershipManager {
private boolean reconciliationInProgress;

/**
* Epoch the member had when the reconciliation in progress started. This is used to identify if
* the member has rejoined while it was reconciling an assignment (in which case the result
* of the reconciliation is not applied.)
* True if a reconciliation is in progress and the member rejoins the group. Used to know
lianetm marked this conversation as resolved.
Show resolved Hide resolved
* that the reconciliation in progress should be interrupted and not be applied.
*/
private int memberEpochOnReconciliationStart;
private boolean rejoinedWhileReconciliationInProgress;

/**
* If the member is currently leaving the group after a call to {@link #leaveGroup()}}, this
Expand Down Expand Up @@ -641,6 +640,9 @@ public void transitionToJoining() {
"the member is in FATAL state");
return;
}
if (reconciliationInProgress) {
rejoinedWhileReconciliationInProgress = true;
}
resetEpoch();
transitionTo(MemberState.JOINING);
clearPendingAssignmentsAndLocalNamesCache();
Expand Down Expand Up @@ -972,7 +974,10 @@ void maybeReconcile() {
log.debug("Auto-commit before reconciling new assignment completed successfully.");
}

revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions);
if (!maybeAbortReconciliation()) {
revokeAndAssign(resolvedAssignment, assignedTopicIdPartitions, revokedPartitions, addedPartitions);
}

}).exceptionally(error -> {
if (error != null) {
log.error("Reconciliation failed.", error);
Expand Down Expand Up @@ -1010,49 +1015,55 @@ private void revokeAndAssign(LocalAssignment resolvedAssignment,
// and assignment, executed sequentially).
CompletableFuture<Void> reconciliationResult =
revocationResult.thenCompose(__ -> {
boolean memberHasRejoined = memberEpochOnReconciliationStart != memberEpoch;
if (state == MemberState.RECONCILING && !memberHasRejoined) {
if (!maybeAbortReconciliation()) {
// Apply assignment
return assignPartitions(assignedTopicIdPartitions, addedPartitions);
} else {
log.debug("Revocation callback completed but the member already " +
"transitioned out of the reconciling state for epoch {} into " +
"{} state with epoch {}. Interrupting reconciliation as it's " +
"not relevant anymore,", memberEpochOnReconciliationStart, state, memberEpoch);
String reason = interruptedReconciliationErrorMessage();
CompletableFuture<Void> res = new CompletableFuture<>();
res.completeExceptionally(new KafkaException("Interrupting reconciliation" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to actually fail the future here? We will repeat the exception with Reconciliation failed at ERROR level 5 lines below. It would be good to separate the "error" path from the "aborted, but that's fine" path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhm good point, I did consider it but wasn't fully convinced at that time (mostly taking into account that this abort path is not only in the case of rejoin while reconciling, but also in the case of fatal failures while reconciling, for instance). But I do like you point of view about the 2 paths, seeing from the POV that even in the failure scenario, the logging/error handling should be done on that fatal transition path, and the reconciliation itself could only be responsible for aborting quietly. Changed, take a look and let me know your thoughts (note that the tradeoff is that it requires another check before sending the ack, to make sure that the reconciliation hasn't been already aborted)

" after revocation. " + reason));
" after revocation. " + interruptedReconciliationReason()));
return res;
}
});

reconciliationResult.whenComplete((result, error) -> {
markReconciliationCompleted();
if (error != null) {
// Leaving member in RECONCILING state after callbacks fail. The member
// won't send the ack, and the expectation is that the broker will kick the
// member out of the group after the rebalance timeout expires, leading to a
// RECONCILING -> FENCED transition.
log.error("Reconciliation failed.", error);
} else {
if (state == MemberState.RECONCILING) {
if (!maybeAbortReconciliation()) {
currentAssignment = resolvedAssignment;

// Reschedule the auto commit starting from now that the member has a new assignment.
commitRequestManager.resetAutoCommitTimer();

// Make assignment effective on the broker by transitioning to send acknowledge.
transitionTo(MemberState.ACKNOWLEDGING);
} else {
String reason = interruptedReconciliationErrorMessage();
log.error("Interrupting reconciliation after partitions assigned callback " +
"completed. " + reason);
}
}
markReconciliationCompleted();
});
}

/**
* @return True if the reconciliation in progress should not continue. This could be because
* the member is not in RECONCILING state anymore (member failed or is leaving the group), or
* if it has rejoined the group (note that after rejoining the member could be RECONCILING
* again, so checking the state is not enough)
*/
boolean maybeAbortReconciliation() {
boolean shouldAbort = state != MemberState.RECONCILING || rejoinedWhileReconciliationInProgress;
if (shouldAbort) {
String reason = interruptedReconciliationReason();
log.error("Interrupting reconciliation because " + reason);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd say that is at most a warn log, possibly info, right? Because entering this code path is completely fine and expected, and should be handled correctly by the consumer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, info lgtm. Done. Just for the record, this could be the case of a fatal failure in HB while reconciling, but still, all the error handling/logging is the responsibility of the fatal error path, so seems sensible that from the reconciliation POV we end up with just the info saying that it was interrupted because the member transitioned out of the reconciling state into fatal state (which is what we would get in that case)

markReconciliationCompleted();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the code path where reconciliation is aborted after assignment is completed, markReconciliationCompleted will be called twice. Can we avoid it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to mark the stale reconciliation or the new reconciliation (or both) as completed?

Copy link
Contributor Author

@lianetm lianetm Mar 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stale one. We only have a single reconciliationInProgress at a time, and that's the one that is aborted, so it will always be the stale one. To complete the story, we keep the new target to reconcile and on the next poll the new reconciliation will be triggered. All tests related to this case end up with the assertInitialReconciliationDiscardedAfterRejoin that validates that the new assignment received after rejoining is kept as target even after the stale reconciliation is discarded. I just extended it to show how the next poll triggers the new reconciliation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only have a single reconciliationInProgress at a time

Isn't it possible to have a reconciliation with callbacks (going to the user space) which could return to the background thread when a new reconciliation triggered after the rejoin is ongoing? Do we have a unit test for this case too?

Copy link
Contributor Author

@lianetm lianetm Mar 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We couldn't have that case of "new reconciliation triggered while another one executing callbacks" I expect. If there is a reconciliation that goes into the user space to run callbacks, the background still has the flag reconciliationInProgress true, so even in the case that the member gets a new assignment from the broker, all it does is to update the currentTargetAssignment. On every poll, the attempt to trigger a new reconciliation for that new target received will be ignored (here) until the ongoing reconciliation completes. Makes sense?

This test shows this path (using commit though), but basically showing one reconciliation triggered at a time, keeping target updated as it's received/discovered, and next reconciliation triggered when the initial one completes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks for the explanation.

}
return shouldAbort;
}

// Visible for testing.
void updateAssignment(Map<Uuid, SortedSet<Integer>> partitions) {
currentAssignment = new LocalAssignment(0, partitions);
Expand All @@ -1070,29 +1081,27 @@ private SortedSet<TopicPartition> toTopicPartitionSet(SortedSet<TopicIdPartition
/**
* @return Reason for interrupting a reconciliation progress when callbacks complete.
*/
private String interruptedReconciliationErrorMessage() {
String reason;
if (state != MemberState.RECONCILING) {
reason = "The member already transitioned out of the reconciling state into " + state;
} else {
reason = "The member has re-joined the group.";
private String interruptedReconciliationReason() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if we get rid of the exception (see comment above), we may want to just inline this function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (rejoinedWhileReconciliationInProgress) {
return "the member has re-joined the group";
}
return reason;
return "the member already transitioned out of the reconciling state into " + state;
}

/**
* Visible for testing.
*/
void markReconciliationInProgress() {
reconciliationInProgress = true;
memberEpochOnReconciliationStart = memberEpoch;
rejoinedWhileReconciliationInProgress = false;
}

/**
* Visible for testing.
*/
void markReconciliationCompleted() {
reconciliationInProgress = false;
rejoinedWhileReconciliationInProgress = false;
}

/**
Expand Down
Loading