diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 7880cbc9e27fe..e47ed11d9cf24 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -254,7 +254,6 @@ public static Read read() { public static Write write() { return new AutoValue_KafkaIO_Write.Builder() .setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES) - .setValueOnly(false) .build(); } @@ -1159,7 +1158,6 @@ public abstract static class Write extends PTransform @Nullable abstract String getTopic(); @Nullable abstract Coder getKeyCoder(); @Nullable abstract Coder getValueCoder(); - abstract boolean getValueOnly(); abstract Map getProducerConfig(); @Nullable abstract SerializableFunction, Producer> getProducerFactoryFn(); @@ -1171,7 +1169,6 @@ abstract static class Builder { abstract Builder setTopic(String topic); abstract Builder setKeyCoder(Coder keyCoder); abstract Builder setValueCoder(Coder valueCoder); - abstract Builder setValueOnly(boolean valueOnly); abstract Builder setProducerConfig(Map producerConfig); abstract Builder setProducerFactoryFn( SerializableFunction, Producer> fn); @@ -1231,7 +1228,7 @@ public Write withProducerFactoryFn( * collections of values rather thank {@link KV}s. */ public PTransform, PDone> values() { - return new KafkaValueWrite<>(toBuilder().setValueOnly(true).build()); + return new KafkaValueWrite<>(withKeyCoder(new NullOnlyCoder()).toBuilder().build()); } @Override @@ -1245,9 +1242,7 @@ public void validate(PCollection> input) { checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), "Kafka bootstrap servers should be set"); checkNotNull(getTopic(), "Kafka topic should be set"); - if (!getValueOnly()) { - checkNotNull(getKeyCoder(), "Key coder should be set"); - } + checkNotNull(getKeyCoder(), "Key coder should be set"); checkNotNull(getValueCoder(), "Value coder should be set"); } @@ -1376,7 +1371,7 @@ public void teardown() { // Set custom kafka serializers. We can not serialize user objects then pass the bytes to // producer. The key and value objects are used in kafka Partitioner interface. // This does not matter for default partitioner in Kafka as it uses just the serialized - // key bytes to pick a partition. But are making sure user's custom partitioner would work + // key bytes to pick a partition. But making sure user's custom partitioner would work // as expected. this.producerConfig = new HashMap<>(spec.getProducerConfig()); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 1897127ac2964..d1696d06f45ac 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -71,9 +71,12 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; import org.hamcrest.collection.IsIterableContainingInAnyOrder; import org.joda.time.Instant; import org.junit.Rule; @@ -728,8 +731,21 @@ public void flush() { private static class ProducerFactoryFn implements SerializableFunction, Producer> { + @SuppressWarnings("unchecked") @Override public Producer apply(Map config) { + + // Make sure the config is correctly set up for serializers. + Utils.newInstance( + ((Class) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) + .asSubclass(Serializer.class) + ).configure(config, true); + + Utils.newInstance( + ((Class) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) + .asSubclass(Serializer.class) + ).configure(config, false); + return MOCK_PRODUCER; } }