From 1ee7c5f22488b593748ebec4bf78bfa607349835 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Thu, 7 Sep 2017 12:52:56 +0100 Subject: [PATCH 1/5] blah --- .../streams/kstream/internals/KTableImpl.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index aed7fde04bb1..8ebb632c8380 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.ForeachAction; @@ -24,6 +25,7 @@ import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.PrintForeachAction; import org.apache.kafka.streams.kstream.Serialized; @@ -33,7 +35,9 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.internals.KeyValueStoreMaterializer; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; import java.io.FileNotFoundException; import java.io.PrintWriter; @@ -156,11 +160,31 @@ private KTable doFilter(final Predicate predicate, return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null); } + private KTable doFilter(final Predicate predicate, final Materialized> materialized, final boolean filterNot) { + String name = builder.newName(FILTER_NAME); + + KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, materialized.storeName()); + builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); + + final StoreBuilder builder = new KeyValueStoreMaterializer<>(materialized).materialize(); + this.builder.internalTopologyBuilder.addStateStore(builder, name); + + return new KTableImpl<>(this.builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, builder.name(), true); + } + @Override public KTable filter(final Predicate predicate) { return filter(predicate, (String) null); } + @Override + public KTable filter(final Predicate predicate, + final Materialized> materialized) { + Objects.requireNonNull(predicate, "predicate can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); + return doFilter(predicate, materialized, false); + } + @Override public KTable filter(final Predicate predicate, final String queryableStoreName) { @@ -183,6 +207,14 @@ public KTable filterNot(final Predicate predicate) { return filterNot(predicate, (String) null); } + @Override + public KTable filterNot(final Predicate predicate, + final Materialized> materialized) { + Objects.requireNonNull(predicate, "predicate can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); + return doFilter(predicate, materialized, true); + } + @Override public KTable filterNot(final Predicate predicate, final String queryableStoreName) { @@ -224,6 +256,18 @@ public KTable mapValues(final ValueMapper m return mapValues(mapper, null, (String) null); } + @Override + public KTable mapValues(final ValueMapper mapper, + final Materialized> materialized) { + Objects.requireNonNull(mapper, "mapper can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); + final String name = builder.newName(MAPVALUES_NAME); + final KTableProcessorSupplier processorSupplier = new KTableMapValues<>(this, mapper, materialized.storeName()); + builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); + builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized).materialize(), name); + return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true); + } + @Override public KTable mapValues(final ValueMapper mapper, final Serde valueSerde, From 259127ff3d2222af1db77f1c2dcc57d50d643b02 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Thu, 7 Sep 2017 13:34:43 +0100 Subject: [PATCH 2/5] KTable filter, filterNot, mapValues and Materialized --- docs/streams/developer-guide.html | 9 +- .../apache/kafka/streams/kstream/KTable.java | 139 ++++++++++++- .../kafka/streams/kstream/Materialized.java | 184 ++++++++++++++++++ .../streams/kstream/internals/KTableImpl.java | 32 ++- .../internals/KeyValueStoreMaterializer.java | 52 +++++ .../internals/MaterializedInternal.java | 62 ++++++ .../QueryableStateIntegrationTest.java | 13 +- .../kstream/internals/KTableFilterTest.java | 28 +++ .../kstream/internals/KTableImplTest.java | 22 +++ .../internals/KTableMapValuesTest.java | 9 +- .../KeyValueStoreMaterializerTest.java | 116 +++++++++++ 11 files changed, 645 insertions(+), 21 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html index a140b468733f..8a18b37b809b 100644 --- a/docs/streams/developer-guide.html +++ b/docs/streams/developer-guide.html @@ -708,7 +708,7 @@
KStream<String, Long> stream = ...; - + KTable<String, Long> table = ...; // A filter that selects (keeps) only positive numbers // Java 8+ example, using lambda expressions KStream<String, Long> onlyPositives = stream.filter((key, value) -> value > 0); @@ -721,6 +721,9 @@
@@ -993,6 +996,7 @@
KStream<byte[], String> stream = ...; + KTable<String, String> table = ...; // Java 8+ example, using lambda expressions KStream<byte[], String> uppercased = stream.mapValues(value -> value.toUpperCase()); @@ -1005,6 +1009,9 @@
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 4bc9572840db..06316c867daf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -19,12 +19,14 @@ import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.internals.WindowedSerializer; import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.state.KeyValueStore; @@ -90,6 +92,43 @@ public interface KTable { */ KTable filter(final Predicate predicate); + /** + * Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given + * predicate. + * All records that do not satisfy the predicate are dropped. + * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the + * result {@code KTable}. + * This is a stateless record-by-record operation. + *

+ * Note that {@code filter} for a changelog stream works different to {@link KStream#filter(Predicate) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded + * directly if required (i.e., if there is anything to be deleted). + * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record + * is forwarded. + *

+ * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + *

{@code
+     * KafkaStreams streams = ... // filtering words
+     * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+     * K key = "some-word";
+     * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + *

+ * + * @param predicate a filter {@link Predicate} that is applied to each record + * @param materialized a {@link Materialized} that describes how the {@link StateStore} for this {@code KTable} + * should be materialized + * @return a {@code KTable} that contains only those records that satisfy the given predicate + * @see #filterNot(Predicate, Materialized) + */ + KTable filter(final Predicate predicate, + final Materialized> materialized); + /** * Create a new {@code KTable} that consists of all records of this {@code KTable} which satisfy the given * predicate. @@ -124,8 +163,10 @@ public interface KTable { * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried * (i.e., that would be equivalent to calling {@link KTable#filter(Predicate)}. * @return a {@code KTable} that contains only those records that satisfy the given predicate - * @see #filterNot(Predicate) + * @see #filterNot(Predicate, Materialized) + * @deprecated use {@link #filter(Predicate, Materialized)} */ + @Deprecated KTable filter(final Predicate predicate, final String queryableStoreName); /** @@ -159,8 +200,10 @@ public interface KTable { * @param predicate a filter {@link Predicate} that is applied to each record * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains only those records that satisfy the given predicate - * @see #filterNot(Predicate) + * @see #filterNot(Predicate, Materialized) + * @deprecated use {@link #filter(Predicate, Materialized)} */ + @Deprecated KTable filter(final Predicate predicate, final StateStoreSupplier storeSupplier); /** @@ -185,6 +228,41 @@ public interface KTable { */ KTable filterNot(final Predicate predicate); + /** + * Create a new {@code KTable} that consists all records of this {@code KTable} which do not satisfy the + * given predicate. + * All records that do satisfy the predicate are dropped. + * For each {@code KTable} update the filter is evaluated on the update record to produce an update record for the + * result {@code KTable}. + * This is a stateless record-by-record operation. + *

+ * Note that {@code filterNot} for a changelog stream works different to {@link KStream#filterNot(Predicate) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded + * directly if required (i.e., if there is anything to be deleted). + * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is + * forwarded. + *

+ * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + *

{@code
+     * KafkaStreams streams = ... // filtering words
+     * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+     * K key = "some-word";
+     * V valueForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + *

+ * @param predicate a filter {@link Predicate} that is applied to each record + * @param materialized a {@link Materialized} that describes how the {@link StateStore} for this {@code KTable} + * should be materialized + * @return a {@code KTable} that contains only those records that do not satisfy the given predicate + * @see #filter(Predicate, Materialized) + */ + KTable filterNot(final Predicate predicate, + final Materialized> materialized); /** * Create a new {@code KTable} that consists all records of this {@code KTable} which do not satisfy the * given predicate. @@ -215,8 +293,10 @@ public interface KTable { * @param predicate a filter {@link Predicate} that is applied to each record * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains only those records that do not satisfy the given predicate - * @see #filter(Predicate) + * @see #filter(Predicate, Materialized) + * @deprecated use {@link #filterNot(Predicate, Materialized)} */ + @Deprecated KTable filterNot(final Predicate predicate, final StateStoreSupplier storeSupplier); /** @@ -252,8 +332,10 @@ public interface KTable { * alphanumerics, '.', '_' and '-'. If {@code null} then the results cannot be queried * (i.e., that would be equivalent to calling {@link KTable#filterNot(Predicate)}. * @return a {@code KTable} that contains only those records that do not satisfy the given predicate - * @see #filter(Predicate) + * @see #filter(Predicate, Materialized) + * @deprecated use {@link #filter(Predicate, Materialized)} */ + @Deprecated KTable filterNot(final Predicate predicate, final String queryableStoreName); @@ -291,6 +373,51 @@ public interface KTable { */ KTable mapValues(final ValueMapper mapper); + /** + * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value + * (with possible new type)in the new {@code KTable}. + * For each {@code KTable} update the provided {@link ValueMapper} is applied to the value of the update record and + * computes a new value for it, resulting in an update record for the result {@code KTable}. + * Thus, an input record {@code } can be transformed into an output record {@code }. + * This is a stateless record-by-record operation. + *

+ * The example below counts the number of token of the value string. + *

{@code
+     * KTable inputTable = builder.table("topic");
+     * KTable outputTable = inputTable.mapValue(new ValueMapper {
+     *     Integer apply(String value) {
+     *         return value.split(" ").length;
+     *     }
+     * });
+     * }
+ *

+ * To query the local {@link KeyValueStore} representing outputTable above it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + *

+ *

+ * This operation preserves data co-location with respect to the key. + * Thus, no internal data redistribution is required if a key based operator (like a join) is applied to + * the result {@code KTable}. + *

+ * Note that {@code mapValues} for a changelog stream works different to {@link KStream#mapValues(ValueMapper) + * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records) + * have delete semantics. + * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to + * delete the corresponding record in the result {@code KTable}. + * + * @param mapper a {@link ValueMapper} that computes a new output value + * @param materialized a {@link Materialized} that describes how the {@link StateStore} for this {@code KTable} + * should be materialized + * (i.e., that would be equivalent to calling {@link KTable#mapValues(ValueMapper)}. + * @param valueSerde serializer for new value type + * @param the value type of the result {@code KTable} + * + * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) + */ + KTable mapValues(final ValueMapper mapper, + final Materialized> materialized); /** * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value @@ -335,7 +462,9 @@ public interface KTable { * @param the value type of the result {@code KTable} * * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) + * @deprecated use {@link #mapValues(ValueMapper, Materialized)} */ + @Deprecated KTable mapValues(final ValueMapper mapper, final Serde valueSerde, final String queryableStoreName); /** @@ -377,7 +506,9 @@ public interface KTable { * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @param the value type of the result {@code KTable} * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) + * @deprecated use {@link #mapValues(ValueMapper, Materialized)} */ + @Deprecated KTable mapValues(final ValueMapper mapper, final Serde valueSerde, final StateStoreSupplier storeSupplier); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java new file mode 100644 index 000000000000..190092bf3660 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.StoreSupplier; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; + +import java.util.HashMap; +import java.util.Map; + +/** + * Used to describe how a {@link StateStore} should be Materialized. + * You can either provide a custom {@link StateStore} backend + * through one of the provided methods accepting a supplier or use the default RocksDB backends + * by providing just a store name. + */ +public class Materialized { + protected StoreSupplier storeSupplier; + protected String storeName; + protected Serde valueSerde; + protected Serde keySerde; + protected boolean loggingEnabled = true; + protected boolean cachingEnabled = true; + protected Map topicConfig = new HashMap<>(); + + private Materialized(final StoreSupplier storeSupplier) { + this.storeSupplier = storeSupplier; + } + + private Materialized(final String storeName) { + this.storeName = storeName; + } + + /** + * Copy constructor. + * @param materialized the {@link Materialized} instance to copy. + */ + public Materialized(final Materialized materialized) { + this.storeSupplier = materialized.storeSupplier; + this.storeName = materialized.storeName; + this.keySerde = materialized.keySerde; + this.valueSerde = materialized.valueSerde; + this.loggingEnabled = materialized.loggingEnabled; + this.cachingEnabled = materialized.cachingEnabled; + this.topicConfig = materialized.topicConfig; + } + + /** + * Materialize a {@link StateStore} with the given name. + * + * @param storeName name of the store to materialize + * @param key type of the store + * @param value type of the store + * @param type of the {@link StateStore} + * @return a new {@link Materialized} instance with the given storeName + */ + public static Materialized as(final String storeName) { + return new Materialized<>(storeName); + } + + /** + * Materialize a {@link WindowStore} using the provided {@link WindowBytesStoreSupplier}. + * + * @param supplier the {@link WindowBytesStoreSupplier} used to materialize the store + * @param key type of the store + * @param value type of the store + * @return a new {@link Materialized} instance with the given supplier + */ + public static Materialized> as(final WindowBytesStoreSupplier supplier) { + return new Materialized<>(supplier); + } + + /** + * Materialize a {@link SessionStore} using the provided {@link SessionBytesStoreSupplier}. + * + * @param supplier the {@link SessionBytesStoreSupplier} used to materialize the store + * @param key type of the store + * @param value type of the store + * @return a new {@link Materialized} instance with the given supplier + */ + public static Materialized> as(final SessionBytesStoreSupplier supplier) { + return new Materialized<>(supplier); + } + + /** + * Materialize a {@link KeyValueStore} using the provided {@link KeyValueBytesStoreSupplier}. + * + * @param supplier the {@link KeyValueBytesStoreSupplier} used to materialize the store + * @param key type of the store + * @param value type of the store + * @return a new {@link Materialized} instance with the given supplier + */ + public static Materialized> as(final KeyValueBytesStoreSupplier supplier) { + return new Materialized<>(supplier); + } + + /** + * Set the valueSerde the materialized {@link StateStore} will use. + * + * @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value + * serde from configs will be used + * @return itself + */ + public Materialized withValueSerde(final Serde valueSerde) { + this.valueSerde = valueSerde; + return this; + } + + /** + * Set the keySerde the materialize {@link StateStore} will use. + * @param keySerde the key {@link Serde} to use. If the {@link Serde} is null, then the default key + * serde from configs will be used + * @return itself + */ + public Materialized withKeySerde(final Serde keySerde) { + this.keySerde = keySerde; + return this; + } + + /** + * Indicates that a changelog should be created for the store. The changelog will be created + * with the provided configs. + *

+ * Note: Any unrecognized configs will be ignored. + * @param config any configs that should be applied to the changelog + * @return itself + */ + public Materialized withLoggingEnabled(final Map config) { + loggingEnabled = true; + this.topicConfig = config; + return this; + } + + /** + * Disable change logging for the materialized {@link StateStore}. + * @return itself + */ + public Materialized withLoggingDisabled() { + loggingEnabled = false; + this.topicConfig.clear(); + return this; + } + + /** + * Enable caching for the materialized {@link StateStore}. + * @return itself + */ + public Materialized withCachingEnabled() { + cachingEnabled = true; + return this; + } + + /** + * Disable caching for the materialized {@link StateStore}. + * @return itself + */ + public Materialized withCachingDisabled() { + cachingEnabled = false; + return this; + } + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 8ebb632c8380..fe9ff76722c9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -35,7 +35,6 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; -import org.apache.kafka.streams.processor.internals.KeyValueStoreMaterializer; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -160,16 +159,28 @@ private KTable doFilter(final Predicate predicate, return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null); } - private KTable doFilter(final Predicate predicate, final Materialized> materialized, final boolean filterNot) { + private KTable doFilter(final Predicate predicate, + final MaterializedInternal> materialized, + final boolean filterNot) { String name = builder.newName(FILTER_NAME); - KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, predicate, filterNot, materialized.storeName()); + KTableProcessorSupplier processorSupplier = new KTableFilter<>(this, + predicate, + filterNot, + materialized.storeName()); builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); final StoreBuilder builder = new KeyValueStoreMaterializer<>(materialized).materialize(); this.builder.internalTopologyBuilder.addStateStore(builder, name); - return new KTableImpl<>(this.builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, builder.name(), true); + return new KTableImpl<>(this.builder, + name, + processorSupplier, + this.keySerde, + this.valSerde, + sourceNodes, + builder.name(), + true); } @Override @@ -182,7 +193,7 @@ public KTable filter(final Predicate predicate, final Materialized> materialized) { Objects.requireNonNull(predicate, "predicate can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - return doFilter(predicate, materialized, false); + return doFilter(predicate, new MaterializedInternal<>(materialized), false); } @Override @@ -212,7 +223,7 @@ public KTable filterNot(final Predicate predicate, final Materialized> materialized) { Objects.requireNonNull(predicate, "predicate can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); - return doFilter(predicate, materialized, true); + return doFilter(predicate, new MaterializedInternal<>(materialized), true); } @Override @@ -261,10 +272,15 @@ public KTable mapValues(final ValueMapper m final Materialized> materialized) { Objects.requireNonNull(mapper, "mapper can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); + final MaterializedInternal> materializedInternal + = new MaterializedInternal<>(materialized); final String name = builder.newName(MAPVALUES_NAME); - final KTableProcessorSupplier processorSupplier = new KTableMapValues<>(this, mapper, materialized.storeName()); + final KTableProcessorSupplier processorSupplier = new KTableMapValues<>(this, + mapper, + materializedInternal.storeName()); builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); - builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materialized).materialize(), name); + builder.internalTopologyBuilder.addStateStore(new KeyValueStoreMaterializer<>(materializedInternal).materialize(), + name); return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, true); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java new file mode 100644 index 000000000000..ac6b31320f7c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; + +public class KeyValueStoreMaterializer { + private final MaterializedInternal> materialized; + + public KeyValueStoreMaterializer(final MaterializedInternal> materialized) { + this.materialized = materialized; + } + + public StoreBuilder> materialize() { + KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier(); + if (supplier == null) { + supplier = Stores.inMemoryKeyValueStore(materialized.storeName()); + } + final StoreBuilder> builder = Stores.keyValueStoreBuilder(supplier, + materialized.keySerde(), + materialized.valueSerde()); + + if (materialized.loggingEnabled()) { + builder.withLoggingEnabled(materialized.logConfig()); + } else { + builder.withLoggingDisabled(); + } + + if (materialized.cachingEnabled()) { + builder.withCachingEnabled(); + } + return builder; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java new file mode 100644 index 000000000000..d7ebc65931c4 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.StoreSupplier; + +import java.util.Map; + +public class MaterializedInternal extends Materialized { + + public MaterializedInternal(final Materialized materialized) { + super(materialized); + } + + public String storeName() { + if (storeName != null) { + return storeName; + } + return storeSupplier.name(); + } + + public StoreSupplier storeSupplier() { + return storeSupplier; + } + + public Serde keySerde() { + return keySerde; + } + + public Serde valueSerde() { + return valueSerde; + } + + public boolean loggingEnabled() { + return loggingEnabled; + } + + public Map logConfig() { + return topicConfig; + } + + public boolean cachingEnabled() { + return cachingEnabled; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 2a06ef479911..95b507001588 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreamsTest; import org.apache.kafka.streams.KeyValue; @@ -38,11 +39,13 @@ import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlyWindowStore; @@ -445,8 +448,8 @@ public boolean test(final String key, final Long value) { } }; final KTable t1 = builder.table(streamOne); - final KTable t2 = t1.filter(filterPredicate, "queryFilter"); - t1.filterNot(filterPredicate, "queryFilterNot"); + final KTable t2 = t1.filter(filterPredicate, Materialized.>as("queryFilter")); + t1.filterNot(filterPredicate, Materialized.>as("queryFilterNot")); t2.to(outputTopic); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); @@ -508,7 +511,7 @@ public void shouldBeAbleToQueryMapValuesState() throws Exception { public Long apply(final String value) { return Long.valueOf(value); } - }, Serdes.Long(), "queryMapValues"); + }, Materialized.>as("queryMapValues").withValueSerde(Serdes.Long())); t2.to(Serdes.String(), Serdes.Long(), outputTopic); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); @@ -558,13 +561,13 @@ public boolean test(final String key, final String value) { } }; final KTable t1 = builder.table(streamOne); - final KTable t2 = t1.filter(filterPredicate, "queryFilter"); + final KTable t2 = t1.filter(filterPredicate, Materialized.>as("queryFilter")); final KTable t3 = t2.mapValues(new ValueMapper() { @Override public Long apply(final String value) { return Long.valueOf(value); } - }, Serdes.Long(), "queryMapValues"); + }, Materialized.>as("queryMapValues").withValueSerde(Serdes.Long())); t3.to(Serdes.String(), Serdes.Long(), outputTopic); kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index 335007274c11..a885edd7d8dc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -18,9 +18,12 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; @@ -94,6 +97,7 @@ public boolean test(String key, Integer value) { doTestKTable(builder, table2, table3, topic1); } + @SuppressWarnings("deprecation") @Test public void testQueryableKTable() { final StreamsBuilder builder = new StreamsBuilder(); @@ -118,6 +122,30 @@ public boolean test(String key, Integer value) { doTestKTable(builder, table2, table3, topic1); } + @Test + public void shouldAddQueryableStore() { + final StreamsBuilder builder = new StreamsBuilder(); + + final String topic1 = "topic1"; + + KTable table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName"); + + KTable table2 = table1.filter(new Predicate() { + @Override + public boolean test(String key, Integer value) { + return (value % 2) == 0; + } + }, Materialized.>as("anyStoreNameFilter")); + KTable table3 = table1.filterNot(new Predicate() { + @Override + public boolean test(String key, Integer value) { + return (value % 2) == 0; + } + }); + + doTestKTable(builder, table2, table3, topic1); + } + private void doTestValueGetter(final StreamsBuilder builder, final KTableImpl table2, final KTableImpl table3, diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index f06cc63fd2f0..64ae6de18a56 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -18,15 +18,18 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.internals.SinkNode; import org.apache.kafka.streams.processor.internals.SourceNode; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; @@ -474,4 +477,23 @@ public void shouldNotAllowNullOtherTableOnLeftJoin() { table.leftJoin(null, MockValueJoiner.TOSTRING_JOINER); } + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnFilterWhenMaterializedIsNull() { + table.filter(new Predicate() { + @Override + public boolean test(final String key, final String value) { + return false; + } + }, (Materialized>) null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnFilterNotWhenMaterializedIsNull() { + table.filterNot(new Predicate() { + @Override + public boolean test(final String key, final String value) { + return false; + } + }, (Materialized>) null); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java index 2e7ccad4f1c5..4bfaea6bde6f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java @@ -18,11 +18,14 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.TestUtils; @@ -92,7 +95,7 @@ public void testQueryableKTable() { public Integer apply(CharSequence value) { return value.charAt(0) - 48; } - }, Serdes.Integer(), "anyName"); + }, Materialized.>as("anyName").withValueSerde(Serdes.Integer())); MockProcessorSupplier proc2 = new MockProcessorSupplier<>(); table2.toStream().process(proc2); @@ -249,14 +252,14 @@ public void testQueryableValueGetter() { public Integer apply(String value) { return new Integer(value); } - }, Serdes.Integer(), "anyMapName"); + }, Materialized.>as("anyMapName").withValueSerde(Serdes.Integer())); KTableImpl table3 = (KTableImpl) table2.filter( new Predicate() { @Override public boolean test(String key, Integer value) { return (value % 2) == 0; } - }, "anyFilterName"); + }, Materialized.>as("anyFilterName").withValueSerde(Serdes.Integer())); KTableImpl table4 = (KTableImpl) table1.through(stringSerde, stringSerde, topic2, storeName2); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java new file mode 100644 index 000000000000..21a5d5714e6b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer; +import org.apache.kafka.streams.kstream.internals.MaterializedInternal; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.internals.CachedStateStore; +import org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore; +import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; +import org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore; +import org.apache.kafka.streams.state.internals.WrappedStateStore; +import org.easymock.EasyMock; +import org.hamcrest.CoreMatchers; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.hamcrest.core.IsNot.not; + + +public class KeyValueStoreMaterializerTest { + + @Test + public void shouldCreateBuilderThatBuildsMeteredStoreWithCachingAndLoggingEnabled() { + final MaterializedInternal> materialized + = new MaterializedInternal<>(Materialized.>as("store")); + final KeyValueStoreMaterializer materializer = new KeyValueStoreMaterializer<>(materialized); + final StoreBuilder> builder = materializer.materialize(); + final KeyValueStore store = builder.build(); + final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore(); + final StateStore logging = caching.wrappedStore(); + assertThat(store, instanceOf(MeteredKeyValueBytesStore.class)); + assertThat(caching, instanceOf(CachedStateStore.class)); + assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class)); + } + + @Test + public void shouldCreateBuilderThatBuildsStoreWithCachingDisabled() { + final MaterializedInternal> materialized + = new MaterializedInternal<>(Materialized.>as("store") + .withCachingDisabled()); + final KeyValueStoreMaterializer materializer = new KeyValueStoreMaterializer<>(materialized); + final StoreBuilder> builder = materializer.materialize(); + final KeyValueStore store = builder.build(); + final WrappedStateStore logging = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore(); + assertThat(logging, instanceOf(ChangeLoggingKeyValueBytesStore.class)); + } + + @Test + public void shouldCreateBuilderThatBuildsStoreWithLoggingDisabled() { + final MaterializedInternal> materialized + = new MaterializedInternal<>(Materialized.>as("store") + .withLoggingDisabled()); + final KeyValueStoreMaterializer materializer = new KeyValueStoreMaterializer<>(materialized); + final StoreBuilder> builder = materializer.materialize(); + final KeyValueStore store = builder.build(); + final WrappedStateStore caching = (WrappedStateStore) ((WrappedStateStore) store).wrappedStore(); + assertThat(caching, instanceOf(CachedStateStore.class)); + assertThat(caching.wrappedStore(), not(instanceOf(ChangeLoggingKeyValueBytesStore.class))); + } + + @Test + public void shouldCreateBuilderThatBuildsStoreWithCachingAndLoggingDisabled() { + final MaterializedInternal> materialized + = new MaterializedInternal<>(Materialized.>as("store") + .withCachingDisabled() + .withLoggingDisabled()); + final KeyValueStoreMaterializer materializer = new KeyValueStoreMaterializer<>(materialized); + final StoreBuilder> builder = materializer.materialize(); + final KeyValueStore store = builder.build(); + final StateStore wrapped = ((WrappedStateStore) store).wrappedStore(); + assertThat(wrapped, not(instanceOf(CachedStateStore.class))); + assertThat(wrapped, not(instanceOf(ChangeLoggingKeyValueBytesStore.class))); + } + + @Test + public void shouldCreateKeyValueStoreWithTheProvidedInnerStore() { + final KeyValueBytesStoreSupplier supplier = EasyMock.createNiceMock(KeyValueBytesStoreSupplier.class); + final InMemoryKeyValueStore store = new InMemoryKeyValueStore<>("name", Serdes.Bytes(), Serdes.ByteArray()); + EasyMock.expect(supplier.name()).andReturn("name").anyTimes(); + EasyMock.expect(supplier.get()).andReturn(store); + EasyMock.replay(supplier); + + final MaterializedInternal> materialized + = new MaterializedInternal<>(Materialized.as(supplier)); + final KeyValueStoreMaterializer materializer = new KeyValueStoreMaterializer<>(materialized); + final StoreBuilder> builder = materializer.materialize(); + final KeyValueStore built = builder.build(); + final StateStore inner = ((WrappedStateStore) built).inner(); + + assertThat(inner, CoreMatchers.equalTo(store)); + } + +} \ No newline at end of file From ee5462da81968ad0ec661a43a9f1a935fe4b1119 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Thu, 7 Sep 2017 18:57:01 +0100 Subject: [PATCH 3/5] address some comments --- docs/streams/developer-guide.html | 4 ++-- .../java/org/apache/kafka/streams/kstream/Materialized.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html index 8a18b37b809b..e8b1d278ed6f 100644 --- a/docs/streams/developer-guide.html +++ b/docs/streams/developer-guide.html @@ -995,8 +995,8 @@

