From b63b681f982c78d52c8582eab7a867108e8d36df Mon Sep 17 00:00:00 2001 From: Finbarr Naughton Date: Thu, 14 May 2026 14:13:11 +0100 Subject: [PATCH 1/2] CAMEL-23523: Reset intervalWatch when batch accumulation begins to prevent premature flush after idle --- .../KafkaRecordBatchingProcessor.java | 1 + ...KafkaBatchingIntervalResetAfterIdleIT.java | 121 ++++++++++++++++++ 2 files changed, 122 insertions(+) create mode 100644 components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java index 85d35635d4640..34e12c2b9bfbd 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/batching/KafkaRecordBatchingProcessor.java @@ -125,6 +125,7 @@ public ProcessingResult processExchange(KafkaConsumer camelKafkaConsumer, Consum // Aggregate all consumer records in a single exchange if (exchangeList.isEmpty()) { timeoutWatch.takenAndRestart(); + intervalWatch.restart(); } // If timeout has expired, process current batch but continue to handle new records diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java new file mode 100644 index 0000000000000..342d7242da5a8 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration.batching; + +import java.util.List; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.integration.common.KafkaTestUtil; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Regression test for CAMEL-XXXXX: batchingIntervalMs timer not reset when a new accumulation cycle begins after an + * idle period longer than batchingIntervalMs. + * + * After such an idle period, intervalWatch is already expired when the first new message arrives. When a subsequent + * non-empty poll arrives with more messages, hasExpiredRecords() fires immediately due to the stale intervalWatch, + * flushing only the first message as a single-message batch instead of accumulating it with the subsequent messages. + * + * The fix is to restart intervalWatch at the same point timeoutWatch is restarted — when exchangeList.isEmpty() is true + * at the top of processExchange() in KafkaRecordBatchingProcessor. + * + * Test scenario: the first post-idle message is sent and flushed to Kafka alone, ensuring the consumer picks it up in a + * dedicated poll. A 200ms gap (less than pollTimeoutMs=500ms) then separates it from the remaining messages, so the + * subsequent poll carries those remaining messages while the first is still in the accumulation buffer. + * + * With the bug: intervalWatch is still expired from the idle period, so it fires on that second poll and prematurely + * flushes only the first message — the remaining messages end up in a later batch. + * + * With the fix: intervalWatch is reset when the first message starts a new accumulation cycle, so the second poll does + * not trigger a premature flush and all messages are accumulated into one batch. + */ +public class KafkaBatchingIntervalResetAfterIdleIT extends BatchingProcessingITSupport { + + public static final String TOPIC = "testBatchingIntervalResetAfterIdle"; + + private static final int BATCHING_INTERVAL_MS = 2000; + private static final int MAX_POLL_RECORDS = 5; + + @AfterEach + public void after() { + cleanupKafka(TOPIC); + } + + @Override + protected RouteBuilder createRouteBuilder() { + String from = "kafka:" + TOPIC + + "?groupId=KafkaBatchingIntervalResetAfterIdleIT" + + "&batching=true" + + "&maxPollRecords=" + MAX_POLL_RECORDS + + "&batchingIntervalMs=" + BATCHING_INTERVAL_MS + + "&pollTimeoutMs=500" + + "&autoOffsetReset=earliest"; + + return new RouteBuilder() { + @Override + public void configure() { + from(from).routeId("batching").to(KafkaTestUtil.MOCK_RESULT); + } + }; + } + + @Test + public void intervalWatchShouldResetWhenNewAccumulationCycleBegins() throws Exception { + // Baseline: produce a full batch and confirm it flushes as one. + to.expectedMessageCount(1); + sendRecords(0, MAX_POLL_RECORDS, TOPIC); + to.assertIsSatisfied(5000); + to.reset(); + + // Idle for longer than batchingIntervalMs so that intervalWatch expires while the + // queue is empty. This is the precondition that triggers the bug on the next cycle. + Thread.sleep(BATCHING_INTERVAL_MS + 1000); + + // Send 1 message and flush it to Kafka before sending the rest. This guarantees the + // consumer will poll it alone in a dedicated poll (Kafka consumer returns immediately + // when records are available, so msg 1 is picked up before msgs 2-4 exist in Kafka). + int postIdleCount = MAX_POLL_RECORDS - 1; + sendRecords(MAX_POLL_RECORDS, MAX_POLL_RECORDS + 1, TOPIC); + producer.flush(); + + // 200ms gap: msg 1 will have been consumed in its own poll by the time msgs 2-4 + // arrive in Kafka. pollTimeoutMs=500ms means the consumer returns from any poll as + // soon as a record is available, so msg 1 lands in a poll well within this window. + Thread.sleep(200); + + // Send remaining messages. With the fix, the consumer will accumulate these with + // msg 1 since intervalWatch was reset when msg 1 started the new cycle. + // With the bug, intervalWatch is still expired and fires immediately on this poll, + // flushing msg 1 alone before msgs 2-4 can be accumulated with it. + sendRecords(MAX_POLL_RECORDS + 1, MAX_POLL_RECORDS + postIdleCount, TOPIC); + producer.flush(); + + // Wait for at least one batch. With the bug a batch of 1 arrives quickly; + // with the fix a single batch of all 4 messages arrives after pollTimeoutMs. + to.expectedMinimumMessageCount(1); + to.assertIsSatisfied(8000); + + List firstBatch = to.getExchanges().get(0).getMessage().getBody(List.class); + assertEquals(postIdleCount, firstBatch.size(), + "Expected all " + postIdleCount + " post-idle messages accumulated into one batch, " + + "but the first batch contained only " + firstBatch.size() + + " message(s) — intervalWatch was not reset when the new accumulation cycle began"); + } +} From 9e6388e5f608d2c3355aeb53d99e05a5c3d77eca Mon Sep 17 00:00:00 2001 From: Finbarr Naughton Date: Fri, 15 May 2026 13:05:45 +0100 Subject: [PATCH 2/2] CAMEL-23523: Improve KafkaBatchingIntervalResetAfterIdleIT timing mechanics --- ...KafkaBatchingIntervalResetAfterIdleIT.java | 64 +++++++++++++++---- 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java index 342d7242da5a8..4a80fa3273201 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/batching/KafkaBatchingIntervalResetAfterIdleIT.java @@ -17,12 +17,20 @@ package org.apache.camel.component.kafka.integration.batching; import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.integration.common.KafkaTestUtil; +import org.apache.kafka.clients.consumer.ConsumerInterceptor; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -36,9 +44,10 @@ * The fix is to restart intervalWatch at the same point timeoutWatch is restarted — when exchangeList.isEmpty() is true * at the top of processExchange() in KafkaRecordBatchingProcessor. * - * Test scenario: the first post-idle message is sent and flushed to Kafka alone, ensuring the consumer picks it up in a - * dedicated poll. A 200ms gap (less than pollTimeoutMs=500ms) then separates it from the remaining messages, so the - * subsequent poll carries those remaining messages while the first is still in the accumulation buffer. + * Test scenario: the first post-idle message is sent and flushed to Kafka alone. A Kafka ConsumerInterceptor counts + * non-empty polls so the test can await confirmation that the consumer has picked up that message before producing the + * rest. This ensures the subsequent poll carries the remaining messages while the first is still in the accumulation + * buffer. * * With the bug: intervalWatch is still expired from the idle period, so it fires on that second poll and prematurely * flushes only the first message — the remaining messages end up in a later batch. @@ -50,9 +59,38 @@ public class KafkaBatchingIntervalResetAfterIdleIT extends BatchingProcessingITS public static final String TOPIC = "testBatchingIntervalResetAfterIdle"; - private static final int BATCHING_INTERVAL_MS = 2000; + private static final int BATCHING_INTERVAL_MS = 400; private static final int MAX_POLL_RECORDS = 5; + // Counts non-empty polls seen by the Kafka consumer interceptor. Reset between test phases + // so we can await the first non-empty poll of the post-idle cycle before producing the rest. + static final AtomicInteger POST_IDLE_POLLS = new AtomicInteger(0); + + public static class PollCountingInterceptor implements ConsumerInterceptor { + @Override + public ConsumerRecords onConsume(ConsumerRecords records) { + if (!records.isEmpty()) { + POST_IDLE_POLLS.incrementAndGet(); + } + return records; + } + + @Override + public void onCommit(Map offsets) { + // no-op + } + + @Override + public void close() { + // no-op + } + + @Override + public void configure(Map configs) { + // no-op + } + } + @AfterEach public void after() { cleanupKafka(TOPIC); @@ -66,7 +104,8 @@ protected RouteBuilder createRouteBuilder() { + "&maxPollRecords=" + MAX_POLL_RECORDS + "&batchingIntervalMs=" + BATCHING_INTERVAL_MS + "&pollTimeoutMs=500" - + "&autoOffsetReset=earliest"; + + "&autoOffsetReset=earliest" + + "&interceptorClasses=" + PollCountingInterceptor.class.getName(); return new RouteBuilder() { @Override @@ -86,7 +125,10 @@ public void intervalWatchShouldResetWhenNewAccumulationCycleBegins() throws Exce // Idle for longer than batchingIntervalMs so that intervalWatch expires while the // queue is empty. This is the precondition that triggers the bug on the next cycle. - Thread.sleep(BATCHING_INTERVAL_MS + 1000); + Thread.sleep(BATCHING_INTERVAL_MS + 600); + + // Reset the poll counter before starting the post-idle phase. + POST_IDLE_POLLS.set(0); // Send 1 message and flush it to Kafka before sending the rest. This guarantees the // consumer will poll it alone in a dedicated poll (Kafka consumer returns immediately @@ -95,10 +137,10 @@ public void intervalWatchShouldResetWhenNewAccumulationCycleBegins() throws Exce sendRecords(MAX_POLL_RECORDS, MAX_POLL_RECORDS + 1, TOPIC); producer.flush(); - // 200ms gap: msg 1 will have been consumed in its own poll by the time msgs 2-4 - // arrive in Kafka. pollTimeoutMs=500ms means the consumer returns from any poll as - // soon as a record is available, so msg 1 lands in a poll well within this window. - Thread.sleep(200); + // Wait until the interceptor confirms the consumer has polled msg 1, then produce + // the remainder. This replaces a fixed sleep: the remaining messages only reach Kafka + // after we know msg 1 is already in the accumulation buffer. + await().atMost(5, TimeUnit.SECONDS).until(() -> POST_IDLE_POLLS.get() >= 1); // Send remaining messages. With the fix, the consumer will accumulate these with // msg 1 since intervalWatch was reset when msg 1 started the new cycle. @@ -108,7 +150,7 @@ public void intervalWatchShouldResetWhenNewAccumulationCycleBegins() throws Exce producer.flush(); // Wait for at least one batch. With the bug a batch of 1 arrives quickly; - // with the fix a single batch of all 4 messages arrives after pollTimeoutMs. + // with the fix a single batch of all 4 messages arrives after batchingIntervalMs. to.expectedMinimumMessageCount(1); to.assertIsSatisfied(8000);