Skip to content

Fix Kafka metadata for sparse partition recovery#18687

Open
xiangfu0 wants to merge 1 commit into
apache:masterfrom
xiangfu0:fix-kafka-sparse-partition-metadata
Open

Fix Kafka metadata for sparse partition recovery#18687
xiangfu0 wants to merge 1 commit into
apache:masterfrom
xiangfu0:fix-kafka-sparse-partition-metadata

Conversation

@xiangfu0
Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 commented Jun 5, 2026

Summary

  • Fix Kafka stream metadata computation to enumerate actual Kafka partition ids instead of using the current Pinot status list size as the next partition id.
  • Preserve existing partition offsets when Pinot has current status, and fetch a stream offset for Kafka partitions that exist in Kafka but are missing from Pinot LLC metadata.
  • Add regression coverage for sparse current statuses and normal topic expansion in both Kafka 3.0 and Kafka 4.0 providers.

User Manual

No table config change is required for Kafka realtime tables.

After upgrading the controller and Kafka stream plugin code, wait for the scheduled RealtimeSegmentValidationManager run or trigger realtime validation through the existing operational path. If Kafka still has the missing partitions, Pinot can recreate missing consuming segments for partitions that exist in Kafka but no longer have consuming/online segments or latest LLC ZK metadata.

Recovered partitions start from the offset selected by validation for new partition repair, typically the smallest currently available Kafka offset. Data older than Kafka retention cannot be recovered by this repair.

Sample Table Config

Existing Kafka realtime configs continue to work. No new config key is required.

{
  "tableName": "asset",
  "tableType": "REALTIME",
  "ingestionConfig": {
    "streamIngestionConfig": {
      "streamConfigMaps": [
        {
          "streamType": "kafka",
          "stream.kafka.topic.name": "asset",
          "stream.kafka.broker.list": "broker-1:9092,broker-2:9092",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder"
        }
      ]
    }
  }
}

How RealtimeSegmentValidationManager Gets Fixed

RealtimeSegmentValidationManager already has the repair flow needed to recreate missing LLC consuming segments. During realtime validation, it asks PinotLLCRealtimeSegmentManager to compute stream partition group metadata and then runs the existing "set up new partitions if not exist" path.

The bug was that Kafka's no-partition-subset metadata provider did not return the missing partition ids when Pinot's current LLC status list was sparse. For example, if Kafka has partitions 0..7 but Pinot only has statuses for 0,1,3,4,5,6,7, the old SPI default metadata computation returned [0,1,3,4,5,6,7,7]. Partition 2 never reached the validation manager repair loop, so setupNewPartitionGroup() was never called for it.

This PR fixes the Kafka 3.0 and Kafka 4.0 metadata providers so validation receives metadata keyed by actual Kafka partition ids:

  • Fetch the live Kafka partition ids from the stream.
  • Key current Pinot consumption statuses by getStreamPartitionGroupId() instead of by list position.
  • Reuse the existing status endOffset for partitions Pinot still knows about.
  • Fetch a stream offset for partitions that exist in Kafka but are missing from Pinot metadata.

After this change, the same sparse example produces [0,1,2,3,4,5,6,7]. On the next scheduled or manually triggered RealtimeSegmentValidationManager run, the existing repair loop sees the missing ids and recreates consuming segments for them, assuming those partition ids still exist in Kafka.

Operationally, this means the validation manager is fixed by giving it correct Kafka partition metadata. No new validation-manager config or manual ZK metadata surgery is required. Recreated partitions can only start from offsets still available under Kafka retention, so historical data older than retention remains unrecoverable.

Why This Fixes The Issue

Before this change, the Kafka no-partition-subset path delegated to the SPI default implementation. That implementation first copied current Pinot statuses and then added "new" partition ids using:

for (int i = partitionGroupConsumptionStatuses.size(); i < partitionCount; i++)

For sparse statuses, for example Kafka partitions 0..7 with Pinot statuses for 0,1,3,4,5,6,7, this produced [0,1,3,4,5,6,7,7]: partition 2 stayed missing and partition 7 was duplicated in metadata.

The Kafka providers now fetch actual Kafka partition ids and key current statuses by stream partition id, so the metadata list becomes [0,1,2,3,4,5,6,7] and realtime validation can call setupNewPartitionGroup() for the missing id. The same path also preserves the normal topic expansion behavior: if Kafka expands from 0..3 to 0..7, statuses for 0..3 reuse their existing end offsets and new partitions 4..7 fetch start offsets from Kafka.

Test Plan

  • ./mvnw spotless:apply checkstyle:check license:format license:check -pl pinot-controller,pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0,pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0
  • ./mvnw checkstyle:check -pl pinot-controller
  • ./mvnw -pl pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0,pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0 -am -Dtest=KafkaStreamMetadataProviderTest -Dsurefire.failIfNoSpecifiedTests=false test
  • ./mvnw -pl pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0 -Dtest=KafkaPartitionLevelConsumerTest#testComputePartitionGroupMetadataUsesKafkaPartitionIds -Dsurefire.failIfNoSpecifiedTests=false test
  • GitHub Actions: all required PR checks passed on the previous commit; checks will rerun on the latest amended test-only commit.

