Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16933: New consumer unsubscribe close commit fixes #16272

Merged
merged 16 commits into from
Jun 14, 2024

Conversation

lianetm
Copy link
Collaborator

@lianetm lianetm commented Jun 10, 2024

Fixes for the leave group flow (unsubscribe/close):

  • fix to send Heartbeat to leave group on close even if the callbacks fail
  • fix to ensure that if a member gets fenced while blocked on callbacks (ex. on unsubscribe), it will clear its epoch to not include it in commit requests
  • fix to avoid race on the subscription state object on unsubscribe, updating it only on the background thread when the callbacks to leave complete (success or failure).

Also improving logging in this area.

@lianetm lianetm changed the title [WIP]: New consumer unsubscribe close fixes [WIP]: New consumer unsubscribe close commit fixes Jun 10, 2024
@lianetm lianetm changed the title [WIP]: New consumer unsubscribe close commit fixes KAFKA-16933: New consumer unsubscribe close commit fixes Jun 11, 2024
@lianetm lianetm marked this pull request as ready for review June 11, 2024 18:29
@lucasbru lucasbru self-requested a review June 13, 2024 08:29
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @lianetm. I left a few comments

applicationEventHandler.addAndGet(new LeaveOnCloseEvent(calculateDeadlineMs(timer)));
},
"Failed to send leaveGroup heartbeat with a timeout(ms)=" + timer.timeoutMs(), firstException);
completeQuietly(() -> maybeRevokePartitions(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the point here seems to be, if maybeRevokePartitions fails, likely due to rebalancelistener, still leave the group.

It looks like a good change to me. I'm just surprised to see that the legacy consumer does not seem to do this? If onPrepareLeave in AbstractCoordinator fails, we won't reach maybeLeaveGroup.

So is this a bug also in the legacy consumer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You got it right, in this case the legacy consumer won't send a leave group either. We discussed it with @dajac and it seemed right to ensure the new consumer leaves on close (even if the callbacks fail). No changes on the legacy for now though (would require more thought).

@@ -1324,6 +1322,7 @@ void completeQuietly(final Utils.ThrowingRunnable function,
} catch (TimeoutException e) {
log.debug("Timeout expired before the {} operation could complete.", msg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update firstException here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally, good catch.

consumer.close(Duration.ZERO);
} catch (Exception e) {
// best effort to clean up after each test, but may throw (ex. if callbacks where
// throwing errors)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this before but was asked to not do it, see https://github.com/apache/kafka/pull/15613/files/9fb917e4b1e60f238183c92d1ad3bc2565a7e1ea#r1559907295. That's why I added a "clean-up close" in the tests where close fails, with an expected exception (search for "clean-up" in this file).

I'd also be fine with your (and my original approach) to have a best-effort clean up and ignore exceptions here. But then, let's remove the "clean-up close" code in the other tests. Any consistent approach is fine with me here.

Copy link
Collaborator Author

@lianetm lianetm Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, good point. Agree with going with a consistent approach here, but could I bring it in a follow-up PR right after this? (it wouldn't need to get into 3.8). The trick is that now the close may throw for any of the tests that throw on the callback, so I would prefer to play safe, make sure the close does not throw here, and I'll review all the tests that we expect could rightfully throw on close now (Also the other clean-up logic has some Timeout related checks so better not removing blindly, could have some value it seems)

// Clear the subscription, no matter if the callback execution failed or succeeded.
subscriptions.unsubscribe();
clearSubscription();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the name clearSubscription misleading? It seems like it clears the assignment, not the subscription.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, it's clearing assignments, renamed.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, didn't mean to approve yet

@lianetm
Copy link
Collaborator Author

lianetm commented Jun 13, 2024

Hey @lucasbru, all comments addressed. Thanks for the review!

@lianetm lianetm requested a review from lucasbru June 13, 2024 12:58
@lianetm
Copy link
Collaborator Author

lianetm commented Jun 13, 2024

Hey @lucasbru , note I included this small commit 56ac969 we realized we needed to avoid the race on unsubscribe/close. I will follow up on another PR with the fix to avoid getting to a revocation of partitions that are not assigned on close, but still having the sanity check at the low level seems a safe bet while the root cause is fixed. cc. @dajac

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I'll wait for the tests to pass

@lianetm
Copy link
Collaborator Author

lianetm commented Jun 14, 2024

Build completed with 5 unrelates failures:

Build / JDK 21 and Scala 2.13 / testFenceMultipleBrokers() – org.apache.kafka.controller.QuorumControllerTest
Build / JDK 21 and Scala 2.13 / shouldQuerySpecificStalePartitionStores() – org.apache.kafka.streams.integration.StoreQueryIntegrationTestBuild / JDK 8 and Scala 2.12 / shouldRestoreState() – org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest
Build / JDK 8 and Scala 2.12 / shouldRestoreState() – org.apache.kafka.streams.integration.PurgeRepartitionTopicIntegrationTest
Build / JDK 8 and Scala 2.12 / testDescribeQuorumStatusSuccessful [6] Type=Raft-Combined, MetadataVersion=4.0-IV0,Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest

Copy link
Contributor

@philipnee philipnee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @lianetm thanks for the pr and fixing the logs and issues. I guess we will file a separated issue to address the interrupted exception weve see during testing.

@lucasbru lucasbru merged commit 46714db into apache:trunk Jun 14, 2024
1 check failed
lucasbru pushed a commit that referenced this pull request Jun 14, 2024
Fixes for the leave group flow (unsubscribe/close):

Fix to send Heartbeat to leave group on close even if the callbacks fail
fix to ensure that if a member gets fenced while blocked on callbacks (ex. on unsubscribe), it will clear its epoch to not include it in commit requests
fix to avoid race on the subscription state object on unsubscribe, updating it only on the background thread when the callbacks to leave complete (success or failure).
Also improving logging in this area.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Philip Nee <pnee@confluent.io>
@lucasbru
Copy link
Member

Test failures are unrelated

Merged to trunk and cherrypicked to 3.8 cc @jlprat

@lianetm
Copy link
Collaborator Author

lianetm commented Jun 14, 2024

Hey @philipnee, agreed there is still an issue with the Interrupt discovered with the stress tests, it's just that as discussed offline that will come in a separate PR (just to not keep stretching this one). Just for the record, there is also another issue similar to the one fixed on this PR for the subscription state https://issues.apache.org/jira/browse/KAFKA-16954. I plan to open a PR today with both fixes for this related blocker. cc. @lucasbru , @jlprat, @dajac . Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants