Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HOTFIX: Fixes to metric names #3491

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
281 changes: 260 additions & 21 deletions docs/ops.html
Original file line number Diff line number Diff line change
Expand Up @@ -1241,14 +1241,64 @@ <h5><a id="kafka_streams_thread_monitoring" href="#kafka_streams_thread_monitori
<th>Description</th>
<th>Mbean name</th>
</tr>
<tr>
<td>[commit | poll | process | punctuate]-latency-[avg | max]</td>
<td>The [average | maximum] execution time in ms, for the respective operation, across all running tasks of this thread.</td>
<tr>
<td>commit-latency-avg</td>
<td>The average execution time in ms for committing, across all running tasks of this thread.</td>
<td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
</tr>
<tr>
<td>[commit | poll | process | punctuate]-rate</td>
<td>The average number of respective operations per second across all tasks.</td>
</tr>
<tr>
<td>commit-latency-max</td>
<td>The maximum execution time in ms for committing across all running tasks of this thread.</td>
<td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
</tr>
<tr>
<td>poll-latency-avg</td>
<td>The average execution time in ms for polling, across all running tasks of this thread.</td>
<td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
</tr>
<tr>
<td>poll-latency-max</td>
<td>The maximum execution time in ms for polling across all running tasks of this thread.</td>
<td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
</tr>
<tr>
<td>process-latency-avg</td>
<td>The average execution time in ms for processing, across all running tasks of this thread.</td>
<td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
</tr>
<tr>
<td>process-latency-max</td>
<td>The maximum execution time in ms for processing across all running tasks of this thread.</td>
<td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
</tr>
<tr>
<td>punctuate-latency-avg</td>
<td>The average execution time in ms for punctuating, across all running tasks of this thread.</td>
<td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
</tr>
<tr>
<td>punctuate-latency-max</td>
<td>The maximum execution time in ms for punctuating across all running tasks of this thread.</td>
<td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
</tr>
<tr>
<td>commit-rate</td>
<td>The average number of commits per second across all tasks.</td>
<td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
</tr>
<tr>
<td>poll-rate</td>
<td>The average number of polls per second across all tasks.</td>
<td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
</tr>
<tr>
<td>process-rate</td>
<td>The average number of process calls per second across all tasks.</td>
<td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
</tr>
<tr>
<td>punctuate-rate</td>
<td>The average number of punctuates per second across all tasks.</td>
<td>kafka.streams:type=stream-metrics,thread.client-id=([-.\w]+)</td>
</tr>
<tr>
Expand Down Expand Up @@ -1279,8 +1329,13 @@ <h5><a id="kafka_streams_task_monitoring" href="#kafka_streams_task_monitoring">
<th>Mbean name</th>
</tr>
<tr>
<td>commit-latency-[avg | max]</td>
<td>The [average | maximum] commit time in ns for this task. </td>
<td>commit-latency-avg</td>
<td>The average commit time in ns for this task. </td>
<td>kafka.streams:type=stream-task-metrics,streams-task-id=([-.\w]+)</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is incorrect: from I saw it is kafka.streams:type=stream-task-metrics,client-id=[thread-name],streams-task-id=[task-id].

Ditto for processor-node-level, and store-level metrics.

