-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-18736: Handle errors in the Streams group heartbeat request manager #19230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18736: Handle errors in the Streams group heartbeat request manager #19230
Conversation
…ager This commit adds error handling to the Streams heartbeat request manager. Errors can occur while sending a heartbeat request and when a response with an error code that is not NONE is received. Some errors are handled explicitly to recover from them or to log specific messages. All the others are handled as fatal errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR addresses KAFKA-18736 by enhancing error handling in the Streams heartbeat request manager so that errors during heartbeat requests are handled explicitly and appropriately. Key changes include:
- Adding new error handling logic in the StreamsGroupHeartbeatRequestManager for various error conditions.
- Refactoring error and failure handling methods (onErrorResponse and onFailure) to differentiate retriable vs fatal error cases.
- Enhancing and expanding test cases to validate the new behavior across several error scenarios.
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated no comments.
File | Description |
---|---|
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java | Introduces new error handling logic with dedicated error cases and logging for unsupported versions, coordinator issues, and authorization failures. |
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java | Adds comprehensive tests covering various error and failure conditions during heartbeat request processing. |
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java | Refactors and consolidates tests for heartbeat failure handling into a parameterized test. |
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java | Updates the heartbeat failure methods, splitting retriable and fatal error handling for clarity. |
Comments suppressed due to low confidence (2)
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:558
- Consider adding additional context (e.g., request id or coordinator info) to the error log in the default case so that unexpected errors are easier to diagnose in production.
logger.error("StreamsGroupHeartbeatRequest failed due to unexpected error {}: {}", error, errorMessage);
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java:1481
- [nitpick] Consider renaming the boolean parameter (e.g., to 'isRetriable') in the parameterized test to improve clarity on its intended meaning.
@ParameterizedTest
@ValueSource(booleans = {true, false})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR! I left some comments @cadonna
@@ -59,6 +62,10 @@ | |||
*/ | |||
public class StreamsGroupHeartbeatRequestManager implements RequestManager { | |||
|
|||
private static final String UNSUPPORTED_VERSION_ERROR_MESSAGE = "The cluster does not support the new STREAMS group protocol. Set " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could it also be that the broker supports the streams protocol, but it's not enabled? Could it also be that the broker is so new that it doesn't support the RPC version anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if in a remote future the earliest versions of the Streams heartbeat RPC are removed, the broker would also respond with an UNSUPPORTED_VERSION
error (see KIP-896).
I wondering what we should put into the error message. Referring also to the possibility that the broker does not support the version of the RPC anymore, seems confusing to me. What about a simple:
"The cluster does not support the STREAMS group protocol or does not support the versions of the STREAMS group protocol used by this client (supported versions: <lowest_supported_version> to <highest_supported_version>)".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's what I meant. We shouldn't imply that the RPCs aren't supported 'yet'.
@@ -451,6 +462,143 @@ private void onSuccessResponse(final StreamsGroupHeartbeatResponse response, fin | |||
membershipManager.onHeartbeatSuccess(response); | |||
} | |||
|
|||
private void onErrorResponse(final StreamsGroupHeartbeatResponse response, final long currentTimeMs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These seem to be only the errors that can happen in the consumer protocol. What about
* - {@link Errors#STREAMS_INVALID_TOPOLOGY}
* - {@link Errors#STREAMS_INVALID_TOPOLOGY_EPOCH}
* - {@link Errors#STREAMS_TOPOLOGY_FENCED}
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right. I missed the new errors. As far as I can tell, all of those are fatal errors. So I added them to the case INVALID_REQUEST
and GROUP_MAX_SIZE_REACHED
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you want to add them to provideOtherErrors
in the tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh man! You are absolutely right!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also added verifications for the error log messages to distinguish the different error handling cases. In such a way, it will not happen that someone forgets to add errors to provideOtherErrors
.
@lucasbru Could you please re-review this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
…ager (apache#19230) This commit adds error handling to the Streams heartbeat request manager. Errors can occur while sending a heartbeat request and when a response with an error code that is not NONE is received. Some errors are handled explicitly to recover from them or to log specific messages. All the others are handled as fatal errors. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
…ager (apache#19230) This commit adds error handling to the Streams heartbeat request manager. Errors can occur while sending a heartbeat request and when a response with an error code that is not NONE is received. Some errors are handled explicitly to recover from them or to log specific messages. All the others are handled as fatal errors. Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
This commit adds error handling to the Streams heartbeat request manager.
Errors can occur while sending a heartbeat request and when a response with an error code that is not NONE is received.
Some errors are handled explicitly to recover from them or to log specific messages. All the others are handled as fatal errors.
Reviewers: Lucas Brutschy lbrutschy@confluent.io