Skip to content

KAFKA-18736: Handle errors in the Streams group heartbeat request manager #19230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

cadonna
Copy link
Member

@cadonna cadonna commented Mar 18, 2025

This commit adds error handling to the Streams heartbeat request manager.

Errors can occur while sending a heartbeat request and when a response with an error code that is not NONE is received.

Some errors are handled explicitly to recover from them or to log specific messages. All the others are handled as fatal errors.

Reviewers: Lucas Brutschy lbrutschy@confluent.io

…ager

This commit adds error handling to the Streams heartbeat request manager.

Errors can occur while sending a heartbeat request and when a response
with an error code that is not NONE is received.

Some errors are handled explicitly to recover from them or to log
specific messages. All the others are handled as fatal errors.
@cadonna cadonna changed the title KAFKA-18736: Handle errors in the Streams group heartbeat request man… KAFKA-18736: Handle errors in the Streams group heartbeat request manager Mar 18, 2025
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR addresses KAFKA-18736 by enhancing error handling in the Streams heartbeat request manager so that errors during heartbeat requests are handled explicitly and appropriately. Key changes include:

  • Adding new error handling logic in the StreamsGroupHeartbeatRequestManager for various error conditions.
  • Refactoring error and failure handling methods (onErrorResponse and onFailure) to differentiate retriable vs fatal error cases.
  • Enhancing and expanding test cases to validate the new behavior across several error scenarios.

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.

File Description
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java Introduces new error handling logic with dedicated error cases and logging for unsupported versions, coordinator issues, and authorization failures.
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java Adds comprehensive tests covering various error and failure conditions during heartbeat request processing.
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java Refactors and consolidates tests for heartbeat failure handling into a parameterized test.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java Updates the heartbeat failure methods, splitting retriable and fatal error handling for clarity.
Comments suppressed due to low confidence (2)

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:558

  • Consider adding additional context (e.g., request id or coordinator info) to the error log in the default case so that unexpected errors are easier to diagnose in production.
logger.error("StreamsGroupHeartbeatRequest failed due to unexpected error {}: {}", error, errorMessage);

clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java:1481

  • [nitpick] Consider renaming the boolean parameter (e.g., to 'isRetriable') in the parameterized test to improve clarity on its intended meaning.
@ParameterizedTest
    @ValueSource(booleans = {true, false})

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! I left some comments @cadonna

@@ -59,6 +62,10 @@
*/
public class StreamsGroupHeartbeatRequestManager implements RequestManager {

private static final String UNSUPPORTED_VERSION_ERROR_MESSAGE = "The cluster does not support the new STREAMS group protocol. Set " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it also be that the broker supports the streams protocol, but it's not enabled? Could it also be that the broker is so new that it doesn't support the RPC version anymore?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if in a remote future the earliest versions of the Streams heartbeat RPC are removed, the broker would also respond with an UNSUPPORTED_VERSION error (see KIP-896).
I wondering what we should put into the error message. Referring also to the possibility that the broker does not support the version of the RPC anymore, seems confusing to me. What about a simple:
"The cluster does not support the STREAMS group protocol or does not support the versions of the STREAMS group protocol used by this client (supported versions: <lowest_supported_version> to <highest_supported_version>)".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I meant. We shouldn't imply that the RPCs aren't supported 'yet'.

@@ -451,6 +462,143 @@ private void onSuccessResponse(final StreamsGroupHeartbeatResponse response, fin
membershipManager.onHeartbeatSuccess(response);
}

private void onErrorResponse(final StreamsGroupHeartbeatResponse response, final long currentTimeMs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem to be only the errors that can happen in the consumer protocol. What about

 * - {@link Errors#STREAMS_INVALID_TOPOLOGY}
 * - {@link Errors#STREAMS_INVALID_TOPOLOGY_EPOCH}
 * - {@link Errors#STREAMS_TOPOLOGY_FENCED}

?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. I missed the new errors. As far as I can tell, all of those are fatal errors. So I added them to the case INVALID_REQUEST and GROUP_MAX_SIZE_REACHED.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you want to add them to provideOtherErrors in the tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh man! You are absolutely right!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also added verifications for the error log messages to distinguish the different error handling cases. In such a way, it will not happen that someone forgets to add errors to provideOtherErrors.

@cadonna
Copy link
Member Author

cadonna commented Mar 24, 2025

@lucasbru Could you please re-review this?

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@cadonna cadonna merged commit 266532f into apache:trunk Mar 24, 2025
23 checks passed
@cadonna cadonna deleted the streams_group_heartbeat_request_manager-5 branch March 24, 2025 20:29
ShivsundarR pushed a commit to ShivsundarR/kafka that referenced this pull request Mar 26, 2025
…ager (apache#19230)

This commit adds error handling to the Streams heartbeat request
manager.

Errors can occur while sending a heartbeat request and when a response
with an error code that is not NONE is received.

Some errors are handled explicitly to recover from them or to log
specific messages. All the others are handled as fatal errors.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
janchilling pushed a commit to janchilling/kafka that referenced this pull request Apr 4, 2025
…ager (apache#19230)

This commit adds error handling to the Streams heartbeat request
manager.

Errors can occur while sending a heartbeat request and when a response
with an error code that is not NONE is received.

Some errors are handled explicitly to recover from them or to log
specific messages. All the others are handled as fatal errors.

Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants