From 0d832fc9f179f3ec9a0b22ba3dc0908787267f44 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Wed, 23 Aug 2017 14:20:48 +0100 Subject: [PATCH] don't bother starting consumer until streams has transitioned to running state --- .../integration/KStreamRepartitionJoinTest.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index b6536472b62c..843fef4ee614 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -51,11 +51,13 @@ import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertTrue; @Category({IntegrationTest.class}) public class KStreamRepartitionJoinTest { @@ -354,9 +356,19 @@ private void createTopics() throws InterruptedException { } - private void startStreams() { + private void startStreams() throws InterruptedException { kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); + final CountDownLatch startupLatch = new CountDownLatch(1); + kafkaStreams.setStateListener(new KafkaStreams.StateListener() { + @Override + public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) { + if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) { + startupLatch.countDown(); + } + } + }); kafkaStreams.start(); + assertTrue("streams failed to get into running state before timeout", startupLatch.await(1, TimeUnit.MINUTES)); }