From 06defa484f29f4cb0832b93cd92306c204b8f07b Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Wed, 22 Oct 2025 21:49:55 +0800 Subject: [PATCH 1/2] KAFKA-19813: Incorrect jitter value in StreamsGroupHeartbeatRequestManager and AbstractHeartbeatRequestManager --- .../internals/AbstractHeartbeatRequestManager.java | 3 ++- .../StreamsGroupHeartbeatRequestManager.java | 3 ++- .../apache/kafka/common/utils/ExponentialBackoff.java | 3 +++ .../kafka/common/utils/ExponentialBackoffTest.java | 11 +++++++++++ 4 files changed, 18 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index 3998d672006a3..eec41c6d3b4f1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -36,6 +36,7 @@ import java.util.Collections; import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY; +import static org.apache.kafka.clients.consumer.internals.RequestState.RETRY_BACKOFF_JITTER; /** *

Manages the request creation and response handling for the heartbeat. The module creates a @@ -113,7 +114,7 @@ public abstract class AbstractHeartbeatRequestManagerManages the request creation and response handling for the streams group heartbeat. The class creates a @@ -330,7 +331,7 @@ public StreamsGroupHeartbeatRequestManager(final LogContext logContext, 0, retryBackoffMs, retryBackoffMaxMs, - maxPollIntervalMs + RETRY_BACKOFF_JITTER ); this.pollTimer = time.timer(maxPollIntervalMs); } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java index 0599448014717..db1970607ae28 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java @@ -42,6 +42,9 @@ public ExponentialBackoff(long initialInterval, int multiplier, long maxInterval this.initialInterval = Math.min(maxInterval, initialInterval); this.multiplier = multiplier; this.maxInterval = maxInterval; + if (jitter < 0 || jitter > 1) { + throw new IllegalArgumentException("jitter must be between 0 and 1"); + } this.jitter = jitter; this.expMax = maxInterval > initialInterval ? Math.log(maxInterval / (double) Math.max(initialInterval, 1)) / Math.log(multiplier) : 0; diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java index 4e843863ab5c7..385305b3f0d09 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class ExponentialBackoffTest { @@ -54,4 +55,14 @@ public void testExponentialBackoffWithoutJitter() { assertEquals(400, exponentialBackoff.backoff(2)); assertEquals(400, exponentialBackoff.backoff(3)); } + + @Test + public void testExponentialBackoffWithInvalidJitter() { + assertEquals("jitter must be between 0 and 1", + assertThrows(IllegalArgumentException.class, + () -> new ExponentialBackoff(100, 2, 400, -1)).getMessage()); + assertEquals("jitter must be between 0 and 1", + assertThrows(IllegalArgumentException.class, + () -> new ExponentialBackoff(100, 2, 400, 3000)).getMessage()); + } } From 5d15580679eb2dcde4504fa1a6a7accaa8241421 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Fri, 31 Oct 2025 23:55:37 +0800 Subject: [PATCH 2/2] Address comments --- .../org/apache/kafka/common/utils/ExponentialBackoff.java | 4 +++- .../org/apache/kafka/common/utils/ExponentialBackoffTest.java | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java index db1970607ae28..73d68b6cf4450 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java @@ -19,6 +19,8 @@ import java.util.concurrent.ThreadLocalRandom; +import static java.lang.String.format; + /** * A utility class for keeping the parameters and providing the value of exponential * retry backoff, exponential reconnect backoff, exponential timeout, etc. @@ -43,7 +45,7 @@ public ExponentialBackoff(long initialInterval, int multiplier, long maxInterval this.multiplier = multiplier; this.maxInterval = maxInterval; if (jitter < 0 || jitter > 1) { - throw new IllegalArgumentException("jitter must be between 0 and 1"); + throw new IllegalArgumentException(format("jitter must be between 0 and 1, but got %s", jitter)); } this.jitter = jitter; this.expMax = maxInterval > initialInterval ? diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java index 385305b3f0d09..fff921db29b22 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ExponentialBackoffTest.java @@ -58,10 +58,10 @@ public void testExponentialBackoffWithoutJitter() { @Test public void testExponentialBackoffWithInvalidJitter() { - assertEquals("jitter must be between 0 and 1", + assertEquals("jitter must be between 0 and 1, but got -1.0", assertThrows(IllegalArgumentException.class, () -> new ExponentialBackoff(100, 2, 400, -1)).getMessage()); - assertEquals("jitter must be between 0 and 1", + assertEquals("jitter must be between 0 and 1, but got 3000.0", assertThrows(IllegalArgumentException.class, () -> new ExponentialBackoff(100, 2, 400, 3000)).getMessage()); }