From b91037ccc6e7e7ebdf18bfff18cda419576a77cb Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 18 Jan 2016 13:42:25 -0800 Subject: [PATCH] complete built-in stream aggregate functions --- .../apache/kafka/streams/kstream/KStream.java | 37 ----------- .../kstream/internals/KStreamImpl.java | 63 ++++++++----------- 2 files changed, 27 insertions(+), 73 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 85d51e941c264..36741a8ddf4b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.ProcessorSupplier; -import java.util.Collection; /** * KStream is an abstraction of a stream of key-value pairs. @@ -291,28 +290,6 @@ KTable, Long> sumByKey(KeyValueToLongMapper Serializer keySerializer, Deserializer keyDeserializer); - /** - * Sum extracted integer values of this stream by key on a window basis. - * - * @param valueSelector the class of KeyValueToIntMapper to extract the long integer from value - * @param windows the specification of the aggregation window - */ - KTable, Integer> sumByKey(KeyValueToIntMapper valueSelector, - Windows windows, - Serializer keySerializer, - Deserializer keyDeserializer); - - /** - * Sum extracted double decimal values of this stream by key on a window basis. - * - * @param valueSelector the class of KeyValueToDoubleMapper to extract the long integer from value - * @param windows the specification of the aggregation window - */ - KTable, Double> sumByKey(KeyValueToDoubleMapper valueSelector, - Windows windows, - Serializer keySerializer, - Deserializer keyDeserializer); - /** * Count number of records of this stream by key on a window basis. * @@ -322,18 +299,4 @@ KTable, Long> countByKey(Windows windows, Serializer keySerializer, Deserializer keyDeserializer); - /** - * Get the top-k values of this stream by key on a window basis. - * - * @param k parameter of the top-k computation - * @param valueSelector the class of KeyValueMapper to extract the comparable value - * @param windows the specification of the aggregation window - */ - > KTable, Collection> topKByKey(int k, - KeyValueMapper valueSelector, - Windows windows, - Serializer keySerializer, - Serializer aggValueSerializer, - Deserializer keyDeserializer, - Deserializer aggValueDeserializer); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 7b634dcc20407..691910b6e7f53 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -18,14 +18,14 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.kstream.AggregatorSupplier; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.kstream.KeyValueToDoubleMapper; -import org.apache.kafka.streams.kstream.KeyValueToIntMapper; import org.apache.kafka.streams.kstream.KeyValueToLongMapper; import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.ValueJoiner; @@ -43,7 +43,6 @@ import org.apache.kafka.streams.state.Serdes; import java.lang.reflect.Array; -import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -71,6 +70,8 @@ public class KStreamImpl extends AbstractStream implements KStream KTable, T> aggregateByKey(AggregatorSup // TODO: this agg window operator is only used for casting K to Windowed for // KTableProcessorSupplier, which is a bit awkward and better be removed in the future String aggregateName = topology.newName(AGGREGATE_NAME); - String aggWindowName = topology.newName(WINDOWED_NAME); + String selectName = topology.newName(SELECT_NAME); ProcessorSupplier aggWindowSupplier = new KStreamAggWindow<>(); ProcessorSupplier, Change> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregatorSupplier.get()); @@ -418,8 +419,8 @@ public KTable, T> aggregateByKey(AggregatorSup null); // aggregate the values with the aggregator and local store - topology.addProcessor(aggWindowName, aggWindowSupplier, this.name); - topology.addProcessor(aggregateName, aggregateSupplier, aggWindowName); + topology.addProcessor(selectName, aggWindowSupplier, this.name); + topology.addProcessor(aggregateName, aggregateSupplier, selectName); topology.addStateStore(aggregateStore, aggregateName); // return the KTable representation with the intermediate topic as the sources @@ -427,47 +428,37 @@ public KTable, T> aggregateByKey(AggregatorSup } @Override - public KTable, Long> sumByKey(KeyValueToLongMapper valueSelector, + public KTable, Long> sumByKey(final KeyValueToLongMapper valueSelector, Windows windows, Serializer keySerializer, Deserializer keyDeserializer) { - // TODO - return null; - } - public KTable, Integer> sumByKey(KeyValueToIntMapper valueSelector, - Windows windows, - Serializer keySerializer, - Deserializer keyDeserializer) { - // TODO - return null; + KStream selected = this.map(new KeyValueMapper>() { + @Override + public KeyValue apply(K key, V value) { + return new KeyValue<>(key, valueSelector.apply(key, value)); + } + }); + + return selected.aggregateByKey(new LongSumSupplier(), + windows, + keySerializer, + new LongSerializer(), + keyDeserializer, + new LongDeserializer()); } - public KTable, Double> sumByKey(KeyValueToDoubleMapper valueSelector, - Windows windows, - Serializer keySerializer, - Deserializer keyDeserializer) { - // TODO - return null; - } @Override public KTable, Long> countByKey(Windows windows, Serializer keySerializer, Deserializer keyDeserializer) { - // TODO - return null; - } - @Override - public > KTable, Collection> topKByKey(int k, - KeyValueMapper valueSelector, - Windows windows, - Serializer keySerializer, - Serializer aggValueSerializer, - Deserializer keyDeserializer, - Deserializer aggValueDeserializer) { - // TODO - return null; + return this.aggregateByKey(new CountSupplier(), + windows, + keySerializer, + new LongSerializer(), + keyDeserializer, + new LongDeserializer()); } }