From 31edf2c02ec3d033ebcf29ebccb38c70c2851110 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Thu, 6 Jul 2017 09:08:12 +0100 Subject: [PATCH 1/5] Fixes to metric names --- .../processor/internals/ProcessorNode.java | 10 +++--- .../processor/internals/StreamTask.java | 2 +- .../internals/StreamsMetricsImpl.java | 28 ++++++++------- .../streams/state/internals/NamedCache.java | 15 ++++---- .../internals/ProcessorNodeTest.java | 34 ++++++++++++------- .../processor/internals/StreamTaskTest.java | 23 +++++++------ .../state/internals/NamedCacheTest.java | 14 ++++---- 7 files changed, 68 insertions(+), 58 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 47f63116bf50..e0c553f98e89 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -181,11 +181,11 @@ public NodeMetrics(StreamsMetrics metrics, String name, String sensorNamePrefix) this.metricTags.put(tagKey, tagValue); // these are all latency metrics - this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "process", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); - this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "punctuate", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); - this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); - this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); - this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix, "process", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix, "punctuate", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, sensorNamePrefix, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); } public void removeAllSensors() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 3fd4596da482..aaa3f0de7ee0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -79,7 +79,7 @@ protected class TaskMetrics { TaskMetrics(final StreamsMetrics metrics) { final String name = id().toString(); this.metrics = (StreamsMetricsImpl) metrics; - taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit", Sensor.RecordingLevel.DEBUG, "streams-task-id", name); + taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", null, "commit", Sensor.RecordingLevel.DEBUG, "streams-task-id", name); } void removeAllSensors() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java index ec35c8605dbb..65bfbc0949b6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java @@ -97,12 +97,13 @@ private String sensorName(String operationName, String entityName) { private Map tagMap(String... tags) { // extract the additional tags if there are any Map tagMap = new HashMap<>(this.tags); - if ((tags.length % 2) != 0) - throw new IllegalArgumentException("Tags needs to be specified in key-value pairs"); - - for (int i = 0; i < tags.length; i += 2) - tagMap.put(tags[i], tags[i + 1]); + if (tags != null) { + if ((tags.length % 2) != 0) + throw new IllegalArgumentException("Tags needs to be specified in key-value pairs"); + for (int i = 0; i < tags.length; i += 2) + tagMap.put(tags[i], tags[i + 1]); + } return tagMap; } @@ -117,7 +118,7 @@ public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, // first add the global operation metrics if not yet, with the global tags only Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel); - addLatencyMetrics(scopeName, parent, "all", operationName, tagMap); + addLatencyMetrics(scopeName, parent, null, operationName, tagMap("all", "all")); // add the operation metrics with additional tags Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent); @@ -137,7 +138,7 @@ public Sensor addThroughputSensor(String scopeName, String entityName, String op // first add the global operation metrics if not yet, with the global tags only Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel); - addThroughputMetrics(scopeName, parent, "all", operationName, tagMap); + addThroughputMetrics(scopeName, parent, null, operationName, tagMap("all", "all")); // add the operation metrics with additional tags Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent); @@ -149,16 +150,17 @@ public Sensor addThroughputSensor(String scopeName, String entityName, String op } private void addLatencyMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map tags) { - maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-latency-avg", groupNameFromScope(scopeName), - "The average latency of " + entityName + " " + opName + " operation.", tags), new Avg()); - maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-latency-max", groupNameFromScope(scopeName), - "The max latency of " + entityName + " " + opName + " operation.", tags), new Max()); + + maybeAddMetric(sensor, metrics.metricName(sensorName(opName, entityName) + "-latency-avg", groupNameFromScope(scopeName), + "The average latency of " + sensorName(opName, entityName) + " operation.", tags), new Avg()); + maybeAddMetric(sensor, metrics.metricName(sensorName(opName, entityName) + "-latency-max", groupNameFromScope(scopeName), + "The max latency of " + sensorName(opName, entityName) + " operation.", tags), new Max()); addThroughputMetrics(scopeName, sensor, entityName, opName, tags); } private void addThroughputMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map tags) { - maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-rate", groupNameFromScope(scopeName), - "The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count())); + maybeAddMetric(sensor, metrics.metricName(sensorName(opName, entityName) + "-rate", groupNameFromScope(scopeName), + "The average number of occurrence of " + sensorName(opName, entityName) + " operation per second.", tags), new Rate(new Count())); } private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index c86d9dfb6f33..61b6e66e475f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -363,7 +363,6 @@ class NamedCacheMetrics { public NamedCacheMetrics(StreamsMetrics metrics) { final String scope = "record-cache"; - final String entityName = name; final String opName = "hitRatio"; final String tagKey = "record-cache-id"; final String tagValue = name; @@ -372,14 +371,14 @@ public NamedCacheMetrics(StreamsMetrics metrics) { this.metricTags = new LinkedHashMap<>(); this.metricTags.put(tagKey, tagValue); - hitRatioSensor = this.metrics.registry().sensor(entityName + "-" + opName, Sensor.RecordingLevel.DEBUG); + hitRatioSensor = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG); - hitRatioSensor.add(this.metrics.registry().metricName(entityName + "-" + opName + "-avg", groupName, - "The average cache hit ratio of " + entityName, metricTags), new Avg()); - hitRatioSensor.add(this.metrics.registry().metricName(entityName + "-" + opName + "-min", groupName, - "The minimum cache hit ratio of " + entityName, metricTags), new Min()); - hitRatioSensor.add(this.metrics.registry().metricName(entityName + "-" + opName + "-max", groupName, - "The maximum cache hit ratio of " + entityName, metricTags), new Max()); + hitRatioSensor.add(this.metrics.registry().metricName(opName + "-avg", groupName, + "The average cache hit ratio of " + name, metricTags), new Avg()); + hitRatioSensor.add(this.metrics.registry().metricName(opName + "-min", groupName, + "The minimum cache hit ratio of " + name, metricTags), new Min()); + hitRatioSensor.add(this.metrics.registry().metricName(opName + "-max", groupName, + "The maximum cache hit ratio of " + name, metricTags), new Max()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index 1db4cef21c9e..aa3a0a8c94c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -25,7 +25,9 @@ import org.apache.kafka.test.MockProcessorContext; import org.junit.Test; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import static org.junit.Assert.assertNotNull; @@ -99,30 +101,36 @@ public void testMetrics() { node.init(context); Metrics metrics = context.baseMetrics(); - String name = "task." + context.taskId() + "." + node.name(); - String[] entities = {"all", name}; + String name = "task." + context.taskId(); String[] latencyOperations = {"process", "punctuate", "create", "destroy"}; String throughputOperation = "forward"; String groupName = "stream-processor-node-metrics"; - Map tags = Collections.singletonMap("processor-node-id", node.name()); + List> tags = new ArrayList<>(); + tags.add(Collections.singletonMap("all", "all")); + tags.add(Collections.singletonMap("processor-node-id", node.name())); + for (String operation : latencyOperations) { assertNotNull(metrics.getSensor(operation)); - assertNotNull(metrics.getSensor(name + "-" + operation)); } assertNotNull(metrics.getSensor(throughputOperation)); - for (String entity : entities) { + for (Map tag : tags) { for (String operation : latencyOperations) { - assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-latency-avg", groupName, - "The average latency in milliseconds of " + entity + " " + operation + " operation.", tags))); - assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-latency-max", groupName, - "The max latency in milliseconds of " + entity + " " + operation + " operation.", tags))); - assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-rate", groupName, - "The average number of occurrence of " + entity + " " + operation + " operation per second.", tags))); + final String tmpName = tag.containsKey("all") ? operation : + name + "-" + operation; + assertNotNull(metrics.metrics().get(metrics.metricName(tmpName + "-latency-avg", groupName, + "The average latency of " + tmpName + " operation.", tag))); + assertNotNull(metrics.metrics().get(metrics.metricName(tmpName + "-latency-max", groupName, + "The max latency of " + tmpName + " operation.", tag))); + assertNotNull(metrics.metrics().get(metrics.metricName(tmpName + "-rate", groupName, + "The average number of occurrence of " + operation + " operation per second.", tag))); } - assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + throughputOperation + "-rate", groupName, - "The average number of occurrence of " + entity + " " + throughputOperation + " operation per second.", tags))); + + final String tmpName = tag.containsKey("all") ? throughputOperation : + name + "-" + throughputOperation; + assertNotNull(metrics.metrics().get(metrics.metricName(tmpName + "-rate", groupName, + "The average number of occurrence of " + tmpName + " operation per second.", tag))); } context.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 1a0bebe71178..1e020bc41488 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -65,6 +65,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.ArrayList; import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.CoreMatchers.equalTo; @@ -219,22 +220,22 @@ public void testProcessOrder() throws Exception { @Test public void testMetrics() throws Exception { final String name = task.id().toString(); - final String[] entities = {"all", name}; + List> tags = new ArrayList<>(); + tags.add(Collections.singletonMap("all", "all")); + tags.add(Collections.singletonMap("streams-task-id", name)); final String operation = "commit"; final String groupName = "stream-task-metrics"; - final Map tags = Collections.singletonMap("streams-task-id", name); assertNotNull(metrics.getSensor(operation)); - assertNotNull(metrics.getSensor(name + "-" + operation)); - - for (final String entity : entities) { - assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-latency-avg", groupName, - "The average latency in milliseconds of " + entity + " " + operation + " operation.", tags))); - assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-latency-max", groupName, - "The max latency in milliseconds of " + entity + " " + operation + " operation.", tags))); - assertNotNull(metrics.metrics().get(metrics.metricName(entity + "-" + operation + "-rate", groupName, - "The average number of occurrence of " + entity + " " + operation + " operation per second.", tags))); + + for (Map tag : tags) { + assertNotNull(metrics.metrics().get(metrics.metricName(operation + "-latency-avg", groupName, + "The average latency of " + operation + " operation.", tag))); + assertNotNull(metrics.metrics().get(metrics.metricName(operation + "-latency-max", groupName, + "The max latency of " + operation + " operation.", tag))); + assertNotNull(metrics.metrics().get(metrics.metricName(operation + "-rate", groupName, + "The average number of occurrence of " + operation + " operation per second.", tag))); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java index 59297b5174cc..0b7682cce87f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java @@ -83,15 +83,15 @@ public void testMetrics() throws Exception { final Map metricTags = new LinkedHashMap<>(); metricTags.put(tagKey, tagValue); - assertNotNull(streamMetrics.registry().getSensor(entityName + "-" + opName)); - assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(entityName + - "-" + opName + "-avg", groupName, "The current count of " + entityName + " " + opName + + assertNotNull(streamMetrics.registry().getSensor(opName)); + assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-avg", + groupName, "The current count of " + entityName + " " + opName + " operation.", metricTags))); - assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(entityName + - "-" + opName + "-min", groupName, "The current count of " + entityName + " " + opName + + assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-min", + groupName, "The current count of " + entityName + " " + opName + " operation.", metricTags))); - assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(entityName + - "-" + opName + "-max", groupName, "The current count of " + entityName + " " + opName + + assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-max", + groupName, "The current count of " + entityName + " " + opName + " operation.", metricTags))); } From 95bbf34ecd9dd0a5d0d19229ba272240499fde0b Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Mon, 17 Jul 2017 10:43:10 +0100 Subject: [PATCH 2/5] Guozhang's comments --- .../processor/internals/ProcessorNode.java | 12 ++++---- .../internals/StreamsMetricsImpl.java | 4 +-- .../state/internals/MeteredKeyValueStore.java | 27 +++++++++++------ .../internals/MeteredSegmentedBytesStore.java | 20 ++++++++----- .../streams/state/internals/NamedCache.java | 30 +++++++++++++++---- .../internals/ProcessorNodeTest.java | 6 ++-- .../processor/internals/StreamTaskTest.java | 2 +- 7 files changed, 68 insertions(+), 33 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java index 0cc746e71c03..b5edeae93380 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java @@ -182,12 +182,12 @@ public NodeMetrics(StreamsMetrics metrics, String name, String sensorNamePrefix) this.metricTags.put(tagKey, tagValue); // these are all latency metrics - this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "process", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); - this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "punctuate", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); - this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); - this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); - this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); - this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "skippedDueToDeserializationError", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix, "process", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix, "punctuate", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, sensorNamePrefix, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(scope, sensorNamePrefix, "skippedDueToDeserializationError", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); } public void removeAllSensors() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java index 65bfbc0949b6..09b6ebab4ac0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java @@ -118,7 +118,7 @@ public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, // first add the global operation metrics if not yet, with the global tags only Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel); - addLatencyMetrics(scopeName, parent, null, operationName, tagMap("all", "all")); + addLatencyMetrics(scopeName, parent, null, operationName, tagMap(tags != null && tags.length > 1 ? tags[0] : "all", "all")); // add the operation metrics with additional tags Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent); @@ -138,7 +138,7 @@ public Sensor addThroughputSensor(String scopeName, String entityName, String op // first add the global operation metrics if not yet, with the global tags only Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel); - addThroughputMetrics(scopeName, parent, null, operationName, tagMap("all", "all")); + addThroughputMetrics(scopeName, parent, null, operationName, tagMap(tags != null && tags.length > 1 ? tags[0] : "all", "all")); // add the operation metrics with additional tags Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 2f280fc522b8..47f6c4868a66 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -116,15 +116,24 @@ public void init(ProcessorContext context, StateStore root) { this.context = context; this.root = root; this.metrics = (StreamsMetricsImpl) context.metrics(); - this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put", Sensor.RecordingLevel.DEBUG); - this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put-if-absent", Sensor.RecordingLevel.DEBUG); - this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "get", Sensor.RecordingLevel.DEBUG); - this.deleteTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "delete", Sensor.RecordingLevel.DEBUG); - this.putAllTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put-all", Sensor.RecordingLevel.DEBUG); - this.allTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "all", Sensor.RecordingLevel.DEBUG); - this.rangeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "range", Sensor.RecordingLevel.DEBUG); - this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "flush", Sensor.RecordingLevel.DEBUG); - this.restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "restore", Sensor.RecordingLevel.DEBUG); + this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "put", + Sensor.RecordingLevel.DEBUG, "store-name", name); + this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "put-if-absent", + Sensor.RecordingLevel.DEBUG, "store-name", name); + this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "get", + Sensor.RecordingLevel.DEBUG, "store-name", name); + this.deleteTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "delete", + Sensor.RecordingLevel.DEBUG, "store-name", name); + this.putAllTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "put-all", + Sensor.RecordingLevel.DEBUG, "store-name", name); + this.allTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "all", + Sensor.RecordingLevel.DEBUG, "store-name", name); + this.rangeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "range", + Sensor.RecordingLevel.DEBUG, "store-name", name); + this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "flush", + Sensor.RecordingLevel.DEBUG, "store-name", name); + this.restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "restore", + Sensor.RecordingLevel.DEBUG, "store-name", name); // register and possibly restore the state from the logs metrics.measureLatencyNs(time, initDelegate, this.restoreTime); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java index 664873a2db12..94c992d8225b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java @@ -52,13 +52,19 @@ class MeteredSegmentedBytesStore extends WrappedStateStore.AbstractStateStore im public void init(ProcessorContext context, StateStore root) { final String name = name(); this.metrics = context.metrics(); - this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "put", Sensor.RecordingLevel.DEBUG); - this.fetchTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "fetch", Sensor.RecordingLevel.DEBUG); - this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "flush", Sensor.RecordingLevel.DEBUG); - this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "get", Sensor.RecordingLevel.DEBUG); - this.removeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "remove", Sensor.RecordingLevel.DEBUG); - - final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "restore", Sensor.RecordingLevel.DEBUG); + this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "put", + Sensor.RecordingLevel.DEBUG, "segmented-store-name", name); + this.fetchTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "fetch", + Sensor.RecordingLevel.DEBUG, "segmented-store-name", name); + this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "flush", + Sensor.RecordingLevel.DEBUG, "segmented-store-name", name); + this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "get", + Sensor.RecordingLevel.DEBUG, "segmented-store-name", name); + this.removeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "remove", + Sensor.RecordingLevel.DEBUG, "segmented-store-name", name); + + final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "restore", + Sensor.RecordingLevel.DEBUG, "segmented-store-name", name); // register and possibly restore the state from the logs final long startNs = time.nanoseconds(); try { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index 61b6e66e475f..1f07e89f54a7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.MeasurableStat; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; @@ -360,6 +362,13 @@ class NamedCacheMetrics { final Map metricTags; final Sensor hitRatioSensor; + private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) { + if (!metrics.metrics().containsKey(name)) { + sensor.add(name, stat); + } else { + log.trace("Trying to add metric twice: {}", name); + } + } public NamedCacheMetrics(StreamsMetrics metrics) { final String scope = "record-cache"; @@ -369,15 +378,26 @@ public NamedCacheMetrics(StreamsMetrics metrics) { this.groupName = "stream-" + scope + "-metrics"; this.metrics = (StreamsMetricsImpl) metrics; this.metricTags = new LinkedHashMap<>(); - this.metricTags.put(tagKey, tagValue); - hitRatioSensor = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG); - hitRatioSensor.add(this.metrics.registry().metricName(opName + "-avg", groupName, + // add parent + this.metricTags.put(tagKey, "all"); + Sensor parent = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG); + maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-avg", groupName, + "The average cache hit ratio of " + name, metricTags), new Avg()); + maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-min", groupName, + "The minimum cache hit ratio of " + name, metricTags), new Min()); + maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-max", groupName, + "The maximum cache hit ratio of " + name, metricTags), new Max()); + + // add child + this.metricTags.put(tagKey, tagValue); + hitRatioSensor = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG, parent); + maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-avg", groupName, "The average cache hit ratio of " + name, metricTags), new Avg()); - hitRatioSensor.add(this.metrics.registry().metricName(opName + "-min", groupName, + maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-min", groupName, "The minimum cache hit ratio of " + name, metricTags), new Min()); - hitRatioSensor.add(this.metrics.registry().metricName(opName + "-max", groupName, + maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-max", groupName, "The maximum cache hit ratio of " + name, metricTags), new Max()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index aa3a0a8c94c2..dc1c822c657c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -106,7 +106,7 @@ public void testMetrics() { String throughputOperation = "forward"; String groupName = "stream-processor-node-metrics"; List> tags = new ArrayList<>(); - tags.add(Collections.singletonMap("all", "all")); + tags.add(Collections.singletonMap("processor-node-id", "all")); tags.add(Collections.singletonMap("processor-node-id", node.name())); @@ -117,7 +117,7 @@ public void testMetrics() { for (Map tag : tags) { for (String operation : latencyOperations) { - final String tmpName = tag.containsKey("all") ? operation : + final String tmpName = tag.containsValue("all") ? operation : name + "-" + operation; assertNotNull(metrics.metrics().get(metrics.metricName(tmpName + "-latency-avg", groupName, "The average latency of " + tmpName + " operation.", tag))); @@ -127,7 +127,7 @@ public void testMetrics() { "The average number of occurrence of " + operation + " operation per second.", tag))); } - final String tmpName = tag.containsKey("all") ? throughputOperation : + final String tmpName = tag.containsValue("all") ? throughputOperation : name + "-" + throughputOperation; assertNotNull(metrics.metrics().get(metrics.metricName(tmpName + "-rate", groupName, "The average number of occurrence of " + tmpName + " operation per second.", tag))); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 1e020bc41488..45b549d84e99 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -221,7 +221,7 @@ public void testProcessOrder() throws Exception { public void testMetrics() throws Exception { final String name = task.id().toString(); List> tags = new ArrayList<>(); - tags.add(Collections.singletonMap("all", "all")); + tags.add(Collections.singletonMap("streams-task-id", "all")); tags.add(Collections.singletonMap("streams-task-id", name)); final String operation = "commit"; From c6a1b3d588810177e760fabed0c1acf09b4b2d1e Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Mon, 17 Jul 2017 11:58:29 +0100 Subject: [PATCH 3/5] Added docs --- docs/ops.html | 281 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 260 insertions(+), 21 deletions(-) diff --git a/docs/ops.html b/docs/ops.html index 85a6e1079068..02f05c1399ad 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -1241,14 +1241,64 @@
Mbean name - commit-latency-[avg | max] - The [average | maximum] commit time in ns for this task. + commit-latency-avg + The average commit time in ns for this task. + kafka.streams:type=stream-task-metrics,streams-task-id=([-.\w]+) + + + commit-latency-max + The maximum commit time in ns for this task. kafka.streams:type=stream-task-metrics,streams-task-id=([-.\w]+) @@ -1301,13 +1356,63 @@
Mbean name - [process | punctuate | create | destroy]-latency-[avg | max] - The [average | maximum] execution time in ns, for the respective operation. + process-latency-avg + The average process execution time in ns. + kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + + + process-latency-max + The maximum process execution time in ns. + kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + + + punctuate-latency-avg + The average punctuate execution time in ns. + kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + + + punctuate-latency-max + The maximum punctuate execution time in ns. + kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + + + create-latency-avg + The average create execution time in ns. + kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + + + create-latency-max + The maximum create execution time in ns. + kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + + + destroy-latency-avg + The average destroy execution time in ns. + kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + + + destroy-latency-max + The maximum destroy execution time in ns. + kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + + + process-rate + The average number of process operations per second. + kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + + + punctuate-rate + The average number of punctuate operations per second. + kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + + + create-rate + The average number of create operations per second. kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) - [process | punctuate | create | destroy]-rate - The average number of respective operations per second. + destroy-rate + The average number of destroy operations per second. kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) @@ -1329,16 +1434,140 @@
metricTags; final Sensor nodeProcessTimeSensor; final Sensor nodePunctuateTimeSensor; @@ -173,21 +170,25 @@ protected static final class NodeMetrics { final Sensor nodeDestructionSensor; - public NodeMetrics(StreamsMetrics metrics, String name, String sensorNamePrefix) { + public NodeMetrics(final StreamsMetrics metrics, final String name, final ProcessorContext context) { final String scope = "processor-node"; - final String tagKey = "processor-node-id"; - final String tagValue = name; + final String tagKey = "task-id"; + final String tagValue = context.taskId().toString(); this.metrics = (StreamsMetricsImpl) metrics; - this.metricTags = new LinkedHashMap<>(); - this.metricTags.put(tagKey, tagValue); // these are all latency metrics - this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix, "process", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); - this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix, "punctuate", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); - this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); - this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); - this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, sensorNamePrefix, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); - this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(scope, sensorNamePrefix, "skippedDueToDeserializationError", Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(scope, name, "process", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(scope, name, "punctuate", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, name, "create", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, name, "destroy", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, name, "forward", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(scope, name, "skippedDueToDeserializationError", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); } public void removeAllSensors() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 2d465a57ff17..294a133d868d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -80,7 +80,8 @@ protected class TaskMetrics { TaskMetrics(final StreamsMetrics metrics) { final String name = id().toString(); this.metrics = (StreamsMetricsImpl) metrics; - taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", null, "commit", Sensor.RecordingLevel.DEBUG, "streams-task-id", name); + taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit", + Sensor.RecordingLevel.DEBUG); } void removeAllSensors() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java index 09b6ebab4ac0..91550021c3b2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java @@ -34,6 +34,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; public class StreamsMetricsImpl implements StreamsMetrics { private static final Logger log = LoggerFactory.getLogger(StreamsMetricsImpl.class); @@ -94,7 +97,7 @@ private String sensorName(String operationName, String entityName) { } } - private Map tagMap(String... tags) { + public Map tagMap(String... tags) { // extract the additional tags if there are any Map tagMap = new HashMap<>(this.tags); if (tags != null) { @@ -113,12 +116,19 @@ private Map tagMap(String... tags) { * @throws IllegalArgumentException if tags is not constructed in key-value pairs */ @Override - public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags) { - Map tagMap = tagMap(tags); + public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, String operationName, + Sensor.RecordingLevel recordingLevel, String... tags) { + List updatedTagList = new ArrayList(Arrays.asList(tags)); + updatedTagList.add(scopeName + "-id"); + updatedTagList.add(entityName); + Map tagMap = tagMap(updatedTagList.toArray(new String[updatedTagList.size()])); + updatedTagList.remove(entityName); + updatedTagList.add("all"); + Map allTagMap = tagMap(updatedTagList.toArray(new String[updatedTagList.size()])); // first add the global operation metrics if not yet, with the global tags only Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel); - addLatencyMetrics(scopeName, parent, null, operationName, tagMap(tags != null && tags.length > 1 ? tags[0] : "all", "all")); + addLatencyMetrics(scopeName, parent, null, operationName, allTagMap); // add the operation metrics with additional tags Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent); @@ -134,11 +144,17 @@ public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, */ @Override public Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags) { - Map tagMap = tagMap(tags); + List updatedTagList = new ArrayList(Arrays.asList(tags)); + updatedTagList.add(scopeName + "-id"); + updatedTagList.add(entityName); + Map tagMap = tagMap(updatedTagList.toArray(new String[updatedTagList.size()])); + updatedTagList.remove(entityName); + updatedTagList.add("all"); + Map allTagMap = tagMap(updatedTagList.toArray(new String[updatedTagList.size()])); // first add the global operation metrics if not yet, with the global tags only Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel); - addThroughputMetrics(scopeName, parent, null, operationName, tagMap(tags != null && tags.length > 1 ? tags[0] : "all", "all")); + addThroughputMetrics(scopeName, parent, null, operationName, allTagMap); // add the operation metrics with additional tags Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent); @@ -151,16 +167,16 @@ public Sensor addThroughputSensor(String scopeName, String entityName, String op private void addLatencyMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map tags) { - maybeAddMetric(sensor, metrics.metricName(sensorName(opName, entityName) + "-latency-avg", groupNameFromScope(scopeName), - "The average latency of " + sensorName(opName, entityName) + " operation.", tags), new Avg()); - maybeAddMetric(sensor, metrics.metricName(sensorName(opName, entityName) + "-latency-max", groupNameFromScope(scopeName), - "The max latency of " + sensorName(opName, entityName) + " operation.", tags), new Max()); + maybeAddMetric(sensor, metrics.metricName(opName + "-latency-avg", groupNameFromScope(scopeName), + "The average latency of " + opName + " operation.", tags), new Avg()); + maybeAddMetric(sensor, metrics.metricName(opName + "-latency-max", groupNameFromScope(scopeName), + "The max latency of " + opName + " operation.", tags), new Max()); addThroughputMetrics(scopeName, sensor, entityName, opName, tags); } private void addThroughputMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map tags) { - maybeAddMetric(sensor, metrics.metricName(sensorName(opName, entityName) + "-rate", groupNameFromScope(scopeName), - "The average number of occurrence of " + sensorName(opName, entityName) + " operation per second.", tags), new Rate(new Count())); + maybeAddMetric(sensor, metrics.metricName(opName + "-rate", groupNameFromScope(scopeName), + "The average number of occurrence of " + opName + " operation per second.", tags), new Rate(new Count())); } private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 6190b88141c1..daebc562bd92 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -70,8 +70,8 @@ private void initInternal(final ProcessorContext context) { keySerde == null ? (Serde) context.keySerde() : keySerde, valueSerde == null ? (Serde) context.valueSerde() : valueSerde); - this.cacheName = context.taskId() + "-" + underlying.name(); this.cache = this.context.getCache(); + this.cacheName = this.cache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), underlying.name()); cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List entries) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java index 47f6c4868a66..9c052b1b3db5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java @@ -112,28 +112,30 @@ public MeteredKeyValueStore(final KeyValueStore inner, @Override public void init(ProcessorContext context, StateStore root) { - final String name = name(); + final String tagKey = "task-id"; + final String tagValue = context.taskId().toString(); + this.context = context; this.root = root; this.metrics = (StreamsMetricsImpl) context.metrics(); - this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "put", - Sensor.RecordingLevel.DEBUG, "store-name", name); - this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "put-if-absent", - Sensor.RecordingLevel.DEBUG, "store-name", name); - this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "get", - Sensor.RecordingLevel.DEBUG, "store-name", name); - this.deleteTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "delete", - Sensor.RecordingLevel.DEBUG, "store-name", name); - this.putAllTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "put-all", - Sensor.RecordingLevel.DEBUG, "store-name", name); - this.allTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "all", - Sensor.RecordingLevel.DEBUG, "store-name", name); - this.rangeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "range", - Sensor.RecordingLevel.DEBUG, "store-name", name); - this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "flush", - Sensor.RecordingLevel.DEBUG, "store-name", name); - this.restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "restore", - Sensor.RecordingLevel.DEBUG, "store-name", name); + this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put-if-absent", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "get", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.deleteTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "delete", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.putAllTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put-all", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.allTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "all", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.rangeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "range", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "flush", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "restore", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); // register and possibly restore the state from the logs metrics.measureLatencyNs(time, initDelegate, this.restoreTime); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java index 94c992d8225b..40583643664e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSegmentedBytesStore.java @@ -50,21 +50,22 @@ class MeteredSegmentedBytesStore extends WrappedStateStore.AbstractStateStore im @Override public void init(ProcessorContext context, StateStore root) { - final String name = name(); + final String tagKey = "task-id"; + final String tagValue = context.taskId().toString(); this.metrics = context.metrics(); - this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "put", - Sensor.RecordingLevel.DEBUG, "segmented-store-name", name); - this.fetchTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "fetch", - Sensor.RecordingLevel.DEBUG, "segmented-store-name", name); - this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "flush", - Sensor.RecordingLevel.DEBUG, "segmented-store-name", name); - this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "get", - Sensor.RecordingLevel.DEBUG, "segmented-store-name", name); - this.removeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, null, "remove", - Sensor.RecordingLevel.DEBUG, "segmented-store-name", name); - - final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name, "restore", - Sensor.RecordingLevel.DEBUG, "segmented-store-name", name); + this.putTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "put", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.fetchTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "fetch", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.flushTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "flush", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.getTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "get", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + this.removeTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "remove", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); + + final Sensor restoreTime = this.metrics.addLatencyAndThroughputSensor(metricScope, name(), "restore", + Sensor.RecordingLevel.DEBUG, tagKey, tagValue); // register and possibly restore the state from the logs final long startNs = time.nanoseconds(); try { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index 1f07e89f54a7..4fc8eac19cf8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -31,7 +31,6 @@ import java.util.ArrayList; import java.util.Iterator; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -360,6 +359,7 @@ class NamedCacheMetrics { final StreamsMetricsImpl metrics; final String groupName; final Map metricTags; + final Map allMetricTags; final Sensor hitRatioSensor; private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) { @@ -374,24 +374,24 @@ public NamedCacheMetrics(StreamsMetrics metrics) { final String scope = "record-cache"; final String opName = "hitRatio"; final String tagKey = "record-cache-id"; - final String tagValue = name; + final String tagValue = ThreadCache.underlyingStoreNamefromCacheName(name); this.groupName = "stream-" + scope + "-metrics"; this.metrics = (StreamsMetricsImpl) metrics; - this.metricTags = new LinkedHashMap<>(); - + this.allMetricTags = ((StreamsMetricsImpl) metrics).tagMap(tagKey, "all", + "task-id", ThreadCache.taskIDfromCacheName(name)); + this.metricTags = ((StreamsMetricsImpl) metrics).tagMap(tagKey, tagValue, + "task-id", ThreadCache.taskIDfromCacheName(name)); // add parent - this.metricTags.put(tagKey, "all"); Sensor parent = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG); maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-avg", groupName, - "The average cache hit ratio of " + name, metricTags), new Avg()); + "The average cache hit ratio of " + name, allMetricTags), new Avg()); maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-min", groupName, - "The minimum cache hit ratio of " + name, metricTags), new Min()); + "The minimum cache hit ratio of " + name, allMetricTags), new Min()); maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-max", groupName, - "The maximum cache hit ratio of " + name, metricTags), new Max()); + "The maximum cache hit ratio of " + name, allMetricTags), new Max()); // add child - this.metricTags.put(tagKey, tagValue); hitRatioSensor = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG, parent); maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-avg", groupName, "The average cache hit ratio of " + name, metricTags), new Avg()); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 31fb3bb1d6fc..6a0fa9bfa82f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -76,6 +76,28 @@ public long flushes() { return numFlushes; } + /** + * The thread cache maintains a set of caches whose names are a concatenation of the task ID and the + * underlying store name + * @param taskIDString Task ID + * @param underlyingStoreName Underlying store name + * @return + */ + public static String nameSpaceFromTaskIdAndStore(final String taskIDString, final String underlyingStoreName) { + return taskIDString + "-" + underlyingStoreName; + } + + public static String taskIDfromCacheName(final String cacheName) { + String[] tokens = cacheName.split("-"); + return tokens[0]; + } + + public static String underlyingStoreNamefromCacheName(final String cacheName) { + String[] tokens = cacheName.split("-"); + return tokens[1]; + } + + /** * Add a listener that is called each time an entry is evicted from the cache or an explicit flush is called * diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java index dc1c822c657c..ab29c5c1de06 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java @@ -25,10 +25,9 @@ import org.apache.kafka.test.MockProcessorContext; import org.junit.Test; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; import java.util.Map; +import java.util.LinkedHashMap; import static org.junit.Assert.assertNotNull; @@ -92,6 +91,17 @@ public void close() { } } + private void testSpecificMetrics(final Metrics metrics, final String groupName, + final String opName, + final Map metricTags) { + assertNotNull(metrics.metrics().get(metrics.metricName(opName + "-latency-avg", groupName, + "The average latency of " + opName + " operation.", metricTags))); + assertNotNull(metrics.metrics().get(metrics.metricName(opName + "-latency-max", groupName, + "The max latency of " + opName + " operation.", metricTags))); + assertNotNull(metrics.metrics().get(metrics.metricName(opName + "-rate", groupName, + "The average number of occurrence of " + opName + " operation per second.", metricTags))); + + } @Test public void testMetrics() { final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class); @@ -105,9 +115,9 @@ public void testMetrics() { String[] latencyOperations = {"process", "punctuate", "create", "destroy"}; String throughputOperation = "forward"; String groupName = "stream-processor-node-metrics"; - List> tags = new ArrayList<>(); - tags.add(Collections.singletonMap("processor-node-id", "all")); - tags.add(Collections.singletonMap("processor-node-id", node.name())); + final Map metricTags = new LinkedHashMap<>(); + metricTags.put("processor-node-id", node.name()); + metricTags.put("task-id", context.taskId().toString()); for (String operation : latencyOperations) { @@ -115,23 +125,20 @@ public void testMetrics() { } assertNotNull(metrics.getSensor(throughputOperation)); - for (Map tag : tags) { - for (String operation : latencyOperations) { - final String tmpName = tag.containsValue("all") ? operation : - name + "-" + operation; - assertNotNull(metrics.metrics().get(metrics.metricName(tmpName + "-latency-avg", groupName, - "The average latency of " + tmpName + " operation.", tag))); - assertNotNull(metrics.metrics().get(metrics.metricName(tmpName + "-latency-max", groupName, - "The max latency of " + tmpName + " operation.", tag))); - assertNotNull(metrics.metrics().get(metrics.metricName(tmpName + "-rate", groupName, - "The average number of occurrence of " + operation + " operation per second.", tag))); - } - - final String tmpName = tag.containsValue("all") ? throughputOperation : - name + "-" + throughputOperation; - assertNotNull(metrics.metrics().get(metrics.metricName(tmpName + "-rate", groupName, - "The average number of occurrence of " + tmpName + " operation per second.", tag))); + for (String opName : latencyOperations) { + testSpecificMetrics(metrics, groupName, opName, metricTags); } + assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName, + "The average number of occurrence of " + throughputOperation + " operation per second.", metricTags))); + + // test "all" + metricTags.put("processor-node-id", "all"); + for (String opName : latencyOperations) { + testSpecificMetrics(metrics, groupName, opName, metricTags); + } + assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName, + "The average number of occurrence of " + throughputOperation + " operation per second.", metricTags))); + context.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 45b549d84e99..f80897665336 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -62,10 +62,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.LinkedHashMap; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.ArrayList; import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.CoreMatchers.equalTo; @@ -217,26 +217,30 @@ public void testProcessOrder() throws Exception { assertEquals(3, source2.numReceived); } + + private void testSpecificMetrics(final String operation, final String groupName, final Map tags) { + assertNotNull(metrics.metrics().get(metrics.metricName(operation + "-latency-avg", groupName, + "The average latency of " + operation + " operation.", tags))); + assertNotNull(metrics.metrics().get(metrics.metricName(operation + "-latency-max", groupName, + "The max latency of " + operation + " operation.", tags))); + assertNotNull(metrics.metrics().get(metrics.metricName(operation + "-rate", groupName, + "The average number of occurrence of " + operation + " operation per second.", tags))); + } + + @Test public void testMetrics() throws Exception { final String name = task.id().toString(); - List> tags = new ArrayList<>(); - tags.add(Collections.singletonMap("streams-task-id", "all")); - tags.add(Collections.singletonMap("streams-task-id", name)); + final Map metricTags = new LinkedHashMap<>(); + metricTags.put("task-id", name); final String operation = "commit"; final String groupName = "stream-task-metrics"; assertNotNull(metrics.getSensor(operation)); - - for (Map tag : tags) { - assertNotNull(metrics.metrics().get(metrics.metricName(operation + "-latency-avg", groupName, - "The average latency of " + operation + " operation.", tag))); - assertNotNull(metrics.metrics().get(metrics.metricName(operation + "-latency-max", groupName, - "The max latency of " + operation + " operation.", tag))); - assertNotNull(metrics.metrics().get(metrics.metricName(operation + "-rate", groupName, - "The average number of occurrence of " + operation + " operation per second.", tag))); - } + testSpecificMetrics(operation, groupName, metricTags); + metricTags.put("task-id", "all"); + testSpecificMetrics(operation, groupName, metricTags); } @SuppressWarnings("unchecked") diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java index 6e0059ffddac..82b20aff23ee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueStoreIteratorTest.java @@ -31,7 +31,7 @@ public class MergedSortedCacheKeyValueStoreIteratorTest { - private final String namespace = "one"; + private final String namespace = "0.0-one"; private final StateSerdes serdes = new StateSerdes<>("dummy", Serdes.ByteArray(), Serdes.ByteArray()); private KeyValueStore store; private ThreadCache cache; @@ -147,7 +147,6 @@ public void shouldPeekNextKey() throws Exception { final KeyValueStore kv = new InMemoryKeyValueStore<>("one", Serdes.Bytes(), Serdes.ByteArray()); final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics())); byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; - final String namespace = "one"; for (int i = 0; i < bytes.length - 1; i += 2) { kv.put(Bytes.wrap(bytes[i]), bytes[i]); cache.put(namespace, Bytes.wrap(bytes[i + 1]), new LRUCacheEntry(bytes[i + 1])); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java index fed39b762746..2649d1bd1b32 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWrappedWindowStoreIteratorTest.java @@ -45,7 +45,7 @@ public long segmentId(Bytes key) { private final List> windowStoreKvPairs = new ArrayList<>(); private final ThreadCache cache = new ThreadCache("testCache", 1000000L, new MockStreamsMetrics(new Metrics())); - private final String namespace = "one"; + private final String namespace = "0.0-one"; private final StateSerdes stateSerdes = new StateSerdes<>("foo", Serdes.String(), Serdes.String()); @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java index 0b7682cce87f..7854191cfd73 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java @@ -42,11 +42,12 @@ public class NamedCacheTest { private NamedCache cache; private MockStreamsMetrics streamMetrics; - + private final String taskIDString = "0.0"; + private final String underlyingStoreName = "storeName"; @Before public void setUp() throws Exception { streamMetrics = new MockStreamsMetrics(new Metrics()); - cache = new NamedCache("name", streamMetrics); + cache = new NamedCache(taskIDString + "-" + underlyingStoreName, streamMetrics); } @Test @@ -72,28 +73,33 @@ public void shouldKeepTrackOfMostRecentlyAndLeastRecentlyUsed() throws IOExcepti } } + private void testSpecificMetrics(final String groupName, final String entityName, final String opName, + final Map metricTags) { + assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-avg", + groupName, "The average cache hit ratio of " + entityName, metricTags))); + assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-min", + groupName, "The minimum cache hit ratio of " + entityName, metricTags))); + assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-max", + groupName, "The maximum cache hit ratio of " + entityName, metricTags))); + } @Test public void testMetrics() throws Exception { final String scope = "record-cache"; final String entityName = cache.name(); final String opName = "hitRatio"; final String tagKey = "record-cache-id"; - final String tagValue = cache.name(); + final String tagValue = underlyingStoreName; final String groupName = "stream-" + scope + "-metrics"; final Map metricTags = new LinkedHashMap<>(); metricTags.put(tagKey, tagValue); + metricTags.put("task-id", taskIDString); assertNotNull(streamMetrics.registry().getSensor(opName)); - assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-avg", - groupName, "The current count of " + entityName + " " + opName + - " operation.", metricTags))); - assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-min", - groupName, "The current count of " + entityName + " " + opName + - " operation.", metricTags))); - assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-max", - groupName, "The current count of " + entityName + " " + opName + - " operation.", metricTags))); + testSpecificMetrics(groupName, entityName, opName, metricTags); + // test "all" + metricTags.put(tagKey, "all"); + testSpecificMetrics(groupName, entityName, opName, metricTags); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java index 43a03f1aa891..2c08871d0aed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java @@ -37,6 +37,9 @@ import static org.junit.Assert.assertTrue; public class ThreadCacheTest { + final String namespace = "0.0-namespace"; + final String namespace1 = "0.1-namespace"; + final String namespace2 = "0.2-namespace"; @Test public void basicPutGet() throws IOException { @@ -47,7 +50,6 @@ public void basicPutGet() throws IOException { new KeyValue<>("K4", "V4"), new KeyValue<>("K5", "V5")); final KeyValue kv = toInsert.get(0); - final String name = "name"; ThreadCache cache = new ThreadCache("testCache", toInsert.size() * memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""), new MockStreamsMetrics(new Metrics())); @@ -55,12 +57,12 @@ public void basicPutGet() throws IOException { for (KeyValue kvToInsert : toInsert) { Bytes key = Bytes.wrap(kvToInsert.key.getBytes()); byte[] value = kvToInsert.value.getBytes(); - cache.put(name, key, new LRUCacheEntry(value, true, 1L, 1L, 1, "")); + cache.put(namespace, key, new LRUCacheEntry(value, true, 1L, 1L, 1, "")); } for (KeyValue kvToInsert : toInsert) { Bytes key = Bytes.wrap(kvToInsert.key.getBytes()); - LRUCacheEntry entry = cache.get(name, key); + LRUCacheEntry entry = cache.get(namespace, key); assertEquals(entry.isDirty(), true); assertEquals(new String(entry.value), kvToInsert.value); } @@ -73,7 +75,6 @@ public void basicPutGet() throws IOException { private void checkOverheads(double entryFactor, double systemFactor, long desiredCacheSize, int keySizeBytes, int valueSizeBytes) { Runtime runtime = Runtime.getRuntime(); - final String name = "name"; long numElements = desiredCacheSize / memoryCacheEntrySize(new byte[keySizeBytes], new byte[valueSizeBytes], ""); System.gc(); @@ -86,7 +87,7 @@ private void checkOverheads(double entryFactor, double systemFactor, long desire String keyStr = "K" + i; Bytes key = Bytes.wrap(keyStr.getBytes()); byte[] value = new byte[valueSizeBytes]; - cache.put(name, key, new LRUCacheEntry(value, true, 1L, 1L, 1, "")); + cache.put(namespace, key, new LRUCacheEntry(value, true, 1L, 1L, 1, "")); } @@ -152,7 +153,6 @@ public void evict() throws IOException { new KeyValue<>("K4", "V4"), new KeyValue<>("K5", "V5")); final KeyValue kv = toInsert.get(0); - final String namespace = "kafka"; ThreadCache cache = new ThreadCache("testCache", memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""), new MockStreamsMetrics(new Metrics())); @@ -185,9 +185,9 @@ public void shouldDelete() throws Exception { final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final Bytes key = Bytes.wrap(new byte[]{0}); - cache.put("name", key, dirtyEntry(key.get())); - assertEquals(key.get(), cache.delete("name", key).value); - assertNull(cache.get("name", key)); + cache.put(namespace, key, dirtyEntry(key.get())); + assertEquals(key.get(), cache.delete(namespace, key).value); + assertNull(cache.get(namespace, key)); } @Test @@ -195,7 +195,6 @@ public void shouldNotFlushAfterDelete() throws Exception { final Bytes key = Bytes.wrap(new byte[]{0}); final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final List received = new ArrayList<>(); - final String namespace = "namespace"; cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List dirty) { @@ -216,14 +215,14 @@ public void shouldNotBlowUpOnNonExistentKeyWhenDeleting() throws Exception { final Bytes key = Bytes.wrap(new byte[]{0}); final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); - cache.put("name", key, dirtyEntry(key.get())); - assertNull(cache.delete("name", Bytes.wrap(new byte[]{1}))); + cache.put(namespace, key, dirtyEntry(key.get())); + assertNull(cache.delete(namespace, Bytes.wrap(new byte[]{1}))); } @Test public void shouldNotBlowUpOnNonExistentNamespaceWhenDeleting() throws Exception { final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); - assertNull(cache.delete("name", Bytes.wrap(new byte[]{1}))); + assertNull(cache.delete(namespace, Bytes.wrap(new byte[]{1}))); } @Test @@ -231,18 +230,17 @@ public void shouldNotClashWithOverlappingNames() throws Exception { final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final Bytes nameByte = Bytes.wrap(new byte[]{0}); final Bytes name1Byte = Bytes.wrap(new byte[]{1}); - cache.put("name", nameByte, dirtyEntry(nameByte.get())); - cache.put("name1", nameByte, dirtyEntry(name1Byte.get())); + cache.put(namespace1, nameByte, dirtyEntry(nameByte.get())); + cache.put(namespace2, nameByte, dirtyEntry(name1Byte.get())); - assertArrayEquals(nameByte.get(), cache.get("name", nameByte).value); - assertArrayEquals(name1Byte.get(), cache.get("name1", nameByte).value); + assertArrayEquals(nameByte.get(), cache.get(namespace1, nameByte).value); + assertArrayEquals(name1Byte.get(), cache.get(namespace2, nameByte).value); } @Test public void shouldPeekNextKey() throws Exception { final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final Bytes theByte = Bytes.wrap(new byte[]{0}); - final String namespace = "streams"; cache.put(namespace, theByte, dirtyEntry(theByte.get())); final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte, Bytes.wrap(new byte[]{1})); assertEquals(theByte, iterator.peekNextKey()); @@ -253,7 +251,6 @@ public void shouldPeekNextKey() throws Exception { public void shouldGetSameKeyAsPeekNext() throws Exception { final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final Bytes theByte = Bytes.wrap(new byte[]{0}); - final String namespace = "streams"; cache.put(namespace, theByte, dirtyEntry(theByte.get())); final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, theByte, Bytes.wrap(new byte[]{1})); assertEquals(iterator.peekNextKey(), iterator.next().key); @@ -262,14 +259,14 @@ public void shouldGetSameKeyAsPeekNext() throws Exception { @Test(expected = NoSuchElementException.class) public void shouldThrowIfNoPeekNextKey() throws Exception { final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); - final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("", Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})); + final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})); iterator.peekNextKey(); } @Test public void shouldReturnFalseIfNoNextKey() throws Exception { final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); - final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("", Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})); + final ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range(namespace, Bytes.wrap(new byte[]{0}), Bytes.wrap(new byte[]{1})); assertFalse(iterator.hasNext()); } @@ -277,7 +274,6 @@ public void shouldReturnFalseIfNoNextKey() throws Exception { public void shouldPeekAndIterateOverRange() throws Exception { final ThreadCache cache = new ThreadCache("testCache", 10000L, new MockStreamsMetrics(new Metrics())); final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}}; - final String namespace = "streams"; for (final byte[] aByte : bytes) { cache.put(namespace, Bytes.wrap(aByte), dirtyEntry(aByte)); } @@ -295,7 +291,6 @@ public void shouldPeekAndIterateOverRange() throws Exception { @Test public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() throws Exception { - final String namespace = "streams"; final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], ""); final ThreadCache cache = new ThreadCache("testCache", entrySize * 5, new MockStreamsMetrics(new Metrics())); cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @@ -321,7 +316,7 @@ public void apply(final List dirty) { public void shouldFlushDirtyEntriesForNamespace() throws Exception { final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); final List received = new ArrayList<>(); - cache.addDirtyEntryFlushListener("1", new ThreadCache.DirtyEntryFlushListener() { + cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List dirty) { for (ThreadCache.DirtyEntry dirtyEntry : dirty) { @@ -331,11 +326,11 @@ public void apply(final List dirty) { }); final List expected = Arrays.asList(new byte[]{0}, new byte[]{1}, new byte[]{2}); for (byte[] bytes : expected) { - cache.put("1", Bytes.wrap(bytes), dirtyEntry(bytes)); + cache.put(namespace1, Bytes.wrap(bytes), dirtyEntry(bytes)); } - cache.put("2", Bytes.wrap(new byte[]{4}), dirtyEntry(new byte[]{4})); + cache.put(namespace2, Bytes.wrap(new byte[]{4}), dirtyEntry(new byte[]{4})); - cache.flush("1"); + cache.flush(namespace1); assertEquals(expected, received); } @@ -343,7 +338,7 @@ public void apply(final List dirty) { public void shouldNotFlushCleanEntriesForNamespace() throws Exception { final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); final List received = new ArrayList<>(); - cache.addDirtyEntryFlushListener("1", new ThreadCache.DirtyEntryFlushListener() { + cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List dirty) { for (ThreadCache.DirtyEntry dirtyEntry : dirty) { @@ -353,18 +348,18 @@ public void apply(final List dirty) { }); final List toInsert = Arrays.asList(new byte[]{0}, new byte[]{1}, new byte[]{2}); for (byte[] bytes : toInsert) { - cache.put("1", Bytes.wrap(bytes), cleanEntry(bytes)); + cache.put(namespace1, Bytes.wrap(bytes), cleanEntry(bytes)); } - cache.put("2", Bytes.wrap(new byte[]{4}), cleanEntry(new byte[]{4})); + cache.put(namespace2, Bytes.wrap(new byte[]{4}), cleanEntry(new byte[]{4})); - cache.flush("1"); + cache.flush(namespace1); assertEquals(Collections.EMPTY_LIST, received); } private void shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(final ThreadCache cache) { final List received = new ArrayList<>(); - final String namespace = "namespace"; + cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List dirty) { @@ -394,7 +389,6 @@ public void shouldEvictImmediatelyIfCacheSizeIsZero() throws Exception { @Test public void shouldEvictAfterPutAll() throws Exception { final List received = new ArrayList<>(); - final String namespace = "namespace"; final ThreadCache cache = new ThreadCache("testCache", 1, new MockStreamsMetrics(new Metrics())); cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override @@ -414,24 +408,24 @@ public void apply(final List dirty) { public void shouldPutAll() throws Exception { final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); - cache.putAll("name", Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})), + cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})), KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6})))); - assertArrayEquals(new byte[]{5}, cache.get("name", Bytes.wrap(new byte[]{0})).value); - assertArrayEquals(new byte[]{6}, cache.get("name", Bytes.wrap(new byte[]{1})).value); + assertArrayEquals(new byte[]{5}, cache.get(namespace, Bytes.wrap(new byte[]{0})).value); + assertArrayEquals(new byte[]{6}, cache.get(namespace, Bytes.wrap(new byte[]{1})).value); } @Test public void shouldNotForwardCleanEntryOnEviction() throws Exception { final ThreadCache cache = new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())); final List received = new ArrayList<>(); - cache.addDirtyEntryFlushListener("name", new ThreadCache.DirtyEntryFlushListener() { + cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List dirty) { received.addAll(dirty); } }); - cache.put("name", Bytes.wrap(new byte[]{1}), cleanEntry(new byte[]{0})); + cache.put(namespace, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[]{0})); assertEquals(0, received.size()); } @Test @@ -439,15 +433,14 @@ public void shouldPutIfAbsent() throws Exception { final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); final Bytes key = Bytes.wrap(new byte[]{10}); final byte[] value = {30}; - assertNull(cache.putIfAbsent("n", key, dirtyEntry(value))); - assertArrayEquals(value, cache.putIfAbsent("n", key, dirtyEntry(new byte[]{8})).value); - assertArrayEquals(value, cache.get("n", key).value); + assertNull(cache.putIfAbsent(namespace, key, dirtyEntry(value))); + assertArrayEquals(value, cache.putIfAbsent(namespace, key, dirtyEntry(new byte[]{8})).value); + assertArrayEquals(value, cache.get(namespace, key).value); } @Test public void shouldEvictAfterPutIfAbsent() throws Exception { final List received = new ArrayList<>(); - final String namespace = "namespace"; final ThreadCache cache = new ThreadCache("testCache", 1, new MockStreamsMetrics(new Metrics())); cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override @@ -469,51 +462,51 @@ public void shouldNotLoopForEverWhenEvictingAndCurrentCacheIsEmpty() throws Exce final int maxCacheSizeInBytes = 100; final ThreadCache threadCache = new ThreadCache("testCache", maxCacheSizeInBytes, new MockStreamsMetrics(new Metrics())); // trigger a put into another cache on eviction from "name" - threadCache.addDirtyEntryFlushListener("name", new ThreadCache.DirtyEntryFlushListener() { + threadCache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List dirty) { // put an item into an empty cache when the total cache size // is already > than maxCacheSizeBytes - threadCache.put("other", Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2])); + threadCache.put(namespace1, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2])); } }); - threadCache.addDirtyEntryFlushListener("other", new ThreadCache.DirtyEntryFlushListener() { + threadCache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List dirty) { // } }); - threadCache.addDirtyEntryFlushListener("another", new ThreadCache.DirtyEntryFlushListener() { + threadCache.addDirtyEntryFlushListener(namespace2, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List dirty) { } }); - threadCache.put("another", Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1])); - threadCache.put("name", Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1])); + threadCache.put(namespace2, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1])); + threadCache.put(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1])); // Put a large item such that when the eldest item is removed // cache sizeInBytes() > maxCacheSizeBytes int remaining = (int) (maxCacheSizeInBytes - threadCache.sizeBytes()); - threadCache.put("name", Bytes.wrap(new byte[]{2}), dirtyEntry(new byte[remaining + 100])); + threadCache.put(namespace, Bytes.wrap(new byte[]{2}), dirtyEntry(new byte[remaining + 100])); } @Test public void shouldCleanupNamedCacheOnClose() throws Exception { final ThreadCache cache = new ThreadCache("testCache", 100000, new MockStreamsMetrics(new Metrics())); - cache.put("one", Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1})); - cache.put("two", Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1})); + cache.put(namespace1, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1})); + cache.put(namespace2, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1})); assertEquals(cache.size(), 2); - cache.close("two"); + cache.close(namespace2); assertEquals(cache.size(), 1); - assertNull(cache.get("two", Bytes.wrap(new byte[]{1}))); + assertNull(cache.get(namespace2, Bytes.wrap(new byte[]{1}))); } @Test public void shouldReturnNullIfKeyIsNull() throws Exception { final ThreadCache threadCache = new ThreadCache("testCache", 10, new MockStreamsMetrics(new Metrics())); - threadCache.put("one", Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1})); - assertNull(threadCache.get("one", null)); + threadCache.put(namespace, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[] {1})); + assertNull(threadCache.get(namespace, null)); } private LRUCacheEntry dirtyEntry(final byte[] key) { From 891c3a9c2e30a165fb98048d437ca30a4988d020 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Fri, 21 Jul 2017 12:02:39 +0100 Subject: [PATCH 5/5] Guozhang's comments --- docs/ops.html | 136 ++++++++++-------- .../internals/StreamsMetricsImpl.java | 40 +++--- .../state/internals/CachingKeyValueStore.java | 2 +- .../streams/state/internals/NamedCache.java | 36 ++--- .../streams/state/internals/ThreadCache.java | 14 +- 5 files changed, 118 insertions(+), 110 deletions(-) diff --git a/docs/ops.html b/docs/ops.html index c7ce1c19892a..32d4bbd5aadf 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -1229,7 +1229,19 @@

Streams Monitoring

- A Kafka Streams instance contains all the producer and consumer metrics as well as additional metrics specific to streams. By default Kafka Streams has metrics with two recording levels: debug and info. The debug level records all metrics, while the info level records only the thread-level metrics. Use the following configuration option to specify which metrics you want collected: + A Kafka Streams instance contains all the producer and consumer metrics as well as additional metrics specific to streams. + By default Kafka Streams has metrics with two recording levels: debug and info. The debug level records all metrics, while + the info level records only the thread-level metrics. + +

+ Note that the metrics have a 3-layer hierarchy. At the top level there are per-thread metrics. Each thread has tasks, with their + own metrics. Each task has a number of processor nodes, with their own metrics. Each task also has a number of state stores + and record caches, all with their own metrics. +

+ + Use the following configuration option to specify which metrics + you want collected: +
metrics.recording.level="info"
Thread Metrics
@@ -1244,77 +1256,77 @@
commit-latency-avg The average commit time in ns for this task. - kafka.streams:type=stream-task-metrics,streams-task-id=([-.\w]+) + kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+) commit-latency-max The maximum commit time in ns for this task. - kafka.streams:type=stream-task-metrics,streams-task-id=([-.\w]+) + kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+) commit-rate The average number of commit calls per second. - kafka.streams:type=stream-task-metrics,streams-task-id=([-.\w]+) + kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+) @@ -1358,67 +1370,67 @@
process-latency-avg The average process execution time in ns. - kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) process-latency-max The maximum process execution time in ns. - kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) punctuate-latency-avg The average punctuate execution time in ns. - kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) punctuate-latency-max The maximum punctuate execution time in ns. - kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) create-latency-avg The average create execution time in ns. - kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) create-latency-max The maximum create execution time in ns. - kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) destroy-latency-avg The average destroy execution time in ns. - kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) destroy-latency-max The maximum destroy execution time in ns. - kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) process-rate The average number of process operations per second. - kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) punctuate-rate The average number of punctuate operations per second. - kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) create-rate The average number of create operations per second. - kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) destroy-rate The average number of destroy operations per second. - kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) forward-rate The average rate of records being forwarded downstream, from source nodes only, per second. - kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+) + kafka.streams:type=stream-processor-node-metrics,client-id=([-.\w]+),task-id=([-.\w]+),processor-node-id=([-.\w]+) @@ -1436,137 +1448,137 @@
tagMap(String... tags) { @Override public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags) { - List updatedTagList = new ArrayList(Arrays.asList(tags)); - updatedTagList.add(scopeName + "-id"); - updatedTagList.add(entityName); - Map tagMap = tagMap(updatedTagList.toArray(new String[updatedTagList.size()])); - updatedTagList.remove(entityName); - updatedTagList.add("all"); - Map allTagMap = tagMap(updatedTagList.toArray(new String[updatedTagList.size()])); + final Map tagMap = constructTags(scopeName, entityName, tags); + final Map allTagMap = constructTags(scopeName, "all", tags); // first add the global operation metrics if not yet, with the global tags only Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel); - addLatencyMetrics(scopeName, parent, null, operationName, allTagMap); + addLatencyMetrics(scopeName, parent, operationName, allTagMap); // add the operation metrics with additional tags Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent); - addLatencyMetrics(scopeName, sensor, entityName, operationName, tagMap); + addLatencyMetrics(scopeName, sensor, operationName, tagMap); parentSensors.put(sensor, parent); @@ -144,42 +145,37 @@ public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, */ @Override public Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordingLevel, String... tags) { - List updatedTagList = new ArrayList(Arrays.asList(tags)); - updatedTagList.add(scopeName + "-id"); - updatedTagList.add(entityName); - Map tagMap = tagMap(updatedTagList.toArray(new String[updatedTagList.size()])); - updatedTagList.remove(entityName); - updatedTagList.add("all"); - Map allTagMap = tagMap(updatedTagList.toArray(new String[updatedTagList.size()])); + final Map tagMap = constructTags(scopeName, entityName, tags); + final Map allTagMap = constructTags(scopeName, "all", tags); // first add the global operation metrics if not yet, with the global tags only Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel); - addThroughputMetrics(scopeName, parent, null, operationName, allTagMap); + addThroughputMetrics(scopeName, parent, operationName, allTagMap); // add the operation metrics with additional tags Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent); - addThroughputMetrics(scopeName, sensor, entityName, operationName, tagMap); + addThroughputMetrics(scopeName, sensor, operationName, tagMap); parentSensors.put(sensor, parent); return sensor; } - private void addLatencyMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map tags) { + private void addLatencyMetrics(String scopeName, Sensor sensor, String opName, Map tags) { maybeAddMetric(sensor, metrics.metricName(opName + "-latency-avg", groupNameFromScope(scopeName), "The average latency of " + opName + " operation.", tags), new Avg()); maybeAddMetric(sensor, metrics.metricName(opName + "-latency-max", groupNameFromScope(scopeName), "The max latency of " + opName + " operation.", tags), new Max()); - addThroughputMetrics(scopeName, sensor, entityName, opName, tags); + addThroughputMetrics(scopeName, sensor, opName, tags); } - private void addThroughputMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map tags) { + private void addThroughputMetrics(String scopeName, Sensor sensor, String opName, Map tags) { maybeAddMetric(sensor, metrics.metricName(opName + "-rate", groupNameFromScope(scopeName), "The average number of occurrence of " + opName + " operation per second.", tags), new Rate(new Count())); } - private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) { + public void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) { if (!metrics.metrics().containsKey(name)) { sensor.add(name, stat); } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index c07fa06768a2..e147ea8ddb58 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -70,7 +70,7 @@ private void initInternal(final ProcessorContext context) { valueSerde == null ? (Serde) context.valueSerde() : valueSerde); this.cache = this.context.getCache(); - this.cacheName = this.cache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), underlying.name()); + this.cacheName = ThreadCache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), underlying.name()); cache.addDirtyEntryFlushListener(cacheName, new ThreadCache.DirtyEntryFlushListener() { @Override public void apply(final List entries) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index 4fc8eac19cf8..47d5152a19b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -16,8 +16,6 @@ */ package org.apache.kafka.streams.state.internals; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.metrics.MeasurableStat; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Max; @@ -362,18 +360,10 @@ class NamedCacheMetrics { final Map allMetricTags; final Sensor hitRatioSensor; - private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) { - if (!metrics.metrics().containsKey(name)) { - sensor.add(name, stat); - } else { - log.trace("Trying to add metric twice: {}", name); - } - } - public NamedCacheMetrics(StreamsMetrics metrics) { final String scope = "record-cache"; final String opName = "hitRatio"; - final String tagKey = "record-cache-id"; + final String tagKey = scope + "-id"; final String tagValue = ThreadCache.underlyingStoreNamefromCacheName(name); this.groupName = "stream-" + scope + "-metrics"; this.metrics = (StreamsMetricsImpl) metrics; @@ -384,21 +374,21 @@ public NamedCacheMetrics(StreamsMetrics metrics) { // add parent Sensor parent = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG); - maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-avg", groupName, - "The average cache hit ratio of " + name, allMetricTags), new Avg()); - maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-min", groupName, - "The minimum cache hit ratio of " + name, allMetricTags), new Min()); - maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-max", groupName, - "The maximum cache hit ratio of " + name, allMetricTags), new Max()); + ((StreamsMetricsImpl) metrics).maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-avg", groupName, + "The average cache hit ratio.", allMetricTags), new Avg()); + ((StreamsMetricsImpl) metrics).maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-min", groupName, + "The minimum cache hit ratio.", allMetricTags), new Min()); + ((StreamsMetricsImpl) metrics).maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-max", groupName, + "The maximum cache hit ratio.", allMetricTags), new Max()); // add child hitRatioSensor = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG, parent); - maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-avg", groupName, - "The average cache hit ratio of " + name, metricTags), new Avg()); - maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-min", groupName, - "The minimum cache hit ratio of " + name, metricTags), new Min()); - maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-max", groupName, - "The maximum cache hit ratio of " + name, metricTags), new Max()); + ((StreamsMetricsImpl) metrics).maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-avg", groupName, + "The average cache hit ratio.", metricTags), new Avg()); + ((StreamsMetricsImpl) metrics).maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-min", groupName, + "The minimum cache hit ratio.", metricTags), new Min()); + ((StreamsMetricsImpl) metrics).maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-max", groupName, + "The maximum cache hit ratio.", metricTags), new Max()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 6a0fa9bfa82f..1220c02347bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -77,8 +77,8 @@ public long flushes() { } /** - * The thread cache maintains a set of caches whose names are a concatenation of the task ID and the - * underlying store name + * The thread cache maintains a set of {@link NamedCache}s whose names are a concatenation of the task ID and the + * underlying store name. This method creates those names. * @param taskIDString Task ID * @param underlyingStoreName Underlying store name * @return @@ -87,11 +87,21 @@ public static String nameSpaceFromTaskIdAndStore(final String taskIDString, fina return taskIDString + "-" + underlyingStoreName; } + /** + * Given a cache name of the form taskid-storename, return the task ID. + * @param cacheName + * @return + */ public static String taskIDfromCacheName(final String cacheName) { String[] tokens = cacheName.split("-"); return tokens[0]; } + /** + * Given a cache name of the form taskid-storename, return the store name. + * @param cacheName + * @return + */ public static String underlyingStoreNamefromCacheName(final String cacheName) { String[] tokens = cacheName.split("-"); return tokens[1];