diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 49a701ae8aad..819732ac6d83 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -218,13 +218,7 @@ private boolean waitOnState(final State targetState, final long waitMs) { synchronized (stateLock) { long elapsedMs = 0L; while (state != targetState) { - if (waitMs == 0) { - try { - stateLock.wait(); - } catch (final InterruptedException e) { - // it is ok: just move on to the next iteration - } - } else if (waitMs > elapsedMs) { + if (waitMs > elapsedMs) { final long remainingMs = waitMs - elapsedMs; try { stateLock.wait(remainingMs); @@ -825,17 +819,30 @@ public void close() { * threads to join. * A {@code timeout} of 0 means to wait forever. * - * @param timeout how long to wait for the threads to shutdown + * @param timeout how long to wait for the threads to shutdown. Can't be negative. If {@code timeout=0} just checking the state and return immediately. * @param timeUnit unit of time used for timeout * @return {@code true} if all threads were successfully stopped—{@code false} if the timeout was reached * before all threads stopped * Note that this method must not be called in the {@code onChange} callback of {@link StateListener}. - * @deprecated Use {@link #close(Duration)} instead + * @deprecated Use {@link #close(Duration)} instead; note, that {@link #close(Duration)} has different semantics and does not block on zero, e.g., `Duration.ofMillis(0)`. */ @Deprecated public synchronized boolean close(final long timeout, final TimeUnit timeUnit) { - log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeUnit.toMillis(timeout)); + long timeoutMs = timeUnit.toMillis(timeout); + + log.debug("Stopping Streams client with timeoutMillis = {} ms. You are using deprecated method. " + + "Please, consider update your code.", timeoutMs); + + if (timeoutMs < 0) { + timeoutMs = 0; + } else if (timeoutMs == 0) { + timeoutMs = Long.MAX_VALUE; + } + + return close(timeoutMs); + } + private boolean close(final long timeoutMs) { if (!setState(State.PENDING_SHUTDOWN)) { // if transition failed, it means it was either in PENDING_SHUTDOWN // or NOT_RUNNING already; just check that all threads have been stopped @@ -891,7 +898,7 @@ public void run() { shutdownThread.start(); } - if (waitOnState(State.NOT_RUNNING, timeUnit.toMillis(timeout))) { + if (waitOnState(State.NOT_RUNNING, timeoutMs)) { log.info("Streams client stopped completely"); return true; } else { @@ -913,7 +920,15 @@ public void run() { */ public synchronized boolean close(final Duration timeout) throws IllegalArgumentException { ApiUtils.validateMillisecondDuration(timeout, "timeout"); - return close(timeout.toMillis(), TimeUnit.MILLISECONDS); + + final long timeoutMs = timeout.toMillis(); + if (timeoutMs < 0) { + throw new IllegalArgumentException("Timeout can't be negative."); + } + + log.debug("Stopping Streams client with timeoutMillis = {} ms.", timeoutMs); + + return close(timeoutMs); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index abc4cb90b7d6..b9d542bc9b69 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -548,6 +548,34 @@ public void shouldCleanupOldStateDirs() throws InterruptedException { } } + @Test + public void shouldThrowOnNegativeTimeoutForClose() { + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + try { + streams.close(Duration.ofMillis(-1L)); + fail("should not accept negative close parameter"); + } catch (final IllegalArgumentException e) { + // expected + } finally { + streams.close(); + } + } + + @Test + public void shouldNotBlockInCloseForZeroDuration() throws InterruptedException { + final KafkaStreams streams = new KafkaStreams(builder.build(), props); + final Thread th = new Thread(() -> streams.close(Duration.ofMillis(0L))); + + th.start(); + + try { + th.join(30_000L); + assertFalse(th.isAlive()); + } finally { + streams.close(); + } + } + private void verifyCleanupStateDir(final String appDir, final File oldTaskDir) throws InterruptedException { final File taskDir = new File(appDir, "0_0"); TestUtils.waitForCondition(