Skip to content

Commit

Permalink
KAFKA-9753: Add active tasks process ratio (#8370)
Browse files Browse the repository at this point in the history
Measure the percentage ratio the stream thread spent on processing each task among all assigned active tasks (KIP-444). Also add unit tests to cover the added metrics in this PR and the previous #8358. Also trying to fix the flaky test reported in KAFKA-5842

Co-authored-by: John Roesler <vvcephei@apache.org>

Reviewers: Bruno Cadonna <bruno@confluent.io>, John Roesler <vvcephei@apache.org>
  • Loading branch information
guozhangwang committed Mar 31, 2020
1 parent 121c769 commit 353aa62
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 59 deletions.
Expand Up @@ -89,7 +89,10 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private final PunctuationQueue streamTimePunctuationQueue;
private final PunctuationQueue systemTimePunctuationQueue;

private long processTimeMs = 0L;

private final Sensor closeTaskSensor;
private final Sensor processRatioSensor;
private final Sensor processLatencySensor;
private final Sensor punctuateLatencySensor;
private final Sensor enforcedProcessingSensor;
Expand Down Expand Up @@ -131,6 +134,7 @@ public StreamTask(final TaskId id,
} else {
enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics);
}
processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics);
processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics);
punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics);

Expand Down Expand Up @@ -206,6 +210,7 @@ public void completeRestoration() {
initializeTopology();
processorContext.initialize();
idleStartTimeMs = RecordQueue.UNKNOWN;

transitionTo(State.RUNNING);

log.info("Restored and ready to run");
Expand Down Expand Up @@ -616,6 +621,17 @@ public boolean process(final long wallClockTime) {
return true;
}

@Override
public void recordProcessBatchTime(final long processBatchTime) {
processTimeMs += processBatchTime;
}

@Override
public void recordProcessTimeRatio(final long allTaskProcessMs) {
processRatioSensor.record((double) processTimeMs / allTaskProcessMs);
processTimeMs = 0L;
}

private String getStacktraceString(final RuntimeException e) {
String stacktrace = null;
try (final StringWriter stringWriter = new StringWriter();
Expand Down
Expand Up @@ -659,6 +659,8 @@ void runOnce() {
// if there's no active restoring or standby updating it would not try to fetch any data
changelogReader.restore();

// TODO: we should record the restore latency and its relative time spent ratio after
// we figure out how to move this method out of the stream thread
advanceNowAndComputeLatency();

long totalCommitLatency = 0L;
Expand All @@ -675,7 +677,7 @@ void runOnce() {
* 6. Otherwise, increment N.
*/
do {
final int processed = taskManager.process(numIterations, now);
final int processed = taskManager.process(numIterations, time);
final long processLatency = advanceNowAndComputeLatency();
totalProcessLatency += processLatency;
if (processed > 0) {
Expand All @@ -684,8 +686,9 @@ void runOnce() {
processRateSensor.record(processed, now);

// This metric is scaled to represent the _average_ processing time of _each_
// task. Note, it's hard to interpret this as defined, but we would need a KIP
// to change it to simply report the overall time spent processing all tasks.
// task. Note, it's hard to interpret this as defined; the per-task process-ratio
// as well as total time ratio spent on processing compared with polling / committing etc
// are reported on other metrics.
processLatencySensor.record(processLatency / (double) processed, now);
}

Expand Down Expand Up @@ -720,6 +723,10 @@ void runOnce() {
numIterations++;
}
} while (true);

// we record the ratio out of the while loop so that the accumulated latency spans over
// multiple iterations with reasonably large max.num.records and hence is less vulnerable to outliers
taskManager.recordTaskProcessRatio(totalProcessLatency);
}

now = time.milliseconds();
Expand Down
Expand Up @@ -200,6 +200,10 @@ default Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
return Collections.emptyMap();
}

default void recordProcessBatchTime(final long processBatchTime) {}

default void recordProcessTimeRatio(final long allTaskProcessMs) {}

default boolean process(final long wallClockTime) {
return false;
}
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
Expand Down Expand Up @@ -810,16 +811,20 @@ private void commitOffsetsOrTransaction(final Map<TaskId, Map<TopicPartition, Of
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
int process(final int maxNumRecords, final long now) {
int process(final int maxNumRecords, final Time time) {
int totalProcessed = 0;

long now = time.milliseconds();
for (final Task task : activeTaskIterable()) {
try {
int processed = 0;
final long then = now;
while (processed < maxNumRecords && task.process(now)) {
processed++;
}
now = time.milliseconds();
totalProcessed += processed;
task.recordProcessBatchTime(then - now);
} catch (final TaskMigratedException e) {
log.info("Failed to process stream task {} since it got migrated to another thread already. " +
"Will trigger a new rebalance and close all tasks as zombies together.", task.id());
Expand All @@ -833,6 +838,12 @@ int process(final int maxNumRecords, final long now) {
return totalProcessed;
}

void recordTaskProcessRatio(final long totalProcessLatencyMs) {
for (final Task task : activeTaskIterable()) {
task.recordProcessTimeRatio(totalProcessLatencyMs);
}
}

/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
Expand Down
Expand Up @@ -24,10 +24,12 @@
import java.util.Map;

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor;

public class TaskMetrics {
private TaskMetrics() {}
Expand All @@ -36,6 +38,7 @@ private TaskMetrics() {}
private static final String MAX_LATENCY_DESCRIPTION = "The maximum latency of ";
private static final String RATE_DESCRIPTION_PREFIX = "The average number of ";
private static final String RATE_DESCRIPTION_SUFFIX = " per second";
private static final String ACTIVE_TASK_PREFIX = "active-";

private static final String COMMIT = "commit";
private static final String COMMIT_DESCRIPTION = "calls to commit";
Expand Down Expand Up @@ -79,6 +82,9 @@ private TaskMetrics() {}
private static final String PROCESS_AVG_LATENCY_DESCRIPTION = AVG_LATENCY_DESCRIPTION + PROCESS_DESCRIPTION;
private static final String PROCESS_MAX_LATENCY_DESCRIPTION = MAX_LATENCY_DESCRIPTION + PROCESS_DESCRIPTION;

private static final String PROCESS_RATIO_DESCRIPTION = "The fraction of time the thread spent " +
"on processing this task among all assigned active tasks";

public static Sensor processLatencySensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics) {
Expand All @@ -96,6 +102,21 @@ public static Sensor processLatencySensor(final String threadId,
return emptySensor(threadId, taskId, PROCESS_LATENCY, RecordingLevel.DEBUG, streamsMetrics);
}

public static Sensor activeProcessRatioSensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics) {
final String name = ACTIVE_TASK_PREFIX + PROCESS + RATIO_SUFFIX;
final Sensor sensor = streamsMetrics.taskLevelSensor(threadId, taskId, name, Sensor.RecordingLevel.INFO);
addValueMetricToSensor(
sensor,
TASK_LEVEL_GROUP,
streamsMetrics.taskLevelTagMap(threadId, taskId),
name,
PROCESS_RATIO_DESCRIPTION
);
return sensor;
}

public static Sensor punctuateSensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics) {
Expand Down
Expand Up @@ -16,10 +16,8 @@
*/
package org.apache.kafka.streams.processor.internals.metrics;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;

import java.util.Map;
Expand All @@ -36,6 +34,7 @@
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor;

public class ThreadMetrics {
private ThreadMetrics() {}
Expand Down Expand Up @@ -87,13 +86,13 @@ private ThreadMetrics() {}
private static final String COMMIT_OVER_TASKS_MAX_LATENCY_DESCRIPTION =
"The maximum commit latency over all tasks assigned to one stream thread";
private static final String PROCESS_RATIO_DESCRIPTION =
"The fraction of time the thread spent on processing active tasks.";
"The fraction of time the thread spent on processing active tasks";
private static final String PUNCTUATE_RATIO_DESCRIPTION =
"The fraction of time the thread spent on punctuating active tasks.";
"The fraction of time the thread spent on punctuating active tasks";
private static final String POLL_RATIO_DESCRIPTION =
"The fraction of time the thread spent on polling records from consumer.";
"The fraction of time the thread spent on polling records from consumer";
private static final String COMMIT_RATIO_DESCRIPTION =
"The fraction of time the thread spent on committing all tasks.";
"The fraction of time the thread spent on committing all tasks";

public static Sensor createTaskSensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) {
Expand Down Expand Up @@ -236,13 +235,12 @@ public static Sensor processRatioSensor(final String threadId,
final Sensor sensor =
streamsMetrics.threadLevelSensor(threadId, PROCESS + RATIO_SUFFIX, Sensor.RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(threadId);
sensor.add(
new MetricName(
PROCESS + RATIO_SUFFIX,
threadLevelGroup(streamsMetrics),
PROCESS_RATIO_DESCRIPTION,
tagMap),
new Value()
addValueMetricToSensor(
sensor,
threadLevelGroup(streamsMetrics),
tagMap,
PROCESS + RATIO_SUFFIX,
PROCESS_RATIO_DESCRIPTION
);
return sensor;
}
Expand All @@ -252,13 +250,12 @@ public static Sensor punctuateRatioSensor(final String threadId,
final Sensor sensor =
streamsMetrics.threadLevelSensor(threadId, PUNCTUATE + RATIO_SUFFIX, Sensor.RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(threadId);
sensor.add(
new MetricName(
PUNCTUATE + RATIO_SUFFIX,
threadLevelGroup(streamsMetrics),
PUNCTUATE_RATIO_DESCRIPTION,
tagMap),
new Value()
addValueMetricToSensor(
sensor,
threadLevelGroup(streamsMetrics),
tagMap,
PUNCTUATE + RATIO_SUFFIX,
PUNCTUATE_RATIO_DESCRIPTION
);
return sensor;
}
Expand All @@ -268,13 +265,12 @@ public static Sensor pollRatioSensor(final String threadId,
final Sensor sensor =
streamsMetrics.threadLevelSensor(threadId, POLL + RATIO_SUFFIX, Sensor.RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(threadId);
sensor.add(
new MetricName(
POLL + RATIO_SUFFIX,
threadLevelGroup(streamsMetrics),
POLL_RATIO_DESCRIPTION,
tagMap),
new Value()
addValueMetricToSensor(
sensor,
threadLevelGroup(streamsMetrics),
tagMap,
POLL + RATIO_SUFFIX,
POLL_RATIO_DESCRIPTION
);
return sensor;
}
Expand All @@ -284,13 +280,12 @@ public static Sensor commitRatioSensor(final String threadId,
final Sensor sensor =
streamsMetrics.threadLevelSensor(threadId, COMMIT + RATIO_SUFFIX, Sensor.RecordingLevel.INFO);
final Map<String, String> tagMap = streamsMetrics.threadLevelTagMap(threadId);
sensor.add(
new MetricName(
COMMIT + RATIO_SUFFIX,
threadLevelGroup(streamsMetrics),
COMMIT_RATIO_DESCRIPTION,
tagMap),
new Value()
addValueMetricToSensor(
sensor,
threadLevelGroup(streamsMetrics),
tagMap,
COMMIT + RATIO_SUFFIX,
COMMIT_RATIO_DESCRIPTION
);
return sensor;
}
Expand Down
Expand Up @@ -109,7 +109,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@Category({IntegrationTest.class})
public class QueryableStateIntegrationTest {
Expand Down Expand Up @@ -626,7 +625,7 @@ public void shouldBeAbleQueryStandbyStateDuringRebalance() throws Exception {
}

@Test
public void concurrentAccesses() throws Exception {
public void shouldAllowConcurrentAccesses() throws Exception {
final int numIterations = 500000;
final String storeName = "word-count-store";
final String windowStoreName = "windowed-word-count-store";
Expand All @@ -646,8 +645,8 @@ public void concurrentAccesses() throws Exception {
producerThread.start();

try {
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, numberOfWordsPerIteration);
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, numberOfWordsPerIteration);
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, 1);
waitUntilAtLeastNumRecordProcessed(outputTopicConcurrentWindowed, 1);

final ReadOnlyKeyValueStore<String, Long> keyValueStore =
kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName + "-" + streamConcurrent, QueryableStoreTypes.keyValueStore()));
Expand All @@ -659,7 +658,7 @@ public void concurrentAccesses() throws Exception {
final Map<String, Long> expectedCount = new HashMap<>();
while (producerRunnable.getCurrIteration() < numIterations) {
verifyGreaterOrEqual(inputValuesKeys.toArray(new String[0]), expectedWindowState,
expectedCount, windowStore, keyValueStore, true);
expectedCount, windowStore, keyValueStore);
}
} finally {
producerRunnable.shutdown();
Expand Down Expand Up @@ -1140,30 +1139,21 @@ private void verifyCanGetByKey(final String[] keys,
* @param expectedCount Expected count
* @param windowStore Window Store
* @param keyValueStore Key-value store
* @param failIfKeyNotFound if true, tests fails if an expected key is not found in store. If false,
* the method merely inserts the new found key into the list of
* expected keys.
*/
private void verifyGreaterOrEqual(final String[] keys,
final Map<String, Long> expectedWindowedCount,
final Map<String, Long> expectedCount,
final ReadOnlyWindowStore<String, Long> windowStore,
final ReadOnlyKeyValueStore<String, Long> keyValueStore,
final boolean failIfKeyNotFound) {
final ReadOnlyKeyValueStore<String, Long> keyValueStore) {
final Map<String, Long> windowState = new HashMap<>();
final Map<String, Long> countState = new HashMap<>();

for (final String key : keys) {
final Map<String, Long> map = fetchMap(windowStore, key);
if (map.equals(Collections.<String, Long>emptyMap()) && failIfKeyNotFound) {
fail("Key in windowed-store not found " + key);
}
windowState.putAll(map);
final Long value = keyValueStore.get(key);
if (value != null) {
countState.put(key, value);
} else if (failIfKeyNotFound) {
fail("Key in key-value-store not found " + key);
}
}

Expand Down

0 comments on commit 353aa62

Please sign in to comment.