From db63e55842d2e83ab1e2fa72abc7631eefe68bcc Mon Sep 17 00:00:00 2001 From: dan norwood Date: Mon, 1 May 2017 23:44:25 -0700 Subject: [PATCH] handle 0 futures in all() if we pass in 0 futures to an AllOfAdapter, we should complete immediately --- .../org/apache/kafka/common/KafkaFuture.java | 9 +++++++-- .../org/apache/kafka/common/KafkaFutureTest.java | 16 +++++++++++++++- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java index 3c51fbede03eb..44def08c5232b 100644 --- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java +++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java @@ -48,6 +48,7 @@ private static class AllOfAdapter extends BiConsumer { public AllOfAdapter(int remainingResponses, KafkaFuture future) { this.remainingResponses = remainingResponses; this.future = future; + maybeComplete(); } @Override @@ -59,10 +60,14 @@ public synchronized void accept(R newValue, Throwable exception) { future.completeExceptionally(exception); } else { remainingResponses--; - if (remainingResponses <= 0) - future.complete(null); + maybeComplete(); } } + + private void maybeComplete() { + if (remainingResponses <= 0) + future.complete(null); + } } /** diff --git a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java index 39868e029eb9e..4769a5348b4ce 100644 --- a/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java +++ b/clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java @@ -35,6 +35,7 @@ * A unit test for KafkaFuture. */ public class KafkaFutureTest { + @Rule final public Timeout globalTimeout = Timeout.millis(120000); @@ -65,7 +66,9 @@ public void testCompleteFutures() throws Exception { @Test public void testCompletingFutures() throws Exception { final KafkaFutureImpl future = new KafkaFutureImpl<>(); - CompleterThread myThread = new CompleterThread(future, "You must construct additional pylons."); + CompleterThread + myThread = + new CompleterThread(future, "You must construct additional pylons."); assertFalse(future.isDone()); assertFalse(future.isCompletedExceptionally()); assertFalse(future.isCancelled()); @@ -82,6 +85,7 @@ public void testCompletingFutures() throws Exception { } private static class CompleterThread extends Thread { + private final KafkaFutureImpl future; private final T value; Throwable testException = null; @@ -106,6 +110,7 @@ public void run() { } private static class WaiterThread extends Thread { + private final KafkaFutureImpl future; private final T expected; Throwable testException = null; @@ -161,4 +166,13 @@ public void testAllOfFutures() throws Exception { assertEquals(null, waiterThreads.get(i).testException); } } + + @Test + public void testAllOfFuturesHandlesZeroFutures() throws Exception { + KafkaFuture allFuture = KafkaFuture.allOf(); + assertTrue(allFuture.isDone()); + assertFalse(allFuture.isCancelled()); + assertFalse(allFuture.isCompletedExceptionally()); + allFuture.get(); + } }