From cce7e1f3d77d6a11a96d3b86cbf4e501740b3be6 Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Thu, 16 Oct 2025 16:09:55 -0400 Subject: [PATCH 1/6] add verifyAllTransactionFinished for smoke test driver --- .../kafka/streams/tests/SmokeTestDriver.java | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 59698607912c4..d2f7fd9fd8494 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -25,9 +25,11 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.IsolationLevel; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; @@ -92,6 +94,7 @@ public class SmokeTestDriver extends SmokeTestUtil { } private static final int MAX_RECORD_EMPTY_RETRIES = 30; + private static final long MAX_IDLE_TIME_MS = 600000L; private static class ValueList { public final String key; @@ -383,6 +386,24 @@ public static VerificationResult verify(final String kafka, props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + // Verify all transactions are finished before proceeding with data verification + if (eosEnabled) { + final Properties txnProps = new Properties(); + txnProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); + txnProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + txnProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + txnProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + txnProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString()); + + try (final KafkaConsumer consumer = new KafkaConsumer<>(txnProps)) { + verifyAllTransactionFinished(consumer, kafka); + } catch (final Exception e) { + e.printStackTrace(System.err); + System.out.println("FAILED"); + return new VerificationResult(false, "Transaction verification failed: " + e.getMessage()); + } + } + final KafkaConsumer consumer = new KafkaConsumer<>(props); final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); consumer.assign(partitions); @@ -732,4 +753,53 @@ private static List getAllPartitions(final KafkaConsumer c return partitions; } + private static void verifyAllTransactionFinished(final KafkaConsumer consumer, + final String kafka) { + // Get all output topics except "data" (which is the input topic) + final String[] outputTopics = Arrays.stream(NUMERIC_VALUE_TOPICS) + .filter(topic -> !topic.equals("data")) + .toArray(String[]::new); + + final List partitions = getAllPartitions(consumer, outputTopics); + consumer.assign(partitions); + consumer.seekToEnd(partitions); + for (final TopicPartition tp : partitions) { + System.out.println(tp + " at position " + consumer.position(tp)); + } + + final Properties consumerProps = new Properties(); + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-uncommitted"); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + + final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; + try (final KafkaConsumer consumerUncommitted = new KafkaConsumer<>(consumerProps)) { + while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) { + consumer.seekToEnd(partitions); + final Map topicEndOffsets = consumerUncommitted.endOffsets(partitions); + + final java.util.Iterator iterator = partitions.iterator(); + while (iterator.hasNext()) { + final TopicPartition topicPartition = iterator.next(); + final long position = consumer.position(topicPartition); + + if (position == topicEndOffsets.get(topicPartition)) { + iterator.remove(); + System.out.println("Removing " + topicPartition + " at position " + position); + } else if (consumer.position(topicPartition) > topicEndOffsets.get(topicPartition)) { + throw new IllegalStateException("Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition)); + } else { + System.out.println("Retry " + topicPartition + " at position " + position); + } + } + sleep(1000L); + } + } + + if (!partitions.isEmpty()) { + throw new RuntimeException("Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000L) + " sec."); + } + } + } From b90e340315b89e45fb155f976d6b4c1ae9ed8ea7 Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Thu, 16 Oct 2025 17:05:21 -0400 Subject: [PATCH 2/6] migrate verificationresult to smoke test utils --- .../kafka/streams/tests/SmokeTestDriver.java | 38 ++++++------------- .../kafka/streams/tests/SmokeTestUtil.java | 18 +++++++++ 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index d2f7fd9fd8494..d30b5a3738337 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -395,12 +395,15 @@ public static VerificationResult verify(final String kafka, txnProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); txnProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString()); + final VerificationResult txnResult; try (final KafkaConsumer consumer = new KafkaConsumer<>(txnProps)) { - verifyAllTransactionFinished(consumer, kafka); - } catch (final Exception e) { - e.printStackTrace(System.err); + txnResult = verifyAllTransactionFinished(consumer, kafka); + } + + if (!txnResult.passed()) { + System.err.println("Transaction verification failed: " + txnResult.result()); System.out.println("FAILED"); - return new VerificationResult(false, "Transaction verification failed: " + e.getMessage()); + return txnResult; } } @@ -512,24 +515,6 @@ private static Map> parseRecordsForEchoTopic( .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)) : Collections.emptyMap(); } - public static class VerificationResult { - private final boolean passed; - private final String result; - - VerificationResult(final boolean passed, final String result) { - this.passed = passed; - this.result = result; - } - - public boolean passed() { - return passed; - } - - public String result() { - return result; - } - } - private static VerificationResult verifyAll(final Map> inputs, final Map>>> events, final boolean printResults, @@ -753,8 +738,8 @@ private static List getAllPartitions(final KafkaConsumer c return partitions; } - private static void verifyAllTransactionFinished(final KafkaConsumer consumer, - final String kafka) { + private static VerificationResult verifyAllTransactionFinished(final KafkaConsumer consumer, + final String kafka) { // Get all output topics except "data" (which is the input topic) final String[] outputTopics = Arrays.stream(NUMERIC_VALUE_TOPICS) .filter(topic -> !topic.equals("data")) @@ -788,7 +773,7 @@ private static void verifyAllTransactionFinished(final KafkaConsumer topicEndOffsets.get(topicPartition)) { - throw new IllegalStateException("Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition)); + return new VerificationResult(false, "Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition)); } else { System.out.println("Retry " + topicPartition + " at position " + position); } @@ -798,8 +783,9 @@ private static void verifyAllTransactionFinished(final KafkaConsumer Date: Thu, 16 Oct 2025 17:51:55 -0400 Subject: [PATCH 3/6] use try-with --- .../kafka/streams/tests/SmokeTestDriver.java | 88 +++++++++---------- 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index d30b5a3738337..e61ff360a192d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -407,60 +407,60 @@ public static VerificationResult verify(final String kafka, } } - final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); - consumer.assign(partitions); - consumer.seekToBeginning(partitions); - - final int recordsGenerated = inputs.size() * maxRecordsPerKey; - int recordsProcessed = 0; - final Map processed = - Stream.of(NUMERIC_VALUE_TOPICS) - .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); - - final Map>>> events = new HashMap<>(); - - VerificationResult verificationResult = new VerificationResult(false, "no results yet"); - int retry = 0; - final long start = System.currentTimeMillis(); - while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { - final ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); - if (records.isEmpty() && recordsProcessed >= recordsGenerated) { - verificationResult = verifyAll(inputs, events, false, eosEnabled); - if (verificationResult.passed()) { - break; - } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) { - System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries."); - break; + try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { + final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); + consumer.assign(partitions); + consumer.seekToBeginning(partitions); + + final int recordsGenerated = inputs.size() * maxRecordsPerKey; + int recordsProcessed = 0; + final Map processed = + Stream.of(NUMERIC_VALUE_TOPICS) + .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); + + final Map>>> events = new HashMap<>(); + + VerificationResult verificationResult = new VerificationResult(false, "no results yet"); + int retry = 0; + final long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { + final ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); + if (records.isEmpty() && recordsProcessed >= recordsGenerated) { + verificationResult = verifyAll(inputs, events, false, eosEnabled); + if (verificationResult.passed()) { + break; + } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) { + System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries."); + break; + } else { + System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry); + } } else { - System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry); - } - } else { - System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry."); + System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry."); - retry = 0; - for (final ConsumerRecord record : records) { - final String key = record.key(); + retry = 0; + for (final ConsumerRecord record : records) { + final String key = record.key(); - final String topic = record.topic(); - processed.get(topic).incrementAndGet(); + final String topic = record.topic(); + processed.get(topic).incrementAndGet(); - if (topic.equals("echo")) { - recordsProcessed++; - if (recordsProcessed % 100 == 0) { - System.out.println("Echo records processed = " + recordsProcessed); + if (topic.equals("echo")) { + recordsProcessed++; + if (recordsProcessed % 100 == 0) { + System.out.println("Echo records processed = " + recordsProcessed); + } } + + events.computeIfAbsent(topic, t -> new HashMap<>()) + .computeIfAbsent(key, k -> new LinkedList<>()) + .add(record); } - events.computeIfAbsent(topic, t -> new HashMap<>()) - .computeIfAbsent(key, k -> new LinkedList<>()) - .add(record); + System.out.println(processed); } - - System.out.println(processed); } } - consumer.close(); final long finished = System.currentTimeMillis() - start; System.out.println("Verification time=" + finished); From 3db1e67d702c8769eb13c7b96b28b519094f7beb Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Thu, 16 Oct 2025 17:57:13 -0400 Subject: [PATCH 4/6] fix --- .../apache/kafka/streams/tests/SmokeTestDriver.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index e61ff360a192d..e015331adb45f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -406,23 +406,21 @@ public static VerificationResult verify(final String kafka, return txnResult; } } + final int recordsGenerated = inputs.size() * maxRecordsPerKey; + final Map>>> events = new HashMap<>(); + VerificationResult verificationResult = new VerificationResult(false, "no results yet"); + final long start = System.currentTimeMillis(); + int recordsProcessed = 0; try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); consumer.assign(partitions); consumer.seekToBeginning(partitions); - - final int recordsGenerated = inputs.size() * maxRecordsPerKey; - int recordsProcessed = 0; final Map processed = Stream.of(NUMERIC_VALUE_TOPICS) .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); - final Map>>> events = new HashMap<>(); - - VerificationResult verificationResult = new VerificationResult(false, "no results yet"); int retry = 0; - final long start = System.currentTimeMillis(); while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { final ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); if (records.isEmpty() && recordsProcessed >= recordsGenerated) { From 3086d3349b8e49d9a106edf24cd3ee39d2bb70b7 Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Thu, 6 Nov 2025 23:09:32 -0500 Subject: [PATCH 5/6] refactor to reuse the props --- .../kafka/streams/tests/SmokeTestDriver.java | 106 +++++++++--------- 1 file changed, 54 insertions(+), 52 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index e015331adb45f..cfa0ae4e0005c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -388,17 +388,8 @@ public static VerificationResult verify(final String kafka, // Verify all transactions are finished before proceeding with data verification if (eosEnabled) { - final Properties txnProps = new Properties(); - txnProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); - txnProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - txnProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - txnProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - txnProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString()); - final VerificationResult txnResult; - try (final KafkaConsumer consumer = new KafkaConsumer<>(txnProps)) { - txnResult = verifyAllTransactionFinished(consumer, kafka); - } + txnResult = verifyAllTransactionFinished(kafka); if (!txnResult.passed()) { System.err.println("Transaction verification failed: " + txnResult.result()); @@ -736,54 +727,65 @@ private static List getAllPartitions(final KafkaConsumer c return partitions; } - private static VerificationResult verifyAllTransactionFinished(final KafkaConsumer consumer, - final String kafka) { - // Get all output topics except "data" (which is the input topic) - final String[] outputTopics = Arrays.stream(NUMERIC_VALUE_TOPICS) - .filter(topic -> !topic.equals("data")) - .toArray(String[]::new); - - final List partitions = getAllPartitions(consumer, outputTopics); - consumer.assign(partitions); - consumer.seekToEnd(partitions); - for (final TopicPartition tp : partitions) { - System.out.println(tp + " at position " + consumer.position(tp)); - } + private static Properties createConsumerPropsWithByteDeserializer(final String kafka, final String clientId) { + final Properties props = new Properties(); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + return props; + } - final Properties consumerProps = new Properties(); - consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-uncommitted"); - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - - final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; - try (final KafkaConsumer consumerUncommitted = new KafkaConsumer<>(consumerProps)) { - while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) { - consumer.seekToEnd(partitions); - final Map topicEndOffsets = consumerUncommitted.endOffsets(partitions); - - final java.util.Iterator iterator = partitions.iterator(); - while (iterator.hasNext()) { - final TopicPartition topicPartition = iterator.next(); - final long position = consumer.position(topicPartition); - - if (position == topicEndOffsets.get(topicPartition)) { - iterator.remove(); - System.out.println("Removing " + topicPartition + " at position " + position); - } else if (consumer.position(topicPartition) > topicEndOffsets.get(topicPartition)) { - return new VerificationResult(false, "Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition)); - } else { - System.out.println("Retry " + topicPartition + " at position " + position); + private static VerificationResult verifyAllTransactionFinished(final String kafka) { + final Properties txnProps = createConsumerPropsWithByteDeserializer(kafka, "verifier"); + txnProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.toString()); + try (final KafkaConsumer consumer = new KafkaConsumer<>(txnProps)) { + // Get all output topics except "data" (which is the input topic) + final String[] outputTopics = Arrays.stream(NUMERIC_VALUE_TOPICS) + .filter(topic -> !topic.equals("data")) + .toArray(String[]::new); + + final List partitions = getAllPartitions(consumer, outputTopics); + consumer.assign(partitions); + consumer.seekToEnd(partitions); + for (final TopicPartition tp : partitions) { + System.out.println(tp + " at position " + consumer.position(tp)); + } + final Properties consumerProps = createConsumerPropsWithByteDeserializer(kafka, "consumer-uncommitted"); + + final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; + try (final KafkaConsumer consumerUncommitted = new KafkaConsumer<>(consumerProps)) { + while (!partitions.isEmpty() && System.currentTimeMillis() < maxWaitTime) { + consumer.seekToEnd(partitions); + final Map topicEndOffsets = consumerUncommitted.endOffsets(partitions); + + final java.util.Iterator iterator = partitions.iterator(); + while (iterator.hasNext()) { + final TopicPartition topicPartition = iterator.next(); + final long position = consumer.position(topicPartition); + + if (position == topicEndOffsets.get(topicPartition)) { + iterator.remove(); + System.out.println("Removing " + topicPartition + " at position " + position); + } else if (position > topicEndOffsets.get(topicPartition)) { + return new VerificationResult(false, "Offset for partition " + topicPartition + " is larger than topic endOffset: " + position + " > " + topicEndOffsets.get(topicPartition)); + } else { + System.out.println("Retry " + topicPartition + " at position " + position); + } } + sleep(1000L); } - sleep(1000L); } - } - if (!partitions.isEmpty()) { - return new VerificationResult(false, "Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000L) + " sec."); + if (!partitions.isEmpty()) { + return new VerificationResult(false, "Could not read all verification records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 1000L) + " sec."); + } + return new VerificationResult(true, "All transactions finished successfully"); + } catch (final Exception e) { + e.printStackTrace(System.err); + System.out.println("FAILED"); + return new VerificationResult(false, "Transaction verification failed: " + e.getMessage()); } - return new VerificationResult(true, "All transactions finished successfully"); } } From 7709f3588d5a76ada64055e2a01affa5f834b57d Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Thu, 20 Nov 2025 21:21:06 -0500 Subject: [PATCH 6/6] refactor to lower complexity --- .../kafka/streams/tests/SmokeTestDriver.java | 167 +++++++++++------- 1 file changed, 105 insertions(+), 62 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index cfa0ae4e0005c..d541ecd2b3f48 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -375,97 +375,116 @@ public Number deserialize(final String topic, final byte[] data) { } } - public static VerificationResult verify(final String kafka, - final Map> inputs, - final int maxRecordsPerKey, - final boolean eosEnabled) { - final Properties props = new Properties(); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class); - props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + private static class PollResult { + final Map>>> events; + final int recordsProcessed; + final VerificationResult verificationResult; + + PollResult(final Map>>> events, + final int recordsProcessed, + final VerificationResult verificationResult) { + this.events = events; + this.recordsProcessed = recordsProcessed; + this.verificationResult = verificationResult; + } + } - // Verify all transactions are finished before proceeding with data verification - if (eosEnabled) { - final VerificationResult txnResult; - txnResult = verifyAllTransactionFinished(kafka); + private static VerificationResult preVerifyTransactions(final String kafka, final boolean eosEnabled) { + if (!eosEnabled) { + return null; + } - if (!txnResult.passed()) { - System.err.println("Transaction verification failed: " + txnResult.result()); - System.out.println("FAILED"); - return txnResult; - } + final VerificationResult txnResult = verifyAllTransactionFinished(kafka); + if (!txnResult.passed()) { + System.err.println("Transaction verification failed: " + txnResult.result()); + System.out.println("FAILED"); + return txnResult; } + return null; + } + + private static PollResult pollAndCollect( + final KafkaConsumer consumer, + final Map> inputs, + final int maxRecordsPerKey, + final boolean eosEnabled) { final int recordsGenerated = inputs.size() * maxRecordsPerKey; final Map>>> events = new HashMap<>(); VerificationResult verificationResult = new VerificationResult(false, "no results yet"); final long start = System.currentTimeMillis(); int recordsProcessed = 0; - try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { - final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); - consumer.assign(partitions); - consumer.seekToBeginning(partitions); - final Map processed = - Stream.of(NUMERIC_VALUE_TOPICS) - .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); - - int retry = 0; - while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { - final ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); - if (records.isEmpty() && recordsProcessed >= recordsGenerated) { - verificationResult = verifyAll(inputs, events, false, eosEnabled); - if (verificationResult.passed()) { - break; - } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) { - System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries."); - break; - } else { - System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry); - } + final List partitions = getAllPartitions(consumer, NUMERIC_VALUE_TOPICS); + consumer.assign(partitions); + consumer.seekToBeginning(partitions); + final Map processed = + Stream.of(NUMERIC_VALUE_TOPICS) + .collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0))); + + int retry = 0; + while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6)) { + final ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); + if (records.isEmpty() && recordsProcessed >= recordsGenerated) { + verificationResult = verifyAll(inputs, events, false, eosEnabled); + if (verificationResult.passed()) { + break; + } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) { + System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries."); + break; } else { - System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry."); + System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry); + } + } else { + System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry."); - retry = 0; - for (final ConsumerRecord record : records) { - final String key = record.key(); + retry = 0; + for (final ConsumerRecord record : records) { + final String key = record.key(); - final String topic = record.topic(); - processed.get(topic).incrementAndGet(); + final String topic = record.topic(); + processed.get(topic).incrementAndGet(); - if (topic.equals("echo")) { - recordsProcessed++; - if (recordsProcessed % 100 == 0) { - System.out.println("Echo records processed = " + recordsProcessed); - } + if (topic.equals("echo")) { + recordsProcessed++; + if (recordsProcessed % 100 == 0) { + System.out.println("Echo records processed = " + recordsProcessed); } - - events.computeIfAbsent(topic, t -> new HashMap<>()) - .computeIfAbsent(key, k -> new LinkedList<>()) - .add(record); } - System.out.println(processed); + events.computeIfAbsent(topic, t -> new HashMap<>()) + .computeIfAbsent(key, k -> new LinkedList<>()) + .add(record); } + + System.out.println(processed); } } - final long finished = System.currentTimeMillis() - start; + return new PollResult(events, recordsProcessed, verificationResult); + } + + private static VerificationResult reportAndFinalize( + final Map> inputs, + final int maxRecordsPerKey, + final long startTime, + final boolean eosEnabled, + final PollResult pollResult) { + final int recordsGenerated = inputs.size() * maxRecordsPerKey; + final long finished = System.currentTimeMillis() - startTime; System.out.println("Verification time=" + finished); System.out.println("-------------------"); System.out.println("Result Verification"); System.out.println("-------------------"); System.out.println("recordGenerated=" + recordsGenerated); - System.out.println("recordProcessed=" + recordsProcessed); + System.out.println("recordProcessed=" + pollResult.recordsProcessed); - if (recordsProcessed > recordsGenerated) { + if (pollResult.recordsProcessed > recordsGenerated) { System.out.println("PROCESSED-MORE-THAN-GENERATED"); - } else if (recordsProcessed < recordsGenerated) { + } else if (pollResult.recordsProcessed < recordsGenerated) { System.out.println("PROCESSED-LESS-THAN-GENERATED"); } - final Map> received = parseRecordsForEchoTopic(events); + final Map> received = parseRecordsForEchoTopic(pollResult.events); boolean success = inputs.equals(received); @@ -479,9 +498,10 @@ public static VerificationResult verify(final String kafka, System.out.println("missedRecords=" + missedCount); } + VerificationResult verificationResult = pollResult.verificationResult; // give it one more try if it's not already passing. if (!verificationResult.passed()) { - verificationResult = verifyAll(inputs, events, true, eosEnabled); + verificationResult = verifyAll(inputs, pollResult.events, true, eosEnabled); } success &= verificationResult.passed(); @@ -491,6 +511,29 @@ public static VerificationResult verify(final String kafka, return verificationResult; } + public static VerificationResult verify(final String kafka, + final Map> inputs, + final int maxRecordsPerKey, + final boolean eosEnabled) { + final VerificationResult txnResult = preVerifyTransactions(kafka, eosEnabled); + if (txnResult != null) { + return txnResult; + } + + final Properties props = new Properties(); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, NumberDeserializer.class); + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + + final long start = System.currentTimeMillis(); + try (final KafkaConsumer consumer = new KafkaConsumer<>(props)) { + final PollResult pollResult = pollAndCollect(consumer, inputs, maxRecordsPerKey, eosEnabled); + return reportAndFinalize(inputs, maxRecordsPerKey, start, eosEnabled, pollResult); + } + } + private static Map> parseRecordsForEchoTopic( final Map>>> events) { return events.containsKey("echo") ?