Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HOTFIX: Fixes to metric names #3491

Closed
wants to merge 8 commits into from
Closed

HOTFIX: Fixes to metric names #3491

wants to merge 8 commits into from

Conversation

enothereska
Copy link
Contributor

A couple of fixes to metric names to match the KIP

  • Removed extra strings in the metric names that are already in the tags
  • add a separate metric for "all"

@enothereska
Copy link
Contributor Author

cc @guozhangwang

@enothereska
Copy link
Contributor Author

screen shot 2017-07-06 at 9 10 56 am
Screenshot in JConsole.

@asfgit
Copy link

asfgit commented Jul 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5918/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jul 6, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5903/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two meta-comments: for processor-node level metrics, we do not need the task-id prefix as in the attribute name:

screen shot 2017-07-09 at 7 20 37 pm

And in store-cache-level metrics, we do not need the task-id as the prefix of the record-cache-id as well:

screen shot 2017-07-09 at 7 21 50 pm

Since the processor-id (e.g. KSTREAM-AGGREGATE-0000000003) and the store name (e.g. Count) respectively are uniquely identifiable throughout the topology.

@@ -117,7 +118,7 @@ public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName,

// first add the global operation metrics if not yet, with the global tags only
Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
addLatencyMetrics(scopeName, parent, "all", operationName, tagMap);
addLatencyMetrics(scopeName, parent, null, operationName, tagMap("all", "all"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. This will result in the mbean object name as, for example:

kafka.streams:type=stream-task-metrics,client-id=streams-wordcount-e600ab70-9a78-4886-b2c7-bce8f9fdbcf2-StreamThread-1,all=all

and

kafka.streams:type=stream-task-metrics,client-id=streams-wordcount-e600ab70-9a78-4886-b2c7-bce8f9fdbcf2-StreamThread-1,streams-task-id=1_0

I think it is better to be "streams-task-id=all".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another issue is that, for state-store-metrics, since the per store metrics do not have tags, doing this will result in a bit mis-match hierarchies like this:

screen shot 2017-07-09 at 7 14 54 pm

Note the "all" attributes is grouped with the other stores at the same level while the state store names will be used as the prefix for the per-store metrics, so if we have multiple rocksDB stores it will be added all under the global "stream-rocksdb-state-metrics" as separate metrics with different store names:

screen shot 2017-07-09 at 7 26 51 pm

Instead what we want is to have "store-name" in the tags as well so that under stream-[store-type]-state-metrics we will have a first layer of "storeName1", "storeName2" .. and "all" (whose tags would be "store-name", "all").

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@guozhangwang
Copy link
Contributor

Also could you update the wed docs file for the streams metrics in this PR as well?

@asfgit
Copy link

asfgit commented Jul 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6106/
Test FAILed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jul 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6091/
Test FAILed (JDK 8 and Scala 2.12).

@asfgit
Copy link

asfgit commented Jul 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6100/
Test PASSed (JDK 8 and Scala 2.12).

@asfgit
Copy link

asfgit commented Jul 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6115/
Test PASSed (JDK 7 and Scala 2.11).

@guozhangwang
Copy link
Contributor

For stream-processor-node-metrics the task id is encoded in the attribute name, but for cache metrics they are encoded in the identified as record-cache-id=1_0-Counts, and for store metrics they are not encoded at all (i.e. multiple tasks' stores will be sharing the same metric). It is not consistent.

screen shot 2017-07-17 at 2 43 20 pm

All metrics have the client-id=[thread-name] but not for stream-record-cache-metrics. This is not consistent:

screen shot 2017-07-17 at 2 43 20 pm

docs/ops.html Outdated
<td>The [average | maximum] commit time in ns for this task. </td>
<td>commit-latency-avg</td>
<td>The average commit time in ns for this task. </td>
<td>kafka.streams:type=stream-task-metrics,streams-task-id=([-.\w]+)</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is incorrect: from I saw it is kafka.streams:type=stream-task-metrics,client-id=[thread-name],streams-task-id=[task-id].

Ditto for processor-node-level, and store-level metrics.

@@ -117,7 +118,7 @@ public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName,

// first add the global operation metrics if not yet, with the global tags only
Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
addLatencyMetrics(scopeName, parent, "all", operationName, tagMap);
addLatencyMetrics(scopeName, parent, null, operationName, tagMap(tags != null && tags.length > 1 ? tags[0] : "all", "all"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these APIs are also exposed to users for them to construct their own metrics, in which case the tags may not contain any values, but at the same time we are relying on entityName to construct the parent-child metrics hierarchy. So we should handle the construction of the tags here than enforcing the callers to pass in null as entity name and with the additional tags constructed inside the function.

@enothereska
Copy link
Contributor Author

For stream-processor-node-metrics the task id is encoded in the attribute name, but for cache metrics they are encoded in the identified as record-cache-id=1_0-Counts

@guozhangwang I don't think this is a metrics problem as such. I simply use (consistently) the name of the node or cache. It just so happens that our way of naming caches includes the task ID in them. By the time we reach the metrics code, I don't have any control on the cache name.

@enothereska
Copy link
Contributor Author

@guozhangwang the source of the problems is the way we assign names to nodes, caches and stores. That itself is inconsistent. I can't solve it at the metrics level if it is inconsistent at a higher level:

  • for nodes: we assign them a unique number as part of their name
  • for caches: we prefix their names with the task ID
  • for stores: we give them a name with no prefix. Internally the code keeps each store name per task.

@guozhangwang
Copy link
Contributor

guozhangwang commented Jul 18, 2017

@guozhangwang I don't think this is a metrics problem as such. I simply use (consistently) the name of the node or cache. It just so happens that our way of naming caches includes the task ID in them. By the time we reach the metrics code, I don't have any control on the cache name.

Yes I understand that we use [task-id]-[store-name] as the name of the cache to be unique within the thread, but that does not necessarily mean that we need to use the same string as the record-cache-id as part of the metrics right?

@guozhangwang the source of the problems is the way we assign names to nodes, caches and stores. That itself is inconsistent. I can't solve it at the metrics level if it is inconsistent at a higher level:

for nodes: we assign them a unique number as part of their name
for caches: we prefix their names with the task ID
for stores: we give them a name with no prefix. Internally the code keeps each store name per task.

Just to follow what I meant above: I think we do not necessarily enforce ourselves to use the id of the node / cache / .. as the tags of "xx-id=yy". In addition, that does not work well with the parent-child metrics hierarchy: originally we infer the hierarchy from the scopeName and the entityName such that we will have a per-entity metric given the combo of scopeName-entityName, and then have an aggregated parent metric (i.e. xx-id=all) for scopeName only. However the current change sets the entityName to null and encode the xx-id=yy in the tags. In this case are we trying to let users who wants to create their own metrics follow this pattern as well? I hope not since it is quite cumbersome as a public API.

So what I had previously in mind is that, for the APIs like addLatencyAndThroughputSensor (note this is not only used by the Streams internal metrics like per-thread / processor-node / etc, but also as a public APIs for users) the protocol is that:

  1. The tags is only used for any additional customized tags users want to add when creating their own metrics;

  2. In the StreamsMetricsImpl class, we will interpret the passed-in EntityName as an additional tags to differentiate different entities, as [scopeName]-id=[entityName] And the resulted metrics will be created as

type:[groupNameFromScope(scopeName)],[scopeName]-id=[entityName],[any passed key=value tags]
    attribute1:[opName]-rate
    attribute2:[opName]-latency-avg
    attribute3:[opName]-latency-max

type:[groupNameFromScope(scopeName)],[scopeName]-id=all,[any passed key=value tags]
    attribute1:[opName]-rate
    attribute2:[opName]-latency-avg
    attribute3:[opName]-latency-max
  1. For the Streams own internal metrics, if we are calling via the add..Sensor public APIs we will not need to pass in the additional tags for the id of the entity itself through the tags strings, but only for any higher-level distinguishable ids (say if it is a processor-node level metrics, we do not need to pass the processor-node-id as the tags, but only for its higher-level parent taskId and threadId. Details below); if we are calling metrics.sensor() directly we manually set the tags with the xx-id=yy. But these two call paths would better make sure the resulted mbean / attributes are consistent. MORE SPECIFICALLY:

3.1) For per-thread metrics, this is the top-level metrics and we are not using the public APIs but are calling metrics.sensor() directly to create the mbean name with thread-id=[threadClientId] manually set by StreamsMetricsThreadImpl, as:

type:streams-thread-metrics,thread-id=[threadClientId]
    attributes: // manually add in the code as commit-rate, process-rate, etc..

So we have one metric per-thread, and we do not have a parent metric over all threads (since in practice very likely one thread per instance).

3.2) For per-task metrics, this is the second layer metrics and we are using the public APIs. We can pass the parameters as scopeName=task, entityName=[taskID] with one additional tag as thread-id=[threadClientId]. The resulted metrics will be

type:streams-task-metrics,task-id=[taskID],thread-id=[threadClientId]
    attribute1:commit-rate
    attribute2:commit-latency-avg
    attribute3:commit-latency-max

type:streams-processor-node-metrics,task-id=all,thread-id=[threadClientId]
    attribute1:commit-rate
    attribute2:commit-latency-avg
    attribute3:commit-latency-max

So we have one metric per-task-per-thread, and the parent metrics aggregates over all tasks per thread.

3.3) For per-processor-node metrics (note this is one lever lower than the per-task since our hierarchy is thread-tasks-processorNode/stateStore) similarly we are using the public APIs; BUT the processor node id itself is not global unique since multiple tasks have the same processor node.

So we can pass the parameters as scopeName=processor-node, entityName=[processorName], and additional tags as task-id=[taskID],`thread-id=[threadClientId] the resulted metrics will be

type:streams-processor-node-metrics,processor-node-id=[processorName],task-id=[taskID],thread-id=[threadClientId]
    attribute1:process-rate
    attribute2:process-latency-avg
    attribute3:process-latency-max
    attribute3:punctuate-rate
    ...

type:streams-processor-node-metrics,processor-node-id=all,task-id=[taskID],thread-id=[threadClientId]
    attribute1:process-rate
    attribute2:process-latency-avg
    attribute3:process-latency-max
    attribute3:punctuate-rate
    ...

I.e. we will have one metrics per-processor-per-task-per-thread, and the parent metric will be aggregate over all processor nodes per-task-per-thread.

3.4) / 3.5) For per-store / per-cache metrics, they are at the similar level of the per-processor-node metrics. Similarly we pass the entityName=[storeName] and scopeName=[storeType] or cache. The resulted metrics will be:

type:streams-rocksdb-state-metrics,rocksdb-state-id=[stateName],task-id=[taskID],thread-id=[threadClientId]
    attributes ... // no prefix, same below

type:streams-rocksdb-state-metrics,rocksdb-state-id=all,task-id=[taskID],thread-id=[threadClientId]
    attributes ...

type:streams-inmemory-state-metrics,inmemory-state-id=[stateName],task-id=[taskID],thread-id=[threadClientId]
    attributes ... // no prefix, same below

type:streams-inmemory-state-metrics,inmemory-state-id=all,task-id=[taskID],thread-id=[threadClientId]
    attributes ...

type:streams-cache-metrics,cache-id=[stateName],task-id=[taskID],thread-id=[threadClientId]
    attributes ... // no prefix, same below

type:streams-cache-state-metrics,cache-id=all,task-id=[taskID],thread-id=[threadClientId]
    attributes ...

Note each of the child metric is per-store/cache-per-task-per-thread, and the parent aggregates over all the state types or caches per-task-per-thread.

  1. For any user added metrics, say I want to add a new throughput-and-latency sensor with two entities, with opName=do, entityName=foo and bar, and scopeName=mine, with no more tags. The resulted metrics will be:
type:streams-mine-metrics,mine-id=foo
    attribute1:do-rate
    attribute2:do-latency-avg
    attribute3:do-latency-max

type:streams-mine-metrics,mine-id=bar
    attribute1:do-rate
    attribute2:do-latency-avg
    attribute3:do-latency-max

type:streams-mine-metrics,mine-id=all
    attribute1:do-rate
    attribute2:do-latency-avg
    attribute3:do-latency-max

I.e. one metric across threads / tasks per the entity, and one parent aggregated across all the entities of the same scope.

If users want to have this metric per-task / thread as well, then we can state that in the javadocs of addLatencyAndThroughputSensor APIs etc so that they need to pass in more tags like task-id=[taskID],thread-id=[threadClientId]

Do this whole make sense to you?

@asfgit
Copy link

asfgit commented Jul 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6206/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jul 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6190/
Test PASSed (JDK 8 and Scala 2.12).

@enothereska
Copy link
Contributor Author

@guozhangwang I'll need to update docs, but see if code makes more sense first. Thanks.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resulted metrics lgtm. Left some comments.

When updating the web docs please also describe the hierarchies (thread -> task -> processor-node / state-stores / caches) for users.

@@ -94,15 +97,16 @@ private String sensorName(String operationName, String entityName) {
}
}

private Map<String, String> tagMap(String... tags) {
public Map<String, String> tagMap(String... tags) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to expose this as a public function in order to be used in NamedCache.java, we can make it as a static public function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses the default tags from the StreamsMetricsImpl class so it is hard to make static.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, nvm.

List<String> updatedTagList = new ArrayList(Arrays.asList(tags));
updatedTagList.add(scopeName + "-id");
updatedTagList.add(entityName);
Map<String, String> tagMap = tagMap(updatedTagList.toArray(new String[updatedTagList.size()]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be simplified as

Map<String, String> tagMap = tagMap(scopeName + "-id", entityName);
Map<String, String> allTagMap = tagMap("all", entityName);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I need to add the original tags as well, not just those two lines. I've cleaned up though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then how about:

Map<String, String> tagMap = tagMap(scopeName + "-id", entityName, tags);
Map<String, String> allTagMap = tagMap("all", entityName, tags);

?

List<String> updatedTagList = new ArrayList(Arrays.asList(tags));
updatedTagList.add(scopeName + "-id");
updatedTagList.add(entityName);
Map<String, String> tagMap = tagMap(updatedTagList.toArray(new String[updatedTagList.size()]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here.

addThroughputMetrics(scopeName, sensor, entityName, opName, tags);
}

private void addThroughputMetrics(String scopeName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-rate", groupNameFromScope(scopeName),
"The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
maybeAddMetric(sensor, metrics.metricName(opName + "-rate", groupNameFromScope(scopeName),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entityName parameter is not needed right?

maybeAddMetric(sensor, metrics.metricName(entityName + "-" + opName + "-latency-max", groupNameFromScope(scopeName),
"The max latency of " + entityName + " " + opName + " operation.", tags), new Max());

maybeAddMetric(sensor, metrics.metricName(opName + "-latency-avg", groupNameFromScope(scopeName),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as addThroughputMetrics, entityName is not needed.

@@ -77,6 +77,28 @@ public long flushes() {
}

/**
* The thread cache maintains a set of caches whose names are a concatenation of the task ID and the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc: we can refer to {link NamedCache}, and also we need to specify the purpose of the function itself, i.e. "return the name of the cache as..." instead of explaining the class.

this.cache = this.context.getCache();
this.cacheName = this.cache.nameSpaceFromTaskIdAndStore(context.taskId().toString(), underlying.name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since nameSpaceFromTaskIdAndStore is static function, we should use ThreadCache#nameSpaceFromTaskIdAndStore .

final Sensor hitRatioSensor;

private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function can be part of NamedCacheMetrics itself.

// add parent
Sensor parent = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG);
maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-avg", groupName,
"The average cache hit ratio of " + name, allMetricTags), new Avg());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we do not need the of name part anymore in the metric description, since it is already under the tags. Just The average cache hit ratio is good enough. Ditto elsewhere.

final String opName = "hitRatio";
final String tagKey = "record-cache-id";
final String tagValue = name;
final String tagValue = ThreadCache.underlyingStoreNamefromCacheName(name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to keep tagKey = scope + "-id"; in case we change scope in the future.

@asfgit
Copy link

asfgit commented Jul 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6249/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Jul 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6233/
Test PASSed (JDK 8 and Scala 2.12).

@enothereska
Copy link
Contributor Author

@guozhangwang anything else left? Thanks.

@asfgit
Copy link

asfgit commented Aug 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/6513/
Test PASSed (JDK 7 and Scala 2.11).

@asfgit
Copy link

asfgit commented Aug 3, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/6499/
Test PASSed (JDK 8 and Scala 2.12).

@asfgit asfgit closed this in 6bee1e9 Aug 3, 2017
asfgit pushed a commit that referenced this pull request Aug 3, 2017
A couple of fixes to metric names to match the KIP
- Removed extra strings in the metric names that are already in the tags
- add a separate metric for "all"

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #3491 from enothereska/hotfix-metric-names

(cherry picked from commit 6bee1e9)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Contributor

LGTM. Merged to trunk and cherry-picked to 0.11.0. Thanks @enothereska .

@enothereska enothereska deleted the hotfix-metric-names branch August 4, 2017 06:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants