feat: Add StreamMetadataProvider caching to prevent port exhaustion#16623
Closed
xiangfu0 wants to merge 1 commit intoapache:masterfrom
Closed
feat: Add StreamMetadataProvider caching to prevent port exhaustion#16623xiangfu0 wants to merge 1 commit intoapache:masterfrom
xiangfu0 wants to merge 1 commit intoapache:masterfrom
Conversation
Implements comprehensive caching solution for StreamMetadataProvider instances to address port exhaustion issues caused by random client IDs from PR apache#15393. Key improvements: * Add StreamMetadataProviderCacheManager with intelligent caching * Cache providers by table+topic+partition to enable reuse * Add explicit cleanup on recreation to prevent orphan providers * Implement synchronized recreation methods for thread safety * Add shutdown hooks for graceful cleanup on app termination * Update main client classes to use cached providers: - PartitionGroupMetadataFetcher - PinotLLCRealtimeSegmentManager - RealtimeConsumptionRateManager - StreamMetadataProvider.computePartitionGroupMetadata Benefits: * Reduces Kafka connection count and prevents port exhaustion * Maintains unique client ID benefits for failure isolation * Provides automatic recreation on failures for robustness * Ensures proper resource cleanup with no orphaned providers * Backwards compatible with existing code Includes comprehensive unit tests for cache functionality and proper provider lifecycle management.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
PR #15393 introduced unique client IDs to resolve JMX conflicts, but this created too many open Kafka connections/ports for Pinot servers, leading to resource exhaustion on high-traffic clusters.
Solution
Implements comprehensive caching for StreamMetadataProvider instances that:
Key Features
🔧 StreamMetadataProviderCacheManager
🔄 Enhanced StreamConsumerFactory
createCachedStreamMetadataProvider()- Get cached or create new stream providercreateCachedPartitionMetadataProvider()- Get cached or create new partition providerrecreateCachedStreamMetadataProvider()- Force recreation with explicit cleanuprecreateCachedPartitionMetadataProvider()- Force recreation with explicit cleanup🛡️ Orphan Prevention
📊 Updated Client Classes
Testing
Includes comprehensive unit tests:
Benefits
✅ Reduced Port Usage: Reuses existing connections instead of creating new ones
✅ Better Performance: Cached connections avoid establishment overhead
✅ Resource Efficiency: Prevents resource exhaustion on high-traffic clusters
✅ Maintained Reliability: Still recreates connections on failures
✅ Backward Compatibility: Original methods continue to work
Impact
Resolves port exhaustion while maintaining all benefits of unique client IDs from PR #15393. Production-ready with comprehensive error handling and monitoring.
Related: Addresses issues introduced in #15393