Skip to content

Commit

Permalink
HOTFIX: Fixes to metric names of Streams
Browse files Browse the repository at this point in the history
A couple of fixes to metric names to match the KIP
- Removed extra strings in the metric names that are already in the tags
- add a separate metric for "all"

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3491 from enothereska/hotfix-metric-names
  • Loading branch information
enothereska authored and guozhangwang committed Aug 3, 2017
1 parent fb21209 commit 6bee1e9
Show file tree
Hide file tree
Showing 15 changed files with 552 additions and 207 deletions.
321 changes: 286 additions & 35 deletions docs/ops.html

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import org.apache.kafka.streams.processor.Punctuator;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class ProcessorNode<K, V> {
Expand Down Expand Up @@ -102,7 +100,7 @@ public void addChild(ProcessorNode<?, ?> child) {
public void init(ProcessorContext context) {
this.context = context;
try {
nodeMetrics = new NodeMetrics(context.metrics(), name, "task." + context.taskId());
nodeMetrics = new NodeMetrics(context.metrics(), name, context);
nodeMetrics.metrics.measureLatencyNs(time, initDelegate, nodeMetrics.nodeCreationSensor);
} catch (Exception e) {
throw new StreamsException(String.format("failed to initialize processor %s", name), e);
Expand Down Expand Up @@ -163,7 +161,6 @@ public String toString(String indent) {

protected static final class NodeMetrics {
final StreamsMetricsImpl metrics;
final Map<String, String> metricTags;

final Sensor nodeProcessTimeSensor;
final Sensor nodePunctuateTimeSensor;
Expand All @@ -173,21 +170,25 @@ protected static final class NodeMetrics {
final Sensor nodeDestructionSensor;


public NodeMetrics(StreamsMetrics metrics, String name, String sensorNamePrefix) {
public NodeMetrics(final StreamsMetrics metrics, final String name, final ProcessorContext context) {
final String scope = "processor-node";
final String tagKey = "processor-node-id";
final String tagValue = name;
final String tagKey = "task-id";
final String tagValue = context.taskId().toString();
this.metrics = (StreamsMetricsImpl) metrics;
this.metricTags = new LinkedHashMap<>();
this.metricTags.put(tagKey, tagValue);

// these are all latency metrics
this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "process", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "punctuate", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "skippedDueToDeserializationError", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(scope, name, "process",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(scope, name, "punctuate",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, name, "create",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, name, "destroy",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, name, "forward",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(scope, name, "skippedDueToDeserializationError",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
}

public void removeAllSensors() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ protected class TaskMetrics {
TaskMetrics(final StreamsMetrics metrics) {
final String name = id().toString();
this.metrics = (StreamsMetricsImpl) metrics;
taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit", Sensor.RecordingLevel.DEBUG, "streams-task-id", name);
taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit",
Sensor.RecordingLevel.DEBUG);
}

void removeAllSensors() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;

public class StreamsMetricsImpl implements StreamsMetrics {
private static final Logger log = LoggerFactory.getLogger(StreamsMetricsImpl.class);
Expand Down Expand Up @@ -94,34 +97,43 @@ private String sensorName(String operationName, String entityName) {
}
}

private Map<String, String> tagMap(String... tags) {
public Map<String, String> tagMap(String... tags) {
// extract the additional tags if there are any
Map<String, String> tagMap = new HashMap<>(this.tags);
if ((tags.length % 2) != 0)
throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");

for (int i = 0; i < tags.length; i += 2)
tagMap.put(tags[i], tags[i + 1]);
if (tags != null) {
if ((tags.length % 2) != 0)
throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");

for (int i = 0; i < tags.length; i += 2)
tagMap.put(tags[i], tags[i + 1]);
}
return tagMap;
}


private Map<String, String> constructTags(final String scopeName, final String entityName, final String... tags) {
List<String> updatedTagList = new ArrayList(Arrays.asList(tags));
updatedTagList.add(scopeName + "-id");
updatedTagList.add(entityName);
return tagMap(updatedTagList.toArray(new String[updatedTagList.size()]));
}

/**
* @throws IllegalArgumentException if tags is not constructed in key-value pairs
*/
@Override
public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags) {
Map<String, String> tagMap = tagMap(tags);
public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, String operationName,
Sensor.RecordingLevel recordingLevel, String... tags) {
final Map<String, String> tagMap = constructTags(scopeName, entityName, tags);
final Map<String, String> allTagMap = constructTags(scopeName, "all", tags);

// first add the global operation metrics if not yet, with the global tags only
Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
addLatencyMetrics(scopeName, parent, "all", operationName, tagMap);
addLatencyMetrics(scopeName, parent, operationName, allTagMap);

// add the operation metrics with additional tags
Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
addLatencyMetrics(scopeName, sensor, entityName, operationName, tagMap);
addLatencyMetrics(scopeName, sensor, operationName, tagMap);

parentSensors.put(sensor, parent);

Expand All @@ -133,35 +145,37 @@ public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName,
*/
@Override
public Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags) {
Map<String, String> tagMap = tagMap(tags);
final Map<String, String> tagMap = constructTags(scopeName, entityName, tags);
final Map<String, String> allTagMap = constructTags(scopeName, "all", tags);

// first add the global operation metrics if not yet, with the global tags only
Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
addThroughputMetrics(scopeName, parent, "all", operationName, tagMap);
addThroughputMetrics(scopeName, parent, operationName, allTagMap);

// add the operation metrics with additional tags
Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
addThroughputMetrics(scopeName, sensor, entityName, operationName, tagMap);
addThroughputMetrics(scopeName, sensor, operationName, tagMap);

parentSensors.put(sensor, parent);

return sensor;
}

private void addLatencyMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-latency-avg", groupNameFromScope(scopeName),
"The average latency of " + entityName + " " + opName + " operation.", tags), new Avg());
maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-latency-max", groupNameFromScope(scopeName),
"The max latency of " + entityName + " " + opName + " operation.", tags), new Max());
addThroughputMetrics(scopeName, sensor, entityName, opName, tags);
private void addLatencyMetrics(String scopeName, Sensor sensor, String opName, Map<String, String> tags) {

maybeAddMetric(sensor, metrics.metricName(opName + "-latency-avg", groupNameFromScope(scopeName),
"The average latency of " + opName + " operation.", tags), new Avg());
maybeAddMetric(sensor, metrics.metricName(opName + "-latency-max", groupNameFromScope(scopeName),
"The max latency of " + opName + " operation.", tags), new Max());
addThroughputMetrics(scopeName, sensor, opName, tags);
}

private void addThroughputMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-rate", groupNameFromScope(scopeName),
"The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
private void addThroughputMetrics(String scopeName, Sensor sensor, String opName, Map<String, String> tags) {
maybeAddMetric(sensor, metrics.metricName(opName + "-rate", groupNameFromScope(scopeName),
"The average number of occurrence of " + opName + " operation per second.", tags), new Rate(new Count()));
}

private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
public void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
if (!metrics.metrics().containsKey(name)) {
sensor.add(name, stat);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ private void initInternal(final ProcessorContext context) {
keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);

this.cacheName = context.taskId() + "-" + underlying.name();
this.cache = this.context.getCache();
this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), underlying.name());
cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> entries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,31 @@ public MeteredKeyValueStore(final KeyValueStore<K, V> inner,

@Override
public void init(ProcessorContext context, StateStore root) {
final String name = name();
final String tagKey = "task-id";
final String tagValue = context.taskId().toString();

this.context = context;
this.root = root;

this.metrics = context.metrics();
this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put", Sensor.RecordingLevel.DEBUG);
this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put-if-absent", Sensor.RecordingLevel.DEBUG);
this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "get", Sensor.RecordingLevel.DEBUG);
this.deleteTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "delete", Sensor.RecordingLevel.DEBUG);
this.putAllTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put-all", Sensor.RecordingLevel.DEBUG);
this.allTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "all", Sensor.RecordingLevel.DEBUG);
this.rangeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "range", Sensor.RecordingLevel.DEBUG);
this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "flush", Sensor.RecordingLevel.DEBUG);
final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "restore", Sensor.RecordingLevel.DEBUG);
this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put-if-absent",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "get",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.deleteTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "delete",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.putAllTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put-all",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.allTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "all",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.rangeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "range",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "flush",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "restore",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);

// register and possibly restore the state from the logs
if (restoreTime.shouldRecord()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,22 @@ class MeteredSegmentedBytesStore extends WrappedStateStore.AbstractStateStore im

@Override
public void init(ProcessorContext context, StateStore root) {
final String name = name();
final String tagKey = "task-id";
final String tagValue = context.taskId().toString();
this.metrics = context.metrics();
this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put", Sensor.RecordingLevel.DEBUG);
this.fetchTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "fetch", Sensor.RecordingLevel.DEBUG);
this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "flush", Sensor.RecordingLevel.DEBUG);
this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "get", Sensor.RecordingLevel.DEBUG);
this.removeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "remove", Sensor.RecordingLevel.DEBUG);

final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "restore", Sensor.RecordingLevel.DEBUG);
this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.fetchTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "fetch",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "flush",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "get",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.removeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "remove",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);

final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "restore",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
// register and possibly restore the state from the logs
final long startNs = time.nanoseconds();
try {
Expand Down

0 comments on commit 6bee1e9

Please sign in to comment.