</tr>
<tr>
<td>commit-latency-max</td>
<td>The maximum commit time in ns for this task. </td>
<td>kafka.streams:type=stream-task-metrics,streams-task-id=([-.\w]+)</td>
</tr>
<tr>
Expand All @@ -1301,13 +1356,63 @@ <h5><a id="kafka_streams_node_monitoring" href="#kafka_streams_node_monitoring">
<th>Mbean name</th>
</tr>
<tr>
<td>[process | punctuate | create | destroy]-latency-[avg | max]</td>
<td>The [average | maximum] execution time in ns, for the respective operation. </td>
<td>process-latency-avg</td>
<td>The average process execution time in ns. </td>
<td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
</tr>
<tr>
<td>process-latency-max</td>
<td>The maximum process execution time in ns. </td>
<td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
</tr>
<tr>
<td>punctuate-latency-avg</td>
<td>The average punctuate execution time in ns. </td>
<td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
</tr>
<tr>
<td>punctuate-latency-max</td>
<td>The maximum punctuate execution time in ns. </td>
<td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
</tr>
<tr>
<td>create-latency-avg</td>
<td>The average create execution time in ns. </td>
<td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
</tr>
<tr>
<td>create-latency-max</td>
<td>The maximum create execution time in ns. </td>
<td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
</tr>
<tr>
<td>destroy-latency-avg</td>
<td>The average destroy execution time in ns. </td>
<td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
</tr>
<tr>
<td>destroy-latency-max</td>
<td>The maximum destroy execution time in ns. </td>
<td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
</tr>
<tr>
<td>process-rate</td>
<td>The average number of process operations per second. </td>
<td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
</tr>
<tr>
<td>punctuate-rate</td>
<td>The average number of punctuate operations per second. </td>
<td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
</tr>
<tr>
<td>create-rate</td>
<td>The average number of create operations per second. </td>
<td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
</tr>
<tr>
<td>[process | punctuate | create | destroy]-rate</td>
<td>The average number of respective operations per second. </td>
<td>destroy-rate</td>
<td>The average number of destroy operations per second. </td>
<td>kafka.streams:type=stream-processor-node-metrics, processor-node-id=([-.\w]+)</td>
</tr>
<tr>
Expand All @@ -1329,16 +1434,140 @@ <h5><a id="kafka_streams_store_monitoring" href="#kafka_streams_store_monitoring
<th>Mbean name</th>
</tr>
<tr>
<td>[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-latency-[avg | max]</td>
<td>The average execution time in ns, for the respective operation. </td>
<td>put-latency-avg</td>
<td>The average put execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>[put | put-if-absent | get | delete | put-all | all | range | flush | restore]-rate</td>
<td>The average rate of respective operations per second for this store.</td>
<tr>
<td>put-latency-max</td>
<td>The maximum put execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>put-if-absent-latency-avg</td>
<td>The average put-if-absent execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>put-if-absent-latency-max</td>
<td>The maximum put-if-absent execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>get-latency-avg</td>
<td>The average get execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>get-latency-max</td>
<td>The maximum get execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>delete-latency-avg</td>
<td>The average delete execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>delete-latency-max</td>
<td>The maximum delete execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>put-all-latency-avg</td>
<td>The average put-all execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>put-all-latency-max</td>
<td>The maximum put-all execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>all-latency-avg</td>
<td>The average all operation execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>all-latency-max</td>
<td>The maximum all operation execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>range-latency-avg</td>
<td>The average range execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>range-latency-max</td>
<td>The maximum range execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>flush-latency-avg</td>
<td>The average flush execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>flush-latency-max</td>
<td>The maximum flush execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>restore-latency-avg</td>
<td>The average restore execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>restore-latency-max</td>
<td>The maximum restore execution time in ns. </td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>put-rate</td>
<td>The average put rate for this store.</td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>put-if-absent-rate</td>
<td>The average put-if-absent rate for this store.</td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>get-rate</td>
<td>The average get rate for this store.</td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>delete-rate</td>
<td>The average delete rate for this store.</td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>put-all-rate</td>
<td>The average put-all rate for this store.</td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>all-rate</td>
<td>The average all operation rate for this store.</td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>range-rate</td>
<td>The average range rate for this store.</td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>flush-rate</td>
<td>The average flush rate for this store.</td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>
<tr>
<td>restore-rate</td>
<td>The average restore rate for this store.</td>
<td>kafka.streams:type=stream-[store-type]-metrics</td>
</tr>

</tbody>
</table>

Expand All @@ -1353,8 +1582,18 @@ <h5><a id="kafka_streams_cache_monitoring" href="#kafka_streams_cache_monitoring
<th>Mbean name</th>
</tr>
<tr>
<td>hitRatio-[avg | min | max]</td>
<td>The cache hit ratio defined as the ratio of cache read hits over the total cache read requests. </td>
<td>hitRatio-avg</td>
<td>The average cache hit ratio defined as the ratio of cache read hits over the total cache read requests. </td>
<td>kafka.streams:type=stream-record-cache-metrics, record-cache-id=([-.\w]+)</td>
</tr>
<tr>
<td>hitRatio-min</td>
<td>The mininum cache hit ratio. </td>
<td>kafka.streams:type=stream-record-cache-metrics, record-cache-id=([-.\w]+)</td>
</tr>
<tr>
<td>hitRatio-max</td>
<td>The maximum cache hit ratio. </td>
<td>kafka.streams:type=stream-record-cache-metrics, record-cache-id=([-.\w]+)</td>
</tr>
</tbody>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import org.apache.kafka.streams.processor.Punctuator;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class ProcessorNode<K, V> {
Expand Down Expand Up @@ -102,7 +100,7 @@ public void addChild(ProcessorNode<?, ?> child) {
public void init(ProcessorContext context) {
this.context = context;
try {
nodeMetrics = new NodeMetrics(context.metrics(), name, "task." + context.taskId());
nodeMetrics = new NodeMetrics(context.metrics(), name, context);
nodeMetrics.metrics.measureLatencyNs(time, initDelegate, nodeMetrics.nodeCreationSensor);
} catch (Exception e) {
throw new StreamsException(String.format("failed to initialize processor %s", name), e);
Expand Down Expand Up @@ -163,7 +161,6 @@ public String toString(String indent) {

protected static final class NodeMetrics {
final StreamsMetricsImpl metrics;
final Map<String, String> metricTags;

final Sensor nodeProcessTimeSensor;
final Sensor nodePunctuateTimeSensor;
Expand All @@ -173,21 +170,25 @@ protected static final class NodeMetrics {
final Sensor nodeDestructionSensor;


public NodeMetrics(StreamsMetrics metrics, String name, String sensorNamePrefix) {
public NodeMetrics(final StreamsMetrics metrics, final String name, final ProcessorContext context) {
final String scope = "processor-node";
final String tagKey = "processor-node-id";
final String tagValue = name;
final String tagKey = "task-id";
final String tagValue = context.taskId().toString();
this.metrics = (StreamsMetricsImpl) metrics;
this.metricTags = new LinkedHashMap<>();
this.metricTags.put(tagKey, tagValue);

// these are all latency metrics
this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "process", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "punctuate", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, sensorNamePrefix + "." + name, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(scope, sensorNamePrefix + "." + name, "skippedDueToDeserializationError", Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(scope, name, "process",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(scope, name, "punctuate",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(scope, name, "create",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(scope, name, "destroy",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.sourceNodeForwardSensor = metrics.addThroughputSensor(scope, name, "forward",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(scope, name, "skippedDueToDeserializationError",
Sensor.RecordingLevel.DEBUG, tagKey, tagValue);
}

public void removeAllSensors() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ protected class TaskMetrics {
TaskMetrics(final StreamsMetrics metrics) {
final String name = id().toString();
this.metrics = (StreamsMetricsImpl) metrics;
taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit", Sensor.RecordingLevel.DEBUG, "streams-task-id", name);
taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit",
Sensor.RecordingLevel.DEBUG);
}

void removeAllSensors() {
Expand Down