From 5fcc4584d2f325034eb1e6e8708bd7db2dabb08b Mon Sep 17 00:00:00 2001 From: Steven Schlansker Date: Wed, 15 Feb 2017 21:19:15 -0800 Subject: [PATCH 1/2] KAFKA-2740: Add a KStream#peek(ForeachAction) in DSL https://issues.apache.org/jira/browse/KAFKA-4720 Peek is a handy method to have to insert diagnostics that do not affect the stream itself, but some external state such as logging or metrics collection. Author: Steven Schlansker Reviewers: Damian Guy, Matthias J. Sax, Eno Thereska, Guozhang Wang Closes #2493 from stevenschlansker/kafka-4720-peek --- .../apache/kafka/streams/kstream/KStream.java | 14 +++ .../kstream/internals/KStreamImpl.java | 12 +++ .../kstream/internals/KStreamPeek.java | 45 ++++++++++ .../kstream/internals/KStreamPeekTest.java | 85 +++++++++++++++++++ 4 files changed, 156 insertions(+) create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 21135fb23b44..64187e724dc8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -405,6 +405,20 @@ void writeAsText(final String filePath, */ void foreach(final ForeachAction action); + /** + * Perform an action on each record of {@code KStream}. + * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)}). + *

+ * Peek is a non-terminal operation that triggers a side effect (such as logging or statistics collection) + * and returns an unchanged stream. + *

+ * Note that since this operation is stateless, it may execute multiple times for a single record in failure cases. + * + * @param action an action to perform on each record + * @see #process(ProcessorSupplier, String...) + */ + KStream peek(final ForeachAction action); + /** * Creates an array of {@code KStream} from this stream by branching the records in the original stream based on * the supplied predicates. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 0434f06c7fe7..f325dcfaa552 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -57,6 +57,8 @@ public class KStreamImpl extends AbstractStream implements KStream action) { topology.addProcessor(name, new KStreamForeach<>(action), this.name); } + @Override + public KStream peek(final ForeachAction action) { + Objects.requireNonNull(action, "action can't be null"); + final String name = topology.newName(PEEK_NAME); + + topology.addProcessor(name, new KStreamPeek<>(action), this.name); + + return new KStreamImpl<>(topology, name, sourceNodes, repartitionRequired); + } + @Override public KStream through(Serde keySerde, Serde valSerde, String topic) { return through(keySerde, valSerde, null, topic); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java new file mode 100644 index 000000000000..3dc05131316c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPeek.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +class KStreamPeek implements ProcessorSupplier { + + private final ForeachAction action; + + public KStreamPeek(final ForeachAction action) { + this.action = action; + } + + @Override + public Processor get() { + return new KStreamPeekProcessor(); + } + + private class KStreamPeekProcessor extends AbstractProcessor { + @Override + public void process(final K key, final V value) { + action.apply(key, value); + context().forward(key, value); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java new file mode 100644 index 000000000000..48f4b653b8a5 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.test.KStreamTestDriver; +import org.junit.After; +import org.junit.Test; + +public class KStreamPeekTest { + + private final String topicName = "topic"; + + private KStreamTestDriver driver = null; + + @After + public void cleanup() { + if (driver != null) { + driver.close(); + } + } + + @Test + public void shouldObserveStreamElements() { + final KStreamBuilder builder = new KStreamBuilder(); + final KStream stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); + final List> peekObserved = new ArrayList<>(), streamObserved = new ArrayList<>(); + stream.peek(collect(peekObserved)).foreach(collect(streamObserved)); + + driver = new KStreamTestDriver(builder); + final List> expected = new ArrayList<>(); + for (int key = 0; key < 32; key++) { + final String value = "V" + key; + driver.process(topicName, key, value); + expected.add(new KeyValue<>(key, value)); + } + + assertEquals(expected, peekObserved); + assertEquals(expected, streamObserved); + } + + @Test + public void shouldNotAllowNullAction() { + final KStreamBuilder builder = new KStreamBuilder(); + final KStream stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName); + try { + stream.peek(null); + fail("expected null action to throw NPE"); + } catch (NullPointerException expected) { } + } + + private static ForeachAction collect(final List> into) { + return new ForeachAction() { + @Override + public void apply(final K key, final V value) { + into.add(new KeyValue<>(key, value)); + } + }; + } +} From b4d4ebfb04831ff2fb0dff586f789631cfd410dc Mon Sep 17 00:00:00 2001 From: Antony Stubbs Date: Thu, 16 Feb 2017 12:00:35 +0000 Subject: [PATCH 2/2] MINOR: Javadoc typo --- .../main/java/org/apache/kafka/streams/kstream/Transformer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java index 6d18477977e1..59985f1605f7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java @@ -25,7 +25,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; /** - * The {@code Transformer} interface for stateful mapping of an input record to zero, one, or multiple new output + * The {@code Transformer} interface is for stateful mapping of an input record to zero, one, or multiple new output * records (both key and value type can be altered arbitrarily). * This is a stateful record-by-record operation, i.e, {@link #transform(Object, Object)} is invoked individually for * each record of a stream and can access and modify a state that is available beyond a single call of