From a5394b1f92ee2cc6542d38e3e31834bc30917eca Mon Sep 17 00:00:00 2001 From: Sean Policarpio Date: Wed, 20 Dec 2017 23:12:01 +1100 Subject: [PATCH] appends Materialized#with to include the ability to specify the store name as well as the serdes --- .../kafka/streams/kstream/Materialized.java | 21 ++++++++++++++++++- .../streams/kstream/MaterializedTest.java | 19 +++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java index 48dd12e08c313..25d81593764d3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java @@ -153,7 +153,26 @@ public static Materialized> as(final K */ public static Materialized with(final Serde keySerde, final Serde valueSerde) { - return new Materialized((String) null).withKeySerde(keySerde).withValueSerde(valueSerde); + return Materialized.with(null, keySerde, valueSerde); + } + + /** + * Materialize a {@link StateStore} with the provided store name and key and value {@link Serde}s. + * + * @param storeName the name of the underlying {@link KTable} state store; valid characters are ASCII + * @param keySerde the key {@link Serde} to use. If the {@link Serde} is null, then the default key + * serde from configs will be used + * @param valueSerde the value {@link Serde} to use. If the {@link Serde} is null, then the default value + * serde from configs will be used + * @param key type + * @param value type + * @param store type + * @return a new {@link Materialized} instance with the given key and value serdes + */ + public static Materialized with(final String storeName, + final Serde keySerde, + final Serde valueSerde) { + return new Materialized(storeName).withKeySerde(keySerde).withValueSerde(valueSerde); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java index de3e503fa0b77..542f7066ac545 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/MaterializedTest.java @@ -18,11 +18,15 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.SessionBytesStoreSupplier; import org.apache.kafka.streams.state.WindowBytesStoreSupplier; import org.junit.Test; +import static org.junit.Assert.assertTrue; + public class MaterializedTest { @Test @@ -51,4 +55,19 @@ public void shouldThrowNullPointerIfKeyValueBytesStoreSupplierIsNull() { public void shouldThrowNullPointerIfSessionBytesStoreSupplierIsNull() { Materialized.as((SessionBytesStoreSupplier) null); } + + @Test + public void shouldSetStoreNameUsingWith() { + final String expectedName = "some.name"; + final Serde intSerde = new Serdes.IntegerSerde(); + final Serde stringSerde = new Serdes.StringSerde(); + + assertTrue(Materialized.with(expectedName, intSerde, stringSerde).storeName.equals(expectedName)); + assertTrue(Materialized.with(expectedName, intSerde, stringSerde).keySerde.equals(intSerde)); + assertTrue(Materialized.with(expectedName, intSerde, stringSerde).valueSerde.equals(stringSerde)); + + assertTrue(Materialized.with(intSerde, stringSerde).storeName == null); + assertTrue(Materialized.with(intSerde, stringSerde).keySerde.equals(intSerde)); + assertTrue(Materialized.with(intSerde, stringSerde).valueSerde.equals(stringSerde)); + } } \ No newline at end of file