diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 2dca864b3292..b5e0d54cbff7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -89,7 +89,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private final PunctuationQueue streamTimePunctuationQueue; private final PunctuationQueue systemTimePunctuationQueue; + private long processTimeMs = 0L; + private final Sensor closeTaskSensor; + private final Sensor processRatioSensor; private final Sensor processLatencySensor; private final Sensor punctuateLatencySensor; private final Sensor enforcedProcessingSensor; @@ -131,6 +134,7 @@ public StreamTask(final TaskId id, } else { enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics); } + processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics); processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics); punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics); @@ -206,6 +210,7 @@ public void completeRestoration() { initializeTopology(); processorContext.initialize(); idleStartTimeMs = RecordQueue.UNKNOWN; + transitionTo(State.RUNNING); log.info("Restored and ready to run"); @@ -616,6 +621,17 @@ public boolean process(final long wallClockTime) { return true; } + @Override + public void recordProcessBatchTime(final long processBatchTime) { + processTimeMs += processBatchTime; + } + + @Override + public void recordProcessTimeRatio(final long allTaskProcessMs) { + processRatioSensor.record((double) processTimeMs / allTaskProcessMs); + processTimeMs = 0L; + } + private String getStacktraceString(final RuntimeException e) { String stacktrace = null; try (final StringWriter stringWriter = new StringWriter(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index e12c8f05d470..7202df5669ff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -659,6 +659,8 @@ void runOnce() { // if there's no active restoring or standby updating it would not try to fetch any data changelogReader.restore(); + // TODO: we should record the restore latency and its relative time spent ratio after + // we figure out how to move this method out of the stream thread advanceNowAndComputeLatency(); long totalCommitLatency = 0L; @@ -675,7 +677,7 @@ void runOnce() { * 6. Otherwise, increment N. */ do { - final int processed = taskManager.process(numIterations, now); + final int processed = taskManager.process(numIterations, time); final long processLatency = advanceNowAndComputeLatency(); totalProcessLatency += processLatency; if (processed > 0) { @@ -684,8 +686,9 @@ void runOnce() { processRateSensor.record(processed, now); // This metric is scaled to represent the _average_ processing time of _each_ - // task. Note, it's hard to interpret this as defined, but we would need a KIP - // to change it to simply report the overall time spent processing all tasks. + // task. Note, it's hard to interpret this as defined; the per-task process-ratio + // as well as total time ratio spent on processing compared with polling / committing etc + // are reported on other metrics. processLatencySensor.record(processLatency / (double) processed, now); } @@ -720,6 +723,10 @@ void runOnce() { numIterations++; } } while (true); + + // we record the ratio out of the while loop so that the accumulated latency spans over + // multiple iterations with reasonably large max.num.records and hence is less vulnerable to outliers + taskManager.recordTaskProcessRatio(totalProcessLatency); } now = time.milliseconds(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index 34fc600f57cd..5b8d60cfaa47 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -200,6 +200,10 @@ default Map committableOffsetsAndMetadata() { return Collections.emptyMap(); } + default void recordProcessBatchTime(final long processBatchTime) {} + + default void recordProcessTimeRatio(final long allTaskProcessMs) {} + default boolean process(final long wallClockTime) { return false; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index b6775d47e2ae..e565215aabf3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskIdFormatException; @@ -810,16 +811,20 @@ private void commitOffsetsOrTransaction(final Map tagMap = streamsMetrics.threadLevelTagMap(threadId); - sensor.add( - new MetricName( - PROCESS + RATIO_SUFFIX, - threadLevelGroup(streamsMetrics), - PROCESS_RATIO_DESCRIPTION, - tagMap), - new Value() + addValueMetricToSensor( + sensor, + threadLevelGroup(streamsMetrics), + tagMap, + PROCESS + RATIO_SUFFIX, + PROCESS_RATIO_DESCRIPTION ); return sensor; } @@ -252,13 +250,12 @@ public static Sensor punctuateRatioSensor(final String threadId, final Sensor sensor = streamsMetrics.threadLevelSensor(threadId, PUNCTUATE + RATIO_SUFFIX, Sensor.RecordingLevel.INFO); final Map tagMap = streamsMetrics.threadLevelTagMap(threadId); - sensor.add( - new MetricName( - PUNCTUATE + RATIO_SUFFIX, - threadLevelGroup(streamsMetrics), - PUNCTUATE_RATIO_DESCRIPTION, - tagMap), - new Value() + addValueMetricToSensor( + sensor, + threadLevelGroup(streamsMetrics), + tagMap, + PUNCTUATE + RATIO_SUFFIX, + PUNCTUATE_RATIO_DESCRIPTION ); return sensor; } @@ -268,13 +265,12 @@ public static Sensor pollRatioSensor(final String threadId, final Sensor sensor = streamsMetrics.threadLevelSensor(threadId, POLL + RATIO_SUFFIX, Sensor.RecordingLevel.INFO); final Map tagMap = streamsMetrics.threadLevelTagMap(threadId); - sensor.add( - new MetricName( - POLL + RATIO_SUFFIX, - threadLevelGroup(streamsMetrics), - POLL_RATIO_DESCRIPTION, - tagMap), - new Value() + addValueMetricToSensor( + sensor, + threadLevelGroup(streamsMetrics), + tagMap, + POLL + RATIO_SUFFIX, + POLL_RATIO_DESCRIPTION ); return sensor; } @@ -284,13 +280,12 @@ public static Sensor commitRatioSensor(final String threadId, final Sensor sensor = streamsMetrics.threadLevelSensor(threadId, COMMIT + RATIO_SUFFIX, Sensor.RecordingLevel.INFO); final Map tagMap = streamsMetrics.threadLevelTagMap(threadId); - sensor.add( - new MetricName( - COMMIT + RATIO_SUFFIX, - threadLevelGroup(streamsMetrics), - COMMIT_RATIO_DESCRIPTION, - tagMap), - new Value() + addValueMetricToSensor( + sensor, + threadLevelGroup(streamsMetrics), + tagMap, + COMMIT + RATIO_SUFFIX, + COMMIT_RATIO_DESCRIPTION ); return sensor; } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index b9f131dd5cc8..1c3d0113626a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -109,7 +109,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; @Category({IntegrationTest.class}) public class QueryableStateIntegrationTest { @@ -626,7 +625,7 @@ public void shouldBeAbleQueryStandbyStateDuringRebalance() throws Exception { } @Test - public void concurrentAccesses() throws Exception { + public void shouldAllowConcurrentAccesses() throws Exception { final int numIterations = 500000; final String storeName = "word-count-store"; final String windowStoreName = "windowed-word-count-store"; @@ -646,8 +645,8 @@ public void concurrentAccesses() throws Exception { producerThread.start(); try { - waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, numberOfWordsPerIteration); - waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, numberOfWordsPerIteration); + waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, 1); + waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, 1); final ReadOnlyKeyValueStore keyValueStore = kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName + "-" + streamConcurrent, QueryableStoreTypes.keyValueStore())); @@ -659,7 +658,7 @@ public void concurrentAccesses() throws Exception { final Map expectedCount = new HashMap<>(); while (producerRunnable.getCurrIteration() < numIterations) { verifyGreaterOrEqual(inputValuesKeys.toArray(new String[0]), expectedWindowState, - expectedCount, windowStore, keyValueStore, true); + expectedCount, windowStore, keyValueStore); } } finally { producerRunnable.shutdown(); @@ -1140,30 +1139,21 @@ private void verifyCanGetByKey(final String[] keys, * @param expectedCount Expected count * @param windowStore Window Store * @param keyValueStore Key-value store - * @param failIfKeyNotFound if true, tests fails if an expected key is not found in store. If false, - * the method merely inserts the new found key into the list of - * expected keys. */ private void verifyGreaterOrEqual(final String[] keys, final Map expectedWindowedCount, final Map expectedCount, final ReadOnlyWindowStore windowStore, - final ReadOnlyKeyValueStore keyValueStore, - final boolean failIfKeyNotFound) { + final ReadOnlyKeyValueStore keyValueStore) { final Map windowState = new HashMap<>(); final Map countState = new HashMap<>(); for (final String key : keys) { final Map map = fetchMap(windowStore, key); - if (map.equals(Collections.emptyMap()) && failIfKeyNotFound) { - fail("Key in windowed-store not found " + key); - } windowState.putAll(map); final Long value = keyValueStore.get(key); if (value != null) { countState.put(key, value); - } else if (failIfKeyNotFound) { - fail("Key in key-value-store not found " + key); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/metrics/TaskMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/metrics/TaskMetricsTest.java index d7f92a7414b6..c77ef9441efb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/metrics/TaskMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/metrics/TaskMetricsTest.java @@ -77,6 +77,30 @@ public void setUp() { mockStatic(StreamsMetricsImpl.class); } + @Test + public void shouldGetActiveProcessRatioSensor() { + final String operation = "active-process-ratio"; + expect(streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, operation, RecordingLevel.INFO)) + .andReturn(expectedSensor); + final String ratioDescription = "The fraction of time the thread spent " + + "on processing this task among all assigned active tasks"; + expect(streamsMetrics.taskLevelTagMap(THREAD_ID, TASK_ID)).andReturn(tagMap); + StreamsMetricsImpl.addValueMetricToSensor( + expectedSensor, + TASK_LEVEL_GROUP, + tagMap, + operation, + ratioDescription + ); + + replay(StreamsMetricsImpl.class, streamsMetrics); + + final Sensor sensor = TaskMetrics.activeProcessRatioSensor(THREAD_ID, TASK_ID, streamsMetrics); + + verify(StreamsMetricsImpl.class, streamsMetrics); + assertThat(sensor, is(expectedSensor)); + } + @Test public void shouldGetProcessLatencySensor() { final String operation = "process-latency"; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 5218020e2998..63586d3a87a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -371,6 +371,30 @@ public void shouldProcessInOrder() { assertEquals(3, source2.numReceived); } + @Test + public void shouldRecordProcessRatio() { + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + + final KafkaMetric metric = getMetric("active-process", "%s-ratio", task.id().toString(), StreamsConfig.METRICS_LATEST); + + assertEquals(0.0d, (double) metric.metricValue(), 0.0001d); + + task.recordProcessBatchTime(10L); + task.recordProcessBatchTime(15L); + task.recordProcessTimeRatio(100L); + + assertEquals(0.25d, (double) metric.metricValue(), 0.0001d); + + task.recordProcessBatchTime(10L); + + assertEquals(0.25d, (double) metric.metricValue(), 0.0001d); + + task.recordProcessBatchTime(10L); + task.recordProcessTimeRatio(20L); + + assertEquals(1.0d, (double) metric.metricValue(), 0.0001d); + } + @Test public void shouldConstructMetricsWithBuiltInMetricsVersion0100To24() { testMetrics(StreamsConfig.METRICS_0100_TO_24); @@ -442,8 +466,9 @@ private void testMetricsForBuiltInMetricsVersionLatest() { assertNull(getMetric("commit", "%s-rate", "all", builtInMetricsVersion)); assertNull(getMetric("commit", "%s-total", "all", builtInMetricsVersion)); - assertNotNull(getMetric("process", "%s-latency-avg", task.id().toString(), builtInMetricsVersion)); + assertNotNull(getMetric("active-process", "%s-ratio", task.id().toString(), builtInMetricsVersion)); assertNotNull(getMetric("process", "%s-latency-max", task.id().toString(), builtInMetricsVersion)); + assertNotNull(getMetric("process", "%s-latency-avg", task.id().toString(), builtInMetricsVersion)); assertNotNull(getMetric("punctuate", "%s-latency-avg", task.id().toString(), builtInMetricsVersion)); assertNotNull(getMetric("punctuate", "%s-latency-max", task.id().toString(), builtInMetricsVersion)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 371b29a5abf1..9e3272612d36 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -356,6 +356,15 @@ private void shouldCreateMetricsAtStartup(final String builtInMetricsVersion) { "task-closed-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); assertNotNull(metrics.metrics().get(metrics.metricName( "task-closed-total", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "process-ratio", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "punctuate-ratio", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "commit-ratio", defaultGroupName, descriptionIsNotVerified, defaultTags))); + assertNotNull(metrics.metrics().get(metrics.metricName( + "poll-ratio", defaultGroupName, descriptionIsNotVerified, defaultTags))); + if (builtInMetricsVersion.equals(StreamsConfig.METRICS_0100_TO_24)) { assertNotNull(metrics.metrics().get(metrics.metricName( "skipped-records-rate", defaultGroupName, descriptionIsNotVerified, defaultTags))); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index f3b33e1fd3b6..95717aa75213 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.StreamsException; @@ -146,6 +147,7 @@ public class TaskManagerTest { private Admin adminClient; private TaskManager taskManager; + private final Time time = new MockTime(); @Rule public final TemporaryFolder testFolder = new TemporaryFolder(); @@ -1846,11 +1848,11 @@ public void shouldProcessActiveTasks() { ); // check that we should be processing at most max num records - assertThat(taskManager.process(3, 0L), is(6)); + assertThat(taskManager.process(3, time), is(6)); // check that if there's no records proccssible, we would stop early - assertThat(taskManager.process(3, 0L), is(5)); - assertThat(taskManager.process(3, 0L), is(0)); + assertThat(taskManager.process(3, time), is(5)); + assertThat(taskManager.process(3, time), is(0)); } @Test @@ -1876,7 +1878,7 @@ public boolean process(final long wallClockTime) { final TopicPartition partition = taskId00Partitions.iterator().next(); task00.addRecords(partition, singletonList(getConsumerRecord(partition, 0L))); - assertThrows(TaskMigratedException.class, () -> taskManager.process(1, 0L)); + assertThrows(TaskMigratedException.class, () -> taskManager.process(1, time)); } @Test @@ -1902,7 +1904,7 @@ public boolean process(final long wallClockTime) { final TopicPartition partition = taskId00Partitions.iterator().next(); task00.addRecords(partition, singletonList(getConsumerRecord(partition, 0L))); - final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.process(1, 0L)); + final RuntimeException exception = assertThrows(RuntimeException.class, () -> taskManager.process(1, time)); assertThat(exception.getMessage(), is("oops")); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java index acd429ece015..5cb5708ad7c3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ThreadMetricsTest.java @@ -80,6 +80,90 @@ public void setUp() { mockStatic(StreamsMetricsImpl.class); } + @Test + public void shouldGetProcessRatioSensor() { + final String operation = "process-ratio"; + final String ratioDescription = "The fraction of time the thread spent on processing active tasks"; + expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); + expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); + StreamsMetricsImpl.addValueMetricToSensor( + expectedSensor, + threadLevelGroup, + tagMap, + operation, + ratioDescription + ); + replay(StreamsMetricsImpl.class, streamsMetrics); + + final Sensor sensor = ThreadMetrics.processRatioSensor(THREAD_ID, streamsMetrics); + + verify(StreamsMetricsImpl.class, streamsMetrics); + assertThat(sensor, is(expectedSensor)); + } + + @Test + public void shouldGetPunctuateRatioSensor() { + final String operation = "punctuate-ratio"; + final String ratioDescription = "The fraction of time the thread spent on punctuating active tasks"; + expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); + expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); + StreamsMetricsImpl.addValueMetricToSensor( + expectedSensor, + threadLevelGroup, + tagMap, + operation, + ratioDescription + ); + replay(StreamsMetricsImpl.class, streamsMetrics); + + final Sensor sensor = ThreadMetrics.punctuateRatioSensor(THREAD_ID, streamsMetrics); + + verify(StreamsMetricsImpl.class, streamsMetrics); + assertThat(sensor, is(expectedSensor)); + } + + @Test + public void shouldGetPollRatioSensor() { + final String operation = "poll-ratio"; + final String ratioDescription = "The fraction of time the thread spent on polling records from consumer"; + expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); + expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); + StreamsMetricsImpl.addValueMetricToSensor( + expectedSensor, + threadLevelGroup, + tagMap, + operation, + ratioDescription + ); + replay(StreamsMetricsImpl.class, streamsMetrics); + + final Sensor sensor = ThreadMetrics.pollRatioSensor(THREAD_ID, streamsMetrics); + + verify(StreamsMetricsImpl.class, streamsMetrics); + assertThat(sensor, is(expectedSensor)); + } + + @Test + public void shouldGetCommitRatioSensor() { + final String operation = "commit-ratio"; + final String ratioDescription = "The fraction of time the thread spent on committing all tasks"; + expect(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.INFO)).andReturn(expectedSensor); + expect(streamsMetrics.threadLevelTagMap(THREAD_ID)).andReturn(tagMap); + StreamsMetricsImpl.addValueMetricToSensor( + expectedSensor, + threadLevelGroup, + tagMap, + operation, + ratioDescription + ); + replay(StreamsMetricsImpl.class, streamsMetrics); + + final Sensor sensor = ThreadMetrics.commitRatioSensor(THREAD_ID, streamsMetrics); + + verify(StreamsMetricsImpl.class, streamsMetrics); + assertThat(sensor, is(expectedSensor)); + } + @Test public void shouldGetCreateTaskSensor() { final String operation = "task-created";