[BEAM-351] Add DisplayData to KafkaIO#2111
Conversation
|
R: @dhalperi / @bjchambers |
|
Refer to this link for build results (access rights to CI server needed): |
| @Override | ||
| public void populateDisplayData(DisplayData.Builder builder) { | ||
| super.populateDisplayData(builder); | ||
| builder.add(DisplayData.item("topics", Joiner.on(",").join(getTopics()))); |
There was a problem hiding this comment.
Please make it read 'topic' in common case of single topic.
There was a problem hiding this comment.
Also need to handle the case if the partitions are set manually (Either 'topics' or 'partitions' is set).
There was a problem hiding this comment.
Yeah I missed the partitions or topics part, will do.
Are we sure we want to change the name of the key depending on the number of topics? Shouldn't display data keys be consistent?
There was a problem hiding this comment.
I haven't the read the contract on the key. Don't know if it is supposed to stay static or not. Will let you guys decide.
| @Override | ||
| public void populateDisplayData(DisplayData.Builder builder) { | ||
| super.populateDisplayData(builder); | ||
| builder.add(DisplayData.item("topics", Joiner.on(",").join(getTopics()))); |
There was a problem hiding this comment.
Also need to handle the case if the partitions are set manually (Either 'topics' or 'partitions' is set).
| public void populateDisplayData(DisplayData.Builder builder) { | ||
| super.populateDisplayData(builder); | ||
| builder.add(DisplayData.item("topics", Joiner.on(",").join(getTopics()))); | ||
| for (Map.Entry<String, Object> conf : getConsumerConfig().entrySet()) { |
There was a problem hiding this comment.
Writing entire config might not be that useful. You could use ConsumerConfig.originals(). Among those, the most important one could be at the top:
BOOTSTRAP_SERVERS_CONFIG "bootstrap.servers".
RECEIVE_BUFFER_CONFIG and ENABLE_AUTO_COMMIT_CONFIG could be skipped unless they differ from what we set them to above. (optional)
There was a problem hiding this comment.
I started by picking and choosing configs but figured it could be useful for the user to see which configurations they made. Is this not useful?
There was a problem hiding this comment.
Agree with @rangadi, this is not LOG.DEBUG, I'd go through Kafka's documentation and add here the "important configurations".
Maybe pick the once marked as "high importance" in Kafka documentation.
There was a problem hiding this comment.
So, the consumer properties map is what the user passed to the Read transform builder.
It is then passed to Kafka consumer which joins these properties with Kafka defaults, but this is in a different scope.
So this map won't contain the defaults for each configuration Kafka has but rather just the ones the user specified in KafkaIO.Read#updateConsumerProperties
There was a problem hiding this comment.
@aviemzur, I misread the code. getConsumerConfig() is our own map. Then we just need to exclude the internal configs (not a must, but better I think). I thought this was the map from 'ConsumerConfig()'.
| public void populateDisplayData(DisplayData.Builder builder) { | ||
| super.populateDisplayData(builder); | ||
| builder.addIfNotNull(DisplayData.item("topic", getTopic())); | ||
| for (Map.Entry<String, Object> conf : getProducerConfig().entrySet()) { |
| @Override | ||
| public void populateDisplayData(DisplayData.Builder builder) { | ||
| super.populateDisplayData(builder); | ||
| read.populateDisplayData(builder); |
There was a problem hiding this comment.
We could indicate that metadata is trimmed. May be..
There was a problem hiding this comment.
Yeah I thought of that too. Is that useful?
There was a problem hiding this comment.
I don't think it matters.
IMHO with/without metadata is more for the pipeline author and what he does with the read PCollection. DisplayData for the KafkaIO should be about T which can be KV<K, V> or KafkaRecord<KV<K, V>>
| assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092")); | ||
| assertThat(displayData, hasDisplayItem("value.deserializer", | ||
| org.apache.kafka.common.serialization.ByteArrayDeserializer.class.getName())); | ||
| assertThat(displayData, hasDisplayItem("key.deserializer", |
There was a problem hiding this comment.
oh, yeah, we should drop these two serializers too. They are are implementation details.
There was a problem hiding this comment.
Funny, they are actually marked as "high importance" in Kafka docs..
There was a problem hiding this comment.
They are, but we don't allow users to set them. Users only provide coders.
|
@dhalperi no problem! |
|
Refer to this link for build results (access rights to CI server needed): |
|
LGTM, pending Travis. |
|
@aviemzur can you please rebase and push so Travis would do us the courtesy of running ? thanks! |
8829d01 to
7fd48d4
Compare
|
Rebased on master. |
|
Refer to this link for build results (access rights to CI server needed): |
|
Merging. |
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
[BEAM-<Jira issue #>] Description of pull requestmvn clean verify. (Even better, enableTravis-CI on your fork and ensure the whole test matrix passes).
<Jira issue #>in the title with the actual Jira issuenumber, if there is one.
Individual Contributor License Agreement.