Known local limitation: running the Kafka 4.0 embedded KafkaPartitionLevelConsumerTest in this workspace is blocked by missing Docker/Testcontainers support (/var/run/docker.sock not found). The non-Docker Kafka 4.0 provider regression passes.

@xiangfu0 xiangfu0 added bug Something is not working as expected kafka Related to Kafka stream connector ingestion Related to data ingestion pipeline labels Jun 5, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes Kafka stream partition metadata computation for realtime validation/recovery when Pinot’s current LLC consumption status list is sparse (missing partition ids). The Kafka 3.0 and 4.0 stream metadata providers now enumerate actual Kafka partition ids, preserving existing offsets where present and fetching offsets from Kafka for partitions missing from Pinot metadata.

Changes:

  • Update Kafka 3.0/4.0 computePartitionGroupMetadata() to build partition metadata from actual Kafka partition ids (not currentStatuses.size()).
  • Preserve existing end offsets for known partitions while fetching stream offsets for missing partitions.
  • Add regression tests for sparse current statuses (Kafka 3.0 and Kafka 4.0), plus partition-level integration coverage.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java Fix metadata computation to enumerate real Kafka partition ids and recover missing partitions correctly.
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java Same fix as Kafka 4.0 provider for sparse-status recovery.
pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProviderTest.java New unit test covering sparse current-status recovery and offset selection.
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProviderTest.java New unit test mirroring Kafka 4.0 sparse-status recovery coverage.
pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/test/java/org/apache/pinot/plugin/stream/kafka40/KafkaPartitionLevelConsumerTest.java Add embedded/provider regression ensuring partition ids come from Kafka, not list size.
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java Same regression coverage for Kafka 3.0 provider.
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java Update comment clarifying the forceGetOffsetFromStream behavior for offset fetching.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.

@xiangfu0 xiangfu0 force-pushed the fix-kafka-sparse-partition-metadata branch from f92c924 to a7ea04e Compare June 5, 2026 04:06
@xiangfu0
Copy link
Copy Markdown
Contributor Author

xiangfu0 commented Jun 5, 2026

Review notes

Traced the change end-to-end (Kafka providers → PinotLLCRealtimeSegmentManager consumers) and cross-checked with an independent reviewer. No CRITICAL/MAJOR issues — the fix is correct, scoped to the real root cause, preserves steady-state behavior, and has a genuine regression test. Three MINOR follow-ups noted below.

Why the fix is correct

The empty-subset branch used to delegate to the SPI default, which enumerates new partitions with for (i = statuses.size(); i < partitionCount; i++) — it treats the status-list count as the next partition id. With a missing low id (a partition that lost all its segments), that yields e.g. [0,1,3,4,5,6,7,7]: partition 2 omitted, 7 duplicated.

The new code keys current statuses by getStreamPartitionGroupId(), enumerates the actual Kafka ids from fetchPartitionIds() (sorted), and per id reuses the status endOffset if present else fetches fetchStreamPartitionOffset(offsetCriteria)[0..7]. The recovered id then flows into setupNewPartitionGroup (PinotLLCRealtimeSegmentManager.java, "Set up new partitions if not exist" loop), so the consuming segment is actually recreated.

One precision note on impact: the manifestation is silent non-recovery, not an NPE. The offset-repair path is guarded by containsKey before selectStartOffset, and the smallest-offset map is built with forceGetOffsetFromStream=true (empty statuses → contiguous [0..N-1], never sparse-broken). So the bug omits the hole id from the new-partition setup loop, which then never runs for it.

Parity / safety verified

  • Steady state unchanged: when every partition has a status, the new path emits the same (id, endOffset) pairs, just sorted by id rather than status-list order. All controller consumers key by partition id, so ordering is irrelevant; no provider connections are opened when there are no holes.
  • Multi-topic: keys on getStreamPartitionGroupId() (raw stream id), matching fetchPartitionIds() space; PartitionGroupMetadataFetcher re-encodes raw→Pinot ids afterward — same contract the old SPI default relied on.
  • forceGetOffsetFromStream: Kafka doesn't override the 5-arg overload, so the repair path still passes an empty list → all partitions fetched from stream. Controller change is comment-only.
  • Resources / imports / concurrency: per-partition providers are try-with-resources closed; required java.util + StreamConsumerFactory* imports present in both 3.0 and 4.0; _partitionIdSubset is read-only; no new shared mutable state.

