Skip to content

[feature] Add Kafka microbatch ingestion plugin#17688

Open
udaysagar2177 wants to merge 1 commit intoapache:masterfrom
udaysagar2177:mb
Open

[feature] Add Kafka microbatch ingestion plugin#17688
udaysagar2177 wants to merge 1 commit intoapache:masterfrom
udaysagar2177:mb

Conversation

@udaysagar2177
Copy link

This PR introduces a new streaming ingestion pattern for Apache Pinot - MicroBatch ingestion from Kafka. Instead of processing individual records from Kafka messages, this consumer reads Kafka messages containing JSON protocol references to batch files stored in PinotFS (S3, HDFS, GCS, Azure, etc.) or inline base64-encoded data. See #17331 for motivation.

Design Overview

Wire Protocol (Version 1):

+----------+---------------------------+
| version  | JSON payload bytes        |
| (1 byte) |                           |
+----------+---------------------------+

JSON Payload Examples:

// URI type - reference to file in PinotFS
{"type":"uri","format":"avro","uri":"s3://bucket/batch.avro","numRecords":1000}

// DATA type - inline base64-encoded data
{"type":"data","format":"avro","data":"<base64>","numRecords":100}

Key Components:

  • KafkaMicroBatchConsumerFactory - Factory that creates MicroBatch consumers
  • MicroBatchProtocol / MicroBatchPayloadV1 - Protocol parsing and validation
  • MicroBatchQueueManager - Orchestrates parallel file downloads and batch conversion
  • MicroBatchStreamPartitionMsgOffset - Composite offset (Kafka offset + record offset within batch) enabling mid-batch resume after segment commits
  • MessageBatchReader - Converts downloaded files to MessageBatch<GenericRow> using Pinot's RecordReader

Supported Formats: AVRO, Parquet, JSON

Configuration

{
  "streamConfigs": {
    "streamType": "kafka",
    "stream.kafka.topic.name": "microbatch-topic",
    "stream.kafka.broker.list": "localhost:9092",
    "stream.kafka.consumer.factory.class.name":
      "org.apache.pinot.plugin.stream.microbatch.kafka30.KafkaMicroBatchConsumerFactory",
    "stream.microbatch.kafka.file.fetch.threads": "2"
  }
}

See CONFIGURATION.md for complete documentation including PinotFS setup for S3/HDFS/GCS/Azure.

Testing

  • Unit tests: MicroBatchProtocolTest (30+ test cases), MicroBatchStreamPartitionMsgOffsetFactoryTest, MicroBatchStreamMetadataProviderTest, MicroBatchQueueManagerTest
  • Integration test: KafkaPartitionLevelMicroBatchConsumerTest - Tests with real embedded Kafka, including mid-batch resume scenarios
  • Cluster integration test: MicroBatchRealtimeClusterIntegrationTest - End-to-end test with full Pinot cluster

Compatibility Notes

  • This is a new plugin module - no changes to existing APIs or behavior
  • The composite offset format ({"kmo":X,"mbro":Y}) is specific to MicroBatch consumers and not compatible with standard Kafka consumer offsets
  • supportsOffsetLag() returns false since we cannot accurately estimate lag without knowing record counts in unconsumed batches. We can have it return batch counts if needed.

Open Questions for Reviewers

I’ve added TODOs in the code to highlight open questions for reviewers, please take a look.


Checklist

  • Code compiles and follows Pinot style guidelines
  • Unit tests added and passing
  • Integration tests added and passing
  • Documentation added (CONFIGURATION.md)
  • No new external dependencies introduced

@udaysagar2177 udaysagar2177 changed the title [feature] Add Kafka microbatch ingestion plugin #17331 [feature] Add Kafka microbatch ingestion plugin Feb 12, 2026
@noob-se7en
Copy link
Contributor

Should we consider enabling a consumer to read directly from the upstream object store? This could eliminate the requirement here for users to provision Kafka.

@udaysagar2177
Copy link
Author

@noob-se7en Thanks for the feedback. The current Kafka-based approach remains useful for users who prefer to avoid the operational complexity of streaming commits into Apache Iceberg.

Regarding reading directly from the upstream object store: clarification on the intended mechanism would help. How would Apache Pinot discover new files and track which ones have already been processed?

One possible direction is using Iceberg tables, which provide snapshots, manifests with file metadata, and incremental scans for new files. The existing microbatch framework would remain largely unchanged - file discovery from Iceberg snapshots instead of Kafka, while MicroBatchQueueManager continues handling download and processing. I posted a short PEP describing this Iceberg-based approach here: #17694. Could you clarify whether your suggestion refers to a different model?

@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes ingestion real-time labels Feb 13, 2026
@codecov-commenter
Copy link

codecov-commenter commented Feb 14, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 55.64%. Comparing base (0f93d52) to head (e86f46f).
⚠️ Report is 18 commits behind head on master.

❗ There is a different number of reports uploaded between BASE (0f93d52) and HEAD (e86f46f). Click for more details.

HEAD has 24 uploads less than BASE
Flag BASE (0f93d52) HEAD (e86f46f)
java-21 5 2
unittests 4 2
temurin 10 4
java-11 5 2
integration 6 2
unittests2 2 0
integration1 2 0
custom-integration1 2 0
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17688      +/-   ##
============================================
- Coverage     63.25%   55.64%   -7.62%     
+ Complexity     1499      721     -778     
============================================
  Files          3174     2479     -695     
  Lines        190373   140388   -49985     
  Branches      29089    22366    -6723     
============================================
- Hits         120419    78116   -42303     
+ Misses        60610    55685    -4925     
+ Partials       9344     6587    -2757     
Flag Coverage Δ
custom-integration1 ?
integration 0.00% <ø> (-100.00%) ⬇️
integration1 ?
integration2 0.00% <ø> (ø)
java-11 55.60% <ø> (-7.61%) ⬇️
java-21 55.59% <ø> (-7.64%) ⬇️
temurin 55.64% <ø> (-7.62%) ⬇️
unittests 55.64% <ø> (-7.61%) ⬇️
unittests1 55.64% <ø> (+0.01%) ⬆️
unittests2 ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature ingestion real-time release-notes Referenced by PRs that need attention when compiling the next release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants