From f6da0be924c6fb657d532ea4cf2b1877354e00d1 Mon Sep 17 00:00:00 2001 From: Michal Borowiecki Date: Wed, 16 Aug 2017 23:21:33 +0100 Subject: [PATCH] KAFKA-5233 follow up --- .../internals/KStreamTransformTest.java | 86 +++++++++++++------ .../apache/kafka/test/KStreamTestDriver.java | 19 +++- 2 files changed, 79 insertions(+), 26 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java index d191891afe96c..5aa7c7d94bbfe 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; import org.junit.Rule; @@ -39,38 +40,73 @@ public class KStreamTransformTest { @Rule public final KStreamTestDriver driver = new KStreamTestDriver(); + private Punctuator punctuator; + + private final TransformerSupplier> transformerSupplier = + new TransformerSupplier>() { + public Transformer> get() { + return new Transformer>() { + + private int total = 0; + + @Override + public void init(final ProcessorContext context) { + punctuator = new Punctuator() { + @Override + public void punctuate(long timestamp) { + context.forward(-1, (int) timestamp); + } + }; + } + + @Override + public KeyValue transform(Number key, Number value) { + total += value.intValue(); + return KeyValue.pair(key.intValue() * 2, total); + } + + @Override + public KeyValue punctuate(long timestamp) { + return KeyValue.pair(-1, (int) timestamp); + } + + @Override + public void close() { + } + }; + } + }; + @Test public void testTransform() { StreamsBuilder builder = new StreamsBuilder(); - TransformerSupplier> transformerSupplier = - new TransformerSupplier>() { - public Transformer> get() { - return new Transformer>() { + final int[] expectedKeys = {1, 10, 100, 1000}; + + MockProcessorSupplier processor = new MockProcessorSupplier<>(); + KStream stream = builder.stream(intSerde, intSerde, topicName); + stream.transform(transformerSupplier).process(processor); + + driver.setUp(builder); + for (int expectedKey : expectedKeys) { + driver.process(topicName, expectedKey, expectedKey * 10); + } - private int total = 0; + driver.punctuate(2, punctuator); + driver.punctuate(3, punctuator); - @Override - public void init(ProcessorContext context) { - } + assertEquals(6, processor.processed.size()); - @Override - public KeyValue transform(Number key, Number value) { - total += value.intValue(); - return KeyValue.pair(key.intValue() * 2, total); - } + String[] expected = {"2:10", "20:110", "200:1110", "2000:11110", "-1:2", "-1:3"}; - @Override - public KeyValue punctuate(long timestamp) { - return KeyValue.pair(-1, (int) timestamp); - } + for (int i = 0; i < expected.length; i++) { + assertEquals(expected[i], processor.processed.get(i)); + } + } - @Override - public void close() { - } - }; - } - }; + @Test @Deprecated + public void testTransformWithDeprecatedPunctuate() { + StreamsBuilder builder = new StreamsBuilder(); final int[] expectedKeys = {1, 10, 100, 1000}; @@ -83,8 +119,8 @@ public void close() { driver.process(topicName, expectedKey, expectedKey * 10); } - driver.punctuate(2); - driver.punctuate(3); + driver.punctuateDeprecated(2); + driver.punctuateDeprecated(3); assertEquals(6, processor.processed.size()); diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 474ef5c96a701..e6c5824b3b2bd 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; @@ -189,7 +190,23 @@ private ProcessorNode sourceNodeByTopicName(final String topicName) { return topicNode; } - public void punctuate(final long timestamp) { + public void punctuate(final long timestamp, Punctuator punctuator) { + final ProcessorNode prevNode = context.currentNode(); + for (final ProcessorNode processor : topology.processors()) { + if (processor.processor() != null) { + context.setRecordContext(createRecordContext(timestamp)); + context.setCurrentNode(processor); + try { + processor.punctuate(timestamp, punctuator); + } finally { + context.setCurrentNode(prevNode); + } + } + } + } + + @Deprecated + public void punctuateDeprecated(final long timestamp) { final ProcessorNode prevNode = context.currentNode(); for (final ProcessorNode processor : topology.processors()) { if (processor.processor() != null) {