-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19521: Fix consumption and leaving group when source topic is deleted #20735
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes a timeout issue in integration tests where missing source topics were causing 5+ minute delays. The fix changes the timeout calculation from using max.poll.interval.ms (300s default) to 2 * heartbeatIntervalMs, which better reflects the actual polling behavior and ensures faster exception reporting.
Key Changes:
- Modified timeout calculation to use heartbeat interval instead of max poll interval
- Added heartbeat interval tracking in
StreamsRebalanceData - Extended integration tests to cover both "classic" and "streams" group protocols
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| StreamThread.java | Updated timeout logic to use 2 * heartbeatIntervalMs when available, falling back to maxPollTimeMs |
| StreamsRebalanceData.java | Added heartbeatIntervalMs field with getter/setter methods for tracking heartbeat interval |
| StreamsGroupHeartbeatRequestManager.java | Updated to set heartbeat interval in StreamsRebalanceData on successful heartbeat response |
| HandlingSourceTopicDeletionIntegrationTest.java | Converted test to parameterized test supporting both "classic" and "streams" protocols |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| /** Updated whenever a heartbeat response is received from the broker. */ | ||
| public void setHeartbeatIntervalMs(final int heartbeatIntervalMs) { | ||
| this.heartbeatIntervalMs.set(heartbeatIntervalMs); | ||
| } | ||
|
|
||
| /** Returns the heartbeat interval in milliseconds, or -1 if not yet set. */ | ||
| public int getHeartbeatIntervalMs() { | ||
| return heartbeatIntervalMs.get(); | ||
| } |
Copilot
AI
Oct 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The getter method's documentation states it returns -1 if not yet set, but the field is initialized to -1 via AtomicInteger(-1) on line 333. Consider documenting this initialization in the field-level comment or the setter to clarify when/how the value transitions from -1 to a valid heartbeat interval.
| if (streamsRebalanceData.isPresent()) { | ||
| final int heartbeatIntervalMs = streamsRebalanceData.get().getHeartbeatIntervalMs(); | ||
| if (heartbeatIntervalMs > 0) { | ||
| // Use 2 * heartbeatIntervalMs to ensure at least one more heartbeat is sent |
Copilot
AI
Oct 20, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment says 'at least one more heartbeat' but the calculation ensures at least two heartbeat intervals (2 * heartbeatIntervalMs). Consider updating the comment to clarify this ensures time for approximately two heartbeats, or explain why this duration is sufficient.
| // Use 2 * heartbeatIntervalMs to ensure at least one more heartbeat is sent | |
| // Use 2 * heartbeatIntervalMs to allow time for approximately two heartbeats to be sent |
lucasbru
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to update
testStreamsProtocolRunOnceWithProcessingThreadsMissingSourceTopic
testStreamsProtocolMissingSourceTopicRecovery
Right? They still mention max.poll.interval
Also StreamsRebalanceDataTest probably needs an update and so does StreamsGroupHeartbeatRequestManagerTest
Can you please run the integration test 100 times in a row to ensure it's not flaky anymore?
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 3 comments.
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
Outdated
Show resolved
Hide resolved
| final int heartbeatIntervalMs = streamsRebalanceData.get().getHeartbeatIntervalMs(); | ||
| final long timeoutMs = 2 * heartbeatIntervalMs; | ||
|
|
||
| // Start timeout tracking on first encounter with missing topics | ||
| if (topicsReadyTimer == null) { | ||
| topicsReadyTimer = time.timer(maxPollTimeMs); | ||
| topicsReadyTimer = time.timer(timeoutMs); |
Copilot
AI
Oct 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing validation for heartbeatIntervalMs being -1 (its default value when not yet set by a heartbeat response). If no heartbeat has been received, this will result in timeoutMs = -2, causing incorrect timer behavior. Add a check to fall back to maxPollTimeMs when heartbeatIntervalMs == -1.
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| private void handleMissingSourceTopicsWithTimeout(final String missingTopicsDetail) { | ||
| // Determine the timeout: use 2 * heartbeatIntervalMs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment just says, what the code does (-> timeoutMs = 2 * heartbeatIntervalMs) -- it does not add any knowledge. -- Either remove it, or say why we need to use 2* heartbeatIntervalMs as timeout
…nals/StreamThread.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…als/StreamsRebalanceDataTest.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
lucasbru
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
…eleted (apache#20735) Integration tests expecting `MissingSourceTopicException` were timing out and taking 5+ minutes because [PR apache#20284](apache#20284) set the missing topics timeout to `max.poll.interval.ms` (default 300 seconds). This is inappropriate because: - When source topics don't exist, actual poll frequency is `poll.ms` (100ms), not `max.poll.interval.ms` - The exception should be raised much faster based on heartbeat frequency ## Solution Use `2 * heartbeatIntervalMs` (from broker via KIP-1071) as the timeout instead: - Ensures at least one heartbeat is sent before raising the exception - Falls back to `max.poll.interval.ms` for backward compatibility - Fixes slow integration tests ## Changes 1. Track `heartbeatIntervalMs` in `StreamsRebalanceData` and update it on each heartbeat response 2. Use `2 * heartbeatIntervalMs` as timeout in `StreamThread.handleMissingSourceTopicsWithTimeout()` 3. Update `HandlingSourceTopicDeletionIntegrationTest` to test both "classic" and "streams" protocols 4. Update `StreamsRebalanceDataTest`, `StreamsGroupHeartbeatRequestManagerTest` and `StreamThreadTest` to test `StreamsRebalanceData` and its behaviour Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…eleted (apache#20735) Integration tests expecting `MissingSourceTopicException` were timing out and taking 5+ minutes because [PR apache#20284](apache#20284) set the missing topics timeout to `max.poll.interval.ms` (default 300 seconds). This is inappropriate because: - When source topics don't exist, actual poll frequency is `poll.ms` (100ms), not `max.poll.interval.ms` - The exception should be raised much faster based on heartbeat frequency ## Solution Use `2 * heartbeatIntervalMs` (from broker via KIP-1071) as the timeout instead: - Ensures at least one heartbeat is sent before raising the exception - Falls back to `max.poll.interval.ms` for backward compatibility - Fixes slow integration tests ## Changes 1. Track `heartbeatIntervalMs` in `StreamsRebalanceData` and update it on each heartbeat response 2. Use `2 * heartbeatIntervalMs` as timeout in `StreamThread.handleMissingSourceTopicsWithTimeout()` 3. Update `HandlingSourceTopicDeletionIntegrationTest` to test both "classic" and "streams" protocols 4. Update `StreamsRebalanceDataTest`, `StreamsGroupHeartbeatRequestManagerTest` and `StreamThreadTest` to test `StreamsRebalanceData` and its behaviour Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax <matthias@confluent.io> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Integration tests expecting
MissingSourceTopicExceptionwere timingout and taking 5+ minutes because PR
#20284 set the missing
topics timeout to
max.poll.interval.ms(default 300 seconds).This is inappropriate because:
poll.ms(100ms), not
max.poll.interval.msfrequency
Solution
Use
2 * heartbeatIntervalMs(from broker via KIP-1071) as the timeoutinstead:
max.poll.interval.msfor backward compatibilityChanges
heartbeatIntervalMsinStreamsRebalanceDataand update iton each heartbeat response
2 * heartbeatIntervalMsas timeout inStreamThread.handleMissingSourceTopicsWithTimeout()HandlingSourceTopicDeletionIntegrationTestto test both"classic" and "streams" protocols
StreamsRebalanceDataTest,StreamsGroupHeartbeatRequestManagerTestandStreamThreadTestto testStreamsRebalanceDataand its behaviourReviewers: Lucas Brutschy lbrutschy@confluent.io, Matthias J. Sax
matthias@confluent.io