Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Support custom metrics rules for PrometheusSink (#3493)
Browse files Browse the repository at this point in the history
* Support Java 11

* config travis to use oracle jdk 11

* Java 11 support (#3399)

* Support Java 11

* config travis to use oracle jdk 11

* Add check jdk version

* Fix command arguments.

Change insert gc_options

Update list

Fix gc-logging

* Add missing parameter

* typo

* Add pause time

* wip

* Support jmx_exporter format configuration.

* Fix checkstyle

* Remove unused

* Java 11 support (#3399)

* Support Java 11

* config travis to use oracle jdk 11

* Add check jdk version

* Fix command arguments.

Change insert gc_options

Update list

Fix gc-logging

* wip

* Support jmx_exporter format configuration.

* Fix checkstyle

* Remove unused

* Update kafkaOffset metrics

* Add Rules

* Make log/sink/consume Streamlet component support setName and setNumPartitions (#3459)

* Patch to fix cppcheck with newer glibc (#3471)

* Add documents for setting up a docker based development environment (#3475)

* Improve concurrency for needed parts. (#3107)

* Change concurrent Map

* Change concurrent Map

* HashMap changes for unneeded parts.

* HashMap changes for unneeded parts.

* Review changes

* Changes HashMap for unneeded parts.

* Improve concurrency for needed parts.

* Remove unused imports.

* Remove unused imports.

* Remove unused imports.

* Fix NPE

(cherry picked from commit 545d381)

* Fix WhitespaceAround

* Add dummy Object

* Fix ConstantName

(cherry picked from commit 8d6d506)

* Update kafkaOffset metrics

* Add Rules

* Update line is longer than 100 characters

* Update line is longer than 100 characters

* Add attrNameSnakeCase or other metrics fix

* fix checkstyle

Co-authored-by: Ning Wang <wangninggm@gmail.com>
Co-authored-by: Ning Wang <nwang@twitter.com>
Co-authored-by: Nicholas Nezis <nicholas.nezis@gmail.com>
  • Loading branch information
4 people committed Apr 2, 2020
1 parent 6699a58 commit b34a76b
Show file tree
Hide file tree
Showing 2 changed files with 324 additions and 36 deletions.
Expand Up @@ -20,10 +20,17 @@
package org.apache.heron.metricsmgr.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.cache.Cache;
Expand All @@ -33,6 +40,9 @@
import org.apache.heron.spi.metricsmgr.metrics.MetricsRecord;
import org.apache.heron.spi.metricsmgr.sink.SinkContext;

import static java.lang.String.format;
import static org.apache.heron.metricsmgr.sink.PrometheusSink.Prometheus.sanitizeMetricName;

/**
* A web sink that exposes and endpoint that Prometheus can scrape
*
Expand All @@ -57,6 +67,7 @@ public class PrometheusSink extends AbstractWebSink {

// This is the cache that is used to serve the metrics
private Cache<String, Map<String, Double>> metricsCache;
private List<Rule> rules = new ArrayList<Rule>();

private String cluster;
private String role;
Expand All @@ -66,13 +77,91 @@ public PrometheusSink() {
super();
}

private enum Type {
COUNTER,
GAUGE,
SUMMARY,
HISTOGRAM,
UNTYPED,
}

private static class Rule {
public Pattern pattern;
public String name;
public String value;
public Double valueFactor = 1.0;
public String help;
public boolean attrNameSnakeCase;
public Type type = Type.UNTYPED;
public ArrayList<String> labelNames;
public ArrayList<String> labelValues;
}

@Override
void initialize(Map<String, Object> configuration, SinkContext context) {
metricsCache = createCache();

cluster = context.getCluster();
role = context.getRole();
environment = context.getEnvironment();

if (configuration.containsKey("rules")) {
List<Map<String, Object>> configRules = (List<Map<String, Object>>)
configuration.get("rules");
for (Map<String, Object> ruleObject : configRules) {
Rule rule = new Rule();
rules.add(rule);
if (ruleObject.containsKey("pattern")) {
rule.pattern = Pattern.compile("^.*(?:" + (String) ruleObject.get("pattern") + ").*$");
}
if (ruleObject.containsKey("name")) {
rule.name = (String) ruleObject.get("name");
}
if (ruleObject.containsKey("value")) {
rule.value = String.valueOf(ruleObject.get("value"));
}
if (ruleObject.containsKey("valueFactor")) {
String valueFactor = String.valueOf(ruleObject.get("valueFactor"));
try {
rule.valueFactor = Double.valueOf(valueFactor);
} catch (NumberFormatException e) {
// use default value
}
}
if (ruleObject.containsKey("attrNameSnakeCase")) {
rule.attrNameSnakeCase = (Boolean) ruleObject.get("attrNameSnakeCase");
}
if (ruleObject.containsKey("type")) {
rule.type = Type.valueOf((String) ruleObject.get("type"));
}
if (ruleObject.containsKey("help")) {
rule.help = (String) ruleObject.get("help");
}
if (ruleObject.containsKey("labels")) {
TreeMap labels = new TreeMap((Map<String, Object>) ruleObject.get("labels"));
rule.labelNames = new ArrayList<String>();
rule.labelValues = new ArrayList<String>();
for (Map.Entry<String, Object> entry : (Set<Map.Entry<String, Object>>) labels
.entrySet()) {
rule.labelNames.add(entry.getKey());
rule.labelValues.add((String) entry.getValue());
}
}

// Validation.
if ((rule.labelNames != null || rule.help != null) && rule.name == null) {
throw new IllegalArgumentException("Must provide name, if help or labels are given: "
+ ruleObject);
}
if (rule.name != null && rule.pattern == null) {
throw new IllegalArgumentException("Must provide pattern, if name is given: "
+ ruleObject);
}
}
} else {
// Default to a single default rule.
rules.add(new Rule());
}
}

@Override
Expand All @@ -82,6 +171,9 @@ byte[] generateResponse() throws IOException {
final StringBuilder sb = new StringBuilder();

metrics.forEach((String source, Map<String, Double> sourceMetrics) -> {
// Map the labels.
final Map<String, String> labelKV = new TreeMap<String, String>();

String[] sources = source.split("/");
String topology = sources[0];
String component = sources[1];
Expand All @@ -96,6 +188,18 @@ byte[] generateResponse() throws IOException {
final String clusterRoleEnv = hasClusterRoleEnvironment(c, r, e)
? String.format("%s/%s/%s", c, r, e) : null;

labelKV.put("topology", topology);
labelKV.put("component", component);
labelKV.put("instance_id", instance);

if (clusterRoleEnv != null) {
labelKV.put("cluster_role_env", clusterRoleEnv);
}

if (componentType != null) {
labelKV.put("component_type", componentType);
}

sourceMetrics.forEach((String metric, Double value) -> {

// some stream manager metrics in heron contain a instance id as part of the metric name
Expand All @@ -104,46 +208,79 @@ byte[] generateResponse() throws IOException {
// __time_spent_back_pressure_by_compid/container_1_exclaim1_1
// TODO convert to small classes for less string manipulation
final String metricName;
final String metricInstanceId;
if (componentIsStreamManger) {
final boolean metricHasInstanceId = metric.contains("_by_");
final String[] metricParts = metric.split("/");
if (metricHasInstanceId && metricParts.length == 3) {
metricName = String.format("%s_%s", metricParts[0], metricParts[2]);
metricInstanceId = metricParts[1];
metricName = splitTargetInstance(metricParts[0], metricParts[2], labelKV);
labelKV.put("metric_instance_id", metricParts[1]);
} else if (metricHasInstanceId && metricParts.length == 2) {
metricName = metricParts[0];
metricInstanceId = metricParts[1];
metricName = splitTargetInstance(metricParts[0], null, labelKV);
labelKV.put("metric_instance_id", metricParts[1]);
} else if (metricParts.length == 2) {
metricName = splitTargetInstance(metricParts[0], metricParts[1], labelKV);
} else {
metricName = metric;
metricInstanceId = null;
metricName = splitTargetInstance(metric, null, labelKV);
}

} else {
metricName = metric;
metricInstanceId = null;
}

String exportedMetricName = String.format("%s_%s", HERON_PREFIX,
metricName.replace("__", "").toLowerCase());
sb.append(Prometheus.sanitizeMetricName(exportedMetricName))
.append("{")
.append("topology=\"").append(topology).append("\",")
.append("component=\"").append(component).append("\",")
.append("instance_id=\"").append(instance).append("\"");

if (clusterRoleEnv != null) {
sb.append(",cluster_role_env=\"").append(clusterRoleEnv).append("\"");
}

if (componentType != null) {
sb.append(",component_type=\"").append(componentType).append("\"");
}

if (metricInstanceId != null) {
sb.append(",metric_instance_id=\"").append(metricInstanceId).append("\"");
final AtomicReference<String> name = new AtomicReference<>(sanitizeMetricName(metric));
rules.forEach(rule -> {
String ruleName = name.get();
Matcher matcher = null;
if (rule.pattern != null) {
matcher = rule.pattern.matcher(metric);
if (!matcher.matches()) {
return;
}
}

// If there's no name provided, use default export format.
if (rule.name == null || rule.name.isEmpty()) {
// nothing
} else {
// Matcher is set below here due to validation in the constructor.
ruleName = sanitizeMetricName(matcher.replaceAll(rule.name));
if (ruleName.isEmpty()) {
return;
}
}
if (rule.attrNameSnakeCase) {
name.set(toSnakeAndLowerCase(ruleName));
} else {
name.set(ruleName.toLowerCase());
}
if (rule.labelNames != null) {
for (int i = 0; i < rule.labelNames.size(); i++) {
final String unsafeLabelName = rule.labelNames.get(i);
final String labelValReplacement = rule.labelValues.get(i);
String labelName = sanitizeMetricName(matcher.replaceAll(unsafeLabelName));
String labelValue = matcher.replaceAll(labelValReplacement);
labelName = labelName.toLowerCase();
if (!labelName.isEmpty() && !labelValue.isEmpty()) {
labelKV.put(labelName, labelValue);
}
}
}
});
metricName = name.get();
}

// TODO Type, Help
String exportedMetricName = format("%s_%s", HERON_PREFIX,
metricName
.replace("__", "")
.toLowerCase());
sb.append(sanitizeMetricName(exportedMetricName))
.append("{");
final AtomicBoolean isFirst = new AtomicBoolean(true);
labelKV.forEach((k, v) -> {
// Add labels
if (!isFirst.get()) {
sb.append(',');
}
sb.append(format("%s=\"%s\"", k, v));
isFirst.set(false);
});
sb.append("} ")
.append(Prometheus.doubleToGoString(value))
.append(" ").append(currentTimeMillis())
Expand All @@ -154,6 +291,45 @@ byte[] generateResponse() throws IOException {
return sb.toString().getBytes();
}

private static final Pattern SPLIT_TARGET = Pattern.compile("__(?<name>\\w+)"
+ "_(?<target>(?<instance>\\w+)-\\d+)");
private static final Pattern DIGIT = Pattern.compile("[0-9]+");

private String splitTargetInstance(String part1, String part2, Map<String, String> labelKV) {
if (part2 != null) {
if (DIGIT.matcher(part2).matches()) {
labelKV.put("metric_instance_id", part2);
return part1;
}
final Matcher m = SPLIT_TARGET.matcher(part1);
if (m.matches()) {
labelKV.put("metric_instance_id", m.group("target"));
return String.format("%s_%s_%s", m.group("name"), m.group("instance"), part2);
}
return String.format("%s_%s", part1, part2);
}
return part1;
}

static String toSnakeAndLowerCase(String attrName) {
if (attrName == null || attrName.isEmpty()) {
return attrName;
}
char firstChar = attrName.subSequence(0, 1).charAt(0);
boolean prevCharIsUpperCaseOrUnderscore = Character.isUpperCase(firstChar) || firstChar == '_';
StringBuilder resultBuilder = new StringBuilder(attrName.length())
.append(Character.toLowerCase(firstChar));
for (char attrChar : attrName.substring(1).toCharArray()) {
boolean charIsUpperCase = Character.isUpperCase(attrChar);
if (!prevCharIsUpperCaseOrUnderscore && charIsUpperCase) {
resultBuilder.append("_");
}
resultBuilder.append(Character.toLowerCase(attrChar));
prevCharIsUpperCaseOrUnderscore = charIsUpperCase || attrChar == '_';
}
return resultBuilder.toString();
}

@Override
public void processRecord(MetricsRecord record) {
final String[] sources = MetricsUtil.splitRecordSource(record);
Expand Down

0 comments on commit b34a76b

Please sign in to comment.