Skip to content

Commit

Permalink
STORM-2826: Set key/value deserializer fields when using the convenie…
Browse files Browse the repository at this point in the history
…nce builder methods in KafkaSpoutConfig
  • Loading branch information
srdo committed Nov 21, 2017
1 parent 7f70fc5 commit dc383c4
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 18 deletions.
Expand Up @@ -291,7 +291,7 @@ private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Cla
this.valueDesClazz = valDesClazz;
this.subscription = subscription;
this.translator = new DefaultRecordTranslator<>();

if (keyDesClazz != null) {
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz);
}
Expand Down Expand Up @@ -320,20 +320,17 @@ private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class
// when they change the key/value types.
this.translator = (RecordTranslator<K, V>) builder.translator;
this.retryService = builder.retryService;

if (keyDesClazz != null) {
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz);
}

if (keyDes != null) {
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDes.getClass());
}
if (valueDesClazz != null) {
this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDesClazz);
}
if (valueDes != null) {
} else if (keyDesClazz != null) {
this.kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz);
} else if (valueDes != null) {
this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDes.getClass());
} else if (valueDesClazz != null) {
this.kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDesClazz);
}

this.keyDes = keyDes;
this.keyDesClazz = keyDesClazz;
this.valueDes = valueDes;
Expand All @@ -348,7 +345,7 @@ private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class
*/
@Deprecated
public <NK> Builder<NK, V> setKey(SerializableDeserializer<NK> keyDeserializer) {
return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz);
return new Builder<>(this, keyDeserializer, null, null, null);
}

/**
Expand All @@ -359,7 +356,7 @@ public <NK> Builder<NK, V> setKey(SerializableDeserializer<NK> keyDeserializer)
*/
@Deprecated
public <NK> Builder<NK, V> setKey(Class<? extends Deserializer<NK>> clazz) {
return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
return new Builder<>(this, null, clazz, null, null);
}

/**
Expand All @@ -370,7 +367,7 @@ public <NK> Builder<NK, V> setKey(Class<? extends Deserializer<NK>> clazz) {
*/
@Deprecated
public <NV> Builder<K, NV> setValue(SerializableDeserializer<NV> valueDeserializer) {
return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, null);
return new Builder<>(this, null, null, valueDeserializer, null);
}

/**
Expand All @@ -381,7 +378,7 @@ public <NV> Builder<K, NV> setValue(SerializableDeserializer<NV> valueDeserializ
*/
@Deprecated
public <NV> Builder<K, NV> setValue(Class<? extends Deserializer<NV>> clazz) {
return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
return new Builder<>(this, null, null, null, clazz);
}

/**
Expand Down Expand Up @@ -680,7 +677,7 @@ public KafkaSpoutConfig<K, V> build() {
* @return The new builder
*/
public static Builder<String, String> builder(String bootstrapServers, String... topics) {
return setStringDeserializers(new Builder<String, String>(bootstrapServers, topics));
return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics));
}

/**
Expand All @@ -691,7 +688,7 @@ public static Builder<String, String> builder(String bootstrapServers, String...
* @return The new builder
*/
public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) {
return setStringDeserializers(new Builder<String, String>(bootstrapServers, topics));
return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics));
}

/**
Expand All @@ -702,7 +699,7 @@ public static Builder<String, String> builder(String bootstrapServers, Collectio
* @return The new builder
*/
public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
return setStringDeserializers(new Builder<String, String>(bootstrapServers, topics));
return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics));
}

private static Builder<String, String> setStringDeserializers(Builder<String, String> builder) {
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.storm.kafka.spout;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertEquals;
Expand All @@ -25,9 +26,12 @@
import static org.junit.Assert.assertTrue;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
import org.hamcrest.CoreMatchers;
import org.junit.Test;

public class KafkaSpoutConfigTest {
Expand Down Expand Up @@ -121,4 +125,102 @@ public void testCanConfigureWithExplicitFalseStringAutoCommitMode() {
assertThat("When setting enable auto commit explicitly to false the spout should use the 'at-least-once' processing guarantee",
conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE));
}

@Test
public void testCanGetKeyDeserializerWhenUsingDefaultBuilder() {
KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
.build();

assertThat("When using the default builder methods, the key deserializer should default to StringDeserializer",
conf.getKeyDeserializer(), instanceOf(StringDeserializer.class));
}

@Test
public void testCanGetValueDeserializerWhenUsingDefaultBuilder() {
KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
.build();

assertThat("When using the default builder methods, the value deserializer should default to StringDeserializer",
conf.getValueDeserializer(), instanceOf(StringDeserializer.class));
}

@Test
public void testCanOverrideDeprecatedDeserializerClassWithKafkaProps() {
KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
.setKey(StringDeserializer.class)
.setValue(StringDeserializer.class)
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
.build();

assertThat("The last set key deserializer should be used, regardless of how it is set",
conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
assertThat("The last set value deserializer should be used, regardless of how it is set",
conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
}

private static class SerializableStringDeserializer implements SerializableDeserializer {

private final StringDeserializer delegate = new StringDeserializer();

@Override
public void configure(Map configs, boolean isKey) {
delegate.configure(configs, isKey);
}

@Override
public Object deserialize(String topic, byte[] data) {
return delegate.deserialize(topic, data);
}

@Override
public void close() {
delegate.close();
}
}

@Test
public void testCanOverrideDeprecatedDeserializerInstanceWithKafkaProps() {
KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
.setKey(new SerializableStringDeserializer())
.setValue(new SerializableStringDeserializer())
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
.build();

assertThat("The last set key deserializer should be used, regardless of how it is set",
conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
assertThat("The last set value deserializer should be used, regardless of how it is set",
conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
}

@Test
public void testCanOverrideKafkaPropsWithDeprecatedDeserializerSetter() {
KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
.setKey(new SerializableStringDeserializer())
.setValue(new SerializableStringDeserializer())
.build();

assertThat("The last set key deserializer should be used, regardless of how it is set",
conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class));
assertThat("The last set value deserializer should be used, regardless of how it is set",
conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class));
}

@Test
public void testCanMixOldAndNewDeserializerSetter() {
KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
.setKey(new SerializableStringDeserializer())
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class)
.setValue(new SerializableStringDeserializer())
.build();

assertThat("The last set key deserializer should be used, regardless of how it is set",
conf.getKafkaProps().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(ByteArrayDeserializer.class));
assertThat("The last set value deserializer should be used, regardless of how it is set",
conf.getKafkaProps().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), CoreMatchers.<Object>equalTo(SerializableStringDeserializer.class));
}
}

0 comments on commit dc383c4

Please sign in to comment.