Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Removed 1/2 of the hardcoded sleep times #1422

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,6 @@ public String apply(String value) {
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(INPUT_TOPIC_A, inputValues, producerConfig);

// Give the stream processing application some time to do its work.
Thread.sleep(10000);
streams.close();

//
// Step 3: Verify the application's output data.
//
Expand All @@ -149,7 +145,8 @@ public String apply(String value) {
consumerConfigB.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfigB.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfigB.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
List<String> actualValuesForB = IntegrationTestUtils.readValues(OUTPUT_TOPIC_B, consumerConfigB, inputValues.size());
List<String> actualValuesForB = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigB,
OUTPUT_TOPIC_B, inputValues.size());
assertThat(actualValuesForB, equalTo(expectedValuesForB));

// Verify output topic C
Expand All @@ -159,7 +156,9 @@ public String apply(String value) {
consumerConfigC.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfigC.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfigC.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
List<String> actualValuesForC = IntegrationTestUtils.readValues(OUTPUT_TOPIC_C, consumerConfigC, inputValues.size());
List<String> actualValuesForC = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfigC,
OUTPUT_TOPIC_C, inputValues.size());
streams.close();
assertThat(actualValuesForC, equalTo(expectedValuesForC));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ public KeyValue<String, String> apply(String key, String value) {
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

// Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
// of the input data we produce below).
Thread.sleep(5000);

//
// Step 2: Produce some input data to the input topic.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public Long apply(Long value1, Long value2) {

// Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
// of the input data we produce below).
Thread.sleep(5000);
Thread.sleep(10000);

//
// Step 2: Publish user-region information.
Expand All @@ -246,10 +246,6 @@ public Long apply(Long value1, Long value2) {
userClicksProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, userClicks, userClicksProducerConfig);

// Give the stream processing application some time to do its work.
Thread.sleep(10000);
streams.close();

//
// Step 4: Verify the application's output data.
//
Expand All @@ -259,7 +255,9 @@ public Long apply(Long value1, Long value2) {
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.readKeyValues(OUTPUT_TOPIC, consumerConfig);
List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_TOPIC, expectedClicksPerRegion.size());
streams.close();
assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ public String apply(String value) {
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig);

// Give the stream processing application some time to do its work.
Thread.sleep(10000);
streams.close();

//
// Step 3: Verify the application's output data.
//
Expand All @@ -120,7 +116,9 @@ public String apply(String value) {
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
List<String> actualValues = IntegrationTestUtils.readValues(DEFAULT_OUTPUT_TOPIC, consumerConfig, inputValues.size());
List<String> actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
DEFAULT_OUTPUT_TOPIC, inputValues.size());
streams.close();
assertThat(actualValues, equalTo(expectedValues));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ public void shouldWriteTheInputDataAsIsToTheOutputTopic() throws Exception {
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig);

// Give the stream processing application some time to do its work.
Thread.sleep(10000);
streams.close();

//
// Step 3: Verify the application's output data.
//
Expand All @@ -107,7 +103,9 @@ public void shouldWriteTheInputDataAsIsToTheOutputTopic() throws Exception {
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
List<String> actualValues = IntegrationTestUtils.readValues(DEFAULT_OUTPUT_TOPIC, consumerConfig, inputValues.size());
List<String> actualValues = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(consumerConfig,
DEFAULT_OUTPUT_TOPIC, inputValues.size());
streams.close();
assertThat(actualValues, equalTo(inputValues));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ public KeyValue<String, String> apply(String key, String value) {
//
// Step 3: Verify the application's output data.
//
Thread.sleep(10000);
streams.close();
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-integration-test-standard-consumer");
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
List<KeyValue<String, Long>> actualWordCounts = IntegrationTestUtils.readKeyValues(DEFAULT_OUTPUT_TOPIC, consumerConfig);
List<KeyValue<String, Long>> actualWordCounts = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
DEFAULT_OUTPUT_TOPIC, expectedWordCounts.size());
streams.close();
assertThat(actualWordCounts, equalTo(expectedWordCounts));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
*/
public class IntegrationTestUtils {

private static final int UNLIMITED_MESSAGES = -1;
public static final int UNLIMITED_MESSAGES = -1;
public static final long DEFAULT_TIMEOUT = 30 * 1000L;

/**
* Returns up to `maxMessages` message-values from the topic.
Expand All @@ -54,10 +55,10 @@ public class IntegrationTestUtils {
* @param maxMessages Maximum number of messages to read via the consumer.
* @return The values retrieved via the consumer.
*/
public static <K, V> List<V> readValues(String topic, Properties consumerConfig, int maxMessages) {
public static <V> List<V> readValues(String topic, Properties consumerConfig, int maxMessages) {
List<V> returnList = new ArrayList<>();
List<KeyValue<K, V>> kvs = readKeyValues(topic, consumerConfig, maxMessages);
for (KeyValue<K, V> kv : kvs) {
List<KeyValue<Object, V>> kvs = readKeyValues(topic, consumerConfig, maxMessages);
for (KeyValue<?, V> kv : kvs) {
returnList.add(kv.value);
}
return returnList;
Expand Down Expand Up @@ -154,4 +155,75 @@ public static <V> void produceValuesSynchronously(
produceKeyValuesSynchronously(topic, keyedRecords, producerConfig);
}

public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig,
String topic,
int expectedNumRecords) throws InterruptedException {

return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
}

/**
* Wait until enough data (key-value records) has been consumed.
* @param consumerConfig Kafka Consumer configuration
* @param topic Topic to consume from
* @param expectedNumRecords Minimum number of expected records
* @param waitTime Upper bound in waiting time in milliseconds
* @return All the records consumed, or null if no records are consumed
* @throws InterruptedException
* @throws AssertionError if the given wait time elapses
*/
public static <K, V> List<KeyValue<K, V>> waitUntilMinKeyValueRecordsReceived(Properties consumerConfig,
String topic,
int expectedNumRecords,
long waitTime) throws InterruptedException {
List<KeyValue<K, V>> accumData = new ArrayList<>();
long startTime = System.currentTimeMillis();
while (true) {
List<KeyValue<K, V>> readData = readKeyValues(topic, consumerConfig);
accumData.addAll(readData);
if (accumData.size() >= expectedNumRecords)
return accumData;
if (System.currentTimeMillis() > startTime + waitTime)
throw new AssertionError("Expected " + expectedNumRecords +
" but received only " + accumData.size() +
" records before timeout " + waitTime + " ms");
Thread.sleep(Math.min(waitTime, 100L));
}
}

public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig,
String topic,
int expectedNumRecords) throws InterruptedException {

return waitUntilMinValuesRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT);
}

/**
* Wait until enough data (value records) has been consumed.
* @param consumerConfig Kafka Consumer configuration
* @param topic Topic to consume from
* @param expectedNumRecords Minimum number of expected records
* @param waitTime Upper bound in waiting time in milliseconds
* @return All the records consumed, or null if no records are consumed
* @throws InterruptedException
* @throws AssertionError if the given wait time elapses
*/
public static <V> List<V> waitUntilMinValuesRecordsReceived(Properties consumerConfig,
String topic,
int expectedNumRecords,
long waitTime) throws InterruptedException {
List<V> accumData = new ArrayList<>();
long startTime = System.currentTimeMillis();
while (true) {
List<V> readData = readValues(topic, consumerConfig, expectedNumRecords);
accumData.addAll(readData);
if (accumData.size() >= expectedNumRecords)
return accumData;
if (System.currentTimeMillis() > startTime + waitTime)
throw new AssertionError("Expected " + expectedNumRecords +
" but received only " + accumData.size() +
" records before timeout " + waitTime + " ms");
Thread.sleep(Math.min(waitTime, 100L));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we extract this loop into a separate method that takes an interface like:

interface WaitPredicate {
    boolean test();
}

Then we can reuse the logic from the different variants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ijuma . I addressed all comments, but I'll skip this one since it's not worth generalizing for what ultimately is a loop around sleep.

}
}