From 9a4c0fde08d14bc7c0bf6f9eeea7d420f41df9e5 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Mon, 11 Sep 2017 11:52:53 +0100 Subject: [PATCH 1/2] add Materialized to KTable join --- .../apache/kafka/streams/kstream/KTable.java | 269 +++++++++++++++++- .../streams/kstream/internals/KTableImpl.java | 88 +++++- .../KTableKTableJoinIntegrationTest.java | 33 ++- .../kstream/internals/KTableImplTest.java | 24 +- 4 files changed, 388 insertions(+), 26 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 2571ac19d398e..efbee67f90291 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -1280,6 +1280,81 @@ KGroupedTable groupBy(final KeyValueMapper KTable join(final KTable other, final ValueJoiner joiner); + /** + * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * The result is an ever updating {@code KTable} that represents the current (i.e., processing time) result + * of the join. + *

+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the current (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + *

+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided + * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + *

+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded + * directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + *

+ * Input records with {@code null} key will be dropped and no join computation is performed. + *

+ * Example: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
thisKTablethisStateotherKTableotherStateresult update record
<K1:A><K1:A>
<K1:A><K1:b><K1:b><K1:ValueJoiner(A,b)>
<K1:C><K1:C><K1:b><K1:ValueJoiner(C,b)>
<K1:C><K1:null><K1:null>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized + * @param the value type of the other {@code KTable} + * @param the value type of the result {@code KTable} + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key + * @see #leftJoin(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) + */ + KTable join(final KTable other, + final ValueJoiner joiner, + final Materialized> materialized); /** * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join. * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. @@ -1353,9 +1428,11 @@ KTable join(final KTable other, * (i.e., that would be equivalent to calling {@link KTable#join(KTable, ValueJoiner)}. * @return a {@code KTable} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key - * @see #leftJoin(KTable, ValueJoiner) - * @see #outerJoin(KTable, ValueJoiner) + * @see #leftJoin(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) + * @deprecated use {@link #join(KTable, ValueJoiner, Materialized)} */ + @Deprecated KTable join(final KTable other, final ValueJoiner joiner, final Serde joinSerde, @@ -1430,9 +1507,11 @@ KTable join(final KTable other, * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key - * @see #leftJoin(KTable, ValueJoiner) - * @see #outerJoin(KTable, ValueJoiner) + * @see #leftJoin(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) + * @deprecated use {@link #join(KTable, ValueJoiner, Materialized)} */ + @Deprecated KTable join(final KTable other, final ValueJoiner joiner, final StateStoreSupplier storeSupplier); @@ -1520,6 +1599,89 @@ KTable join(final KTable other, KTable leftJoin(final KTable other, final ValueJoiner joiner); + /** + * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using + * non-windowed left equi join. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce + * an output record (cf. below). + * The result is an ever updating {@code KTable} that represents the current (i.e., processing time) result + * of the join. + *

+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the current (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + *

+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the + * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the + * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue = + * null} to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + *

+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is + * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be + * deleted). + *

+ * Input records with {@code null} key will be dropped and no join computation is performed. + *

+ * Example: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
thisKTablethisStateotherKTableotherStateresult update record
<K1:A><K1:A><K1:ValueJoiner(A,null)>
<K1:A><K1:b><K1:b><K1:ValueJoiner(A,b)>
<K1:null><K1:b><K1:null>
<K1:null>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized + * @param the value type of the other {@code KTable} + * @param the value type of the result {@code KTable} + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * left {@code KTable} + * @see #join(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) + */ + KTable leftJoin(final KTable other, + final ValueJoiner joiner, + final Materialized> materialized); /** * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using * non-windowed left equi join. @@ -1603,7 +1765,9 @@ KTable leftJoin(final KTable other, * left {@code KTable} * @see #join(KTable, ValueJoiner) * @see #outerJoin(KTable, ValueJoiner) + * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)} */ + @Deprecated KTable leftJoin(final KTable other, final ValueJoiner joiner, final Serde joinSerde, @@ -1686,9 +1850,11 @@ KTable leftJoin(final KTable other, * @return a {@code KTable} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of * left {@code KTable} - * @see #join(KTable, ValueJoiner) - * @see #outerJoin(KTable, ValueJoiner) + * @see #join(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) + * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)} */ + @Deprecated KTable leftJoin(final KTable other, final ValueJoiner joiner, final StateStoreSupplier storeSupplier); @@ -1775,6 +1941,89 @@ KTable leftJoin(final KTable other, KTable outerJoin(final KTable other, final ValueJoiner joiner); + /** + * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using + * non-windowed outer equi join. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join}, + * all records from both input {@code KTable}s will produce an output record (cf. below). + * The result is an ever updating {@code KTable} that represents the current (i.e., processing time) result + * of the join. + *

+ * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the current (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + *

+ * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the + * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * Additionally, for each record that does not find a corresponding record in the corresponding other + * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the + * corresponding other value to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + *

+ * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly + * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + *

+ * Input records with {@code null} key will be dropped and no join computation is performed. + *

+ * Example: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
thisKTablethisStateotherKTableotherStateresult update record
<K1:A><K1:A><K1:ValueJoiner(A,null)>
<K1:A><K1:b><K1:b><K1:ValueJoiner(A,b)>
<K1:null><K1:b><K1:ValueJoiner(null,b)>
<K1:null><K1:null>
+ * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized + * @param the value type of the other {@code KTable} + * @param the value type of the result {@code KTable} + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * both {@code KTable}s + * @see #join(KTable, ValueJoiner) + * @see #leftJoin(KTable, ValueJoiner) + */ + KTable outerJoin(final KTable other, + final ValueJoiner joiner, + final Materialized> materialized); + /** * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using * non-windowed outer equi join. @@ -1855,9 +2104,11 @@ KTable outerJoin(final KTable other, * @return a {@code KTable} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of * both {@code KTable}s - * @see #join(KTable, ValueJoiner) - * @see #leftJoin(KTable, ValueJoiner) + * @see #join(KTable, ValueJoiner, Materialized) + * @see #leftJoin(KTable, ValueJoiner, Materialized) + * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized)} */ + @Deprecated KTable outerJoin(final KTable other, final ValueJoiner joiner, final Serde joinSerde, @@ -1941,7 +2192,9 @@ KTable outerJoin(final KTable other, * both {@code KTable}s * @see #join(KTable, ValueJoiner) * @see #leftJoin(KTable, ValueJoiner) + * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized)} */ + @Deprecated KTable outerJoin(final KTable other, final ValueJoiner joiner, final StateStoreSupplier storeSupplier); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index d3d6ce2c208c8..26491fb91139d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -534,7 +534,17 @@ public KStream toStream(final KeyValueMapper KTable join(final KTable other, final ValueJoiner joiner) { - return doJoin(other, joiner, false, false, null, null); + return doJoin(other, joiner, null, false, false); + } + + @Override + public KTable join(final KTable other, + final ValueJoiner joiner, + final Materialized> materialized) { + Objects.requireNonNull(other, "other can't be null"); + Objects.requireNonNull(joiner, "joiner can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); + return doJoin(other, joiner, new MaterializedInternal<>(materialized), false, false); } @Override @@ -556,7 +566,14 @@ public KTable join(final KTable other, @Override public KTable outerJoin(final KTable other, final ValueJoiner joiner) { - return doJoin(other, joiner, true, true, null, null); + return doJoin(other, joiner, null, true, true); + } + + @Override + public KTable outerJoin(final KTable other, + final ValueJoiner joiner, + final Materialized> materialized) { + return doJoin(other, joiner, new MaterializedInternal<>(materialized), true, true); } @Override @@ -578,7 +595,18 @@ public KTable outerJoin(final KTable other, @Override public KTable leftJoin(final KTable other, final ValueJoiner joiner) { - return doJoin(other, joiner, true, false, null, null); + return doJoin(other, joiner, null, true, false); + } + + @Override + public KTable leftJoin(final KTable other, + final ValueJoiner joiner, + final Materialized> materialized) { + return doJoin(other, + joiner, + new MaterializedInternal<>(materialized), + true, + false); } @Override @@ -619,8 +647,53 @@ private KTable doJoin(final KTable other, final StateStoreSupplier storeSupplier) { Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); + final String joinMergeName = builder.newName(MERGE_NAME); final String internalQueryableName = storeSupplier == null ? null : storeSupplier.name(); - final Set allSourceNodes = ensureJoinableWith((AbstractStream) other); + final KTable result = buildJoin((AbstractStream) other, + joiner, + leftOuter, + rightOuter, + joinMergeName, + internalQueryableName); + + if (internalQueryableName != null) { + builder.internalTopologyBuilder.addStateStore(storeSupplier, joinMergeName); + } + + return result; + } + + private KTable doJoin(final KTable other, + final ValueJoiner joiner, + final MaterializedInternal> materialized, + final boolean leftOuter, + final boolean rightOuter) { + Objects.requireNonNull(other, "other can't be null"); + Objects.requireNonNull(joiner, "joiner can't be null"); + final String internalQueryableName = materialized == null ? null : materialized.storeName(); + final String joinMergeName = builder.newName(MERGE_NAME); + final KTable result = buildJoin((AbstractStream) other, + joiner, + leftOuter, + rightOuter, + joinMergeName, + internalQueryableName); + + if (materialized != null) { + final StoreBuilder> storeBuilder + = new KeyValueStoreMaterializer<>(materialized).materialize(); + builder.internalTopologyBuilder.addStateStore(storeBuilder, joinMergeName); + } + return result; + } + + private KTable buildJoin(final AbstractStream other, + final ValueJoiner joiner, + final boolean leftOuter, + final boolean rightOuter, + final String joinMergeName, + final String internalQueryableName) { + final Set allSourceNodes = ensureJoinableWith(other); if (leftOuter) { enableSendingOldValues(); @@ -631,7 +704,7 @@ private KTable doJoin(final KTable other, final String joinThisName = builder.newName(JOINTHIS_NAME); final String joinOtherName = builder.newName(JOINOTHER_NAME); - final String joinMergeName = builder.newName(MERGE_NAME); + final KTableKTableAbstractJoin joinThis; final KTableKTableAbstractJoin joinOther; @@ -659,11 +732,6 @@ private KTable doJoin(final KTable other, builder.internalTopologyBuilder.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName); builder.internalTopologyBuilder.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames()); builder.internalTopologyBuilder.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames()); - - if (internalQueryableName != null) { - builder.internalTopologyBuilder.addStateStore(storeSupplier, joinMergeName); - } - return new KTableImpl<>(builder, joinMergeName, joinMerge, allSourceNodes, internalQueryableName, internalQueryableName != null); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java index 949f8be426c2e..1b45711833e55 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -29,8 +30,10 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.test.IntegrationTest; @@ -344,8 +347,15 @@ private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType jo final KTable table2 = builder.table(TABLE_2, TABLE_2); final KTable table3 = builder.table(TABLE_3, TABLE_3); + Materialized> materialized = null; + if (queryableName != null) { + materialized = Materialized.>as(queryableName) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) + .withCachingDisabled(); + } join(join(table1, table2, joinType1, null /* no need to query intermediate result */), table3, - joinType2, queryableName).to(OUTPUT); + joinType2, materialized).to(OUTPUT); return new KafkaStreams(builder.build(), new StreamsConfig(streamsConfig)); } @@ -353,7 +363,7 @@ private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType jo private KTable join(final KTable first, final KTable second, final JoinType joinType, - final String queryableName) { + final Materialized> materialized) { final ValueJoiner joiner = new ValueJoiner() { @Override public String apply(final String value1, final String value2) { @@ -361,13 +371,26 @@ public String apply(final String value1, final String value2) { } }; + switch (joinType) { case INNER: - return first.join(second, joiner, Serdes.String(), queryableName); + if (materialized != null) { + return first.join(second, joiner, materialized); + } else { + return first.join(second, joiner); + } case LEFT: - return first.leftJoin(second, joiner, Serdes.String(), queryableName); + if (materialized != null) { + return first.leftJoin(second, joiner, materialized); + } else { + return first.leftJoin(second, joiner); + } case OUTER: - return first.outerJoin(second, joiner, Serdes.String(), queryableName); + if (materialized != null) { + return first.outerJoin(second, joiner, materialized); + } else { + return first.outerJoin(second, joiner); + } } throw new RuntimeException("Unknown join type."); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 64ae6de18a56a..6ca38b8c8e821 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.internals.SinkNode; import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.streams.state.KeyValueStore; @@ -437,19 +438,21 @@ public void shouldAllowNullStoreInJoin() { table.join(table, MockValueJoiner.TOSTRING_JOINER, null, null); } + @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) public void shouldNotAllowNullStoreSupplierInJoin() { - table.join(table, MockValueJoiner.TOSTRING_JOINER, null); + table.join(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier) null); } + @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) public void shouldNotAllowNullStoreSupplierInLeftJoin() { - table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, null); + table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier) null); } @Test(expected = NullPointerException.class) public void shouldNotAllowNullStoreSupplierInOuterJoin() { - table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, null); + table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier) null); } @Test(expected = NullPointerException.class) @@ -496,4 +499,19 @@ public boolean test(final String key, final String value) { } }, (Materialized>) null); } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnJoinWhenMaterializedIsNull() { + table.join(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnLeftJoinWhenMaterializedIsNull() { + table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() { + table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null); + } } From e5914b16a2956e98840f71c38cbda7c6fff02ae7 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Tue, 12 Sep 2017 10:25:25 +0100 Subject: [PATCH 2/2] address comments --- .../apache/kafka/streams/kstream/KTable.java | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index efbee67f90291..6d1d85d68ec3f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -84,7 +84,7 @@ public interface KTable { * have delete semantics. * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded * directly if required (i.e., if there is anything to be deleted). - * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record + * Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate) a tombstone record * is forwarded. * * @param predicate a filter {@link Predicate} that is applied to each record @@ -106,7 +106,7 @@ public interface KTable { * have delete semantics. * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded * directly if required (i.e., if there is anything to be deleted). - * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record + * Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate) a tombstone record * is forwarded. *

* To query the local {@link KeyValueStore} it must be obtained via @@ -124,7 +124,7 @@ public interface KTable { * * @param predicate a filter {@link Predicate} that is applied to each record * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} - * should be materialized + * should be materialized. Cannot be {@code null} * @return a {@code KTable} that contains only those records that satisfy the given predicate * @see #filterNot(Predicate, Materialized) */ @@ -144,7 +144,7 @@ KTable filter(final Predicate predicate, * have delete semantics. * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded * directly if required (i.e., if there is anything to be deleted). - * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record + * Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate) a tombstone record * is forwarded. *

* To query the local {@link KeyValueStore} it must be obtained via @@ -184,7 +184,7 @@ KTable filter(final Predicate predicate, * have delete semantics. * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded * directly if required (i.e., if there is anything to be deleted). - * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record + * Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate) a tombstone record * is forwarded. *

* To query the local {@link KeyValueStore} it must be obtained via @@ -260,7 +260,7 @@ KTable filter(final Predicate predicate, *

* @param predicate a filter {@link Predicate} that is applied to each record * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} - * should be materialized + * should be materialized. Cannot be {@code null} * @return a {@code KTable} that contains only those records that do not satisfy the given predicate * @see #filter(Predicate, Materialized) */ @@ -412,7 +412,7 @@ KTable filterNot(final Predicate predicate, * * @param mapper a {@link ValueMapper} that computes a new output value * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} - * should be materialized + * should be materialized. Cannot be {@code null} * @param the value type of the result {@code KTable} * * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) @@ -1344,7 +1344,8 @@ KTable join(final KTable other, * * @param other the other {@code KTable} to be joined with this {@code KTable} * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized + * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized. + * Cannot be {@code null} * @param the value type of the other {@code KTable} * @param the value type of the result {@code KTable} * @return a {@code KTable} that contains join-records for each key and values computed by the given @@ -1670,7 +1671,8 @@ KTable leftJoin(final KTable other, * * @param other the other {@code KTable} to be joined with this {@code KTable} * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized + * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized. + * Cannot be {@code null} * @param the value type of the other {@code KTable} * @param the value type of the result {@code KTable} * @return a {@code KTable} that contains join-records for each key and values computed by the given @@ -1763,8 +1765,8 @@ KTable leftJoin(final KTable other, * @return a {@code KTable} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of * left {@code KTable} - * @see #join(KTable, ValueJoiner) - * @see #outerJoin(KTable, ValueJoiner) + * @see #join(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)} */ @Deprecated @@ -2011,7 +2013,8 @@ KTable outerJoin(final KTable other, * * @param other the other {@code KTable} to be joined with this {@code KTable} * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records - * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized + * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized. + * Cannot be {@code null} * @param the value type of the other {@code KTable} * @param the value type of the result {@code KTable} * @return a {@code KTable} that contains join-records for each key and values computed by the given