New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-6063] KafkaIO: add writing support with ProducerRecord #7052
[BEAM-6063] KafkaIO: add writing support with ProducerRecord #7052
Conversation
R: @rangadi Please, take a look |
LGTM overall. |
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
Show resolved
Hide resolved
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
Show resolved
Hide resolved
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
Outdated
Show resolved
Hide resolved
@Override | ||
public void encode(ProducerRecord<K, V> value, OutputStream outStream) throws IOException { | ||
stringCoder.encode(value.topic(), outStream); | ||
intCoder.encode(value.partition() != null ? value.partition() : Integer.MAX_VALUE, outStream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-1 for partition? Any reason to chose max values for null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think "-1" should work for partition (but not for timestamp)
Is there some better way how to deal with null values in Coder? Does NullableCoder
can help here?
|
||
p.apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata()) | ||
.apply(ParDo.of(new KV2ProducerRecord(topic))) | ||
.setCoder(ProducerRecordCoder.of(VarIntCoder.of(), VarLongCoder.of())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to set a coder here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think yes, otherwise pipeline is not aware which coder to use for ProducerRecord
's. Can we avoid this?
@rangadi Thank you for review! I addressed your comments, please., take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a minor comment for update to TODO comment.
LGTM.
Thanks for the contribution. Please ping any committer once this passes all the test.
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
Outdated
Show resolved
Hide resolved
@@ -58,7 +58,7 @@ public ProducerRecordCoder(Coder<K> keyCoder, Coder<V> valueCoder) { | |||
@Override | |||
public void encode(ProducerRecord<K, V> value, OutputStream outStream) throws IOException { | |||
stringCoder.encode(value.topic(), outStream); | |||
intCoder.encode(value.partition() != null ? value.partition() : Integer.MAX_VALUE, outStream); | |||
intCoder.encode(value.partition() != null ? value.partition() : -1, outStream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your preference : either -1 or INT_MAX is fine with me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep "-1" for now
ea36cda
to
3eb6ca0
Compare
@rangadi I addressed your last comments and also I added a class |
3eb6ca0
to
80b9d79
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Added new transform
WriteRecords
based on KafkaProducerRecord
. API of oldWrite
transform is kept as it was before but now it usesWriteRecords
under the hood to write data. All internal functionality, which is not visible for user, has been changed to useProducerRecord
instead ofKV
.Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username
) to look at it.Post-Commit Tests Status (on master branch)