From 6c9e00f347d4468e5a85ef2bf840f3497b6786c0 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 23 Jan 2018 23:15:18 +0100 Subject: [PATCH 1/5] NIFI-4809 - Implement a SiteToSiteMetricsReportingTask --- .../nifi-ambari-reporting-task/pom.xml | 14 +- .../reporting/ambari/AmbariReportingTask.java | 4 +- .../ambari/api/TestMetricsBuilder.java | 2 + .../ambari/metrics/TestMetricsService.java | 2 + .../nifi-reporting-utils/pom.xml | 10 ++ .../reporting/util}/metrics/MetricNames.java | 2 +- .../util}/metrics/MetricsService.java | 2 +- .../util/metrics}/api/MetricBuilder.java | 2 +- .../util/metrics}/api/MetricFields.java | 2 +- .../util/metrics}/api/MetricsBuilder.java | 2 +- .../SiteToSiteMetricsReportingTask.java | 151 ++++++++++++++++++ .../org.apache.nifi.reporting.ReportingTask | 3 +- 12 files changed, 178 insertions(+), 18 deletions(-) rename nifi-nar-bundles/{nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari => nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util}/metrics/MetricNames.java (97%) rename nifi-nar-bundles/{nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari => nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util}/metrics/MetricsService.java (99%) rename nifi-nar-bundles/{nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari => nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics}/api/MetricBuilder.java (98%) rename nifi-nar-bundles/{nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari => nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics}/api/MetricFields.java (95%) rename nifi-nar-bundles/{nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari => nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics}/api/MetricsBuilder.java (98%) create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml index 5f30ec5170ce..e451303e11ab 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml @@ -29,21 +29,11 @@ org.glassfish.jersey.core jersey-client - - org.glassfish - javax.json - 1.0.4 - javax.json javax.json-api 1.0 - - com.yammer.metrics - metrics-core - 2.2.0 - org.apache.nifi nifi-api @@ -53,6 +43,10 @@ nifi-utils 1.6.0-SNAPSHOT + + org.apache.nifi + nifi-reporting-utils + org.apache.nifi diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java index a5ce9f45d598..eadef74aa0e3 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/AmbariReportingTask.java @@ -28,8 +28,8 @@ import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.AbstractReportingTask; import org.apache.nifi.reporting.ReportingContext; -import org.apache.nifi.reporting.ambari.api.MetricsBuilder; -import org.apache.nifi.reporting.ambari.metrics.MetricsService; +import org.apache.nifi.reporting.util.metrics.MetricsService; +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder; import org.apache.nifi.scheduling.SchedulingStrategy; import javax.json.Json; diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java index cdaa45343549..9b96eb9626ed 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/api/TestMetricsBuilder.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.reporting.ambari.api; +import org.apache.nifi.reporting.util.metrics.api.MetricFields; +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder; import org.junit.Assert; import org.junit.Test; diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java index 93224eb190d8..ec0cf6e46bdd 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/test/java/org/apache/nifi/reporting/ambari/metrics/TestMetricsService.java @@ -19,6 +19,8 @@ import com.yammer.metrics.core.VirtualMachineMetrics; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.reporting.util.metrics.MetricNames; +import org.apache.nifi.reporting.util.metrics.MetricsService; import org.junit.Assert; import org.junit.Test; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml index e118271bd332..e05b88b3fb6f 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/pom.xml @@ -40,6 +40,16 @@ commons-lang3 3.7 + + com.yammer.metrics + metrics-core + 2.2.0 + + + org.glassfish + javax.json + 1.0.4 + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java similarity index 97% rename from nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java index 20cfa4e282e7..f76246f09a74 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricNames.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.reporting.ambari.metrics; +package org.apache.nifi.reporting.util.metrics; /** * The Metric names to send to Ambari. diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java similarity index 99% rename from nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java index cef257dad920..59402252ac39 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/metrics/MetricsService.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.reporting.ambari.metrics; +package org.apache.nifi.reporting.util.metrics; import com.yammer.metrics.core.VirtualMachineMetrics; import org.apache.nifi.controller.status.ProcessGroupStatus; diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java similarity index 98% rename from nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java index 8e234ce84135..81fb0219c9da 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricBuilder.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricBuilder.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.reporting.ambari.api; +package org.apache.nifi.reporting.util.metrics.api; import javax.json.JsonBuilderFactory; import javax.json.JsonObject; diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java similarity index 95% rename from nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java index 1c1629c511d1..4c451eaf4429 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricFields.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricFields.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.reporting.ambari.api; +package org.apache.nifi.reporting.util.metrics.api; public interface MetricFields { diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java similarity index 98% rename from nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java index 11b4db5b2101..36947206da6f 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/src/main/java/org/apache/nifi/reporting/ambari/api/MetricsBuilder.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/api/MetricsBuilder.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.reporting.ambari.api; +package org.apache.nifi.reporting.util.metrics.api; import javax.json.JsonArrayBuilder; import javax.json.JsonBuilderFactory; diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java new file mode 100644 index 000000000000..6a22a68aee4f --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting; + +import java.lang.management.ManagementFactory; +import java.lang.management.OperatingSystemMXBean; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import javax.json.Json; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.reporting.util.metrics.MetricsService; +import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder; + +import com.yammer.metrics.core.VirtualMachineMetrics; + +@Tags({"status", "metrics", "site", "site to site"}) +@CapabilityDescription("Publishes same metrics as the Ambari Reporting task using the Site To Site protocol. " + + "Metrics are formatted according to the Ambari Metrics API.") +public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingTask { + + static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + + static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder() + .name("Application ID") + .description("The Application ID to be included in the metrics sent to Ambari") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("nifi") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() + .name("Hostname") + .description("The Hostname of this NiFi instance to be included in the metrics sent to Ambari") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("${hostname(true)}") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private final MetricsService metricsService = new MetricsService(); + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(HOSTNAME); + properties.add(APPLICATION_ID); + properties.remove(BATCH_SIZE); + return properties; + } + + @Override + public void onTrigger(final ReportingContext context) { + final boolean isClustered = context.isClustered(); + final String nodeId = context.getClusterNodeIdentifier(); + if (nodeId == null && isClustered) { + getLogger().debug("This instance of NiFi is configured for clustering, but the Cluster Node Identifier is not yet available. " + + "Will wait for Node Identifier to be established."); + return; + } + + final VirtualMachineMetrics virtualMachineMetrics = VirtualMachineMetrics.getInstance(); + final Map config = Collections.emptyMap(); + final JsonBuilderFactory factory = Json.createBuilderFactory(config); + + final String applicationId = context.getProperty(APPLICATION_ID).evaluateAttributeExpressions().getValue(); + final String hostname = context.getProperty(HOSTNAME).evaluateAttributeExpressions().getValue(); + final ProcessGroupStatus status = context.getEventAccess().getControllerStatus(); + + if(status != null) { + final Map statusMetrics = metricsService.getMetrics(status, false); + final Map jvmMetrics = metricsService.getMetrics(virtualMachineMetrics); + + final MetricsBuilder metricsBuilder = new MetricsBuilder(factory); + final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); + final double systemLoad = os.getSystemLoadAverage(); + + final JsonObject metricsObject = metricsBuilder + .applicationId(applicationId) + .instanceId(status.getId()) + .hostname(hostname) + .timestamp(System.currentTimeMillis()) + .addAllMetrics(statusMetrics) + .addAllMetrics(jvmMetrics) + .metric("available.cores", String.valueOf(os.getAvailableProcessors())) + .metric("load.average.1min", String.valueOf(systemLoad >= 0 ? systemLoad : -1)) + .build(); + + try { + long start = System.nanoTime(); + final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); + if (transaction == null) { + getLogger().debug("All destination nodes are penalized; will attempt to send data later"); + return; + } + + final Map attributes = new HashMap<>(); + final String transactionId = UUID.randomUUID().toString(); + attributes.put("reporting.task.transaction.id", transactionId); + attributes.put("mime.type", "application/json"); + + final byte[] data = metricsObject.toString().getBytes(StandardCharsets.UTF_8); + transaction.send(data, attributes); + transaction.confirm(); + transaction.complete(); + + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + getLogger().info("Successfully sent metrics to destination in {}ms; Transaction ID = {}", new Object[]{transferMillis, transactionId}); + } catch (final Exception e) { + throw new ProcessException("Failed to send metrics to destination due to:" + e.getMessage(), e); + } + + } else { + getLogger().error("No process group status to retrieve metrics"); + } + + } + +} diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask index 0aced9465747..652b58186ee1 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask @@ -15,4 +15,5 @@ org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask org.apache.nifi.reporting.SiteToSiteBulletinReportingTask -org.apache.nifi.reporting.SiteToSiteStatusReportingTask \ No newline at end of file +org.apache.nifi.reporting.SiteToSiteStatusReportingTask +org.apache.nifi.reporting.SiteToSiteMetricsReportingTask \ No newline at end of file From 4a4f9bc16ffe2ef3b4bcb49f8509714bd95075b3 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Wed, 28 Feb 2018 21:51:27 +0100 Subject: [PATCH 2/5] address review comments --- .../reporting/SiteToSiteMetricsReportingTask.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java index 6a22a68aee4f..00570daf2c90 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java @@ -53,8 +53,9 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder() - .name("Application ID") - .description("The Application ID to be included in the metrics sent to Ambari") + .name("s2s-metrics-application-id") + .displayName("Application ID") + .description("The Application ID to be included in the metrics") .required(true) .expressionLanguageSupported(true) .defaultValue("nifi") @@ -62,8 +63,9 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT .build(); static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder() - .name("Hostname") - .description("The Hostname of this NiFi instance to be included in the metrics sent to Ambari") + .name("s2s-metrics-hostname") + .displayName("Hostname") + .description("The Hostname of this NiFi instance to be included in the metrics") .required(true) .expressionLanguageSupported(true) .defaultValue("${hostname(true)}") @@ -129,6 +131,9 @@ public void onTrigger(final ReportingContext context) { final Map attributes = new HashMap<>(); final String transactionId = UUID.randomUUID().toString(); attributes.put("reporting.task.transaction.id", transactionId); + attributes.put("reporting.task.name", getName()); + attributes.put("reporting.task.uuid", getIdentifier()); + attributes.put("reporting.task.type", this.getClass().getSimpleName()); attributes.put("mime.type", "application/json"); final byte[] data = metricsObject.toString().getBytes(StandardCharsets.UTF_8); From 5551ce53e922c6bb77642db1bbb73038b2c8a64d Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Sat, 3 Mar 2018 19:19:17 +0100 Subject: [PATCH 3/5] NIFI-4809 - Added Record Writer property --- .../reporting/util/metrics/MetricNames.java | 4 + .../util/metrics/MetricsService.java | 191 +++++++++++++----- .../nifi-site-to-site-reporting-task/pom.xml | 33 ++- .../AbstractSiteToSiteReportingTask.java | 103 +++++++++- .../SiteToSiteBulletinReportingTask.java | 18 +- .../SiteToSiteMetricsReportingTask.java | 90 +++++++-- .../SiteToSiteProvenanceReportingTask.java | 28 +-- .../SiteToSiteStatusReportingTask.java | 37 +--- .../src/main/resources/schema-metrics.avsc | 37 ++++ 9 files changed, 399 insertions(+), 142 deletions(-) create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java index f76246f09a74..19bb90dc1736 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricNames.java @@ -52,4 +52,8 @@ public interface MetricNames { String JVM_GC_RUNS = "jvm.gc.runs"; String JVM_GC_TIME = "jvm.gc.time"; + // OS Metrics + String LOAD1MN = "loadAverage1min"; + String CORES = "availableCores"; + } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java index 59402252ac39..ed3922a1f559 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/metrics/MetricsService.java @@ -16,14 +16,20 @@ */ package org.apache.nifi.reporting.util.metrics; -import com.yammer.metrics.core.VirtualMachineMetrics; -import org.apache.nifi.controller.status.ProcessGroupStatus; -import org.apache.nifi.controller.status.ProcessorStatus; - import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; + +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.reporting.util.metrics.api.MetricFields; + +import com.yammer.metrics.core.VirtualMachineMetrics; + /** * A service used to produce key/value metrics based on a given input. */ @@ -38,21 +44,42 @@ public class MetricsService { */ public Map getMetrics(ProcessGroupStatus status, boolean appendPgId) { final Map metrics = new HashMap<>(); - metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), String.valueOf(status.getFlowFilesReceived())); - metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), String.valueOf(status.getBytesReceived())); - metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), String.valueOf(status.getFlowFilesSent())); - metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), String.valueOf(status.getBytesSent())); - metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedCount())); - metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), String.valueOf(status.getQueuedContentSize())); - metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), String.valueOf(status.getBytesRead())); - metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), String.valueOf(status.getBytesWritten())); - metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), String.valueOf(status.getActiveThreadCount())); + + Map longMetrics = getLongMetrics(status, appendPgId); + for (String key : longMetrics.keySet()) { + metrics.put(key, String.valueOf(longMetrics.get(key))); + } + + Map integerMetrics = getIntegerMetrics(status, appendPgId); + for (String key : integerMetrics.keySet()) { + metrics.put(key, String.valueOf(integerMetrics.get(key))); + } + + return metrics; + } + + private Map getIntegerMetrics(ProcessGroupStatus status, boolean appendPgId) { + final Map metrics = new HashMap<>(); + metrics.put(appendPgId(MetricNames.FLOW_FILES_RECEIVED, status, appendPgId), status.getFlowFilesReceived()); + metrics.put(appendPgId(MetricNames.FLOW_FILES_SENT, status, appendPgId), status.getFlowFilesSent()); + metrics.put(appendPgId(MetricNames.FLOW_FILES_QUEUED, status, appendPgId), status.getQueuedCount()); + metrics.put(appendPgId(MetricNames.ACTIVE_THREADS, status, appendPgId), status.getActiveThreadCount()); + return metrics; + } + + private Map getLongMetrics(ProcessGroupStatus status, boolean appendPgId) { + final Map metrics = new HashMap<>(); + metrics.put(appendPgId(MetricNames.BYTES_RECEIVED, status, appendPgId), status.getBytesReceived()); + metrics.put(appendPgId(MetricNames.BYTES_SENT, status, appendPgId), status.getBytesSent()); + metrics.put(appendPgId(MetricNames.BYTES_QUEUED, status, appendPgId), status.getQueuedContentSize()); + metrics.put(appendPgId(MetricNames.BYTES_READ, status, appendPgId), status.getBytesRead()); + metrics.put(appendPgId(MetricNames.BYTES_WRITTEN, status, appendPgId), status.getBytesWritten()); final long durationNanos = calculateProcessingNanos(status); - metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), String.valueOf(durationNanos)); + metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_NANOS, status, appendPgId), durationNanos); final long durationSeconds = TimeUnit.SECONDS.convert(durationNanos, TimeUnit.NANOSECONDS); - metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), String.valueOf(durationSeconds)); + metrics.put(appendPgId(MetricNames.TOTAL_TASK_DURATION_SECONDS, status, appendPgId), durationSeconds); return metrics; } @@ -65,40 +92,20 @@ public Map getMetrics(ProcessGroupStatus status, boolean appendPg */ public Map getMetrics(VirtualMachineMetrics virtualMachineMetrics) { final Map metrics = new HashMap<>(); - metrics.put(MetricNames.JVM_UPTIME, String.valueOf(virtualMachineMetrics.uptime())); - metrics.put(MetricNames.JVM_HEAP_USED, String.valueOf(virtualMachineMetrics.heapUsed())); - metrics.put(MetricNames.JVM_HEAP_USAGE, String.valueOf(virtualMachineMetrics.heapUsage())); - metrics.put(MetricNames.JVM_NON_HEAP_USAGE, String.valueOf(virtualMachineMetrics.nonHeapUsage())); - metrics.put(MetricNames.JVM_THREAD_COUNT, String.valueOf(virtualMachineMetrics.threadCount())); - metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, String.valueOf(virtualMachineMetrics.daemonThreadCount())); - metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, String.valueOf(virtualMachineMetrics.fileDescriptorUsage())); - for (Map.Entry entry : virtualMachineMetrics.threadStatePercentages().entrySet()) { - final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue())); - switch(entry.getKey()) { - case BLOCKED: - metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, String.valueOf(normalizedValue)); - break; - case RUNNABLE: - metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, String.valueOf(normalizedValue)); - break; - case TERMINATED: - metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, String.valueOf(normalizedValue)); - break; - case TIMED_WAITING: - metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, String.valueOf(normalizedValue)); - break; - default: - break; - } + Map integerMetrics = getIntegerMetrics(virtualMachineMetrics); + for (String key : integerMetrics.keySet()) { + metrics.put(key, String.valueOf(integerMetrics.get(key))); } - for (Map.Entry entry : virtualMachineMetrics.garbageCollectors().entrySet()) { - final String gcName = entry.getKey().replace(" ", ""); - final long runs = entry.getValue().getRuns(); - final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS); - metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, String.valueOf(runs)); - metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, String.valueOf(timeMS)); + Map longMetrics = getLongMetrics(virtualMachineMetrics); + for (String key : longMetrics.keySet()) { + metrics.put(key, String.valueOf(longMetrics.get(key))); + } + + Map doubleMetrics = getDoubleMetrics(virtualMachineMetrics); + for (String key : doubleMetrics.keySet()) { + metrics.put(key, String.valueOf(doubleMetrics.get(key))); } return metrics; @@ -128,4 +135,96 @@ private String appendPgId(String name, ProcessGroupStatus status, boolean append } } + private Map getDoubleMetrics(VirtualMachineMetrics virtualMachineMetrics) { + final Map metrics = new HashMap<>(); + metrics.put(MetricNames.JVM_HEAP_USED, virtualMachineMetrics.heapUsed()); + metrics.put(MetricNames.JVM_HEAP_USAGE, virtualMachineMetrics.heapUsage()); + metrics.put(MetricNames.JVM_NON_HEAP_USAGE, virtualMachineMetrics.nonHeapUsage()); + metrics.put(MetricNames.JVM_FILE_DESCRIPTOR_USAGE, virtualMachineMetrics.fileDescriptorUsage()); + return metrics; + } + + private Map getLongMetrics(VirtualMachineMetrics virtualMachineMetrics) { + final Map metrics = new HashMap<>(); + metrics.put(MetricNames.JVM_UPTIME, virtualMachineMetrics.uptime()); + + for (Map.Entry entry : virtualMachineMetrics.garbageCollectors().entrySet()) { + final String gcName = entry.getKey().replace(" ", ""); + final long runs = entry.getValue().getRuns(); + final long timeMS = entry.getValue().getTime(TimeUnit.MILLISECONDS); + metrics.put(MetricNames.JVM_GC_RUNS + "." + gcName, runs); + metrics.put(MetricNames.JVM_GC_TIME + "." + gcName, timeMS); + } + + return metrics; + } + + private Map getIntegerMetrics(VirtualMachineMetrics virtualMachineMetrics) { + final Map metrics = new HashMap<>(); + metrics.put(MetricNames.JVM_DAEMON_THREAD_COUNT, virtualMachineMetrics.daemonThreadCount()); + metrics.put(MetricNames.JVM_THREAD_COUNT, virtualMachineMetrics.threadCount()); + + for (Map.Entry entry : virtualMachineMetrics.threadStatePercentages().entrySet()) { + final int normalizedValue = (int) (100 * (entry.getValue() == null ? 0 : entry.getValue())); + switch(entry.getKey()) { + case BLOCKED: + metrics.put(MetricNames.JVM_THREAD_STATES_BLOCKED, normalizedValue); + break; + case RUNNABLE: + metrics.put(MetricNames.JVM_THREAD_STATES_RUNNABLE, normalizedValue); + break; + case TERMINATED: + metrics.put(MetricNames.JVM_THREAD_STATES_TERMINATED, normalizedValue); + break; + case TIMED_WAITING: + metrics.put(MetricNames.JVM_THREAD_STATES_TIMED_WAITING, normalizedValue); + break; + default: + break; + } + } + + return metrics; + } + + public JsonObject getMetrics(JsonBuilderFactory factory, ProcessGroupStatus status, VirtualMachineMetrics virtualMachineMetrics, + String applicationId, String id, String hostname, long currentTimeMillis, int availableProcessors, double systemLoad) { + JsonObjectBuilder objectBuilder = factory.createObjectBuilder() + .add(MetricFields.APP_ID, applicationId) + .add(MetricFields.HOSTNAME, hostname) + .add(MetricFields.INSTANCE_ID, status.getId()) + .add(MetricFields.TIMESTAMP, currentTimeMillis); + + objectBuilder + .add(MetricNames.CORES, availableProcessors) + .add(MetricNames.LOAD1MN, systemLoad); + + Map integerMetrics = getIntegerMetrics(virtualMachineMetrics); + for (String key : integerMetrics.keySet()) { + objectBuilder.add(key.replaceAll("\\.", ""), integerMetrics.get(key)); + } + + Map longMetrics = getLongMetrics(virtualMachineMetrics); + for (String key : longMetrics.keySet()) { + objectBuilder.add(key.replaceAll("\\.", ""), longMetrics.get(key)); + } + + Map doubleMetrics = getDoubleMetrics(virtualMachineMetrics); + for (String key : doubleMetrics.keySet()) { + objectBuilder.add(key.replaceAll("\\.", ""), doubleMetrics.get(key)); + } + + Map longPgMetrics = getLongMetrics(status, false); + for (String key : longPgMetrics.keySet()) { + objectBuilder.add(key, longPgMetrics.get(key)); + } + + Map integerPgMetrics = getIntegerMetrics(status, false); + for (String key : integerPgMetrics.keySet()) { + objectBuilder.add(key, integerPgMetrics.get(key)); + } + + return objectBuilder.build(); + } + } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml index d88352917dad..a25dc2fab34f 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml @@ -54,6 +54,23 @@ nifi-site-to-site-client 1.6.0-SNAPSHOT + + org.apache.nifi + nifi-record-serialization-service-api + + + org.apache.nifi + nifi-record-serialization-services + 1.6.0-SNAPSHOT + + + org.apache.nifi + nifi-record + + + org.apache.nifi + nifi-avro-record-utils + org.glassfish javax.json @@ -85,8 +102,22 @@ junit junit - 4.12 test + + + + + org.apache.rat + apache-rat-plugin + + + src/main/resources/schema-metrics.avsc + + + + + + diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java index 28106a676224..c3032039e2db 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java @@ -16,6 +16,18 @@ */ package org.apache.nifi.reporting; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.json.JsonObjectBuilder; +import javax.json.JsonValue; +import javax.net.ssl.SSLContext; + import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; @@ -24,27 +36,39 @@ import org.apache.nifi.components.Validator; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.json.JsonTreeRowRecordReader; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.remote.client.SiteToSiteClient; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.remote.protocol.http.HttpProxy; import org.apache.nifi.remote.util.SiteToSiteRestApiClient; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.StringUtils; -import javax.net.ssl.SSLContext; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - /** * Base class for ReportingTasks that send data over site-to-site. */ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingTask { + + protected static final String LAST_EVENT_ID_KEY = "last_event_id"; protected static final String DESTINATION_URL_PATH = "/nifi"; + protected static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + + private final String dateFormat = RecordFieldType.DATE.getDefaultFormat(); + private final String timeFormat = RecordFieldType.TIME.getDefaultFormat(); + private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat(); static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() .name("Destination URL") @@ -140,8 +164,16 @@ public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingT .sensitive(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records.") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(false) + .build(); protected volatile SiteToSiteClient siteToSiteClient; + protected volatile RecordSchema recordSchema; @Override protected List getSupportedPropertyDescriptors() { @@ -187,7 +219,7 @@ public void reportEvent(final Severity severity, final String category, final St final SiteToSiteTransportProtocol mode = SiteToSiteTransportProtocol.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue()); final HttpProxy httpProxy = mode.equals(SiteToSiteTransportProtocol.RAW) || StringUtils.isEmpty(context.getProperty(HTTP_PROXY_HOSTNAME).getValue()) ? null : new HttpProxy(context.getProperty(HTTP_PROXY_HOSTNAME).getValue(), context.getProperty(HTTP_PROXY_PORT).asInteger(), - context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue()); + context.getProperty(HTTP_PROXY_USERNAME).getValue(), context.getProperty(HTTP_PROXY_PASSWORD).getValue()); siteToSiteClient = new SiteToSiteClient.Builder() .urls(SiteToSiteRestApiClient.parseClusterUrls(destinationUrl)) @@ -214,6 +246,33 @@ protected SiteToSiteClient getClient() { return this.siteToSiteClient; } + protected byte[] getData(final ReportingContext context, InputStream in, Map attributes) { + try (final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, getLogger(), recordSchema, dateFormat, timeFormat, timestampFormat)) { + + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordSchema writeSchema = writerFactory.getSchema(null, recordSchema); + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) { + writer.beginRecordSet(); + + Record record; + while ((record = reader.nextRecord()) != null) { + writer.write(record); + } + + final WriteResult writeResult = writer.finishRecordSet(); + + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + } + + return out.toByteArray(); + } catch (IOException | SchemaNotFoundException | MalformedRecordException e) { + throw new ProcessException("Failed to write metrics using record writer: " + e.getMessage(), e); + } + } + static class NiFiUrlValidator implements Validator { @Override public ValidationResult validate(final String subject, final String input, final ValidationContext context) { @@ -235,4 +294,34 @@ public ValidationResult validate(final String subject, final String input, final } } } + + protected void addField(final JsonObjectBuilder builder, final String key, final Long value) { + if (value != null) { + builder.add(key, value.longValue()); + } + } + + protected void addField(final JsonObjectBuilder builder, final String key, final Integer value) { + if (value != null) { + builder.add(key, value.intValue()); + } + } + + protected void addField(final JsonObjectBuilder builder, final String key, final String value) { + if (value == null) { + return; + } + + builder.add(key, value); + } + + protected void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) { + if (value == null) { + if (allowNullValues) { + builder.add(key, JsonValue.NULL); + } + } else { + builder.add(key, value); + } + } } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java index 03d8f3b724f5..eddf4bed80c7 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteBulletinReportingTask.java @@ -67,9 +67,6 @@ @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min") public class SiteToSiteBulletinReportingTask extends AbstractSiteToSiteReportingTask { - static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; - static final String LAST_EVENT_ID_KEY = "last_event_id"; - static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() .name("Platform") .description("The value to use for the platform field in each provenance event.") @@ -194,7 +191,7 @@ public void onTrigger(final ReportingContext context) { lastSentBulletinId = currMaxId; } - static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df, + private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final Bulletin bulletin, final DateFormat df, final String platform, final String nodeIdentifier) { addField(builder, "objectId", UUID.randomUUID().toString()); @@ -215,17 +212,4 @@ static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBu return builder.build(); } - private static void addField(final JsonObjectBuilder builder, final String key, final Long value) { - if (value != null) { - builder.add(key, value.longValue()); - } - } - - private static void addField(final JsonObjectBuilder builder, final String key, final String value) { - if (value == null) { - return; - } - builder.add(key, value); - } - } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java index 00570daf2c90..c12f868224b4 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java @@ -17,10 +17,14 @@ package org.apache.nifi.reporting; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; import java.lang.management.ManagementFactory; import java.lang.management.OperatingSystemMXBean; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -32,25 +36,36 @@ import javax.json.JsonBuilderFactory; import javax.json.JsonObject; +import org.apache.avro.Schema; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.remote.Transaction; import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.reporting.util.metrics.MetricNames; import org.apache.nifi.reporting.util.metrics.MetricsService; import org.apache.nifi.reporting.util.metrics.api.MetricsBuilder; import com.yammer.metrics.core.VirtualMachineMetrics; @Tags({"status", "metrics", "site", "site to site"}) -@CapabilityDescription("Publishes same metrics as the Ambari Reporting task using the Site To Site protocol. " - + "Metrics are formatted according to the Ambari Metrics API.") +@CapabilityDescription("Publishes same metrics as the Ambari Reporting task using the Site To Site protocol.") public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingTask { - static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + static final AllowableValue AMBARI_FORMAT = new AllowableValue("ambari-format", "Ambari Format", "Metrics will be formatted" + + " according to the Ambari Metrics API. See Additional Details in Usage documentation."); + static final AllowableValue RECORD_FORMAT = new AllowableValue("record-format", "Record Format", "Metrics will be formatted" + + " using the Record Writer property of this reporting task. See Additional Details in Usage documentation to" + + " have the description of the default schema."); static final PropertyDescriptor APPLICATION_ID = new PropertyDescriptor.Builder() .name("s2s-metrics-application-id") @@ -72,17 +87,50 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder() + .name("s2s-metrics-format") + .displayName("Output format") + .description("The output format that will be used for the metrics") + .required(true) + .allowableValues(AMBARI_FORMAT, RECORD_FORMAT) + .defaultValue(AMBARI_FORMAT.getValue()) + .addValidator(Validator.VALID) + .build(); + private final MetricsService metricsService = new MetricsService(); + public SiteToSiteMetricsReportingTask() throws IOException { + final InputStream schema = getClass().getClassLoader().getResourceAsStream("schema-metrics.avsc"); + recordSchema = AvroTypeUtil.createSchema(new Schema.Parser().parse(schema)); + } + @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); properties.add(HOSTNAME); properties.add(APPLICATION_ID); + properties.add(FORMAT); + properties.add(RECORD_WRITER); properties.remove(BATCH_SIZE); return properties; } + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List problems = new ArrayList<>(super.customValidate(validationContext)); + + final boolean isWriterSet = validationContext.getProperty(RECORD_WRITER).isSet(); + if (validationContext.getProperty(FORMAT).getValue().equals(RECORD_FORMAT.getValue()) && !isWriterSet) { + problems.add(new ValidationResult.Builder() + .input("Record Writer") + .valid(false) + .explanation("If using " + RECORD_FORMAT.getDisplayName() + ", a record writer needs to be set.") + .build()); + } + + return problems; + } + @Override public void onTrigger(final ReportingContext context) { final boolean isClustered = context.isClustered(); @@ -109,16 +157,28 @@ public void onTrigger(final ReportingContext context) { final OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean(); final double systemLoad = os.getSystemLoadAverage(); - final JsonObject metricsObject = metricsBuilder - .applicationId(applicationId) - .instanceId(status.getId()) - .hostname(hostname) - .timestamp(System.currentTimeMillis()) - .addAllMetrics(statusMetrics) - .addAllMetrics(jvmMetrics) - .metric("available.cores", String.valueOf(os.getAvailableProcessors())) - .metric("load.average.1min", String.valueOf(systemLoad >= 0 ? systemLoad : -1)) - .build(); + byte[] data; + final Map attributes = new HashMap<>(); + + if(context.getProperty(FORMAT).getValue().equals(AMBARI_FORMAT.getValue())) { + final JsonObject metricsObject = metricsBuilder + .applicationId(applicationId) + .instanceId(status.getId()) + .hostname(hostname) + .timestamp(System.currentTimeMillis()) + .addAllMetrics(statusMetrics) + .addAllMetrics(jvmMetrics) + .metric(MetricNames.CORES, String.valueOf(os.getAvailableProcessors())) + .metric(MetricNames.LOAD1MN, String.valueOf(systemLoad >= 0 ? systemLoad : -1)) + .build(); + + data = metricsObject.toString().getBytes(StandardCharsets.UTF_8); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + } else { + final JsonObject metricsObject = metricsService.getMetrics(factory, status, virtualMachineMetrics, applicationId, status.getId(), + hostname, System.currentTimeMillis(), os.getAvailableProcessors(), systemLoad >= 0 ? systemLoad : -1); + data = getData(context, new ByteArrayInputStream(metricsObject.toString().getBytes(StandardCharsets.UTF_8)), attributes); + } try { long start = System.nanoTime(); @@ -128,15 +188,12 @@ public void onTrigger(final ReportingContext context) { return; } - final Map attributes = new HashMap<>(); final String transactionId = UUID.randomUUID().toString(); attributes.put("reporting.task.transaction.id", transactionId); attributes.put("reporting.task.name", getName()); attributes.put("reporting.task.uuid", getIdentifier()); attributes.put("reporting.task.type", this.getClass().getSimpleName()); - attributes.put("mime.type", "application/json"); - final byte[] data = metricsObject.toString().getBytes(StandardCharsets.UTF_8); transaction.send(data, attributes); transaction.confirm(); transaction.complete(); @@ -150,7 +207,6 @@ public void onTrigger(final ReportingContext context) { } else { getLogger().error("No process group status to retrieve metrics"); } - } } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index f7a59db34d35..fe407eb90be9 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -45,7 +45,6 @@ import javax.json.JsonBuilderFactory; import javax.json.JsonObject; import javax.json.JsonObjectBuilder; -import javax.json.JsonValue; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -75,9 +74,6 @@ ) public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReportingTask { - static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; - static final String LAST_EVENT_ID_KEY = "last_event_id"; - static final AllowableValue BEGINNING_OF_STREAM = new AllowableValue("beginning-of-stream", "Beginning of Stream", "Start reading provenance Events from the beginning of the stream (the oldest event first)"); static final AllowableValue END_OF_STREAM = new AllowableValue("end-of-stream", "End of Stream", @@ -306,7 +302,7 @@ public void onTrigger(final ReportingContext context) { } - static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df, + private JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df, final String componentName, final String processGroupId, final String processGroupName, final String hostname, final URL nifiUrl, final String applicationName, final String platform, final String nodeIdentifier) { addField(builder, "eventId", UUID.randomUUID().toString()); @@ -370,13 +366,7 @@ private static void addField(final JsonObjectBuilder builder, final JsonBuilderF builder.add(key, mapBuilder); } - private static void addField(final JsonObjectBuilder builder, final String key, final Long value) { - if (value != null) { - builder.add(key, value.longValue()); - } - } - - private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection values) { + private void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection values) { if (values == null) { return; } @@ -384,20 +374,6 @@ private static void addField(final JsonObjectBuilder builder, final JsonBuilderF builder.add(key, createJsonArray(factory, values)); } - private static void addField(final JsonObjectBuilder builder, final String key, final String value) { - addField(builder, key, value, false); - } - - private static void addField(final JsonObjectBuilder builder, final String key, final String value, final boolean allowNullValues) { - if (value == null) { - if (allowNullValues) { - builder.add(key, JsonValue.NULL); - } - } else { - builder.add(key, value); - } - } - private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection values) { final JsonArrayBuilder builder = factory.createArrayBuilder(); for (final String value : values) { diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java index 526b5d513f46..d623b6f01e94 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteStatusReportingTask.java @@ -60,8 +60,6 @@ + "However, all process groups are recursively searched for matching components, regardless of whether the process group matches the component filters.") public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTask { - static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; - static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() .name("Platform") .description("The value to use for the platform field in each status record.") @@ -70,6 +68,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa .defaultValue("nifi") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + static final PropertyDescriptor COMPONENT_TYPE_FILTER_REGEX = new PropertyDescriptor.Builder() .name("Component Type Filter Regex") .description("A regex specifying which component types to report. Any component type matching this regex will be included. " @@ -79,6 +78,7 @@ public class SiteToSiteStatusReportingTask extends AbstractSiteToSiteReportingTa .defaultValue("(Processor|ProcessGroup|RemoteProcessGroup|RootProcessGroup|Connection|InputPort|OutputPort)") .addValidator(StandardValidators.createRegexValidator(0, Integer.MAX_VALUE, true)) .build(); + static final PropertyDescriptor COMPONENT_NAME_FILTER_REGEX = new PropertyDescriptor.Builder() .name("Component Name Filter Regex") .description("A regex specifying which component names to report. Any component name matching this regex will be included.") @@ -197,7 +197,7 @@ public void onTrigger(final ReportingContext context) { * The component name * @return Whether the component matches both filters */ - boolean componentMatchesFilters(final String componentType, final String componentName) { + private boolean componentMatchesFilters(final String componentType, final String componentName) { return componentTypeFilter.matcher(componentType).matches() && componentNameFilter.matcher(componentName).matches(); } @@ -221,7 +221,7 @@ boolean componentMatchesFilters(final String componentType, final String compone * @param parentId * The parent's component id */ - void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, + private void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { final JsonObjectBuilder builder = factory.createObjectBuilder(); @@ -278,7 +278,7 @@ void serializeProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final Json } } - void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, + private void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final RemoteProcessGroupStatus status, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { final JsonObjectBuilder builder = factory.createObjectBuilder(); @@ -303,7 +303,7 @@ void serializeRemoteProcessGroupStatus(final JsonArrayBuilder arrayBuilder, fina } } - void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status, + private void serializePortStatus(final String componentType, final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final PortStatus status, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentName = status.getName(); @@ -327,7 +327,7 @@ void serializePortStatus(final String componentType, final JsonArrayBuilder arra } } - void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df, + private void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ConnectionStatus status, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentType = "Connection"; @@ -355,7 +355,7 @@ void serializeConnectionStatus(final JsonArrayBuilder arrayBuilder, final JsonBu } } - void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df, + private void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBuilderFactory factory, final ProcessorStatus status, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate) { final JsonObjectBuilder builder = factory.createObjectBuilder(); final String componentType = "Processor"; @@ -386,7 +386,7 @@ void serializeProcessorStatus(final JsonArrayBuilder arrayBuilder, final JsonBui } } - private static void addCommonFields(final JsonObjectBuilder builder, final DateFormat df, final String hostname, + private void addCommonFields(final JsonObjectBuilder builder, final DateFormat df, final String hostname, final String applicationName, final String platform, final String parentId, final Date currentDate, final String componentType, final String componentName) { addField(builder, "statusId", UUID.randomUUID().toString()); @@ -400,23 +400,4 @@ private static void addCommonFields(final JsonObjectBuilder builder, final DateF addField(builder, "application", applicationName); } - private static void addField(final JsonObjectBuilder builder, final String key, final Long value) { - if (value != null) { - builder.add(key, value.longValue()); - } - } - - private static void addField(final JsonObjectBuilder builder, final String key, final Integer value) { - if (value != null) { - builder.add(key, value.intValue()); - } - } - - private static void addField(final JsonObjectBuilder builder, final String key, final String value) { - if (value == null) { - return; - } - - builder.add(key, value); - } } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc new file mode 100644 index 000000000000..90dea108f5fa --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/schema-metrics.avsc @@ -0,0 +1,37 @@ +{ + "type" : "record", + "name" : "metrics", + "namespace" : "metrics", + "fields" : [ + { "name" : "appid", "type" : "string" }, + { "name" : "instanceid", "type" : "string" }, + { "name" : "hostname", "type" : "string" }, + { "name" : "timestamp", "type" : "long" }, + { "name" : "loadAverage1min", "type" : "double" }, + { "name" : "availableCores", "type" : "int" }, + { "name" : "FlowFilesReceivedLast5Minutes", "type" : "int" }, + { "name" : "BytesReceivedLast5Minutes", "type" : "long" }, + { "name" : "FlowFilesSentLast5Minutes", "type" : "int" }, + { "name" : "BytesSentLast5Minutes", "type" : "long" }, + { "name" : "FlowFilesQueued", "type" : "int" }, + { "name" : "BytesQueued", "type" : "long" }, + { "name" : "BytesReadLast5Minutes", "type" : "long" }, + { "name" : "BytesWrittenLast5Minutes", "type" : "long" }, + { "name" : "ActiveThreads", "type" : "int" }, + { "name" : "TotalTaskDurationSeconds", "type" : "long" }, + { "name" : "TotalTaskDurationNanoSeconds", "type" : "long" }, + { "name" : "jvmuptime", "type" : "long" }, + { "name" : "jvmheap_used", "type" : "double" }, + { "name" : "jvmheap_usage", "type" : "double" }, + { "name" : "jvmnon_heap_usage", "type" : "double" }, + { "name" : "jvmthread_statesrunnable", "type" : ["int", "null"] }, + { "name" : "jvmthread_statesblocked", "type" : ["int", "null"] }, + { "name" : "jvmthread_statestimed_waiting", "type" : ["int", "null"] }, + { "name" : "jvmthread_statesterminated", "type" : ["int", "null"] }, + { "name" : "jvmthread_count", "type" : "int" }, + { "name" : "jvmdaemon_thread_count", "type" : "int" }, + { "name" : "jvmfile_descriptor_usage", "type" : "double" }, + { "name" : "jvmgcruns", "type" : ["long", "null"] }, + { "name" : "jvmgctime", "type" : ["long", "null"] } + ] +} From 8ddec3f1a69606747b93f653997916c699b697b9 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 6 Mar 2018 12:09:42 +0100 Subject: [PATCH 4/5] added unit tests and additional details doc --- .../nifi-ambari-reporting-task/pom.xml | 1 + .../nifi-site-to-site-reporting-task/pom.xml | 7 + .../additionalDetails.html | 178 +++++++++++++++ .../additionalDetails.html | 2 +- .../TestSiteToSiteMetricsReportingTask.java | 216 ++++++++++++++++++ 5 files changed, 403 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java diff --git a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml index e451303e11ab..88f4a3b950ef 100644 --- a/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml +++ b/nifi-nar-bundles/nifi-ambari-bundle/nifi-ambari-reporting-task/pom.xml @@ -46,6 +46,7 @@ org.apache.nifi nifi-reporting-utils + 1.6.0-SNAPSHOT diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml index a25dc2fab34f..b664e57b624e 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml @@ -70,6 +70,7 @@ org.apache.nifi nifi-avro-record-utils + 1.6.0-SNAPSHOT org.glassfish @@ -99,6 +100,12 @@ 1.6.0-SNAPSHOT test + + org.apache.nifi + nifi-mock-record-utils + 1.6.0-SNAPSHOT + test + junit junit diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html new file mode 100644 index 000000000000..8120d6a86916 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteMetricsReportingTask/additionalDetails.html @@ -0,0 +1,178 @@ + + + + + + SiteToSiteMetricsReportingTask + + + + + +

+ The Site-to-Site Metrics Reporting Task allows the user to publish NiFi's metrics (as in the Ambari reporting task) to the + same NiFi instance or another NiFi instance. This provides a great deal of power because it allows the user to make use of + all of the different Processors that are available in NiFi in order to process or distribute that data. +

+ +

Ambari format

+ +

+ There are two available output formats. The first one is the Ambari format as defined in the Ambari Metrics Collector + API which is a JSON with dynamic keys. If using this format you might be interested by the below Jolt specification to + transform the data. +

+ +
+			
+			[
+			  {
+			    "operation": "shift",
+			    "spec": {
+			      "metrics": {
+			        "*": {
+			          "metrics": {
+			            "*": {
+			              "$": "metrics.[#4].metrics.time",
+			              "@": "metrics.[#4].metrics.value"
+			            }
+			          },
+			          "*": "metrics.[&1].&"
+			        }
+			      }
+			    }
+			  }
+			]
+			
+		
+ +

+ This would transform the below sample: +

+ +
+			
+			{
+				"metrics": [{
+					"metricname": "jvm.gc.time.G1OldGeneration",
+					"appid": "nifi",
+					"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
+					"hostname": "localhost",
+					"timestamp": "1520456854361",
+					"starttime": "1520456854361",
+					"metrics": {
+						"1520456854361": "0"
+					}
+				}, {
+					"metricname": "jvm.thread_states.terminated",
+					"appid": "nifi",
+					"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
+					"hostname": "localhost",
+					"timestamp": "1520456854361",
+					"starttime": "1520456854361",
+					"metrics": {
+						"1520456854361": "0"
+					}
+				}]
+			}
+			
+		
+ +

+ into: +

+ +
+			
+			{
+				"metrics": [{
+					"metricname": "jvm.gc.time.G1OldGeneration",
+					"appid": "nifi",
+					"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
+					"hostname": "localhost",
+					"timestamp": "1520456854361",
+					"starttime": "1520456854361",
+					"metrics": {
+						"time": "1520456854361",
+						"value": "0"
+					}
+				}, {
+					"metricname": "jvm.thread_states.terminated",
+					"appid": "nifi",
+					"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
+					"hostname": "localhost",
+					"timestamp": "1520456854361",
+					"starttime": "1520456854361",
+					"metrics": {
+						"time": "1520456854361",
+						"value": "0"
+					}
+				}]
+			}
+			
+		
+ +

Record format

+ +

+ The second format is leveraging the record framework of NiFi so that the user can define a Record Writer and directly + specify the output format and data with the assumption that the input schema is the following: +

+ +
+			
+			{
+			  "type" : "record",
+			  "name" : "metrics",
+			  "namespace" : "metrics",
+			  "fields" : [ 
+				{ "name" : "appid", "type" : "string" },
+				{ "name" : "instanceid", "type" : "string" },
+				{ "name" : "hostname", "type" : "string" },
+				{ "name" : "timestamp", "type" : "long" },
+				{ "name" : "loadAverage1min", "type" : "double" },
+				{ "name" : "availableCores", "type" : "int" },
+				{ "name" : "FlowFilesReceivedLast5Minutes", "type" : "int" },
+				{ "name" : "BytesReceivedLast5Minutes", "type" : "long" },
+				{ "name" : "FlowFilesSentLast5Minutes", "type" : "int" },
+				{ "name" : "BytesSentLast5Minutes", "type" : "long" },
+				{ "name" : "FlowFilesQueued", "type" : "int" },
+				{ "name" : "BytesQueued", "type" : "long" },
+				{ "name" : "BytesReadLast5Minutes", "type" : "long" },
+				{ "name" : "BytesWrittenLast5Minutes", "type" : "long" },
+				{ "name" : "ActiveThreads", "type" : "int" },
+				{ "name" : "TotalTaskDurationSeconds", "type" : "long" },
+				{ "name" : "TotalTaskDurationNanoSeconds", "type" : "long" },
+				{ "name" : "jvmuptime", "type" : "long" },
+				{ "name" : "jvmheap_used", "type" : "double" },
+				{ "name" : "jvmheap_usage", "type" : "double" },
+				{ "name" : "jvmnon_heap_usage", "type" : "double" },
+				{ "name" : "jvmthread_statesrunnable", "type" : ["int", "null"] },
+				{ "name" : "jvmthread_statesblocked", "type" : ["int", "null"] },
+				{ "name" : "jvmthread_statestimed_waiting", "type" : ["int", "null"] },
+				{ "name" : "jvmthread_statesterminated", "type" : ["int", "null"] },
+				{ "name" : "jvmthread_count", "type" : "int" },
+				{ "name" : "jvmdaemon_thread_count", "type" : "int" },
+				{ "name" : "jvmfile_descriptor_usage", "type" : "double" },
+				{ "name" : "jvmgcruns", "type" : ["long", "null"] },
+				{ "name" : "jvmgctime", "type" : ["long", "null"] }
+			  ]
+			}
+			
+		
+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html index e1841b2c770d..86736a66165b 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/docs/org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask/additionalDetails.html @@ -25,7 +25,7 @@

The Site-to-Site Provenance Reporting Task allows the user to publish all of the Provenance Events from a NiFi instance back to the same NiFi instance or another NiFi instance. This provides a great deal of power because it allows the user to make use of - all of the different Processors that are available in NiFi in order to processor or distribute that data. When possible, it is + all of the different Processors that are available in NiFi in order to process or distribute that data. When possible, it is advisable to send the Provenance data to a different NiFi instance than the one that this Reporting Task is running on, because when the data is received over Site-to-Site and processed, that in and of itself will generate Provenance events. As a result, there is a cycle that is created. However, the data is sent in batches (1,000 by default). This means that for each batch of Provenance events diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java new file mode 100644 index 000000000000..fb55ff3f0fcf --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.UUID; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonObject; +import javax.json.JsonReader; +import javax.json.JsonValue; + +import org.apache.nifi.attribute.expression.language.StandardPropertyValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.util.MockPropertyValue; +import org.apache.nifi.util.TestRunner; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestSiteToSiteMetricsReportingTask { + + private ReportingContext context; + private ProcessGroupStatus status; + private TestRunner runner; + + @Before + public void setup() { + status = new ProcessGroupStatus(); + status.setId("1234"); + status.setFlowFilesReceived(5); + status.setBytesReceived(10000); + status.setFlowFilesSent(10); + status.setBytesSent(20000); + status.setQueuedCount(100); + status.setQueuedContentSize(1024L); + status.setBytesRead(60000L); + status.setBytesWritten(80000L); + status.setActiveThreadCount(5); + + // create a processor status with processing time + ProcessorStatus procStatus = new ProcessorStatus(); + procStatus.setProcessingNanos(123456789); + + Collection processorStatuses = new ArrayList<>(); + processorStatuses.add(procStatus); + status.setProcessorStatus(processorStatuses); + + // create a group status with processing time + ProcessGroupStatus groupStatus = new ProcessGroupStatus(); + groupStatus.setProcessorStatus(processorStatuses); + + Collection groupStatuses = new ArrayList<>(); + groupStatuses.add(groupStatus); + status.setProcessGroupStatus(groupStatuses); + } + + public MockSiteToSiteMetricsReportingTask initTask(Map customProperties) throws InitializationException, IOException { + + final MockSiteToSiteMetricsReportingTask task = new MockSiteToSiteMetricsReportingTask(); + Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.putAll(customProperties); + + context = Mockito.mock(ReportingContext.class); + Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(task)); + Mockito.doAnswer(new Answer() { + @Override + public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { + final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class); + return new MockPropertyValue(properties.get(descriptor)); + } + }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); + + final EventAccess eventAccess = Mockito.mock(EventAccess.class); + Mockito.when(context.getEventAccess()).thenReturn(eventAccess); + Mockito.when(eventAccess.getControllerStatus()).thenReturn(status); + + final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class); + MockRecordWriter writer = new MockRecordWriter(); + Mockito.when(context.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue); + Mockito.when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer); + + final ComponentLog logger = Mockito.mock(ComponentLog.class); + final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); + Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); + Mockito.when(initContext.getLogger()).thenReturn(logger); + task.initialize(initContext); + + return task; + } + + @Test + public void testAmbariFormat() throws IOException, InitializationException { + + final Map properties = new HashMap<>(); + properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.AMBARI_FORMAT.getValue()); + + MockSiteToSiteMetricsReportingTask task = initTask(properties); + task.onTrigger(context); + + assertEquals(1, task.dataSent.size()); + final String msg = new String(task.dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonArray array = jsonReader.readObject().getJsonArray("metrics"); + for(int i = 0; i < array.size(); i++) { + JsonObject object = array.getJsonObject(i); + assertEquals("nifi", object.getString("appid")); + assertEquals("1234", object.getString("instanceid")); + if(object.getString("metricname").equals("FlowFilesQueued")) { + for(Entry kv : object.getJsonObject("metrics").entrySet()) { + assertEquals("\"100\"", kv.getValue().toString()); + } + return; + } + } + fail(); + } + + @Test + public void testRecordFormat() throws IOException, InitializationException { + final Map properties = new HashMap<>(); + properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.RECORD_FORMAT.getValue()); + properties.put(SiteToSiteMetricsReportingTask.RECORD_WRITER, "record-writer"); + MockSiteToSiteMetricsReportingTask task = initTask(properties); + + task.onTrigger(context); + + assertEquals(1, task.dataSent.size()); + String[] data = new String(task.dataSent.get(0)).split(","); + assertEquals("\"nifi\"", data[0]); + assertEquals("\"1234\"", data[1]); + assertEquals("\"100\"", data[10]); // FlowFilesQueued + } + + private static final class MockSiteToSiteMetricsReportingTask extends SiteToSiteMetricsReportingTask { + + public MockSiteToSiteMetricsReportingTask() throws IOException { + super(); + } + + final List dataSent = new ArrayList<>(); + + @Override + protected SiteToSiteClient getClient() { + final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class); + final Transaction transaction = Mockito.mock(Transaction.class); + + try { + Mockito.doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + final byte[] data = invocation.getArgumentAt(0, byte[].class); + dataSent.add(data); + return null; + } + }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class)); + + Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction); + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + + return client; + } + + public List getDataSent() { + return dataSent; + } + } + +} From 2731fceefc2441c7cfe3be3cecf9532a0697298d Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Wed, 14 Mar 2018 00:27:34 +0100 Subject: [PATCH 5/5] review comments --- .../SiteToSiteMetricsReportingTask.java | 13 ++- .../TestSiteToSiteMetricsReportingTask.java | 88 ++++++++++++++++++- 2 files changed, 95 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java index c12f868224b4..0bb7501948d8 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteMetricsReportingTask.java @@ -89,8 +89,10 @@ public class SiteToSiteMetricsReportingTask extends AbstractSiteToSiteReportingT static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder() .name("s2s-metrics-format") - .displayName("Output format") - .description("The output format that will be used for the metrics") + .displayName("Output Format") + .description("The output format that will be used for the metrics. If " + RECORD_FORMAT.getDisplayName() + " is selected, " + + "a Record Writer must be provided. If " + AMBARI_FORMAT.getDisplayName() + " is selected, the Record Writer property " + + "should be empty.") .required(true) .allowableValues(AMBARI_FORMAT, RECORD_FORMAT) .defaultValue(AMBARI_FORMAT.getValue()) @@ -127,6 +129,13 @@ protected Collection customValidate(ValidationContext validati .explanation("If using " + RECORD_FORMAT.getDisplayName() + ", a record writer needs to be set.") .build()); } + if (validationContext.getProperty(FORMAT).getValue().equals(AMBARI_FORMAT.getValue()) && isWriterSet) { + problems.add(new ValidationResult.Builder() + .input("Record Writer") + .valid(false) + .explanation("If using " + AMBARI_FORMAT.getDisplayName() + ", no record writer should be set.") + .build()); + } return problems; } diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java index fb55ff3f0fcf..c699a1c8838b 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteMetricsReportingTask.java @@ -41,6 +41,8 @@ import org.apache.nifi.attribute.expression.language.StandardPropertyValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.logging.ComponentLog; @@ -133,6 +135,88 @@ public PropertyValue answer(final InvocationOnMock invocation) throws Throwable return task; } + @Test + public void testValidationBothAmbariFormatRecordWriter() throws IOException { + ValidationContext validationContext = Mockito.mock(ValidationContext.class); + final String urlEL = "http://${hostname(true)}:8080/nifi"; + final String url = "http://localhost:8080/nifi"; + + final MockSiteToSiteMetricsReportingTask task = new MockSiteToSiteMetricsReportingTask(); + Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + + properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.AMBARI_FORMAT.getValue()); + properties.put(SiteToSiteMetricsReportingTask.DESTINATION_URL, url); + properties.put(SiteToSiteMetricsReportingTask.INSTANCE_URL, url); + properties.put(SiteToSiteMetricsReportingTask.PORT_NAME, "port"); + + final PropertyValue pValueUrl = Mockito.mock(StandardPropertyValue.class); + Mockito.when(validationContext.newPropertyValue(url)).thenReturn(pValueUrl); + Mockito.when(validationContext.newPropertyValue(urlEL)).thenReturn(pValueUrl); + Mockito.when(pValueUrl.evaluateAttributeExpressions()).thenReturn(pValueUrl); + Mockito.when(pValueUrl.getValue()).thenReturn(url); + + Mockito.doAnswer(new Answer() { + @Override + public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { + final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class); + return new MockPropertyValue(properties.get(descriptor)); + } + }).when(validationContext).getProperty(Mockito.any(PropertyDescriptor.class)); + + final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class); + Mockito.when(validationContext.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue); + Mockito.when(pValue.isSet()).thenReturn(true); + + // should be invalid because both ambari format and record writer are set + Collection list = task.validate(validationContext); + Assert.assertEquals(1, list.size()); + Assert.assertEquals(SiteToSiteMetricsReportingTask.RECORD_WRITER.getDisplayName(), list.iterator().next().getInput()); + } + + @Test + public void testValidationRecordFormatNoRecordWriter() throws IOException { + ValidationContext validationContext = Mockito.mock(ValidationContext.class); + final String urlEL = "http://${hostname(true)}:8080/nifi"; + final String url = "http://localhost:8080/nifi"; + + final MockSiteToSiteMetricsReportingTask task = new MockSiteToSiteMetricsReportingTask(); + Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + + properties.put(SiteToSiteMetricsReportingTask.FORMAT, SiteToSiteMetricsReportingTask.RECORD_FORMAT.getValue()); + properties.put(SiteToSiteMetricsReportingTask.DESTINATION_URL, url); + properties.put(SiteToSiteMetricsReportingTask.INSTANCE_URL, url); + properties.put(SiteToSiteMetricsReportingTask.PORT_NAME, "port"); + + final PropertyValue pValueUrl = Mockito.mock(StandardPropertyValue.class); + Mockito.when(validationContext.newPropertyValue(url)).thenReturn(pValueUrl); + Mockito.when(validationContext.newPropertyValue(urlEL)).thenReturn(pValueUrl); + Mockito.when(pValueUrl.evaluateAttributeExpressions()).thenReturn(pValueUrl); + Mockito.when(pValueUrl.getValue()).thenReturn(url); + + Mockito.doAnswer(new Answer() { + @Override + public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { + final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class); + return new MockPropertyValue(properties.get(descriptor)); + } + }).when(validationContext).getProperty(Mockito.any(PropertyDescriptor.class)); + + final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class); + Mockito.when(validationContext.getProperty(MockSiteToSiteMetricsReportingTask.RECORD_WRITER)).thenReturn(pValue); + Mockito.when(pValue.isSet()).thenReturn(false); + + // should be invalid because both ambari format and record writer are set + Collection list = task.validate(validationContext); + Assert.assertEquals(1, list.size()); + Assert.assertEquals(SiteToSiteMetricsReportingTask.RECORD_WRITER.getDisplayName(), list.iterator().next().getInput()); + } + @Test public void testAmbariFormat() throws IOException, InitializationException { @@ -207,10 +291,6 @@ public Object answer(final InvocationOnMock invocation) throws Throwable { return client; } - - public List getDataSent() { - return dataSent; - } } }