-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Fix: Handle kafka topic not found exception in fetchPartitionCount fu… #17088
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…nction gracefully
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR addresses a critical availability issue where a failure to fetch partition counts (commonly triggered by deleted Kafka topics) would halt ingestion for an entire multi-topic table. The fix isolates failures by catching exceptions in fetchPartitionCount, allowing ingestion to continue on all valid topics while partitions for the failed topic remain in a consuming state.
Key changes:
- Added exception handling in
StreamMetadataProvider.computePartitionGroupMetadata()to catch failures when fetching partition counts - When partition count cannot be fetched, the method now returns existing partition metadata instead of propagating the exception
- Added comprehensive test coverage for the new exception handling behavior
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java |
Added try-catch block around fetchPartitionCount() call with fallback to existing partitions and warning log |
pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataProviderTest.java |
New test file covering exception scenarios including empty statuses and different exception types |
| LOGGER.warn("Failed to fetch partition count for stream config: {}. Skipping stream and using" | ||
| + "existing partitions only. Error: {}", streamConfig.getTopicName(), e.getMessage(), e); |
Copilot
AI
Oct 28, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log message is split across two lines with a string concatenation using +. This should be written as a single continuous string literal for better readability and to avoid the concatenation operator.
| LOGGER.warn("Failed to fetch partition count for stream config: {}. Skipping stream and using" | |
| + "existing partitions only. Error: {}", streamConfig.getTopicName(), e.getMessage(), e); | |
| LOGGER.warn("Failed to fetch partition count for stream config: {}. Skipping stream and using existing partitions only. Error: {}", streamConfig.getTopicName(), e.getMessage(), e); |
| for (PartitionGroupConsumptionStatus currentPartitionGroupConsumptionStatus : partitionGroupConsumptionStatuses) { | ||
| existingPartitionGroupMetadataList.add( | ||
| new PartitionGroupMetadata(currentPartitionGroupConsumptionStatus.getStreamPartitionGroupId(), | ||
| currentPartitionGroupConsumptionStatus.getEndOffset())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this partitionGroupConsumptionStatuses updated?
Is it possible this endOffset is lower than the previous commit segment end offset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a comment for it. endOffset is only set as start offset if the last consuming segment commited.
It will be equal to previous commit segment end offset
|
|
||
| int partitionCount; | ||
| try { | ||
| partitionCount = fetchPartitionCount(timeoutMillis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this part is processing a single stream. If we want to handle the case of multiple topics with one topic deleted, we should handle it on the caller side instead.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #17088 +/- ##
=============================================
- Coverage 63.17% 33.58% -29.60%
+ Complexity 1421 726 -695
=============================================
Files 3104 3104
Lines 183247 183366 +119
Branches 28088 28098 +10
=============================================
- Hits 115774 61590 -54184
- Misses 58502 116692 +58190
+ Partials 8971 5084 -3887
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Summary
Resolves Issue 17045
This PR fixes an issue where an exception in fetchPartitionCount, often caused by a deleted topic, would halt ingestion for an entire multi-topic table. By catching this exception, the failure is now isolated. Partitions for the deleted topic will get stuck in a "consuming" state, but ingestion from all other valid topics will continue unaffected.
Testing
Tested on a pinot table. Validated single incorrect topic doesn't stop table ingestion completely. This table contains 2 topics - adaptive-authn-gateway & adaptive-authn-gatewy. adaptive-authn-gatewy is a invalid kafka topic.
Validated logs are still getting ingested for adaptive-authn-gateway.

Warning message is logged for topic - adaptive-authn-gatewy