- KStream<byte[], String> stream = ...; - KTable<String, String> table = ...; + KStream<byte[], String> stream = ...; + KTable<String, String> table = ...; // Java 8+ example, using lambda expressions KStream<byte[], String> uppercased = stream.mapValues(value -> value.toUpperCase()); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java index 190092bf3660..15b0aad2f1ce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java @@ -31,7 +31,7 @@ import java.util.Map; /** - * Used to describe how a {@link StateStore} should be Materialized. + * Used to describe how a {@link StateStore} should be materialized. * You can either provide a custom {@link StateStore} backend * through one of the provided methods accepting a supplier or use the default RocksDB backends * by providing just a store name. @@ -142,7 +142,7 @@ public Materialized withKeySerde(final Serde keySerde) { /** * Indicates that a changelog should be created for the store. The changelog will be created * with the provided configs. - *

+ *

* Note: Any unrecognized configs will be ignored. * @param config any configs that should be applied to the changelog * @return itself From a2184d1ce92f833efc9b60715dd542a0679e5b6a Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 8 Sep 2017 08:42:48 +0100 Subject: [PATCH 4/5] address comments --- docs/streams/developer-guide.html | 2 +- .../org/apache/kafka/streams/kstream/KTable.java | 13 +++++++------ .../internals/KeyValueStoreMaterializer.java | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html index e8b1d278ed6f..0e8fe2a14dba 100644 --- a/docs/streams/developer-guide.html +++ b/docs/streams/developer-guide.html @@ -996,7 +996,7 @@

