New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-11325] ReadFromKafkaDoFn should stop reading when topic/partition is removed or marked as stopped #13710
Conversation
2acbf8a
to
1435e98
Compare
Thanks, I'll take a look! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM in general, I left several comments, ptal.
Also, please, add Jira ID to commit message as a prefix.
Don't you think it will make sense to add a IT that test this functionality against "real" (see KafkaIOIT
) Kafka cluster as well?
* A custom {@link SerializableFunction} that determines whether the {@link ReadFromKafkaDoFn} | ||
* should stop reading from the given {@link TopicPartition}. | ||
*/ | ||
public ReadSourceDescriptors<K, V> withCheckStopReadingFn( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add an example to the main KafkaIO
Javadoc how to use this functionality to make it more clear for users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, will do. I'll also add support to commit transform together in this PR for reverting safety.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added more related javadoc around KafkaIO
.
Thinking about the commit transform again. I don't think additional changes required here because:
- For stopped TopicPartition, we still want to commit the offset
- For deleted TopicPartition, we should get exceptions from Kafka Consumer API, which has been handled in the Commit transform.
I'll merge this PR if all these look good to you : )
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
Show resolved
Hide resolved
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
Show resolved
Hide resolved
Run Java KafkaIO Performance Test |
The IT failed on pulling sdk harness docker image. |
Run Java KafkaIO Performance Test |
I think |
1435e98
to
2afe9a4
Compare
It seems like KafkaIOIT has been failing for more than 1 month. The failure is the pipeline fails to pull java sdk harness docker image from gcr. It looks like an issue of test configuration. |
…on is removed or marked as stopped.
2afe9a4
to
e8eed0b
Compare
Sounds good to me. |
Run Java KafkaIO Performance Test |
Do you have an idea why it got broken and how to fix it? This IT is very valuable since it runs tests agains different Kafka versions and, therefore, test the compatibility. Other than that, I think we can merge this PR. |
Yeah I'll explore the failure today. Thanks for your quick review! |
For design details, please refer to the doc here: https://docs.google.com/document/d/1FU3GxVRetHPLVizP3Mdv6mP5tpjZ3fd99qNjUI5DT5k/edit#
Next step is to add support of starting reading from new added TopicPartitions.
r: @dpmills @aromanenko-dev
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.