Minor follow-ups (non-blocking)

  1. Test hygienetestComputePartitionGroupMetadataUsesKafkaPartitionIds (both modules) constructs new KafkaStreamMetadataProvider(...) without try-with-resources, leaking the consumer. The new KafkaStreamMetadataProviderTest does this correctly; worth matching.
  2. Per-partition fetch fan-out — in the force-from-stream / empty-status path, each hole/new partition opens+closes a metadata provider sequentially. Identical to the prior SPI-default cost (not a regression), but since the provider already holds a connected _consumer, a single beginningOffsets(allTopicPartitions) could collapse it to one RPC. Reasonable later optimization.
  3. Vanished-partition parity — the new path emits only ids returned by fetchPartitionIds(), so a partition present in the status list but absent from the live stream is dropped (the old code re-emitted it). Only reachable if a topic is deleted+recreated smaller; treating a non-existent partition as EOL is arguably more correct. Awareness only.

Tests

KafkaStreamMetadataProviderTest is a true regression test — it asserts the old size-based algorithm yields [0,1,3,4,5,6,7,7], then asserts the fix yields [0..7] with the distinguishing offsets (fresh smallest 1002 for the recovered hole vs reused endOffset 13 for an existing partition). Added to both 3.0 and 4.0.

@xiangfu0
Copy link
Copy Markdown
Contributor Author

xiangfu0 commented Jun 5, 2026

Follow-up status:

  1. Test hygiene: addressed in a7ea04e720 by wrapping the new KafkaStreamMetadataProvider instances in KafkaPartitionLevelConsumerTest with try-with-resources for both Kafka 3.0 and 4.0.
  2. Per-partition fetch fan-out: intentionally left as a later optimization because this PR preserves the previous SPI-default cost model and keeps the bug fix scoped.
  3. Vanished-partition parity: intentionally left unchanged; Kafka partitions normally do not shrink, and treating a partition absent from live Kafka metadata as unavailable/end-of-life is acceptable for this scoped recovery fix.

All review threads are resolved/outdated after the latest push.

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jun 5, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 64.50%. Comparing base (fe761d2) to head (56d16e5).
⚠️ Report is 4 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18687      +/-   ##
============================================
+ Coverage     64.48%   64.50%   +0.01%     
  Complexity     1291     1291              
============================================
  Files          3371     3372       +1     
  Lines        208552   208604      +52     
  Branches      32570    32577       +7     
============================================
+ Hits         134483   134550      +67     
+ Misses        63273    63254      -19     
- Partials      10796    10800       +4     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.50% <100.00%> (+0.01%) ⬆️
temurin 64.50% <100.00%> (+0.01%) ⬆️
unittests 64.49% <100.00%> (+0.01%) ⬆️
unittests1 56.91% <ø> (+<0.01%) ⬆️
unittests2 37.13% <100.00%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

@noob-se7en noob-se7en left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pinot only has statuses for 0,1,3,4,5,6,7, the old SPI default metadata computation returned [0,1,3,4,5,6,7,7]. Partition 2 never reached the validation manager repair loop

Is this based on real issue? The logic in RVM is applicable for kafka where new partitions are added upstream, exp: Cases like partition count going from 8 -> 16. In kafka an intermittent partitioin missing is unlikely.

@xiangfu0
Copy link
Copy Markdown
Contributor Author

xiangfu0 commented Jun 5, 2026

Yes, this is based on the real issue we saw, but the missing partition is not missing from Kafka.

Kafka still has the partitions. The sparse state is in Pinot LLC metadata: for some existing Kafka partition ids, Pinot no longer has consuming/online segments or latest LLC metadata in ZK. In that state, RVM should be able to recreate consuming segments because the existing validation flow sets up partition groups that exist in stream metadata but are missing in Pinot metadata.

The bug is that Kafka's no-subset metadata path used the SPI default, which derives new ids from currentStatuses.size(). That works for the normal Kafka partition expansion case, e.g. 8 -> 16, but fails when Pinot's current status list is sparse for existing Kafka ids. Example: Kafka has 0..7, Pinot metadata has 0,1,3,4,5,6,7. The old computation returns 0,1,3,4,5,6,7,7, so partition 2 never reaches RVM's setupNewPartitionGroup path.

This PR does not assume Kafka partitions disappear intermittently. It asks Kafka for the actual live partition ids and gives RVM correct metadata for partitions that still exist in Kafka but are missing from Pinot metadata.

@xiangfu0 xiangfu0 force-pushed the fix-kafka-sparse-partition-metadata branch from a7ea04e to 56d16e5 Compare June 5, 2026 21:52
@xiangfu0
Copy link
Copy Markdown
Contributor Author

xiangfu0 commented Jun 5, 2026

Added a focused Kafka provider regression for the normal topic expansion case in 56d16e540c.

The new test covers Kafka reporting partitions 0..7 while Pinot has contiguous current statuses for 0..3. It verifies the provider returns 0..7, reuses existing end offsets for 0..3, and fetches stream offsets for new partitions 4..7. Added for both Kafka 3.0 and Kafka 4.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something is not working as expected ingestion Related to data ingestion pipeline kafka Related to Kafka stream connector

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants