Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,7 @@ private void flushCurrentBatch() {
}

long flushStartMs = time.milliseconds();
runtimeMetrics.recordLingerTime(flushStartMs - currentBatch.appendTimeMs);
// Write the records to the log and update the last written offset.
long offset = partitionWriter.append(
tp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ public interface CoordinatorRuntimeMetrics extends AutoCloseable {
*/
void recordEventPurgatoryTime(long durationMs);

/**
* Record the effective batch linger time.
*
* @param durationMs The linger time in milliseconds.
*/
void recordLingerTime(long durationMs);

/**
* Record the flush time.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
*/
public static final String EVENT_PURGATORY_TIME_METRIC_NAME = "event-purgatory-time-ms";

/**
* The effective batch linger time metric name.
*/
public static final String BATCH_LINGER_TIME_METRIC_NAME = "batch-linger-time-ms";

/**
* The flush time metric name.
*/
Expand Down Expand Up @@ -116,6 +121,11 @@ public class CoordinatorRuntimeMetricsImpl implements CoordinatorRuntimeMetrics
*/
private final Sensor eventPurgatoryTimeSensor;

/**
* Sensor to measure the effective batch linger time.
*/
private final Sensor lingerTimeSensor;

/**
* Sensor to measure the flush time.
*/
Expand Down Expand Up @@ -199,6 +209,15 @@ public CoordinatorRuntimeMetricsImpl(Metrics metrics, String metricsGroup) {
this.eventPurgatoryTimeSensor = metrics.sensor(this.metricsGroup + "-EventPurgatoryTime");
this.eventPurgatoryTimeSensor.add(eventPurgatoryTimeHistogram);

KafkaMetricHistogram lingerTimeHistogram = KafkaMetricHistogram.newLatencyHistogram(
suffix -> kafkaMetricName(
BATCH_LINGER_TIME_METRIC_NAME + "-" + suffix,
"The " + suffix + " effective linger time in milliseconds"
)
);
this.lingerTimeSensor = metrics.sensor(this.metricsGroup + "-LingerTime");
this.lingerTimeSensor.add(lingerTimeHistogram);

KafkaMetricHistogram flushTimeHistogram = KafkaMetricHistogram.newLatencyHistogram(
suffix -> kafkaMetricName(
BATCH_FLUSH_TIME_METRIC_NAME + "-" + suffix,
Expand Down Expand Up @@ -234,6 +253,7 @@ public void close() {
metrics.removeSensor(eventQueueTimeSensor.name());
metrics.removeSensor(eventProcessingTimeSensor.name());
metrics.removeSensor(eventPurgatoryTimeSensor.name());
metrics.removeSensor(lingerTimeSensor.name());
metrics.removeSensor(flushTimeSensor.name());
}

Expand Down Expand Up @@ -294,6 +314,11 @@ public void recordEventPurgatoryTime(long purgatoryTimeMs) {
eventPurgatoryTimeSensor.record(purgatoryTimeMs);
}

@Override
public void recordLingerTime(long durationMs) {
lingerTimeSensor.record(durationMs);
}

@Override
public void recordFlushTime(long durationMs) {
flushTimeSensor.record(durationMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.stream.IntStream;

import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_FLUSH_TIME_METRIC_NAME;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.BATCH_LINGER_TIME_METRIC_NAME;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PROCESSING_TIME_METRIC_NAME;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_PURGATORY_TIME_METRIC_NAME;
import static org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetricsImpl.EVENT_QUEUE_TIME_METRIC_NAME;
Expand Down Expand Up @@ -74,6 +75,11 @@ public void testMetricNames() {
kafkaMetricName(metrics, "event-purgatory-time-ms-p95"),
kafkaMetricName(metrics, "event-purgatory-time-ms-p99"),
kafkaMetricName(metrics, "event-purgatory-time-ms-p999"),
kafkaMetricName(metrics, "batch-linger-time-ms-max"),
kafkaMetricName(metrics, "batch-linger-time-ms-p50"),
kafkaMetricName(metrics, "batch-linger-time-ms-p95"),
kafkaMetricName(metrics, "batch-linger-time-ms-p99"),
kafkaMetricName(metrics, "batch-linger-time-ms-p999"),
kafkaMetricName(metrics, "batch-flush-time-ms-max"),
kafkaMetricName(metrics, "batch-flush-time-ms-p50"),
kafkaMetricName(metrics, "batch-flush-time-ms-p95"),
Expand Down Expand Up @@ -236,6 +242,7 @@ public void testEventQueueSizeMetricsGroupIsolation() {
EVENT_QUEUE_TIME_METRIC_NAME,
EVENT_PROCESSING_TIME_METRIC_NAME,
EVENT_PURGATORY_TIME_METRIC_NAME,
BATCH_LINGER_TIME_METRIC_NAME,
BATCH_FLUSH_TIME_METRIC_NAME
})
public void testHistogramMetrics(String metricNamePrefix) {
Expand All @@ -255,6 +262,9 @@ public void testHistogramMetrics(String metricNamePrefix) {
case EVENT_PURGATORY_TIME_METRIC_NAME:
runtimeMetrics.recordEventPurgatoryTime(i);
break;
case BATCH_LINGER_TIME_METRIC_NAME:
runtimeMetrics.recordLingerTime(i);
break;
case BATCH_FLUSH_TIME_METRIC_NAME:
runtimeMetrics.recordFlushTime(i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4182,6 +4182,119 @@ public void testEmptyBatch() throws Exception {
assertNull(complete1.get(5, TimeUnit.SECONDS));
}

@Test
public void testRecordAppendLingerTime() throws Exception {
MockTimer timer = new MockTimer();

// Writer sleeps for 10ms before appending records.
MockPartitionWriter writer = new MockPartitionWriter(timer.time(), Integer.MAX_VALUE, false);
CoordinatorRuntimeMetrics runtimeMetrics = mock(CoordinatorRuntimeMetrics.class);

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(runtimeMetrics)
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.withExecutorService(mock(ExecutorService.class))
.build();

// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);

// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertNull(ctx.currentBatch);

// Get the max batch size.
int maxBatchSize = writer.config(TP).maxMessageSize();

// Create records with a quarter of the max batch size each. Keep in mind that
// each batch has a header so it is not possible to have those four records
// in one single batch.
List<String> records = Stream.of('1', '2', '3', '4').map(c -> {
char[] payload = new char[maxBatchSize / 4];
Arrays.fill(payload, c);
return new String(payload);
}).collect(Collectors.toList());

// Write #1 with two records.
long firstBatchTimestamp = timer.time().milliseconds();
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(50),
state -> new CoordinatorResult<>(records.subList(0, 2), "response1")
);

// A batch has been created.
assertNotNull(ctx.currentBatch);

// Write #2 with one record.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(50),
state -> new CoordinatorResult<>(records.subList(2, 3), "response2")
);

// Verify the state. Records are replayed but no batch written.
assertEquals(List.of(), writer.entries(TP));
verify(runtimeMetrics, times(0)).recordFlushTime(10);

// Write #3 with one record. This one cannot go into the existing batch
// so the existing batch should be flushed and a new one should be created.
long secondBatchTimestamp = timer.time().milliseconds();
CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(50),
state -> new CoordinatorResult<>(records.subList(3, 4), "response3")
);

// Verify the state. Records are replayed. The previous batch
// got flushed with all the records but the new one from #3.
// The new batch's timestamp comes from before the flush.
assertEquals(3L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(List.of(
records(firstBatchTimestamp, records.subList(0, 3))
), writer.entries(TP));
verify(runtimeMetrics, times(1)).recordLingerTime(0);

// Advance past the linger time.
timer.advanceClock(11);

// Verify the state. The pending batch is flushed.
assertEquals(4L, ctx.coordinator.lastWrittenOffset());
assertEquals(0L, ctx.coordinator.lastCommittedOffset());
assertEquals(List.of(
new MockCoordinatorShard.RecordAndMetadata(0, records.get(0)),
new MockCoordinatorShard.RecordAndMetadata(1, records.get(1)),
new MockCoordinatorShard.RecordAndMetadata(2, records.get(2)),
new MockCoordinatorShard.RecordAndMetadata(3, records.get(3))
), ctx.coordinator.coordinator().fullRecords());
assertEquals(List.of(
records(secondBatchTimestamp, records.subList(0, 3)),
records(secondBatchTimestamp, records.subList(3, 4))
), writer.entries(TP));
verify(runtimeMetrics, times(1)).recordLingerTime(21);

// Commit and verify that writes are completed.
writer.commit(TP);
assertTrue(write1.isDone());
assertTrue(write2.isDone());
assertTrue(write3.isDone());
assertEquals(4L, ctx.coordinator.lastCommittedOffset());
assertEquals("response1", write1.get(5, TimeUnit.SECONDS));
assertEquals("response2", write2.get(5, TimeUnit.SECONDS));
assertEquals("response3", write3.get(5, TimeUnit.SECONDS));
}

@Test
public void testRecordFlushTime() throws Exception {
MockTimer timer = new MockTimer();
Expand Down
5 changes: 5 additions & 0 deletions docs/ops.html
Original file line number Diff line number Diff line change
Expand Up @@ -1890,6 +1890,11 @@ <h4 class="anchor-heading"><a id="group_coordinator_monitoring" class="anchor-li
<td>kafka.server:type=group-coordinator-metrics,name=event-purgatory-time-ms-[max|p50|p95|p99|p999]</td>
<td>The time that an event waited in the purgatory before being completed</td>
</tr>
<tr>
<td>Batch Linger Time (Ms)</td>
<td>kafka.server:type=group-coordinator-metrics,name=batch-linger-time-ms-[max|p50|p95|p99|p999]</td>
<td>The effective linger time of a batch before being flushed to the local partition</td>
</tr>
<tr>
<td>Batch Flush Time (Ms)</td>
<td>kafka.server:type=group-coordinator-metrics,name=batch-flush-time-ms-[max|p50|p95|p99|p999]</td>
Expand Down