New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1837] Fix NPE in KafkaIO writer. #2369
Conversation
KafkaIO.writer()...values() does not require user to set key coder since the key always null. Validation passes, but it results in an NPE at runtime when the writer is tries to instantiates the producer. Set key coder to 'NullOnlyCoder'.
+R: @jkff |
Refer to this link for build results (access rights to CI server needed): |
@@ -1376,7 +1371,7 @@ public void teardown() { | |||
// Set custom kafka serializers. We can not serialize user objects then pass the bytes to | |||
// producer. The key and value objects are used in kafka Partitioner interface. | |||
// This does not matter for default partitioner in Kafka as it uses just the serialized | |||
// key bytes to pick a partition. But are making sure user's custom partitioner would work | |||
// key bytes to pick a partition. But making sure user's custom partitioner would work |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the last sentence, in the original nor rephrased version, can you rephrase it some more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Expanded it more.
The whole custom kafka serializers to wrap user supplied coders would be moot point once we remove use of coders in KafkaIO.
@@ -1231,7 +1228,7 @@ public void close() throws IOException { | |||
* collections of values rather thank {@link KV}s. | |||
*/ | |||
public PTransform<PCollection<V>, PDone> values() { | |||
return new KafkaValueWrite<>(toBuilder().setValueOnly(true).build()); | |||
return new KafkaValueWrite<>(withKeyCoder(new NullOnlyCoder<K>()).toBuilder().build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that before this PR, .values() never worked at all? In that case, that means a test was missing - please add such a test. Either way, the test changes that you added seem unrelated to this fix, am I misunderstanding something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this fix, .values()
worked only if .withKeyCoder()
was also specified, (when real KafkaProducer is involved). Unit tests use a mock producer that don't don't instantiate the serializers the way KafkaProducer. I have updated KafkaIOTests so that we instantiate these serializers. IOW testValuesSink
now fails without this fix.
But we should not need a key coder (since the transform's input is just values). The validation passed, but it caused a check failure when KafkaProducer tried to create the serializers.
@Override | ||
public Producer<Integer, Long> apply(Map<String, Object> config) { | ||
|
||
// Make sure the config is correctly set up for serializers. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change? Is it relevant to the NPE fix, or is it separate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This causes NPE without the fix. We are instantiating the serializers the same way that a KafkaProducer would.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mock producer that we are using does not use config to create serializers.
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but please squash the commits.
KafkaIO.writer()...values()
does not require user to set key coder since the key always null.Validation passes, but it results in an NPE at runtime when the writer is
tries to instantiates the producer. Set key coder to 'NullOnlyCoder'.
Updated ProducerFactoryFn in KafkaIOTest to instantiate serializers to ensure config is set correctly. The sink tests fail without this fix.