Fix incorrect stream partition id for multi-stream realtime consumption#17953
Conversation
|
Good catch, thanks for fixing |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #17953 +/- ##
============================================
- Coverage 63.21% 63.20% -0.02%
Complexity 1525 1525
============================================
Files 3194 3194
Lines 193239 193239
Branches 29706 29706
============================================
- Hits 122161 122133 -28
- Misses 61494 61518 +24
- Partials 9584 9588 +4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR fixes multi-stream realtime consumption on the server by ensuring the consuming state uses the upstream streamPartitionId (derived from the Pinot partitionGroupId) instead of incorrectly defaulting streamPartitionId = partitionGroupId when initializing consumption status.
Changes:
- Update
RealtimeSegmentDataManagerto pass_streamPartitionIdintoPartitionGroupConsumptionStatusduring initialization.
| _partitionGroupConsumptionStatus = | ||
| new PartitionGroupConsumptionStatus(_partitionGroupId, llcSegmentName.getSequenceNumber(), | ||
| new PartitionGroupConsumptionStatus(_partitionGroupId, _streamPartitionId, llcSegmentName.getSequenceNumber(), | ||
| _streamPartitionMsgOffsetFactory.create(_segmentZKMetadata.getStartOffset()), |
There was a problem hiding this comment.
This fixes a multi-stream bug, but there’s no unit test exercising the multi-stream branch where partitionGroupId encodes the stream index (e.g., 10000) and _streamPartitionId is derived via IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId. Consider adding a test that builds a RealtimeSegmentDataManager with a multi-stream IngestionConfig and asserts the constructed PartitionGroupConsumptionStatus carries the derived stream partition id (not the partitionGroupId), so this regression can’t recur.
This PR adds MultiTopicRealtimeClusterIntegrationTest, an integration test that validates realtime ingestion from multiple Kafka topics into a single table using the streamConfigMaps multi-stream configuration. The motivation was that currently there is no E2E integration test for multi-topic realtime tables (I encountered the same bug which was fixed in #17953). The test is parameterized over N topics (default 3, overridable via getNumTopics()). For each topic, it generates CSV records with a mutually exclusive source column and non-overlapping value ranges, then verifies correctness through targeted queries: - Total doc count equals the sum across all topics - GROUP BY source returns exactly N groups with the expected counts - Filter and aggregation queries isolate each topic's data correctly - Value ranges don't leak between topics - Segments exist for all N stream config indices (verified via partition group IDs) - DISTINCT source returns exactly N values - No existing tests covered multi-topic ingestion end-to-end.
…on (#17953) Co-authored-by: leguo2 <leguo2@cisco.com>
Summary
This PR fixes a bug in multi-stream realtime consumption where the server can use the Pinot
partitionGroupIdinstead of the actual upstream
streamPartitionIdwhen creating the consuming state for a segment.The server already derives the correct
_streamPartitionId, but when constructingPartitionGroupConsumptionStatusinRealtimeSegmentDataManager, it previously used the constructor that defaults:streamPartitionId = partitionGroupIdAs a result, the Kafka consumer could incorrectly consume partition
10000instead of partition0.This change updates
RealtimeSegmentDataManagerto explicitly pass the derived_streamPartitionIdwhen creating
PartitionGroupConsumptionStatus.