From ea2897ec26afd84772a96d447fcbe951f140e095 Mon Sep 17 00:00:00 2001 From: Jeyhun Karimov Date: Tue, 5 Jul 2016 14:37:05 +0200 Subject: [PATCH] KAFKA-3825 --- .../kafka/streams/kstream/KGroupedStream.java | 61 +++++++++++++++++-- .../kafka/streams/kstream/KGroupedTable.java | 34 +++++++++++ .../kstream/internals/KGroupedStreamImpl.java | 47 ++++++++++++++ .../kstream/internals/KGroupedTableImpl.java | 44 ++++++++++--- 4 files changed, 173 insertions(+), 13 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index 25fdb3a3b3948..0d880ab652a5c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -17,6 +17,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.processor.StateStoreSupplier; /** * {@link KGroupedStream} is an abstraction of a grouped record stream of key-value pairs @@ -46,7 +47,16 @@ public interface KGroupedStream { KTable reduce(Reducer reducer, String name); - + /** + * Combine values of this stream by the grouped key into a new instance of ever-updating with user defined state store supplier + * @param reducer the instance of {@link Reducer} + * @param storeSupplier the state store supplier {@link StateStoreSupplier} + * @param name the name of the resulted {@link KTable} + * @return a {@link KTable} that contains records with unmodified keys and values that represent the latest (rolling) aggregate for each key + */ + KTable reduce(final Reducer reducer, + final StateStoreSupplier storeSupplier, + final String name); /** * Combine values of this stream by key on a window basis into a new instance of windowed {@link KTable}. * @@ -58,7 +68,18 @@ KTable reduce(Reducer reducer, */ KTable, V> reduce(Reducer reducer, Windows windows); - + /** + * + * @param reducer the instance of {@link Reducer} + * @param windows the specification of the aggregation {@link Windows} + * @param storeSupplier the state store supplier {@link StateStoreSupplier} + * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s + * where each table contains records with unmodified keys and values + * that represent the latest (rolling) aggregate for each key within that window + */ + KTable, V> reduce(Reducer reducer, + Windows windows, + StateStoreSupplier storeSupplier); /** * Aggregate values of this stream by key into a new instance of a {@link KTable}. * @@ -75,6 +96,22 @@ KTable aggregate(Initializer initializer, Serde aggValueSerde, String name); + /** + * + * @param initializer the instance of {@link Initializer} + * @param aggregator the instance of {@link Aggregator} + * @param aggValueSerde aggregate value serdes for materializing the aggregated table, + * if not specified the default serdes defined in the configs will be used + * @param storeSupplier the state store supplier {@link StateStoreSupplier} + * @param name the name of the resulted {@link KTable} + * @param the value type of the resulted {@link KTable} + * @return a {@link KTable} that represents the latest (rolling) aggregate for each key + */ + KTable aggregate(final Initializer initializer, + final Aggregator aggregator, + final Serde aggValueSerde, + final StateStoreSupplier storeSupplier, + final String name); /** * Aggregate values of this stream by key on a window basis into a new instance of windowed {@link KTable}. * @@ -93,8 +130,24 @@ KTable, T> aggregate(Initializer initialize Aggregator aggregator, Windows windows, Serde aggValueSerde); - - + /** + * + * @param initializer the instance of {@link Initializer} + * @param aggregator the instance of {@link Aggregator} + * @param windows the specification of the aggregation {@link Windows} + * @param aggValueSerde aggregate value serdes for materializing the aggregated table, + * if not specified the default serdes defined in the configs will be used + * @param storeSupplier the state store supplier {@link StateStoreSupplier} + * @param the value type of the resulted {@link KTable} + * @return a windowed {@link KTable} which can be treated as a list of {@code KTable}s + * where each table contains records with unmodified keys and values with type {@code T} + * that represent the latest (rolling) aggregate for each key within that window + */ + KTable, T> aggregate(Initializer initializer, + Aggregator aggregator, + Windows windows, + Serde aggValueSerde, + StateStoreSupplier storeSupplier); /** * Count number of records of this stream by key into a new instance of a {@link KTable} * diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java index 2ebad87635402..a2cffad346f18 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.processor.StateStoreSupplier; /** * {@link KGroupedTable} is an abstraction of a grouped changelog stream from a primary-keyed table, @@ -45,6 +46,19 @@ public interface KGroupedTable { KTable reduce(Reducer adder, Reducer subtractor, String name); + /** + * + * @param adder the instance of {@link Reducer} for addition + * @param subtractor the instance of {@link Reducer} for subtraction + * @param storeSupplier the state store supplier {@link StateStoreSupplier} + * @param name the name of the resulted {@link KTable} + * @return a {@link KTable} with the same key and value types as this {@link KGroupedTable}, + * containing aggregated values for each key + */ + KTable reduce(Reducer adder, + Reducer subtractor, + StateStoreSupplier storeSupplier, + String name); /** * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable}. @@ -65,6 +79,26 @@ KTable aggregate(Initializer initializer, Serde aggValueSerde, String name); + /** + * + * @param initializer the instance of {@link Initializer} + * @param adder the instance of {@link Aggregator} for addition + * @param substractor the instance of {@link Aggregator} for subtraction + * @param aggValueSerde value serdes for materializing the aggregated table, + * if not specified the default serdes defined in the configs will be used + * @param storeSupplier the state store supplier {@link StateStoreSupplier} + * @param name the name of the resulted table + * @param the value type of the aggregated {@link KTable} + * @return a {@link KTable} with same key and aggregated value type {@code T}, + * containing aggregated values for each key + */ + KTable aggregate(Initializer initializer, + Aggregator adder, + Aggregator substractor, + Serde aggValueSerde, + StateStoreSupplier storeSupplier, + String name); + /** * Aggregate updating values of this stream by the selected key into a new instance of {@link KTable} * using default serializers and deserializers. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index 18304843187f7..991c5977118f9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -61,6 +61,15 @@ public KTable reduce(final Reducer reducer, keyValueStore(valSerde, name)); } + @Override + public KTable reduce(final Reducer reducer, + final StateStoreSupplier storeSupplier, + final String name) { + return doAggregate( + new KStreamReduce(name, reducer), + REDUCE_NAME, + storeSupplier); + } @SuppressWarnings("unchecked") @Override @@ -73,6 +82,18 @@ public KTable, V> reduce(Reducer reducer, ); } + @SuppressWarnings("unchecked") + @Override + public KTable, V> reduce(Reducer reducer, + Windows windows, + StateStoreSupplier storeSupplier) { + return (KTable, V>) doAggregate( + new KStreamWindowReduce(windows, windows.name(), reducer), + REDUCE_NAME, + storeSupplier + ); + } + @Override public KTable aggregate(final Initializer initializer, final Aggregator aggregator, @@ -84,6 +105,18 @@ public KTable aggregate(final Initializer initializer, keyValueStore(aggValueSerde, name)); } + @Override + public KTable aggregate(final Initializer initializer, + final Aggregator aggregator, + final Serde aggValueSerde, + final StateStoreSupplier storeSupplier, + final String name) { + return doAggregate( + new KStreamAggregate<>(name, initializer, aggregator), + AGGREGATE_NAME, + storeSupplier); + } + @SuppressWarnings("unchecked") @Override public KTable, T> aggregate(final Initializer initializer, @@ -97,6 +130,20 @@ public KTable, T> aggregate(final Initializer< ); } + @SuppressWarnings("unchecked") + @Override + public KTable, T> aggregate(final Initializer initializer, + final Aggregator aggregator, + final Windows windows, + final Serde aggValueSerde, + final StateStoreSupplier storeSupplier) { + return (KTable, T>) doAggregate( + new KStreamWindowAggregate<>(windows, windows.name(), initializer, aggregator), + AGGREGATE_NAME, + storeSupplier + ); + } + @Override public KTable count(final String name) { return aggregate(new Initializer() { diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index 7118bb9eda25c..14baf1d1c3fe1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -66,9 +66,22 @@ public KTable aggregate(Initializer initializer, String name) { ProcessorSupplier> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor); - return doAggregate(aggregateSupplier, aggValueSerde, AGGREGATE_NAME, name); + return doAggregate(aggregateSupplier, aggValueSerde, AGGREGATE_NAME, null, name); } + @Override + public KTable aggregate(Initializer initializer, + Aggregator adder, + Aggregator subtractor, + Serde aggValueSerde, + StateStoreSupplier storeSupplier, + String name) { + + ProcessorSupplier> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor); + return doAggregate(aggregateSupplier, aggValueSerde, AGGREGATE_NAME, storeSupplier, name); + } + + @Override public KTable aggregate(Initializer initializer, Aggregator adder, @@ -81,6 +94,7 @@ public KTable aggregate(Initializer initializer, private KTable doAggregate(ProcessorSupplier> aggregateSupplier, Serde aggValueSerde, String functionName, + StateStoreSupplier storeSupplier, String name) { String sinkName = topology.newName(KStreamImpl.SINK_NAME); String sourceName = topology.newName(KStreamImpl.SOURCE_NAME); @@ -96,12 +110,6 @@ private KTable doAggregate(ProcessorSupplier> aggregateSu ChangedSerializer changedValueSerializer = new ChangedSerializer<>(valueSerializer); ChangedDeserializer changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer); - StateStoreSupplier aggregateStore = Stores.create(name) - .withKeys(keySerde) - .withValues(aggValueSerde) - .persistent() - .build(); - // send the aggregate key-value pairs to the intermediate topic for partitioning topology.addInternalTopic(topic); topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name); @@ -111,7 +119,16 @@ private KTable doAggregate(ProcessorSupplier> aggregateSu // aggregate the values with the aggregator and local store topology.addProcessor(funcName, aggregateSupplier, sourceName); - topology.addStateStore(aggregateStore, funcName); + if (storeSupplier == null) { + StateStoreSupplier aggregateStore = Stores.create(name) + .withKeys(keySerde) + .withValues(aggValueSerde) + .persistent() + .build(); + topology.addStateStore(aggregateStore, funcName); + } else { + topology.addStateStore(storeSupplier, funcName); + } // return the KTable representation with the intermediate topic as the sources return new KTableImpl<>(topology, funcName, aggregateSupplier, Collections.singleton(sourceName)); @@ -122,7 +139,16 @@ public KTable reduce(Reducer adder, Reducer subtractor, String name) { ProcessorSupplier> aggregateSupplier = new KTableReduce<>(name, adder, subtractor); - return doAggregate(aggregateSupplier, valSerde, REDUCE_NAME, name); + return doAggregate(aggregateSupplier, valSerde, REDUCE_NAME, null, name); + } + + @Override + public KTable reduce(Reducer adder, + Reducer subtractor, + StateStoreSupplier storeSupplier, + String name) { + ProcessorSupplier> aggregateSupplier = new KTableReduce<>(name, adder, subtractor); + return doAggregate(aggregateSupplier, valSerde, REDUCE_NAME, storeSupplier, name); } @Override