New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-5798] Add support to write to multiple topics with KafkaIO #7371
[BEAM-5798] Add support to write to multiple topics with KafkaIO #7371
Conversation
R: @rangadi |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a couple of comments.
So multiple topics were not supported in #7052 ?. This is follow up to it?
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriter.java
Show resolved
Hide resolved
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
Outdated
Show resolved
Hide resolved
6af59c9
to
20045f4
Compare
@rangadi Thanks for review. Yes, multiple output topics were not supported, I decided to split these tasks. Now, topic name from |
20045f4
to
ef4a288
Compare
I also updated javadoc for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Over all LGTM.
Left a couple of comments.
? spec.getPublishTimestampFunction() | ||
.getTimestamp(record, ctx.timestamp()) | ||
.getMillis() | ||
: System.currentTimeMillis()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to use currentTimeMillis(). We should leave it 'null' such that Kafka client decide.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, fixed
|
||
completionThread.shutdown(); | ||
|
||
// Verify that appropriate messages are written to different Kafka topics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update comment to say it checks for timestamp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@rangadi Thanks, I addressed your comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Merged manually to update the name of the commit and squash the extra commits. |
@rangadi I renamed this PR/issue to say explicitly multiple topics because it does not align exactly with the dynamic destination style APIs in BigQueryIO, TextIO and AvroIO. I think it is probably worth to add a similar pattern here, or at least document how to achieve it if not. I filled BEAM-6385. WDYT ? |
Thanks. Renaming makes sense. |
Use topic name from ProducerRecord as a destination topic
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username
) to look at it.Post-Commit Tests Status (on master branch)