From 9d02faee9fc0debc50c7fe2786c64605aada8492 Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Mon, 20 Oct 2025 15:20:42 -0400 Subject: [PATCH 1/7] resolve --- .../StreamsGroupHeartbeatRequestManager.java | 1 + .../consumer/internals/StreamsRebalanceData.java | 13 +++++++++++++ ...HandlingSourceTopicDeletionIntegrationTest.java | 9 ++++++--- .../streams/processor/internals/StreamThread.java | 14 ++++++++++++-- 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java index ceeeb6c191607..cb9a38d0ddc1e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java @@ -527,6 +527,7 @@ private void onSuccessResponse(final StreamsGroupHeartbeatResponse response, fin heartbeatRequestState.updateHeartbeatIntervalMs(data.heartbeatIntervalMs()); heartbeatRequestState.onSuccessfulAttempt(currentTimeMs); heartbeatState.setEndpointInformationEpoch(data.endpointInformationEpoch()); + streamsRebalanceData.setHeartbeatIntervalMs(data.heartbeatIntervalMs()); if (data.partitionsByUserEndpoint() != null) { streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data)); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java index 2fe7ae8ad35d2..dcca83a824e3f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; /** @@ -329,6 +330,8 @@ public String toString() { private final AtomicReference> statuses = new AtomicReference<>(List.of()); + private final AtomicInteger heartbeatIntervalMs = new AtomicInteger(-1); + public StreamsRebalanceData(final UUID processId, final Optional endpoint, final Map subtopologies, @@ -395,4 +398,14 @@ public List statuses() { return statuses.get(); } + /** Updated whenever a heartbeat response is received from the broker. */ + public void setHeartbeatIntervalMs(final int heartbeatIntervalMs) { + this.heartbeatIntervalMs.set(heartbeatIntervalMs); + } + + /** Returns the heartbeat interval in milliseconds, or -1 if not yet set. */ + public int getHeartbeatIntervalMs() { + return heartbeatIntervalMs.get(); + } + } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java index d8f9061dfdb11..f31e79c53992f 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java @@ -33,8 +33,9 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.Properties; @@ -75,8 +76,9 @@ public void after() throws InterruptedException { CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC); } - @Test - public void shouldThrowErrorAfterSourceTopicDeleted(final TestInfo testName) throws InterruptedException { + @ParameterizedTest + @ValueSource(strings = {"classic", "streams"}) + public void shouldThrowErrorAfterSourceTopicDeleted(final String groupProtocol, final TestInfo testName) throws InterruptedException { final StreamsBuilder builder = new StreamsBuilder(); builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.String())) .to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), Serdes.String())); @@ -91,6 +93,7 @@ public void shouldThrowErrorAfterSourceTopicDeleted(final TestInfo testName) thr streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS); streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000); + streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); final Topology topology = builder.build(); final AtomicBoolean calledUncaughtExceptionHandler1 = new AtomicBoolean(false); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 3521b31d8a3f0..6db81dc1c2ed3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1587,11 +1587,21 @@ public void handleStreamsRebalanceData() { } private void handleMissingSourceTopicsWithTimeout(final String missingTopicsDetail) { + // Determine the timeout: use 2 * heartbeatIntervalMs if available, otherwise fall back to maxPollTimeMs + long timeoutMs = maxPollTimeMs; + if (streamsRebalanceData.isPresent()) { + final int heartbeatIntervalMs = streamsRebalanceData.get().getHeartbeatIntervalMs(); + if (heartbeatIntervalMs > 0) { + // Use 2 * heartbeatIntervalMs to ensure at least one more heartbeat is sent + timeoutMs = 2L * heartbeatIntervalMs; + } + } + // Start timeout tracking on first encounter with missing topics if (topicsReadyTimer == null) { - topicsReadyTimer = time.timer(maxPollTimeMs); + topicsReadyTimer = time.timer(timeoutMs); log.info("Missing source topics detected: {}. Will wait up to {}ms before failing.", - missingTopicsDetail, maxPollTimeMs); + missingTopicsDetail, timeoutMs); } else { topicsReadyTimer.update(); } From 5dad4576fc67d7ed74465d93a75eda8c25dc5e93 Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Mon, 20 Oct 2025 20:12:55 -0400 Subject: [PATCH 2/7] update --- ...reamsGroupHeartbeatRequestManagerTest.java | 29 ++++++++++++++ .../internals/StreamsRebalanceDataTest.java | 40 +++++++++++++++++++ .../processor/internals/StreamThread.java | 12 ++---- .../processor/internals/StreamThreadTest.java | 22 +++++----- 4 files changed, 85 insertions(+), 18 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index f4a2726b9e570..10f13b3619a6a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -1507,6 +1507,35 @@ public void testResetPollTimerWhenExpired() { } } + @Test + public void testStreamsRebalanceDataHeartbeatIntervalMsUpdatedOnSuccess() { + try ( + final MockedConstruction ignored = mockConstruction( + HeartbeatRequestState.class, + (mock, context) -> when(mock.canSendRequest(time.milliseconds())).thenReturn(true)) + ) { + final StreamsGroupHeartbeatRequestManager heartbeatRequestManager = createStreamsGroupHeartbeatRequestManager(); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(coordinatorNode)); + when(membershipManager.groupId()).thenReturn(GROUP_ID); + when(membershipManager.memberId()).thenReturn(MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(MEMBER_EPOCH); + when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID)); + + // Initially, heartbeatIntervalMs should be -1 + assertEquals(-1, streamsRebalanceData.getHeartbeatIntervalMs()); + + final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + final NetworkClientDelegate.UnsentRequest networkRequest = result.unsentRequests.get(0); + final ClientResponse response = buildClientResponse(); + networkRequest.handler().onComplete(response); + + // After successful response, heartbeatIntervalMs should be updated + assertEquals(RECEIVED_HEARTBEAT_INTERVAL_MS, streamsRebalanceData.getHeartbeatIntervalMs()); + } + } + private static ConsumerConfig config() { Properties prop = new Properties(); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java index 606ba0b735027..f89800aa8abf6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java @@ -437,4 +437,44 @@ public void streamsRebalanceDataShouldBeConstructedWithEmptyStatuses() { assertTrue(streamsRebalanceData.statuses().isEmpty()); } + @Test + public void streamsRebalanceDataShouldBeConstructedWithHeartbeatIntervalMsSetToMinusOne() { + final UUID processId = UUID.randomUUID(); + final Optional endpoint = Optional.of(new + StreamsRebalanceData.HostInfo("localhost", 9090)); + final Map subtopologies = Map.of(); + final Map clientTags = Map.of("clientTag1", + "clientTagValue1"); + final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData( + processId, + endpoint, + subtopologies, + clientTags + ); + + assertEquals(-1, streamsRebalanceData.getHeartbeatIntervalMs()); + } + + @Test + public void streamsRebalanceDataShouldBeAbleToUpdateHeartbeatIntervalMs() { + final UUID processId = UUID.randomUUID(); + final Optional endpoint = Optional.of(new + StreamsRebalanceData.HostInfo("localhost", 9090)); + final Map subtopologies = Map.of(); + final Map clientTags = Map.of("clientTag1", + "clientTagValue1"); + final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData( + processId, + endpoint, + subtopologies, + clientTags + ); + + streamsRebalanceData.setHeartbeatIntervalMs(1000); + assertEquals(1000, streamsRebalanceData.getHeartbeatIntervalMs()); + } + + + + } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 6db81dc1c2ed3..b88e1dd5c8c0b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1587,15 +1587,9 @@ public void handleStreamsRebalanceData() { } private void handleMissingSourceTopicsWithTimeout(final String missingTopicsDetail) { - // Determine the timeout: use 2 * heartbeatIntervalMs if available, otherwise fall back to maxPollTimeMs - long timeoutMs = maxPollTimeMs; - if (streamsRebalanceData.isPresent()) { - final int heartbeatIntervalMs = streamsRebalanceData.get().getHeartbeatIntervalMs(); - if (heartbeatIntervalMs > 0) { - // Use 2 * heartbeatIntervalMs to ensure at least one more heartbeat is sent - timeoutMs = 2L * heartbeatIntervalMs; - } - } + // Determine the timeout: use 2 * heartbeatIntervalMs + final int heartbeatIntervalMs = streamsRebalanceData.get().getHeartbeatIntervalMs(); + final long timeoutMs = 2 * heartbeatIntervalMs; // Start timeout tracking on first encounter with missing topics if (topicsReadyTimer == null) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index d7f61883adb1b..10e847ac63372 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -4153,11 +4153,13 @@ public void testStreamsProtocolRunOnceWithProcessingThreadsMissingSourceTopic() .setStatusDetail("Missing source topics") )); + streamsRebalanceData.setHeartbeatIntervalMs(5000); + // First call should not throw exception (within timeout) thread.runOnceWithProcessingThreads(); - // Advance time beyond max.poll.interval.ms (default is 300000ms) to trigger timeout - mockTime.sleep(300001); + // Advance time beyond 2 * heartbeatIntervalMs (default is 5000ms) to trigger timeout + mockTime.sleep(10001); final MissingSourceTopicException exception = assertThrows(MissingSourceTopicException.class, () -> thread.runOnceWithProcessingThreads()); assertTrue(exception.getMessage().contains("Missing source topics")); @@ -4221,11 +4223,13 @@ public void testStreamsProtocolMissingSourceTopicRecovery() { .setStatusDetail("Missing source topics") )); + streamsRebalanceData.setHeartbeatIntervalMs(5000); + // First call should not throw exception (within timeout) thread.runOnceWithoutProcessingThreads(); // Advance time but not beyond timeout - mockTime.sleep(150000); // Half of max.poll.interval.ms + mockTime.sleep(5000); // Half of max.poll.interval.ms // Should still not throw exception thread.runOnceWithoutProcessingThreads(); @@ -4243,13 +4247,13 @@ public void testStreamsProtocolMissingSourceTopicRecovery() { .setStatusDetail("Different missing topics") )); - // Advance time by 250 seconds to test if timer was reset - // Total time from beginning: 150000 + 250000 = 400000ms (400s) - // If timer was NOT reset: elapsed time = 400s > 300s → should throw - // If timer WAS reset: elapsed time = 250s < 300s → should NOT throw - mockTime.sleep(250000); // Advance by 250 seconds + // Advance time by 6 seconds to test if timer was reset + // Total time from beginning: 5000 + 6000 = 11000ms (11s) + // If timer was NOT reset: elapsed time = 11s > 10s → should throw + // If timer WAS reset: elapsed time = 6s < 10s → should NOT throw + mockTime.sleep(6000); // Advance by 6 seconds - // Should not throw because timer was reset - only 250s elapsed from reset point + // Should not throw because timer was reset - only 6s elapsed from reset point assertDoesNotThrow(() -> thread.runOnceWithoutProcessingThreads()); } From ef5121fbe2bdfc88fd335a2da6533436a601fda1 Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Mon, 20 Oct 2025 23:15:03 -0400 Subject: [PATCH 3/7] fix --- .../kafka/streams/processor/internals/StreamThreadTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 10e847ac63372..66b90ffcc032d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -3966,11 +3966,13 @@ public void testStreamsProtocolRunOnceWithoutProcessingThreadsMissingSourceTopic .setStatusDetail("Missing source topics") )); + streamsRebalanceData.setHeartbeatIntervalMs(5000); + // First call should not throw exception (within timeout) thread.runOnceWithoutProcessingThreads(); // Advance time beyond max.poll.interval.ms (default is 300000ms) to trigger timeout - mockTime.sleep(300001); + mockTime.sleep(10001); final MissingSourceTopicException exception = assertThrows(MissingSourceTopicException.class, () -> thread.runOnceWithoutProcessingThreads()); assertTrue(exception.getMessage().contains("Missing source topics")); From be0d698fbe5c535b8d2e72269d6c56129d183bed Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Tue, 21 Oct 2025 11:03:31 -0400 Subject: [PATCH 4/7] remove get prefix --- .../clients/consumer/internals/StreamsRebalanceData.java | 2 +- .../internals/StreamsGroupHeartbeatRequestManagerTest.java | 4 ++-- .../clients/consumer/internals/StreamsRebalanceDataTest.java | 4 ++-- .../kafka/streams/processor/internals/StreamThread.java | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java index dcca83a824e3f..c6fe1fd9215ee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java @@ -404,7 +404,7 @@ public void setHeartbeatIntervalMs(final int heartbeatIntervalMs) { } /** Returns the heartbeat interval in milliseconds, or -1 if not yet set. */ - public int getHeartbeatIntervalMs() { + public int heartbeatIntervalMs() { return heartbeatIntervalMs.get(); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java index 10f13b3619a6a..9e4b843714447 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java @@ -1522,7 +1522,7 @@ public void testStreamsRebalanceDataHeartbeatIntervalMsUpdatedOnSuccess() { when(membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID)); // Initially, heartbeatIntervalMs should be -1 - assertEquals(-1, streamsRebalanceData.getHeartbeatIntervalMs()); + assertEquals(-1, streamsRebalanceData.heartbeatIntervalMs()); final NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); @@ -1532,7 +1532,7 @@ public void testStreamsRebalanceDataHeartbeatIntervalMsUpdatedOnSuccess() { networkRequest.handler().onComplete(response); // After successful response, heartbeatIntervalMs should be updated - assertEquals(RECEIVED_HEARTBEAT_INTERVAL_MS, streamsRebalanceData.getHeartbeatIntervalMs()); + assertEquals(RECEIVED_HEARTBEAT_INTERVAL_MS, streamsRebalanceData.heartbeatIntervalMs()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java index f89800aa8abf6..8d90dbec4ead1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java @@ -452,7 +452,7 @@ public void streamsRebalanceDataShouldBeConstructedWithHeartbeatIntervalMsSetToM clientTags ); - assertEquals(-1, streamsRebalanceData.getHeartbeatIntervalMs()); + assertEquals(-1, streamsRebalanceData.heartbeatIntervalMs()); } @Test @@ -471,7 +471,7 @@ public void streamsRebalanceDataShouldBeAbleToUpdateHeartbeatIntervalMs() { ); streamsRebalanceData.setHeartbeatIntervalMs(1000); - assertEquals(1000, streamsRebalanceData.getHeartbeatIntervalMs()); + assertEquals(1000, streamsRebalanceData.heartbeatIntervalMs()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index b88e1dd5c8c0b..769b134ce448e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1588,7 +1588,7 @@ public void handleStreamsRebalanceData() { private void handleMissingSourceTopicsWithTimeout(final String missingTopicsDetail) { // Determine the timeout: use 2 * heartbeatIntervalMs - final int heartbeatIntervalMs = streamsRebalanceData.get().getHeartbeatIntervalMs(); + final int heartbeatIntervalMs = streamsRebalanceData.get().heartbeatIntervalMs(); final long timeoutMs = 2 * heartbeatIntervalMs; // Start timeout tracking on first encounter with missing topics From ab344e7094b45d46f3e1dd7c9c827558621e3c2e Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Tue, 21 Oct 2025 16:07:44 -0400 Subject: [PATCH 5/7] Update streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../apache/kafka/streams/processor/internals/StreamThread.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 769b134ce448e..b203eee960d7f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1589,7 +1589,7 @@ public void handleStreamsRebalanceData() { private void handleMissingSourceTopicsWithTimeout(final String missingTopicsDetail) { // Determine the timeout: use 2 * heartbeatIntervalMs final int heartbeatIntervalMs = streamsRebalanceData.get().heartbeatIntervalMs(); - final long timeoutMs = 2 * heartbeatIntervalMs; + final long timeoutMs = 2L * heartbeatIntervalMs; // Start timeout tracking on first encounter with missing topics if (topicsReadyTimer == null) { From 7ec9ab514970ba07611de7c745e45037e275ba5c Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Tue, 21 Oct 2025 16:08:10 -0400 Subject: [PATCH 6/7] Update clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../clients/consumer/internals/StreamsRebalanceDataTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java index 8d90dbec4ead1..f2376640c0102 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java @@ -474,7 +474,4 @@ public void streamsRebalanceDataShouldBeAbleToUpdateHeartbeatIntervalMs() { assertEquals(1000, streamsRebalanceData.heartbeatIntervalMs()); } - - - } From 87535c121551c3a28a009a1c547ee3db46f4325a Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Tue, 21 Oct 2025 16:10:19 -0400 Subject: [PATCH 7/7] fix comment --- .../apache/kafka/streams/processor/internals/StreamThread.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index b203eee960d7f..f208567c32db7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -1587,7 +1587,7 @@ public void handleStreamsRebalanceData() { } private void handleMissingSourceTopicsWithTimeout(final String missingTopicsDetail) { - // Determine the timeout: use 2 * heartbeatIntervalMs + // Use 2 * heartbeatIntervalMs as the timeout ensures at least one heartbeat is sent before raising the exception final int heartbeatIntervalMs = streamsRebalanceData.get().heartbeatIntervalMs(); final long timeoutMs = 2L * heartbeatIntervalMs;