From 8b0a9d39fb3f6c75b47c1511329e617cd1467135 Mon Sep 17 00:00:00 2001 From: Corey Fritz Date: Fri, 24 Aug 2018 07:34:56 -0400 Subject: [PATCH] NIFI-5535: Fixed metric tagging in DataDogReportingTask --- .../datadog/DataDogReportingTask.java | 52 +++++++------------ .../datadog/metrics/MetricsService.java | 17 ------ .../datadog/TestDataDogReportingTask.java | 37 +++++++------ 3 files changed, 37 insertions(+), 69 deletions(-) diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java index bba9658d0a4e..19825e830e1b 100644 --- a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java +++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/DataDogReportingTask.java @@ -20,7 +20,6 @@ import com.codahale.metrics.MetricRegistry; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.AtomicDouble; import com.yammer.metrics.core.VirtualMachineMetrics; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -38,7 +37,6 @@ import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.ReportingContext; import org.apache.nifi.reporting.datadog.metrics.MetricsService; -import org.coursera.metrics.datadog.DynamicTagsCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; @@ -47,6 +45,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; @Tags({"reporting", "datadog", "metrics"}) @CapabilityDescription("Publishes metrics from NiFi to datadog. For accurate and informative reporting, components should have unique names.") @@ -72,6 +71,7 @@ public class DataDogReportingTask extends AbstractReportingTask { .name("API key") .description("Datadog API key. If specified value is 'agent', local Datadog agent will be used.") .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); @@ -114,8 +114,7 @@ public void setup(final ConfigurationContext context) { metricsPrefix = METRICS_PREFIX.getDefaultValue(); environment = ENVIRONMENT.getDefaultValue(); virtualMachineMetrics = VirtualMachineMetrics.getInstance(); - ddMetricRegistryBuilder.setMetricRegistry(metricRegistry) - .setTags(metricsService.getAllTagsList()); + ddMetricRegistryBuilder.setMetricRegistry(metricRegistry); } @Override @@ -147,12 +146,12 @@ public void onTrigger(ReportingContext context) { protected void updateMetrics(Map metrics, Optional processorName, Map tags) { for (Map.Entry entry : metrics.entrySet()) { - final String metricName = buildMetricName(processorName, entry.getKey()); + final String metricName = buildMetricName(processorName, entry.getKey(), tags); logger.debug(metricName + ": " + entry.getValue()); //if metric is not registered yet - register it if (!metricsMap.containsKey(metricName)) { metricsMap.put(metricName, new AtomicDouble(entry.getValue())); - metricRegistry.register(metricName, new MetricGauge(metricName, tags)); + metricRegistry.register(metricName, (Gauge) () -> metricsMap.get(metricName).get()); } //set real time value to metrics map metricsMap.get(metricName).set(entry.getValue()); @@ -196,37 +195,13 @@ private void updateAllMetricGroups(ProcessGroupStatus processGroupStatus) { updateMetrics(metricsService.getDataFlowMetrics(processGroupStatus), Optional.absent(), defaultTags); } - private class MetricGauge implements Gauge, DynamicTagsCallback { - private Map tags; - private String metricName; - - public MetricGauge(String metricName, Map tagsMap) { - this.tags = tagsMap; - this.metricName = metricName; - } - - @Override - public Object getValue() { - return metricsMap.get(metricName).get(); - } - - @Override - public List getTags() { - List tagsList = Lists.newArrayList(); - for (Map.Entry entry : tags.entrySet()) { - tagsList.add(entry.getKey() + ":" + entry.getValue()); - } - return tagsList; - } - } - private void updateDataDogTransport(ReportingContext context) throws IOException { String dataDogTransport = context.getProperty(DATADOG_TRANSPORT).getValue(); if (dataDogTransport.equalsIgnoreCase(DATADOG_AGENT.getValue())) { ddMetricRegistryBuilder.build("agent"); } else if (dataDogTransport.equalsIgnoreCase(DATADOG_HTTP.getValue()) && context.getProperty(API_KEY).isSet()) { - ddMetricRegistryBuilder.build(context.getProperty(API_KEY).getValue()); + ddMetricRegistryBuilder.build(context.getProperty(API_KEY).evaluateAttributeExpressions().getValue()); } } @@ -258,8 +233,19 @@ private void populateOutputPortStatuses(final ProcessGroupStatus groupStatus, fi } } - private String buildMetricName(Optional processorName, String metricName) { - return metricsPrefix + "." + processorName.or("flow") + "." + metricName; + private String buildMetricName(Optional processorName, String metricName, Map tags) { + return metricsPrefix + "." + processorName.or("flow") + "." + metricName + encodeTags(tags); + } + + private String encodeTags(Map tags) { + if (tags == null || tags.isEmpty()) { + return ""; + } else { + return tags.entrySet() + .stream() + .map(e -> e.getKey() + ":" + e.getValue()) + .collect(Collectors.joining(",", "[", "]")); + } } protected MetricsService getMetricsService() { diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java index d1419ebd92a9..4204e8e74791 100644 --- a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java +++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricsService.java @@ -108,23 +108,6 @@ public Map getDataFlowMetrics(ProcessGroupStatus status) { return metrics; } - public List getAllTagsList() { - List tagsList = new ArrayList<>(); - tagsList.add("env"); - tagsList.add("dataflow_id"); - tagsList.add(MetricNames.PORT_ID); - tagsList.add(MetricNames.PORT_NAME); - tagsList.add(MetricNames.PORT_GROUP_ID); - tagsList.add(MetricNames.CONNECTION_ID); - tagsList.add(MetricNames.CONNECTION_NAME); - tagsList.add(MetricNames.CONNECTION_GROUP_ID); - tagsList.add(MetricNames.CONNECTION_SOURCE_ID); - tagsList.add(MetricNames.CONNECTION_SOURCE_NAME); - tagsList.add(MetricNames.CONNECTION_DESTINATION_ID); - tagsList.add(MetricNames.CONNECTTION_DESTINATION_NAME); - return tagsList; - } - //virtual machine metrics public Map getJVMMetrics(VirtualMachineMetrics virtualMachineMetrics) { final Map metrics = new HashMap<>(); diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java index 0d2f0e74b99d..616e07c280b9 100644 --- a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java +++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/test/java/org/apache/nifi/reporting/datadog/TestDataDogReportingTask.java @@ -35,7 +35,6 @@ import org.junit.Test; import org.mockito.Mockito; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Map; @@ -99,7 +98,7 @@ private void initContexts() { //test onTrigger method @Test - public void testOnTrigger() throws InitializationException, IOException { + public void testOnTrigger() throws InitializationException { DataDogReportingTask dataDogReportingTask = new TestableDataDogReportingTask(); dataDogReportingTask.initialize(initContext); dataDogReportingTask.setup(configurationContext); @@ -112,7 +111,7 @@ public void testOnTrigger() throws InitializationException, IOException { //test updating metrics of processors @Test - public void testUpdateMetricsProcessor() throws InitializationException, IOException { + public void testUpdateMetricsProcessor() throws InitializationException { MetricsService ms = new MetricsService(); Map processorMetrics = ms.getProcessorMetrics(procStatus); Map tagsMap = ImmutableMap.of("env", "test"); @@ -121,16 +120,16 @@ public void testUpdateMetricsProcessor() throws InitializationException, IOExcep dataDogReportingTask.setup(configurationContext); dataDogReportingTask.updateMetrics(processorMetrics, Optional.of("sampleProcessor"), tagsMap); - verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesReceivedLast5Minutes"), Mockito.any()); - verify(metricRegistry).register(eq("nifi.sampleProcessor.ActiveThreads"), Mockito.any()); - verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesWrittenLast5Minutes"), Mockito.any()); - verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesReadLast5Minutes"), Mockito.any()); - verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesSentLast5Minutes"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesReceivedLast5Minutes[env:test]"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.sampleProcessor.ActiveThreads[env:test]"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesWrittenLast5Minutes[env:test]"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.sampleProcessor.BytesReadLast5Minutes[env:test]"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.sampleProcessor.FlowFilesSentLast5Minutes[env:test]"), Mockito.any()); } //test updating JMV metrics @Test - public void testUpdateMetricsJVM() throws InitializationException, IOException { + public void testUpdateMetricsJVM() throws InitializationException { MetricsService ms = new MetricsService(); Map processorMetrics = ms.getJVMMetrics(virtualMachineMetrics); Map tagsMap = ImmutableMap.of("env", "test"); @@ -140,16 +139,16 @@ public void testUpdateMetricsJVM() throws InitializationException, IOException { dataDogReportingTask.setup(configurationContext); dataDogReportingTask.updateMetrics(processorMetrics, Optional.absent(), tagsMap); - verify(metricRegistry).register(eq("nifi.flow.jvm.heap_usage"), Mockito.any()); - verify(metricRegistry).register(eq("nifi.flow.jvm.thread_count"), Mockito.any()); - verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.terminated"), Mockito.any()); - verify(metricRegistry).register(eq("nifi.flow.jvm.heap_used"), Mockito.any()); - verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.runnable"), Mockito.any()); - verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.timed_waiting"), Mockito.any()); - verify(metricRegistry).register(eq("nifi.flow.jvm.uptime"), Mockito.any()); - verify(metricRegistry).register(eq("nifi.flow.jvm.daemon_thread_count"), Mockito.any()); - verify(metricRegistry).register(eq("nifi.flow.jvm.file_descriptor_usage"), Mockito.any()); - verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.blocked"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.heap_usage[env:test]"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.thread_count[env:test]"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.terminated[env:test]"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.heap_used[env:test]"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.runnable[env:test]"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.timed_waiting[env:test]"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.uptime[env:test]"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.daemon_thread_count[env:test]"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.file_descriptor_usage[env:test]"), Mockito.any()); + verify(metricRegistry).register(eq("nifi.flow.jvm.thread_states.blocked[env:test]"), Mockito.any()); }