From 8f432f47e2aaf2358152bee8330b53b7d8bf0b5a Mon Sep 17 00:00:00 2001 From: Yasuhiro Matsuda Date: Wed, 27 Jan 2016 16:32:16 -0800 Subject: [PATCH] MINOR: remove the init method from Serdes --- .../apache/kafka/streams/state/Serdes.java | 33 +++++++------------ .../state/internals/MeteredKeyValueStore.java | 1 - .../streams/state/internals/RocksDBStore.java | 2 -- 3 files changed, 11 insertions(+), 25 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java index 4e1b05a32d1d3..e1e78afe06899 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Serdes.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.processor.ProcessorContext; public final class Serdes { @@ -63,8 +62,7 @@ static Deserializer deserializer(Class type) { private Deserializer valueDeserializer; /** - * Create a context for serialization using the specified serializers and deserializers, or if any of them are null the - * corresponding {@link ProcessorContext}'s serializer or deserializer, which + * Create a context for serialization using the specified serializers and deserializers which * must match the key and value types used as parameters for this object. * * @param topic the name of the topic @@ -78,31 +76,22 @@ public Serdes(String topic, Serializer keySerializer, Deserializer keyDeserializer, Serializer valueSerializer, Deserializer valueDeserializer) { this.topic = topic; + + if (keySerializer == null) + throw new NullPointerException(); + if (keyDeserializer == null) + throw new NullPointerException(); + if (valueSerializer == null) + throw new NullPointerException(); + if (valueDeserializer == null) + throw new NullPointerException(); + this.keySerializer = keySerializer; this.keyDeserializer = keyDeserializer; this.valueSerializer = valueSerializer; this.valueDeserializer = valueDeserializer; } - /** - * Create a context for serialization using the {@link ProcessorContext}'s serializers and deserializers, which - * must match the key and value types used as parameters for this object. - * - * @param topic the name of the topic - */ - @SuppressWarnings("unchecked") - public Serdes(String topic) { - this(topic, null, null, null, null); - } - - @SuppressWarnings("unchecked") - public void init(ProcessorContext context) { - keySerializer = keySerializer != null ? keySerializer : (Serializer) context.keySerializer(); - keyDeserializer = keyDeserializer != null ? keyDeserializer : (Deserializer) context.keyDeserializer(); - valueSerializer = valueSerializer != null ? valueSerializer : (Serializer) context.valueSerializer(); - valueDeserializer = valueDeserializer != null ? valueDeserializer : (Deserializer) context.valueDeserializer(); - } - public Deserializer keyDeserializer() { return keyDeserializer; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index d5fe44a58e836..6dee4c705853a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -88,7 +88,6 @@ public void init(ProcessorContext context) { this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush"); this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore"); - serialization.init(context); this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null; // register and possibly restore the state from the logs diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index dea7e0b7750f5..b324ff1c05ce5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -89,8 +89,6 @@ public RocksDBStore(String name, Serdes serdes) { } public void init(ProcessorContext context) { - serdes.init(context); - this.context = context; this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name); this.db = openDB(this.dbDir, this.options, TTL_SECONDS);