Skip to content

Conversation

@junaiddshaukat
Copy link
Contributor

Summary

PR #34659 introduced a "fail fast" feature using checkState() in GenerateKafkaSourceDescriptor and WatchForKafkaTopicPartitions. However, when Kafka returns null or an empty partition list for a topic, checkState() throws IllegalStateException inside DoFns. Since DoFns automatically retry on exceptions, this causes infinite retries instead of cleanly stopping the pipeline — the opposite of the intended "fail fast" behavior.

Fix

Replaced checkState() assertions with LOG.warn() + continue in both classes. When a topic has no available partitions:

  • A warning is logged so operators can investigate the Kafka configuration
  • The topic is skipped gracefully, allowing the pipeline to continue processing other valid topics
  • No exception is thrown, preventing the infinite retry loop

What I changed

File Change
KafkaIO.java Replaced checkState / if-else branching with null/empty check, LOG.warn, and continue in GenerateKafkaSourceDescriptor.processElement()
WatchForKafkaTopicPartitions.java Replaced checkState with LOG.warn and continue in getAllTopicPartitions(). Added SLF4J Logger.
WatchForKafkaTopicPartitionsTest.java Added 3 unit tests: null partition info, empty partition info, and partial topic availability (one missing, one valid)

Testing

  • All existing WatchForKafkaTopicPartitionsTest tests pass
  • 3 new tests verify the graceful handling:
    • testGetAllTopicPartitionsWithNullPartitionInfo — returns empty list, no exception
    • testGetAllTopicPartitionsWithEmptyPartitionInfo — returns empty list, no exception
    • testGetAllTopicPartitionsSkipsMissingTopics — skips missing topic, returns valid topic's partitions
  • spotlessApply passes (code formatting verified)

Fixes #37449


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Replace checkState() with LOG.warn() in GenerateKafkaSourceDescriptor
and WatchForKafkaTopicPartitions to prevent IllegalStateException from
causing infinite retries when Kafka returns null or empty partition info
for a topic. Topics with missing partitions are now gracefully skipped
with a warning log instead of throwing.
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @junaiddshaukat, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves a critical flaw in KafkaIO's error handling for topic partition discovery. Previously, if Kafka returned no partition information for a topic, an IllegalStateException would be thrown, causing DoFns to enter an infinite retry loop. The changes introduce a more robust mechanism that logs warnings and skips problematic topics, ensuring pipeline resilience and continuous operation even when facing transient or misconfigured Kafka topics.

Highlights

  • Bug Fix: Infinite Retries in KafkaIO: Addresses an issue where checkState() in KafkaIO's DoFns would throw IllegalStateException when Kafka returned null or empty partition lists, leading to infinite retries and pipeline stalls.
  • Graceful Error Handling: Replaced checkState() assertions with LOG.warn() and continue statements. This allows the pipeline to log a warning for topics with no available partitions and gracefully skip them, continuing to process other valid topics.
  • New Unit Tests: Added three new unit tests in WatchForKafkaTopicPartitionsTest.java to specifically verify the graceful handling of null partition information, empty partition information, and scenarios with partial topic availability.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
    • Modified GenerateKafkaSourceDescriptor.processElement() to replace checkState with a null/empty check, LOG.warn, and continue for handling missing partition info.
  • sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitions.java
    • Removed the checkState import.
    • Added an SLF4J Logger instance.
    • Updated getAllTopicPartitions() to replace checkState with LOG.warn and continue when partition information is null or empty.
  • sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/WatchForKafkaTopicPartitionsTest.java
    • Added assertTrue and Collections imports.
    • Introduced testGetAllTopicPartitionsWithNullPartitionInfo to verify handling of null partition info.
    • Added testGetAllTopicPartitionsWithEmptyPartitionInfo to test handling of empty partition lists.
    • Created testGetAllTopicPartitionsSkipsMissingTopics to ensure topics with missing partitions are skipped while others are processed.
Activity
  • The pull request addresses and fixes issue [Bug]: Unhandled exception in KafkaIO SDF #37449.
  • The author has confirmed that the appropriate issue is mentioned in the description.
  • All existing WatchForKafkaTopicPartitionsTest tests pass.
  • Three new tests have been added and pass, verifying graceful handling of various partition info scenarios.
  • spotlessApply has passed, indicating code formatting compliance.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Contributor

github-actions bot commented Feb 9, 2026

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@junaiddshaukat
Copy link
Contributor Author

assign set of reviewers

@github-actions
Copy link
Contributor

github-actions bot commented Feb 9, 2026

Assigning reviewers:

R: @ahmedabu98 for label java.
R: @fozzie15 for label kafka.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: Unhandled exception in KafkaIO SDF

1 participant