From a9dac7ef6499312d7e2a333fee80f3363c9f573b Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Thu, 21 Sep 2017 19:16:52 +0100 Subject: [PATCH 1/3] use serdes from materialized in table and globaltable --- .../apache/kafka/streams/StreamsBuilder.java | 6 ++- .../kafka/streams/StreamsBuilderTest.java | 53 +++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index a272ec43c468..48c2731590cb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -429,9 +429,11 @@ public synchronized GlobalKTable globalTable(final String topic, final Materialized> materialized) { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); + final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized); return internalStreamsBuilder.globalTable(topic, - new ConsumedInternal(), - new MaterializedInternal<>(materialized)); + new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), + materializedInternal.valueSerde())), + materializedInternal); } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index dedd157a307a..4ce202b94abc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -16,21 +16,31 @@ */ package org.apache.kafka.streams; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.TestUtils; import org.junit.Rule; import org.junit.Test; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; public class StreamsBuilderTest { @@ -108,6 +118,49 @@ public void testMerge() { assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd"), processorSupplier.processed); } + @Test + public void shouldUseSerdesDefinedInMaterializedToConsumeTable() { + final Map results = new HashMap<>(); + final String topic = "topic"; + final ForeachAction action = new ForeachAction() { + @Override + public void apply(final Long key, final String value) { + results.put(key, value); + } + }; + builder.table(topic, Materialized.>as("store") + .withKeySerde(Serdes.Long()) + .withValueSerde(Serdes.String())) + .toStream().foreach(action); + + driver.setUp(builder, TestUtils.tempDirectory()); + driver.setTime(0L); + driver.process(topic, 1L, "value1"); + driver.process(topic, 2L, "value2"); + driver.flushState(); + final KeyValueStore store = (KeyValueStore) driver.allStateStores().get("store"); + assertThat(store.get(1L), equalTo("value1")); + assertThat(store.get(2L), equalTo("value2")); + assertThat(results.get(1L), equalTo("value1")); + assertThat(results.get(2L), equalTo("value2")); + } + + @Test + public void shouldUseSerdesDefinedInMaterializedToConsumeGlobalTable() { + final String topic = "topic"; + builder.globalTable(topic, Materialized.>as("store") + .withKeySerde(Serdes.Long()) + .withValueSerde(Serdes.String())); + driver.setUp(builder, TestUtils.tempDirectory()); + driver.setTime(0L); + driver.process(topic, 1L, "value1"); + driver.process(topic, 2L, "value2"); + driver.flushState(); + final KeyValueStore store = (KeyValueStore) driver.allStateStores().get("store"); + assertThat(store.get(1L), equalTo("value1")); + assertThat(store.get(2L), equalTo("value2")); + } + @Test(expected = TopologyException.class) public void shouldThrowExceptionWhenNoTopicPresent() throws Exception { builder.stream(Collections.emptyList()); From 688f0d35a6719082a0d1073b88afa063c17bba4c Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 22 Sep 2017 07:20:57 +0100 Subject: [PATCH 2/3] address comments --- .../src/main/java/org/apache/kafka/streams/StreamsBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index 48c2731590cb..c91008666f46 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -432,7 +432,7 @@ public synchronized GlobalKTable globalTable(final String topic, final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized); return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), - materializedInternal.valueSerde())), + materializedInternal.valueSerde())), materializedInternal); } From 85a60a52e268ba244cc5055733fddfeca4afb7aa Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 22 Sep 2017 07:50:46 +0100 Subject: [PATCH 3/3] fix table --- .../main/java/org/apache/kafka/streams/StreamsBuilder.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index c91008666f46..7e746e6a2bad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -301,8 +301,10 @@ public synchronized KTable table(final String topic, final Materialized> materialized) { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(materialized, "materialized can't be null"); + final MaterializedInternal> materializedInternal = new MaterializedInternal<>(materialized); return internalStreamsBuilder.table(topic, - new ConsumedInternal(), + new ConsumedInternal<>(Consumed.with(materializedInternal.keySerde(), + materializedInternal.valueSerde())), new MaterializedInternal<>(materialized)); }