From 453223e2297271470cd194f0d9896e8b7a44bae9 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 22 Oct 2025 19:06:40 +0100 Subject: [PATCH] KAFKA-19825 [1/2]: Add effective batch linger time metric --- .../common/runtime/CoordinatorRuntime.java | 1 + .../runtime/CoordinatorRuntimeMetrics.java | 7 ++ .../CoordinatorRuntimeMetricsImpl.java | 25 ++++ .../CoordinatorRuntimeMetricsImplTest.java | 10 ++ .../runtime/CoordinatorRuntimeTest.java | 113 ++++++++++++++++++ docs/ops.html | 5 + 6 files changed, 161 insertions(+) diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 965f8074f8014..8d5580d306793 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -789,6 +789,7 @@ private void flushCurrentBatch() { } long flushStartMs = time.milliseconds(); + runtimeMetrics.recordLingerTime(flushStartMs - currentBatch.appendTimeMs); // Write the records to the log and update the last written offset. long offset = partitionWriter.append( tp, diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java index 5693e3ea99427..5b9b9254230e1 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetrics.java @@ -60,6 +60,13 @@ public interface CoordinatorRuntimeMetrics extends AutoCloseable { */ void recordEventPurgatoryTime(long durationMs); + /** + * Record the effective batch linger time. + * + * @param durationMs The linger time in milliseconds. + */ + void recordLingerTime(long durationMs); + /** * Record the flush time. * diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java index af775c7c45118..966d4c5c62593 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImpl.java @@ -58,6 +58,11 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics */ public static final String EVENT_PURGATORY_TIME_METRIC_NAME = "event-purgatory-time-ms"; + /** + * The effective batch linger time metric name. + */ + public static final String BATCH_LINGER_TIME_METRIC_NAME = "batch-linger-time-ms"; + /** * The flush time metric name. */ @@ -116,6 +121,11 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics */ private final Sensor eventPurgatoryTimeSensor; + /** + * Sensor to measure the effective batch linger time. + */ + private final Sensor lingerTimeSensor; + /** * Sensor to measure the flush time. */ @@ -199,6 +209,15 @@ public CoordinatorRuntimeMetricsImpl(Metrics metrics, String metricsGroup) { this.eventPurgatoryTimeSensor = metrics.sensor(this.metricsGroup + "-EventPurgatoryTime"); this.eventPurgatoryTimeSensor.add(eventPurgatoryTimeHistogram); + KafkaMetricHistogram lingerTimeHistogram = KafkaMetricHistogram.newLatencyHistogram( + suffix -> kafkaMetricName( + BATCH_LINGER_TIME_METRIC_NAME + "-" + suffix, + "The " + suffix + " effective linger time in milliseconds" + ) + ); + this.lingerTimeSensor = metrics.sensor(this.metricsGroup + "-LingerTime"); + this.lingerTimeSensor.add(lingerTimeHistogram); + KafkaMetricHistogram flushTimeHistogram = KafkaMetricHistogram.newLatencyHistogram( suffix -> kafkaMetricName( BATCH_FLUSH_TIME_METRIC_NAME + "-" + suffix, @@ -234,6 +253,7 @@ public void close() { metrics.removeSensor(eventQueueTimeSensor.name()); metrics.removeSensor(eventProcessingTimeSensor.name()); metrics.removeSensor(eventPurgatoryTimeSensor.name()); + metrics.removeSensor(lingerTimeSensor.name()); metrics.removeSensor(flushTimeSensor.name()); } @@ -294,6 +314,11 @@ public void recordEventPurgatoryTime(long purgatoryTimeMs) { eventPurgatoryTimeSensor.record(purgatoryTimeMs); } + @Override + public void recordLingerTime(long durationMs) { + lingerTimeSensor.record(durationMs); + } + @Override public void recordFlushTime(long durationMs) { flushTimeSensor.record(durationMs); diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java index 68f152f2bea08..42c3505c52e80 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeMetricsImplTest.java @@ -32,6 +32,7 @@ import java.util.stream.IntStream; import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_FLUSH_TIME_METRIC_NAME; +import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_LINGER_TIME_METRIC_NAME; import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PROCESSING_TIME_METRIC_NAME; import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PURGATORY_TIME_METRIC_NAME; import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_QUEUE_TIME_METRIC_NAME; @@ -74,6 +75,11 @@ public void testMetricNames() { kafkaMetricName(metrics, "event-purgatory-time-ms-p95"), kafkaMetricName(metrics, "event-purgatory-time-ms-p99"), kafkaMetricName(metrics, "event-purgatory-time-ms-p999"), + kafkaMetricName(metrics, "batch-linger-time-ms-max"), + kafkaMetricName(metrics, "batch-linger-time-ms-p50"), + kafkaMetricName(metrics, "batch-linger-time-ms-p95"), + kafkaMetricName(metrics, "batch-linger-time-ms-p99"), + kafkaMetricName(metrics, "batch-linger-time-ms-p999"), kafkaMetricName(metrics, "batch-flush-time-ms-max"), kafkaMetricName(metrics, "batch-flush-time-ms-p50"), kafkaMetricName(metrics, "batch-flush-time-ms-p95"), @@ -236,6 +242,7 @@ public void testEventQueueSizeMetricsGroupIsolation() { EVENT_QUEUE_TIME_METRIC_NAME, EVENT_PROCESSING_TIME_METRIC_NAME, EVENT_PURGATORY_TIME_METRIC_NAME, + BATCH_LINGER_TIME_METRIC_NAME, BATCH_FLUSH_TIME_METRIC_NAME }) public void testHistogramMetrics(String metricNamePrefix) { @@ -255,6 +262,9 @@ public void testHistogramMetrics(String metricNamePrefix) { case EVENT_PURGATORY_TIME_METRIC_NAME: runtimeMetrics.recordEventPurgatoryTime(i); break; + case BATCH_LINGER_TIME_METRIC_NAME: + runtimeMetrics.recordLingerTime(i); + break; case BATCH_FLUSH_TIME_METRIC_NAME: runtimeMetrics.recordFlushTime(i); } diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index dfbbdf048bc20..7fb1a1b95dd13 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -4182,6 +4182,119 @@ public void testEmptyBatch() throws Exception { assertNull(complete1.get(5, TimeUnit.SECONDS)); } + @Test + public void testRecordAppendLingerTime() throws Exception { + MockTimer timer = new MockTimer(); + + // Writer sleeps for 10ms before appending records. + MockPartitionWriter writer = new MockPartitionWriter(timer.time(), Integer.MAX_VALUE, false); + CoordinatorRuntimeMetrics runtimeMetrics = mock(CoordinatorRuntimeMetrics.class); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(runtimeMetrics) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify the initial state. + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertNull(ctx.currentBatch); + + // Get the max batch size. + int maxBatchSize = writer.config(TP).maxMessageSize(); + + // Create records with a quarter of the max batch size each. Keep in mind that + // each batch has a header so it is not possible to have those four records + // in one single batch. + List records = Stream.of('1', '2', '3', '4').map(c -> { + char[] payload = new char[maxBatchSize / 4]; + Arrays.fill(payload, c); + return new String(payload); + }).collect(Collectors.toList()); + + // Write #1 with two records. + long firstBatchTimestamp = timer.time().milliseconds(); + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50), + state -> new CoordinatorResult<>(records.subList(0, 2), "response1") + ); + + // A batch has been created. + assertNotNull(ctx.currentBatch); + + // Write #2 with one record. + CompletableFuture write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50), + state -> new CoordinatorResult<>(records.subList(2, 3), "response2") + ); + + // Verify the state. Records are replayed but no batch written. + assertEquals(List.of(), writer.entries(TP)); + verify(runtimeMetrics, times(0)).recordFlushTime(10); + + // Write #3 with one record. This one cannot go into the existing batch + // so the existing batch should be flushed and a new one should be created. + long secondBatchTimestamp = timer.time().milliseconds(); + CompletableFuture write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(50), + state -> new CoordinatorResult<>(records.subList(3, 4), "response3") + ); + + // Verify the state. Records are replayed. The previous batch + // got flushed with all the records but the new one from #3. + // The new batch's timestamp comes from before the flush. + assertEquals(3L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), + new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), + new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + records(firstBatchTimestamp, records.subList(0, 3)) + ), writer.entries(TP)); + verify(runtimeMetrics, times(1)).recordLingerTime(0); + + // Advance past the linger time. + timer.advanceClock(11); + + // Verify the state. The pending batch is flushed. + assertEquals(4L, ctx.coordinator.lastWrittenOffset()); + assertEquals(0L, ctx.coordinator.lastCommittedOffset()); + assertEquals(List.of( + new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)), + new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)), + new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)), + new MockCoordinatorShard.RecordAndMetadata(3, records.get(3)) + ), ctx.coordinator.coordinator().fullRecords()); + assertEquals(List.of( + records(secondBatchTimestamp, records.subList(0, 3)), + records(secondBatchTimestamp, records.subList(3, 4)) + ), writer.entries(TP)); + verify(runtimeMetrics, times(1)).recordLingerTime(21); + + // Commit and verify that writes are completed. + writer.commit(TP); + assertTrue(write1.isDone()); + assertTrue(write2.isDone()); + assertTrue(write3.isDone()); + assertEquals(4L, ctx.coordinator.lastCommittedOffset()); + assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); + assertEquals("response2", write2.get(5, TimeUnit.SECONDS)); + assertEquals("response3", write3.get(5, TimeUnit.SECONDS)); + } + @Test public void testRecordFlushTime() throws Exception { MockTimer timer = new MockTimer(); diff --git a/docs/ops.html b/docs/ops.html index fcef480f2c91a..15f1db073f025 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -1890,6 +1890,11 @@