-
Notifications
You must be signed in to change notification settings - Fork 0
Queue Semantics support in Kafka Ingestion
Ref https://github.com/apache/druid/issues/18439
The Core Tension Kafka is a distributed log. Queues (Pub/Sub, RabbitMQ, SQS) are work distribution systems. When users try to use Kafka as a work distribution system, they hit these friction points:
- Scaling workers beyond partition count is impossible
Topic: 10 partitions
Need: 100 workers for burst processing
Kafka consumer group: 90 workers sit idle
Pub/Sub/RabbitMQ/SQS: all 100 workers active
- Rebalancing pauses are unacceptable for elastic workloads
Auto-scaling event: 50 -> 100 workers
Kafka: Triggers rebalancing, all 150 consumers pause for 30-60s
Pub/Sub: Zero pause, new workers start consuming immediately
- Per-message error handling is essential for job queues and Head-of-line blocking kills throughput
Message #50 takes 30s to process (slow DB query)
Kafka: Messages #51-#100 wait behind #50 in same partition
SQS: Messages #51-#100 processed by other workers immediately
Share Groups make Kafka competitive with queue systems for these use cases while retaining Kafka's strengths (log retention, replay, high throughput, multi-consumer patterns).
Use share.acknowledgement.mode=explicit so Druid controls exactly when records
are acknowledged. Records are only ACCEPT-ed after the segment containing them is
atomically registered in the metadata store. This guarantees at-least-once with
no data loss.
KafkaShareConsumer requires single-threaded access. We split the work:
- ConsumerThread: owns KafkaShareConsumer exclusively (poll, ack, commit, RENEW)
- TaskRunnerThread: processes records, builds segments, publishes
Threads communicate via two bounded queues: (to get more control over record lifecyle)
- RecordBuffer: ConsumerThread -> TaskRunner (records to process)
- AckCommandQueue: TaskRunner -> ConsumerThread (ACCEPT/REJECT/RELEASE/COMMIT)
A scheduled executor posts RENEW_ALL commands every lockTimeout * 0.4 seconds.
ConsumerThread renews all in-flight records, preventing lock expiry during slow
segment building. RENEW is O(1) on the broker (timer cancel + insert, no persistence).
Druid has zero record-level deduplication today. With Share Groups, broker redelivers on lock timeout or consumer crash. DedupCache stores published offset ranges per partition in the metadata store, loaded on task startup, updated atomically with segment publish. Any redelivered record found in the cache is immediately ACCEPT-ed and skipped.
INVARIANT 1: ACCEPT only after atomic segment registration in metadata store.
INVARIANT 2: Every polled record must reach exactly one of: ACCEPT, RELEASE, REJECT, or task crash.
INVARIANT 3: RENEW_ALL only touches records still in InFlightTracker (decided records removed first).
Improve the resource utilisation and maintain the high throughput, low latency end to end system

| Interface | Module | Purpose |
|---|---|---|
AcknowledgingRecordSupplier<P,S,R> |
indexing-service | Parallel to RecordSupplier; subscribe/poll/acknowledge/commit instead of assign/seek/poll |
AcknowledgeType (enum) |
indexing-service | ACCEPT, RELEASE, REJECT, RENEW mapped to Kafka byte values |
| Class | Module | Purpose |
|---|---|---|
KafkaShareGroupRecordSupplier |
kafka-indexing-service | Implements AcknowledgingRecordSupplier; wraps KafkaShareConsumer in explicit mode; dedup at poll time |
ShareGroupConsumerThread |
kafka-indexing-service | Dedicated thread owning KafkaShareConsumer; poll loop, RENEW scheduling, ack command draining |
ShareGroupIndexTask |
kafka-indexing-service | New task type; does not extend SeekableStreamIndexTask; carries topic + consumer config (no start/end offsets) |
ShareGroupIndexTaskRunner |
kafka-indexing-service | Ingestion loop: take from buffer -> dedup -> parse -> add -> persist -> publish -> ACCEPT -> COMMIT |
DedupCache |
indexing-service | Map<Partition, TreeSet>; backed by metadata store; loaded on startup; updated atomically with publish |
ShareGroupDataSourceMetadata |
kafka-indexing-service | Stores published offset ranges; no consumer offsets (broker manages those) |
InFlightTracker |
kafka-indexing-service | Map<Partition, Map<Offset, TrackedRecord>>; tracks per-record state: ACQUIRED, RENEW_PENDING, ACCEPT_PENDING |
AckCommand |
kafka-indexing-service | Command object: type (ACCEPT/REJECT/RELEASE/RENEW_ALL/COMMIT) + offsets + optional CompletableFuture |
KafkaSupervisor | Add groupType field ("consumer" | "share")
Design: Each task ran a consumer thread (poll + RENEW/ACCEPT each record) and a runner thread (parse + publish), connected by a blocking queue. Used Kafka's explicit acknowledgement mode requiring per-record acknowledge() calls.
Why rejected: Adding tasks decreased throughput. With explicit ack, every in-flight record requires a RENEW RPC to the broker on each poll cycle. With N tasks sharing one partition, this creates O(records x N) broker RPCs per cycle. The broker's single-threaded per-partition acknowledgement processing became the bottleneck.
Benchmark (Kafka 4.2.0, 1 partition, 500 RPS producer):
| Tasks | Throughput | Scaling |
|---|---|---|
| 1 | 25 rows/sec | baseline |
| 2 | 12 rows/sec | 0.48x (regression) |