KStream<byte[], String> stream = ...; - KTable<String, String> table = ...; + KTable<String, String> table = ...; // Java 8+ example, using lambda expressions KStream<byte[], String> uppercased = stream.mapValues(value -> value.toUpperCase()); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 06316c867daf..2571ac19d398 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -118,10 +119,11 @@ public interface KTable { * } * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to * query the value of the key on a parallel running instance of your Kafka Streams application. + * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}. *

* * @param predicate a filter {@link Predicate} that is applied to each record - * @param materialized a {@link Materialized} that describes how the {@link StateStore} for this {@code KTable} + * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} * should be materialized * @return a {@code KTable} that contains only those records that satisfy the given predicate * @see #filterNot(Predicate, Materialized) @@ -254,9 +256,10 @@ KTable filter(final Predicate predicate, * } * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to * query the value of the key on a parallel running instance of your Kafka Streams application. + * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}. *

* @param predicate a filter {@link Predicate} that is applied to each record - * @param materialized a {@link Materialized} that describes how the {@link StateStore} for this {@code KTable} + * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} * should be materialized * @return a {@code KTable} that contains only those records that do not satisfy the given predicate * @see #filter(Predicate, Materialized) @@ -395,7 +398,7 @@ KTable filterNot(final Predicate predicate, * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to * query the value of the key on a parallel running instance of your Kafka Streams application. - *

+ * The store name to query with is specified by {@link Materialized#as(String)} or {@link Materialized#as(KeyValueBytesStoreSupplier)}. *

* This operation preserves data co-location with respect to the key. * Thus, no internal data redistribution is required if a key based operator (like a join) is applied to @@ -408,10 +411,8 @@ KTable filterNot(final Predicate predicate, * delete the corresponding record in the result {@code KTable}. * * @param mapper a {@link ValueMapper} that computes a new output value - * @param materialized a {@link Materialized} that describes how the {@link StateStore} for this {@code KTable} + * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} * should be materialized - * (i.e., that would be equivalent to calling {@link KTable#mapValues(ValueMapper)}. - * @param valueSerde serializer for new value type * @param the value type of the result {@code KTable} * * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java index ac6b31320f7c..1d702f2841dc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java @@ -32,7 +32,7 @@ public KeyValueStoreMaterializer(final MaterializedInternal> materialize() { KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier(); if (supplier == null) { - supplier = Stores.inMemoryKeyValueStore(materialized.storeName()); + supplier = Stores.persistentKeyValueStore(materialized.storeName()); } final StoreBuilder> builder = Stores.keyValueStoreBuilder(supplier, materialized.keySerde(), From 83bef880d7750e32e1766924c33e85709dca1be2 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 8 Sep 2017 18:08:23 +0100 Subject: [PATCH 5/5] make Materialized copy ctor protected --- .../java/org/apache/kafka/streams/kstream/Materialized.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java index 15b0aad2f1ce..fb2e7a644691 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java @@ -57,7 +57,7 @@ private Materialized(final String storeName) { * Copy constructor. * @param materialized the {@link Materialized} instance to copy. */ - public Materialized(final Materialized materialized) { + protected Materialized(final Materialized materialized) { this.storeSupplier = materialized.storeSupplier; this.storeName = materialized.storeName; this.keySerde = materialized.keySerde;