From f10c8908cd1ad684cba0978f7ea77f408656fed0 Mon Sep 17 00:00:00 2001 From: bbejeck Date: Sat, 1 Apr 2017 07:36:15 -0400 Subject: [PATCH 1/4] HOTFIX: fix potentially hanging test shouldAddStateStoreToRegexDefinedSource --- .../streams/integration/RegexSourceIntegrationTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 5647b1ef246de..2306ffec1881d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -254,12 +254,14 @@ public void shouldAddStateStoreToRegexDefinedSource() throws Exception { streams.start(); final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); + final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); + + IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList("testMessage"), producerConfig, mockTime); + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, TOPIC_1, 1, 5000); - IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList("message for test"), producerConfig, mockTime); streams.close(); Map> stateStoreToSourceTopic = builder.stateStoreNameToSourceTopics(); - assertThat(stateStoreToSourceTopic.get("testStateStore").get(0), is("topic-1")); } From 93b0bf4aef3e5c14415f0b92bd99dbec693d5b8a Mon Sep 17 00:00:00 2001 From: bbejeck Date: Mon, 3 Apr 2017 21:11:43 -0400 Subject: [PATCH 2/4] HOTFIX: fix potentially hanging test, increased timeout --- .../kafka/streams/integration/RegexSourceIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 2306ffec1881d..ff786a9f1986a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -257,7 +257,7 @@ public void shouldAddStateStoreToRegexDefinedSource() throws Exception { final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList("testMessage"), producerConfig, mockTime); - IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, TOPIC_1, 1, 5000); + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, TOPIC_1, 1, 30000); streams.close(); From 4b08d8e67a317f2fe722995c0112962031218b17 Mon Sep 17 00:00:00 2001 From: bbejeck Date: Thu, 27 Apr 2017 21:33:36 -0400 Subject: [PATCH 3/4] HOTFIX: fix potentially hanging test, remove timeout parameter use default of 30 sec --- .../kafka/streams/integration/RegexSourceIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index ff786a9f1986a..06f1f06054713 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -257,7 +257,7 @@ public void shouldAddStateStoreToRegexDefinedSource() throws Exception { final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList("testMessage"), producerConfig, mockTime); - IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, TOPIC_1, 1, 30000); + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, TOPIC_1, 1); streams.close(); From 0dd0640f092469265d60247e7d2dd72417f13631 Mon Sep 17 00:00:00 2001 From: bbejeck Date: Fri, 28 Apr 2017 22:22:10 -0400 Subject: [PATCH 4/4] HOTFIX: use TestUtils.waitForCondition, put streams.close() in finally block, added final modifier where needed, remove producing a message not needed to prove the condition of the test. --- .../RegexSourceIntegrationTest.java | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 06f1f06054713..011bca6b7890c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -64,7 +64,6 @@ import java.util.regex.Pattern; import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -241,28 +240,34 @@ public boolean conditionMet() { @Test public void shouldAddStateStoreToRegexDefinedSource() throws Exception { - ProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); - MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore", false); + final ProcessorSupplier processorSupplier = new MockProcessorSupplier<>(); + final MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore", false); + final long thirtySecondTimeout = 30 * 1000; - TopologyBuilder builder = new TopologyBuilder() + final TopologyBuilder builder = new TopologyBuilder() .addSource("ingest", Pattern.compile("topic-\\d+")) .addProcessor("my-processor", processorSupplier, "ingest") .addStateStore(stateStoreSupplier, "my-processor"); final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); - streams.start(); + try { + streams.start(); - final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); - final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); + final TestCondition stateStoreNameBoundToSourceTopic = new TestCondition() { + @Override + public boolean conditionMet() { + final Map> stateStoreToSourceTopic = builder.stateStoreNameToSourceTopics(); + final List topicNamesList = stateStoreToSourceTopic.get("testStateStore"); + return topicNamesList != null && !topicNamesList.isEmpty() && topicNamesList.get(0).equals("topic-1"); + } + }; - IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList("testMessage"), producerConfig, mockTime); - IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, TOPIC_1, 1); + TestUtils.waitForCondition(stateStoreNameBoundToSourceTopic, thirtySecondTimeout, "Did not find topic: [topic-1] connected to state store: [testStateStore]"); - streams.close(); - - Map> stateStoreToSourceTopic = builder.stateStoreNameToSourceTopics(); - assertThat(stateStoreToSourceTopic.get("testStateStore").get(0), is("topic-1")); + } finally { + streams.close(); + } }