feat: [Phase 1] Queue Semantics support in Kafka Ingestion#19311
feat: [Phase 1] Queue Semantics support in Kafka Ingestion#19311Shekharrajak wants to merge 72 commits into
Conversation
| cd apache-druid-31.0.0 | ||
|
|
||
| # Replace the kafka extension with our build | ||
| rm extensions/druid-kafka-indexing-service/*.jar |
There was a problem hiding this comment.
We can build from the source and use the tar
| agg -> agg.hasSumAtLeast(numRecords) | ||
| ); | ||
|
|
||
| Assertions.assertEquals( |
There was a problem hiding this comment.
Making sure we read all the records.
d31199b to
4a30bee
Compare
|
Spending time on Integeration testing embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/EmbeddedShareGroupIngestionTest.java |
|
For smaller blast radius and clean separation - parking the suggestion #18439 (comment) , Also KafkaShareGroupSupervisor (planned) will be structurally different from KafkaSupervisor — no partition-to-task assignment, no offset bookkeeping, seek/rest . |
| } | ||
|
|
||
| @Test | ||
| public void test_shareGroupIngestion_basicEndToEnd() throws InterruptedException |
There was a problem hiding this comment.
Subscribe to topic with KafkaShareConsumer (groupId), poll → parse (CSV) → persist → publish → acknowledge(ACCEPT) → commitSync → broker offset persistence. SQL COUNT(*) == 10.
| final int numRecords = 5; | ||
| final int rowsPerRecord = 4; | ||
| kafkaServer.produceRecordsToTopic( | ||
| generateMultiObjectJsonRecords(topic, numRecords, rowsPerRecord, DateTimes.of("2025-07-01")) |
There was a problem hiding this comment.
StreamChunkReader multi-row path One Kafka record carrying 4 whitespace-separated JSON objects produces 4 rows. SQL count = numRecords × rowsPerRecord (5 × 4 = 20). Verifies no row is dropped before ACK.
|
To make sure no data loss we are ack only after the publish completed https://github.com/apache/druid/pull/19311/changes#diff-487176908fb14d87486db6f683c9ddb34465f3a1572f5c01b11d7b9c6a482289R381 Although this will be slow and no concurrency but I think for the intial phase this is fine and we can improve the performance step by step. WDYT @gianm @a2l007 @clintropolis @FrankChen021 ? |
|
CI checks are failing due to one flaky test unrelated to this PR : #19435 |
| recordMap.put(new TopicPartition("test-topic", 0), recordsP0); | ||
| recordMap.put(new TopicPartition("test-topic", 1), recordsP1); | ||
| Mockito.when(mockConsumer.poll(Mockito.any(Duration.class))) | ||
| .thenReturn(new ConsumerRecords<>(recordMap)); |
| final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> recordMap = new HashMap<>(); | ||
| recordMap.put(new TopicPartition(record.topic(), record.partition()), List.of(record)); | ||
| Mockito.when(mockConsumer.poll(Mockito.any(Duration.class))) | ||
| .thenReturn(new ConsumerRecords<>(recordMap)); |
| */ | ||
| Map<PartitionIdType, Optional<Exception>> commitSync(); | ||
|
|
||
| Set<PartitionIdType> getPartitionIds(String stream); |
234a8ec to
e5894ef
Compare
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 2 |
| P3 | 0 |
| Total | 2 |
Reviewed 22 of 22 changed files. Findings focus on task scheduling semantics and Kafka compatibility test coverage.
This is an automated review by Codex GPT-5
| <artifactId>maven-surefire-plugin</artifactId> | ||
| <configuration> | ||
| <excludes> | ||
| <exclude>**/KafkaIndexTaskTest.java</exclude> |
There was a problem hiding this comment.
[P2] Kafka compatibility tests are skipped globally
This Surefire exclusion removes the main KafkaIndexTask, KafkaRecordSupplier, KafkaSamplerSpec, and KafkaSupervisor test classes from every normal test run. Since this PR also upgrades kafka-clients to 4.2.0, those are the regression tests most likely to catch compatibility breaks in existing index_kafka ingestion; the new share-group mock and embedded tests do not cover that existing API surface. Please migrate or narrowly isolate the TestBroker-dependent cases instead of excluding the whole classes.
There was a problem hiding this comment.
Yes, planning to fix (upgrade to 4.2 version in all places) as part of this #19322 - I feel that we need bunch of changes, API compatibility issues in running tests , kafka docker image for IT tests and in source code - so want to keep it in separate PR.
This PR mainly focus on Share group implementation.
|
Struggling with CI check related to license : |
e3a9ad7 to
7865104
Compare
FrankChen021
left a comment
There was a problem hiding this comment.
Reviewed 3 of 27 changed files. Follow-up review handled: the Kafka extension POM no longer has the global Surefire exclusions, and ShareGroupIndexTask now uses DEFAULT_REALTIME_TASK_PRIORITY with tests covering the default and context override cases.
This is an automated review by Codex GPT-5.5
…r queue-semantics ingestion
…re group ingestion
…plicit acknowledgement
ede5072 to
e159405
Compare
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 2 |
| P3 | 1 |
| Total | 3 |
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 2 |
| P3 | 1 |
| Total | 3 |
Reviewed 23 of 23 changed files.
This is an automated review by Codex GPT-5.5
| TaskToolbox toolbox | ||
| ) throws Exception | ||
| { | ||
| recordSupplier.subscribe(Collections.singleton(ioConfig.getTopic())); |
There was a problem hiding this comment.
[P2] Close the appenderator when setup fails
After this refactor, appenderator.closeNow() only runs in the try/finally that starts after recordSupplier.subscribe(...) and driver.startJob(...). If supplier creation, subscribe, or startJob throws, run() only calls driver.close(), and BaseAppenderatorDriver.close explicitly does not close the underlying Appenderator, so failed startup can leak realtime appenderator resources. Keep the appenderator close guard in run() or include setup in runLoop's try/finally.
| Assertions.assertEquals(task.getId(), payloadResponse.getTask()); | ||
|
|
||
| final JsonNode payloadJson = mapper.valueToTree(payloadResponse.getPayload()); | ||
| Assertions.assertEquals("share_group_index", payloadJson.path("type").asText()); |
There was a problem hiding this comment.
[P2] Fix JSON-submit payload expectations
This new IT serializes a real ShareGroupIndexTask and reads the Overlord payload, but the task is registered as index_kafka_share_group and serializes ioConfig at the top level, not under spec. The round-trip test therefore fails on the type assertion, and still fails on the topic/group assertions after that is fixed; the malformed-json test below also uses the wrong type/shape, so it does not exercise the intended missing-field path.
There was a problem hiding this comment.
| ); | ||
| cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker); | ||
|
|
||
| final long deadlineMs = System.currentTimeMillis() + 30_000L; |
There was a problem hiding this comment.
[P3] Fail when the row-count wait expires
If verifySqlQuery never observes the expected count, this loop exits when the deadline expires and the test continues to cancel the task without failing. A broken JSON-submit ingestion/query path can pass after 30 seconds. Track the last assertion and fail after the deadline, or reuse the existing assertRowCountEventually pattern.
There was a problem hiding this comment.
…ions to share consumers
|
|
||
| /** | ||
| * Verifies that the Kafka broker automatically rebalances new partitions to share-group consumers | ||
| * without any Druid-side intervention (no supervisor, no task restart). |
There was a problem hiding this comment.
without any Druid-side intervention (no supervisor, no task restart) - Main advantages of using share group - no stop the world case.
|
|
||
| /** | ||
| * Verifies at-least-once semantics: uncommitted records are redelivered after Indexer crash. | ||
| */ |
There was a problem hiding this comment.
Validation of no data loss in crash situation.
…ly always guards appenderator
FrankChen021
left a comment
There was a problem hiding this comment.
Reviewed all 26 changed files in this follow-up. The JSON-submit payload expectation and row-count timeout threads are addressed by the current changes, and I did not find any new high-confidence PR-caused issues.
This is an automated review by Codex GPT-5.5
| kafkaServer.publishRecordsToTopic(topic, csvRecords(batchA, 0, "2025-09-01")); | ||
|
|
||
| // Task 1: starts ingesting but Indexer crashes before it completes. | ||
| final String taskId1 = submitTask(topic); |
|
fixing one unit test : |
…balance and crash test waits
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 1 |
| P3 | 0 |
| Total | 1 |
Reviewed 26 of 26 changed files.
This is an automated review by Codex GPT-5.5
| Thread.sleep(SHARE_CONSUMER_READY_DELAY_MS); | ||
|
|
||
| // Give Task 2 time to consume redelivered records before cancel. | ||
| Thread.sleep(15_000L); |
There was a problem hiding this comment.
[P2] Crash tests no longer prove redelivery
Replacing the post-restart waitForEventAggregate checks with fixed sleeps means these crash tests no longer assert that Task 2 consumed any redelivered records. Task 1 is allowed to run for 3 seconds before indexer.stop(), and this runner publishes and acks every batch, so it can already make the final SQL row count pass; in the first test the new batch can also satisfy the count. A regression that fails to redeliver unacknowledged records could now slip through. Please wait on a post-restart ingest signal or otherwise assert rows were processed by the restarted task before cancelling it.
…or partition rebalance assertion
…ove Task 2 actually consumed records after restart
…ments; replace taskPayload roundtrip with taskStatus assertion
…untAfterCancel arg order
…plier default method coverage
| }; | ||
|
|
||
| stub.wakeup(); | ||
| Assert.assertFalse(stub.acquisitionLockTimeoutMs().isPresent()); |
There was a problem hiding this comment.
Just to have test coverage CI check pass.
| ); | ||
| final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> recordMap = new HashMap<>(); | ||
| recordMap.put(new TopicPartition(testTopic, 0), List.of(record)); | ||
| when(mockConsumer.poll(any(Duration.class))).thenReturn(new ConsumerRecords<>(recordMap)); |
…e-not-yet-visible after task cancel
FrankChen021
left a comment
There was a problem hiding this comment.
Handled the ShareGroupIndexerCrashIT follow-up. The latest update replaces fixed post-restart sleeps with processed-event baselines/waits before cancelling Task 2, so I don't have a further inline reply.
Reviewed 1 of 26 changed files.
This is an automated review by Codex GPT-5.5
…ition acknowledge to clear 50% coverage
| final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> recordMap = new HashMap<>(); | ||
| recordMap.put(new TopicPartition(testTopic, 0), List.of(r0)); | ||
| recordMap.put(new TopicPartition(testTopic, 1), List.of(r1)); | ||
| when(mockConsumer.poll(any(Duration.class))).thenReturn(new ConsumerRecords<>(recordMap)); |
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 0 |
| P3 | 0 |
| Total | 1 |
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 0 |
| P3 | 0 |
| Total | 1 |
Reviewed 26 of 26 changed files.
This is an automated review by Codex GPT-5.5
| for (InputRow row : rows) { | ||
| final AppenderatorDriverAddResult addResult = driver.add( | ||
| row, | ||
| SEQUENCE_NAME, |
There was a problem hiding this comment.
[P1] Use per-task sequence names for segment allocation
Every ShareGroupIndexTaskRunner uses the same hard-coded sequence name when adding rows. Druid segment allocation is idempotent for the same sequenceName, and this call also skips lineage checks, so concurrent share-group tasks ingesting the same datasource interval can be handed the same pending segment id. If one task publishes that segment first, another task can treat the id as already published and then acknowledge its Kafka records even though its rows were not registered. Make the allocation sequence unique per task or otherwise isolate concurrent producers before supporting multiple tasks in one share group.
Phase 1 #18439.
Description
Fixed the bug ...
Renamed the class ...
Added a forbidden-apis entry ...
Release note
Key changed/added classes in this PR
MyFooOurBarTheirBazThis PR has:
Demo
https://youtu.be/K_O1MH-AaE8