From d9b793575c893ae10451a9a4653717f45dcf04af Mon Sep 17 00:00:00 2001 From: avijayanhwx Date: Thu, 18 Oct 2018 10:16:30 -0700 Subject: [PATCH] [AMBARI-24797] Support regex based metric inclusion in KafkaTimelineMetricsReporter. (#8) --- .../sink/kafka/KafkaTimelineMetricsReporter.java | 12 +++++++++++- .../sink/kafka/KafkaTimelineMetricsReporterTest.java | 3 +++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java index f07d5089..cf529eac 100644 --- a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java +++ b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java @@ -45,6 +45,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -80,6 +81,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink private static final String TIMELINE_DEFAULT_PROTOCOL = "http"; private static final String EXCLUDED_METRICS_PROPERTY = "external.kafka.metrics.exclude.prefix"; private static final String INCLUDED_METRICS_PROPERTY = "external.kafka.metrics.include.prefix"; + private static final String INCLUDED_METRICS_REGEX_PROPERTY = "external.kafka.metrics.include.regex"; private volatile boolean initialized = false; private boolean running = false; @@ -97,6 +99,7 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink private String[] excludedMetricsPrefixes; private String[] includedMetricsPrefixes; + private String[] includedMetricsRegex; // Local cache to avoid prefix matching everytime private Set excludedMetrics = new HashSet<>(); private boolean hostInMemoryAggregationEnabled; @@ -214,6 +217,13 @@ public void init(VerifiableProperties props) { includedMetricsPrefixes = includedMetricsStr.trim().split(","); } + // Inclusion override + String includedMetricsRegexStr = props.getString(INCLUDED_METRICS_REGEX_PROPERTY, ""); + if (!StringUtils.isEmpty(includedMetricsRegexStr.trim())) { + LOG.info("Including metrics which match the following regex patterns : " + includedMetricsRegexStr); + includedMetricsRegex = includedMetricsRegexStr.trim().split(","); + } + initializeReporter(); if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) { startReporter(metricsConfig.pollingIntervalSecs()); @@ -273,7 +283,7 @@ protected boolean isExcludedMetric(String metricName) { ", include: " + StringUtils.startsWithAny(metricName, includedMetricsPrefixes)); } if (StringUtils.startsWithAny(metricName, excludedMetricsPrefixes)) { - if (!StringUtils.startsWithAny(metricName, includedMetricsPrefixes)) { + if (!(StringUtils.startsWithAny(metricName, includedMetricsPrefixes) || Arrays.stream(includedMetricsRegex).anyMatch(metricName::matches))) { excludedMetrics.add(metricName); return true; } diff --git a/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java b/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java index b05190c6..cc562279 100644 --- a/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java +++ b/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java @@ -84,6 +84,7 @@ public void setUp() throws Exception { properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true"); properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c"); properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d"); + properties.setProperty("external.kafka.metrics.include.regex", "a.b.c.*.f"); properties.setProperty("kafka.timeline.metrics.instanceId", "cluster"); properties.setProperty("kafka.timeline.metrics.set.instanceId", "false"); props = new VerifiableProperties(properties); @@ -118,6 +119,7 @@ public void testReporterStartStopHttps() { properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true"); properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c"); properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d"); + properties.setProperty("external.kafka.metrics.include.regex", "a.b.c.*.f"); properties.setProperty("kafka.timeline.metrics.protocol", "https"); properties.setProperty("kafka.timeline.metrics.truststore.path", ""); properties.setProperty("kafka.timeline.metrics.truststore.type", ""); @@ -143,6 +145,7 @@ public void testMetricsExclusionPolicy() throws Exception { Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b")); Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.d")); Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.d.e")); + Assert.assertFalse(kafkaTimelineMetricsReporter.isExcludedMetric("a.b.c.e.f")); kafkaTimelineMetricsReporter.stopReporter(); verifyAll();