Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-1837] Fix NPE in KafkaIO writer. #2369

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -254,7 +254,6 @@ public static <K, V> Read<K, V> read() {
public static <K, V> Write<K, V> write() {
return new AutoValue_KafkaIO_Write.Builder<K, V>()
.setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES)
.setValueOnly(false)
.build();
}

Expand Down Expand Up @@ -1159,7 +1158,6 @@ public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>
@Nullable abstract String getTopic();
@Nullable abstract Coder<K> getKeyCoder();
@Nullable abstract Coder<V> getValueCoder();
abstract boolean getValueOnly();
abstract Map<String, Object> getProducerConfig();
@Nullable
abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn();
Expand All @@ -1171,7 +1169,6 @@ abstract static class Builder<K, V> {
abstract Builder<K, V> setTopic(String topic);
abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
abstract Builder<K, V> setValueOnly(boolean valueOnly);
abstract Builder<K, V> setProducerConfig(Map<String, Object> producerConfig);
abstract Builder<K, V> setProducerFactoryFn(
SerializableFunction<Map<String, Object>, Producer<K, V>> fn);
Expand Down Expand Up @@ -1231,7 +1228,7 @@ public Write<K, V> withProducerFactoryFn(
* collections of values rather thank {@link KV}s.
*/
public PTransform<PCollection<V>, PDone> values() {
return new KafkaValueWrite<>(toBuilder().setValueOnly(true).build());
return new KafkaValueWrite<>(withKeyCoder(new NullOnlyCoder<K>()).toBuilder().build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that before this PR, .values() never worked at all? In that case, that means a test was missing - please add such a test. Either way, the test changes that you added seem unrelated to this fix, am I misunderstanding something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this fix, .values() worked only if .withKeyCoder() was also specified, (when real KafkaProducer is involved). Unit tests use a mock producer that don't don't instantiate the serializers the way KafkaProducer. I have updated KafkaIOTests so that we instantiate these serializers. IOW testValuesSink now fails without this fix.

But we should not need a key coder (since the transform's input is just values). The validation passed, but it caused a check failure when KafkaProducer tried to create the serializers.

}

@Override
Expand All @@ -1245,21 +1242,20 @@ public void validate(PCollection<KV<K, V>> input) {
checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
"Kafka bootstrap servers should be set");
checkNotNull(getTopic(), "Kafka topic should be set");
if (!getValueOnly()) {
checkNotNull(getKeyCoder(), "Key coder should be set");
}
checkNotNull(getKeyCoder(), "Key coder should be set");
checkNotNull(getValueCoder(), "Value coder should be set");
}

// set config defaults
private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES =
ImmutableMap.<String, Object>of(
ProducerConfig.RETRIES_CONFIG, 3,
// See comment about custom serializers in KafkaWriter constructor.
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class);

/**
* A set of properties that are not required or don't make sense for our consumer.
* A set of properties that are not required or don't make sense for our producer.
*/
private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Set keyCoder instead",
Expand Down Expand Up @@ -1373,11 +1369,13 @@ public void teardown() {
KafkaWriter(Write<K, V> spec) {
this.spec = spec;

// Set custom kafka serializers. We can not serialize user objects then pass the bytes to
// producer. The key and value objects are used in kafka Partitioner interface.
// Set custom kafka serializers. We do not want to serialize user objects then pass the bytes
// to producer since key and value objects are used in Kafka Partitioner interface.
// This does not matter for default partitioner in Kafka as it uses just the serialized
// key bytes to pick a partition. But are making sure user's custom partitioner would work
// as expected.
// key bytes to pick a partition. But we don't want to limit use of custom partitions.
// We pass key and values objects the user writes directly Kafka and user supplied
// coders to serialize them are invoked inside CoderBasedKafkaSerializer.
// Use case : write all the events for a single session to same Kafka partition.

this.producerConfig = new HashMap<>(spec.getProducerConfig());
this.producerConfig.put(configForKeySerializer(), spec.getKeyCoder());
Expand Down
Expand Up @@ -71,9 +71,12 @@
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.joda.time.Instant;
import org.junit.Rule;
Expand Down Expand Up @@ -728,8 +731,21 @@ public void flush() {
private static class ProducerFactoryFn
implements SerializableFunction<Map<String, Object>, Producer<Integer, Long>> {

@SuppressWarnings("unchecked")
@Override
public Producer<Integer, Long> apply(Map<String, Object> config) {

// Make sure the config is correctly set up for serializers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? Is it relevant to the NPE fix, or is it separate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This causes NPE without the fix. We are instantiating the serializers the same way that a KafkaProducer would.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mock producer that we are using does not use config to create serializers.

Utils.newInstance(
((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
.asSubclass(Serializer.class)
).configure(config, true);

Utils.newInstance(
((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
.asSubclass(Serializer.class)
).configure(config, false);

return MOCK_PRODUCER;
}
}
Expand Down