New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1837] Fix NPE in KafkaIO writer. #2369
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,9 +71,12 @@ | |
import org.apache.kafka.clients.consumer.OffsetResetStrategy; | ||
import org.apache.kafka.clients.producer.MockProducer; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.common.PartitionInfo; | ||
import org.apache.kafka.common.TopicPartition; | ||
import org.apache.kafka.common.serialization.Serializer; | ||
import org.apache.kafka.common.utils.Utils; | ||
import org.hamcrest.collection.IsIterableContainingInAnyOrder; | ||
import org.joda.time.Instant; | ||
import org.junit.Rule; | ||
|
@@ -728,8 +731,21 @@ public void flush() { | |
private static class ProducerFactoryFn | ||
implements SerializableFunction<Map<String, Object>, Producer<Integer, Long>> { | ||
|
||
@SuppressWarnings("unchecked") | ||
@Override | ||
public Producer<Integer, Long> apply(Map<String, Object> config) { | ||
|
||
// Make sure the config is correctly set up for serializers. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this change? Is it relevant to the NPE fix, or is it separate? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. This causes NPE without the fix. We are instantiating the serializers the same way that a KafkaProducer would. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The mock producer that we are using does not use config to create serializers. |
||
Utils.newInstance( | ||
((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) | ||
.asSubclass(Serializer.class) | ||
).configure(config, true); | ||
|
||
Utils.newInstance( | ||
((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) | ||
.asSubclass(Serializer.class) | ||
).configure(config, false); | ||
|
||
return MOCK_PRODUCER; | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean that before this PR, .values() never worked at all? In that case, that means a test was missing - please add such a test. Either way, the test changes that you added seem unrelated to this fix, am I misunderstanding something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this fix,
.values()
worked only if.withKeyCoder()
was also specified, (when real KafkaProducer is involved). Unit tests use a mock producer that don't don't instantiate the serializers the way KafkaProducer. I have updated KafkaIOTests so that we instantiate these serializers. IOWtestValuesSink
now fails without this fix.But we should not need a key coder (since the transform's input is just values). The validation passed, but it caused a check failure when KafkaProducer tried to create the serializers.