From ce13a2e02b88ab6f9a51c440c122645ba242a89f Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Thu, 12 May 2016 09:59:27 -0400 Subject: [PATCH] NIFI-1858 Adding site-to-site reporting bundle --- nifi-assembly/pom.xml | 5 + .../nifi-site-to-site-reporting-nar/pom.xml | 40 ++ .../src/main/resources/META-INF/NOTICE | 15 + .../nifi-site-to-site-reporting-task/pom.xml | 82 +++++ .../AbstractSiteToSiteReportingTask.java | 187 ++++++++++ .../SiteToSiteProvenanceReportingTask.java | 344 ++++++++++++++++++ .../org.apache.nifi.reporting.ReportingTask | 16 + ...TestSiteToSiteProvenanceReportingTask.java | 184 ++++++++++ .../pom.xml | 46 +++ nifi-nar-bundles/pom.xml | 3 +- pom.xml | 6 + 11 files changed, 927 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-nar/pom.xml create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-nar/src/main/resources/META-INF/NOTICE create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java create mode 100644 nifi-nar-bundles/nifi-site-to-site-reporting-bundle/pom.xml diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml index 4d3f92fedc0d..529b58882561 100644 --- a/nifi-assembly/pom.xml +++ b/nifi-assembly/pom.xml @@ -312,6 +312,11 @@ language governing permissions and limitations under the License. --> nifi-hive-nar nar + + org.apache.nifi + nifi-site-to-site-reporting-nar + nar + diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-nar/pom.xml b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-nar/pom.xml new file mode 100644 index 000000000000..0f07bceba71e --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-nar/pom.xml @@ -0,0 +1,40 @@ + + + 4.0.0 + + org.apache.nifi + nifi-site-to-site-reporting-bundle + 1.0.0-SNAPSHOT + + + nifi-site-to-site-reporting-nar + nar + + true + true + + + + org.apache.nifi + nifi-site-to-site-reporting-task + + + org.apache.nifi + nifi-standard-services-api-nar + nar + + + diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-nar/src/main/resources/META-INF/NOTICE new file mode 100644 index 000000000000..ae730395c6ab --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-nar/src/main/resources/META-INF/NOTICE @@ -0,0 +1,15 @@ +nifi-site-to-site-reporting-nar +Copyright 2015-2016 The Apache Software Foundation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + +************************ +Common Development and Distribution License 1.1 +************************ + +The following binary components are provided under the Common Development and Distribution License 1.1. See project link for details. + + (CDDL 1.1) (GPL2 w/ CPE) JSON Processing API (javax.json:javax.json-api:jar:1.0 - http://json-processing-spec.java.net) + (CDDL 1.1) (GPL2 w/ CPE) JSON Processing Default Provider (org.glassfish:javax.json:jar:1.0.4 - https://jsonp.java.net) + diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml new file mode 100644 index 000000000000..431de26bce0d --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/pom.xml @@ -0,0 +1,82 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-site-to-site-reporting-bundle + 1.0.0-SNAPSHOT + + + nifi-site-to-site-reporting-task + Publishes NiFi metrics and provenance events via S2S + 1.0.0-SNAPSHOT + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + org.apache.nifi + nifi-utils + + + org.apache.nifi + nifi-ssl-context-service-api + + + org.apache.nifi + nifi-site-to-site-client + + + org.glassfish + javax.json + 1.0.4 + + + javax.json + javax.json-api + 1.0 + + + + org.mockito + mockito-all + test + + + org.apache.nifi + nifi-data-provenance-utils + test + + + org.apache.nifi + nifi-mock + test + + + junit + junit + 4.12 + test + + + diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java new file mode 100644 index 000000000000..b1b34102c3f3 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/AbstractSiteToSiteReportingTask.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.reporting; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.ssl.SSLContextService; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Base class for ReportingTasks that send data over site-to-site. + */ +public abstract class AbstractSiteToSiteReportingTask extends AbstractReportingTask { + + static final PropertyDescriptor DESTINATION_URL = new PropertyDescriptor.Builder() + .name("Destination URL") + .description("The URL of the destination NiFi instance to send the Provenance Events to, " + + "should be in the format http(s)://host:port/nifi.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(new NiFiUrlValidator()) + .build(); + static final PropertyDescriptor PORT_NAME = new PropertyDescriptor.Builder() + .name("Input Port Name") + .description("The name of the Input Port to delivery Provenance Events to.") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor SSL_CONTEXT = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The SSL Context Service to use when communicating with the destination. If not specified, communications will not be secure.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + static final PropertyDescriptor INSTANCE_URL = new PropertyDescriptor.Builder() + .name("Instance URL") + .description("The URL of this instance to use in the Content URI of each event.") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("http://${hostname(true)}:8080/nifi") + .addValidator(new NiFiUrlValidator()) + .build(); + static final PropertyDescriptor COMPRESS = new PropertyDescriptor.Builder() + .name("Compress Events") + .description("Indicates whether or not to compress the events when being sent.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("Communications Timeout") + .description("Specifies how long to wait to a response from the destination before deciding that an error has occurred and canceling the transaction") + .required(true) + .defaultValue("30 secs") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("Specifies how many records to send in a single batch, at most.") + .required(true) + .defaultValue("1000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + protected volatile SiteToSiteClient siteToSiteClient; + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(DESTINATION_URL); + properties.add(PORT_NAME); + properties.add(SSL_CONTEXT); + properties.add(INSTANCE_URL); + properties.add(COMPRESS); + properties.add(TIMEOUT); + properties.add(BATCH_SIZE); + return properties; + } + + @OnScheduled + public void setup(final ConfigurationContext context) throws IOException { + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT).asControllerService(SSLContextService.class); + final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); + final EventReporter eventReporter = new EventReporter() { + @Override + public void reportEvent(final Severity severity, final String category, final String message) { + switch (severity) { + case WARNING: + getLogger().warn(message); + break; + case ERROR: + getLogger().error(message); + break; + default: + break; + } + } + }; + + final String destinationUrl = context.getProperty(DESTINATION_URL).evaluateAttributeExpressions().getValue(); + + siteToSiteClient = new SiteToSiteClient.Builder() + .url(destinationUrl) + .portName(context.getProperty(PORT_NAME).getValue()) + .useCompression(context.getProperty(COMPRESS).asBoolean()) + .eventReporter(eventReporter) + .sslContext(sslContext) + .timeout(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) + .build(); + } + + @OnStopped + public void shutdown() throws IOException { + final SiteToSiteClient client = getClient(); + if (client != null) { + client.close(); + } + } + + // this getter is intended explicitly for testing purposes + protected SiteToSiteClient getClient() { + return this.siteToSiteClient; + } + + static class NiFiUrlValidator implements Validator { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final String value = context.newPropertyValue(input).evaluateAttributeExpressions().getValue(); + + URL url; + try { + url = new URL(value); + } catch (final Exception e) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Not a valid URL") + .build(); + } + + if (url != null && !url.getPath().endsWith("/nifi")) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("URL path must be /nifi") + .build(); + } + + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(true) + .build(); + } + } +} diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java new file mode 100644 index 000000000000..4a897f771640 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting; + +import org.apache.nifi.annotation.behavior.Stateful; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.controller.status.PortStatus; +import org.apache.nifi.controller.status.ProcessGroupStatus; +import org.apache.nifi.controller.status.ProcessorStatus; +import org.apache.nifi.controller.status.RemoteProcessGroupStatus; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonArrayBuilder; +import javax.json.JsonBuilderFactory; +import javax.json.JsonObject; +import javax.json.JsonObjectBuilder; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TimeZone; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +@Tags({"provenance", "lineage", "tracking", "site", "site to site"}) +@CapabilityDescription("Publishes Provenance events using the Site To Site protocol.") +@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.") +public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReportingTask { + + private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + private static final String LAST_EVENT_ID_KEY = "last_event_id"; + + static final PropertyDescriptor PLATFORM = new PropertyDescriptor.Builder() + .name("Platform") + .description("The value to use for the platform field in each provenance event.") + .required(true) + .expressionLanguageSupported(true) + .defaultValue("nifi") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private volatile long firstEventId = -1L; + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(PLATFORM); + return properties; + } + + private Map createComponentMap(final ProcessGroupStatus status) { + final Map componentMap = new HashMap<>(); + + if (status != null) { + componentMap.put(status.getId(), status.getName()); + + for (final ProcessorStatus procStatus : status.getProcessorStatus()) { + componentMap.put(procStatus.getId(), procStatus.getName()); + } + + for (final PortStatus portStatus : status.getInputPortStatus()) { + componentMap.put(portStatus.getId(), portStatus.getName()); + } + + for (final PortStatus portStatus : status.getOutputPortStatus()) { + componentMap.put(portStatus.getId(), portStatus.getName()); + } + + for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) { + componentMap.put(rpgStatus.getId(), rpgStatus.getName()); + } + + for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) { + componentMap.put(childGroup.getId(), childGroup.getName()); + } + } + + return componentMap; + } + + @Override + public void onTrigger(final ReportingContext context) { + final ProcessGroupStatus procGroupStatus = context.getEventAccess().getControllerStatus(); + final String rootGroupName = procGroupStatus == null ? null : procGroupStatus.getName(); + final Map componentMap = createComponentMap(procGroupStatus); + + Long currMaxId = context.getEventAccess().getProvenanceRepository().getMaxEventId(); + + if(currMaxId == null) { + getLogger().debug("No events to send because no events have been created yet."); + return; + } + + if (firstEventId < 0) { + Map state; + try { + state = context.getStateManager().getState(Scope.LOCAL).toMap(); + } catch (IOException e) { + getLogger().error("Failed to get state at start up due to {}:"+e.getMessage(), e); + return; + } + if (state.containsKey(LAST_EVENT_ID_KEY)) { + firstEventId = Long.parseLong(state.get(LAST_EVENT_ID_KEY)) + 1; + } + + if(currMaxId < firstEventId){ + getLogger().warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " + + "ids. Restarting querying from the beginning.", new Object[]{currMaxId, firstEventId}); + firstEventId = -1; + } + } + + if (currMaxId == (firstEventId - 1)) { + getLogger().debug("No events to send due to the current max id being equal to the last id that was queried."); + return; + } + + List events; + try { + events = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger()); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe); + return; + } + + if (events == null || events.isEmpty()) { + getLogger().debug("No events to send due to 'events' being null or empty."); + return; + } + + final String nifiUrl = context.getProperty(INSTANCE_URL).evaluateAttributeExpressions().getValue(); + URL url; + try { + url = new URL(nifiUrl); + } catch (final MalformedURLException e1) { + // already validated + throw new AssertionError(); + } + + final String hostname = url.getHost(); + final String platform = context.getProperty(PLATFORM).evaluateAttributeExpressions().getValue(); + + final Map config = Collections.emptyMap(); + final JsonBuilderFactory factory = Json.createBuilderFactory(config); + final JsonObjectBuilder builder = factory.createObjectBuilder(); + + final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT); + df.setTimeZone(TimeZone.getTimeZone("Z")); + + while (events != null && !events.isEmpty()) { + final long start = System.nanoTime(); + + // Create a JSON array of all the events in the current batch + final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); + for (final ProvenanceEventRecord event : events) { + final String componentName = componentMap.get(event.getComponentId()); + arrayBuilder.add(serialize(factory, builder, event, df, componentName, hostname, url, rootGroupName, platform)); + } + final JsonArray jsonArray = arrayBuilder.build(); + + // Send the JSON document for the current batch + try { + final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); + if (transaction == null) { + getLogger().debug("All destination nodes are penalized; will attempt to send data later"); + return; + } + + final Map attributes = new HashMap<>(); + final String transactionId = UUID.randomUUID().toString(); + attributes.put("reporting.task.transaction.id", transactionId); + + final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8); + transaction.send(data, attributes); + transaction.confirm(); + transaction.complete(); + + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}", + new Object[]{events.size(), transferMillis, transactionId, events.get(0).getEventId()}); + } catch (final IOException e) { + throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e); + } + + // Store the id of the last event so we know where we left off + final ProvenanceEventRecord lastEvent = events.get(events.size() - 1); + final String lastEventId = String.valueOf(lastEvent.getEventId()); + try { + StateManager stateManager = context.getStateManager(); + Map newMapOfState = new HashMap<>(); + newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId); + stateManager.setState(newMapOfState, Scope.LOCAL); + } catch (final IOException ioe) { + getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart. The message of {} was: {}", + new Object[]{lastEventId, ioe, ioe, ioe.getMessage()}, ioe); + } + + firstEventId = lastEvent.getEventId() + 1; + + // Retrieve the next batch + try { + events = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger()); + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe); + return; + } + } + + } + + static JsonObject serialize(final JsonBuilderFactory factory, final JsonObjectBuilder builder, final ProvenanceEventRecord event, final DateFormat df, + final String componentName, final String hostname, final URL nifiUrl, final String applicationName, final String platform) { + addField(builder, "eventId", UUID.randomUUID().toString()); + addField(builder, "eventOrdinal", event.getEventId()); + addField(builder, "eventType", event.getEventType().name()); + addField(builder, "timestampMillis", event.getEventTime()); + + + addField(builder, "timestamp", df.format(event.getEventTime())); + + addField(builder, "durationMillis", event.getEventDuration()); + addField(builder, "lineageStart", event.getLineageStartDate()); + + final Set lineageIdentifiers = new HashSet<>(); + if (event.getLineageIdentifiers() != null) { + lineageIdentifiers.addAll(event.getLineageIdentifiers()); + } + lineageIdentifiers.add(event.getFlowFileUuid()); + addField(builder, factory, "lineageIdentifiers", lineageIdentifiers); + addField(builder, "details", event.getDetails()); + addField(builder, "componentId", event.getComponentId()); + addField(builder, "componentType", event.getComponentType()); + addField(builder, "componentName", componentName); + addField(builder, "entityId", event.getFlowFileUuid()); + addField(builder, "entityType", "org.apache.nifi.flowfile.FlowFile"); + addField(builder, "entitySize", event.getFileSize()); + addField(builder, "previousEntitySize", event.getPreviousFileSize()); + addField(builder, factory, "updatedAttributes", event.getUpdatedAttributes()); + addField(builder, factory, "previousAttributes", event.getPreviousAttributes()); + + addField(builder, "actorHostname", hostname); + if (nifiUrl != null) { + final String urlPrefix = nifiUrl.toString().replace(nifiUrl.getPath(), ""); + final String contentUriBase = urlPrefix + "/nifi-api/controller/provenance/events/" + event.getEventId() + "/content/"; + addField(builder, "contentURI", contentUriBase + "output"); + addField(builder, "previousContentURI", contentUriBase + "input"); + } + + addField(builder, factory, "parentIds", event.getParentUuids()); + addField(builder, factory, "childIds", event.getChildUuids()); + addField(builder, "transitUri", event.getTransitUri()); + addField(builder, "remoteIdentifier", event.getSourceSystemFlowFileIdentifier()); + addField(builder, "alternateIdentifier", event.getAlternateIdentifierUri()); + addField(builder, "platform", platform); + addField(builder, "application", applicationName); + + return builder.build(); + } + + private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Map values) { + if (values == null) { + return; + } + + final JsonObjectBuilder mapBuilder = factory.createObjectBuilder(); + for (final Map.Entry entry : values.entrySet()) { + if (entry.getKey() == null || entry.getValue() == null) { + continue; + } + + mapBuilder.add(entry.getKey(), entry.getValue()); + } + + builder.add(key, mapBuilder); + } + + private static void addField(final JsonObjectBuilder builder, final String key, final Long value) { + if (value != null) { + builder.add(key, value.longValue()); + } + } + + private static void addField(final JsonObjectBuilder builder, final JsonBuilderFactory factory, final String key, final Collection values) { + if (values == null) { + return; + } + + builder.add(key, createJsonArray(factory, values)); + } + + private static void addField(final JsonObjectBuilder builder, final String key, final String value) { + if (value == null) { + return; + } + + builder.add(key, value); + } + + private static JsonArrayBuilder createJsonArray(JsonBuilderFactory factory, final Collection values) { + final JsonArrayBuilder builder = factory.createArrayBuilder(); + for (final String value : values) { + if (value != null) { + builder.add(value); + } + } + return builder; + } + +} diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask new file mode 100644 index 000000000000..be9f6546342d --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.reporting.SiteToSiteProvenanceReportingTask \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java new file mode 100644 index 000000000000..265bdd0ed7ee --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.reporting; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.provenance.ProvenanceEventBuilder; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.StandardProvenanceEventRecord; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.client.SiteToSiteClient; +import org.apache.nifi.state.MockStateManager; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import javax.json.Json; +import javax.json.JsonObject; +import javax.json.JsonReader; + +public class TestSiteToSiteProvenanceReportingTask { + + @Test + public void testSerializedForm() throws IOException, InitializationException { + final String uuid = "10000000-0000-0000-0000-000000000000"; + final Map attributes = new HashMap<>(); + attributes.put("abc", "xyz"); + attributes.put("xyz", "abc"); + attributes.put("filename", "file-" + uuid); + + final Map prevAttrs = new HashMap<>(); + attributes.put("filename", "1234.xyz"); + + final Set lineageIdentifiers = new HashSet<>(); + lineageIdentifiers.add("123"); + lineageIdentifiers.add("321"); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + attributes.put("uuid", uuid); + builder.fromFlowFile(createFlowFile(3L, attributes)); + builder.setAttributes(prevAttrs, attributes); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + builder.setLineageIdentifiers(lineageIdentifiers); + final ProvenanceEventRecord event = builder.build(); + + final List dataSent = new ArrayList<>(); + final SiteToSiteProvenanceReportingTask task = new SiteToSiteProvenanceReportingTask() { + @SuppressWarnings("unchecked") + @Override + protected SiteToSiteClient getClient() { + final SiteToSiteClient client = Mockito.mock(SiteToSiteClient.class); + final Transaction transaction = Mockito.mock(Transaction.class); + + try { + Mockito.doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + final byte[] data = invocation.getArgumentAt(0, byte[].class); + dataSent.add(data); + return null; + } + }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class)); + + Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction); + } catch (final Exception e) { + e.printStackTrace(); + Assert.fail(e.toString()); + } + + return client; + } + }; + + final Map properties = new HashMap<>(); + for (final PropertyDescriptor descriptor : task.getSupportedPropertyDescriptors()) { + properties.put(descriptor, descriptor.getDefaultValue()); + } + properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000"); + + final ReportingContext context = Mockito.mock(ReportingContext.class); + Mockito.when(context.getStateManager()) + .thenReturn(new MockStateManager(task)); + Mockito.doAnswer(new Answer() { + @Override + public PropertyValue answer(final InvocationOnMock invocation) throws Throwable { + final PropertyDescriptor descriptor = invocation.getArgumentAt(0, PropertyDescriptor.class); + return new MockPropertyValue(properties.get(descriptor), null); + } + }).when(context).getProperty(Mockito.any(PropertyDescriptor.class)); + + final long maxEventId = 2500; + final AtomicInteger totalEvents = new AtomicInteger(0); + + final EventAccess eventAccess = Mockito.mock(EventAccess.class); + Mockito.doAnswer(new Answer>() { + @Override + public List answer(final InvocationOnMock invocation) throws Throwable { + final long startId = invocation.getArgumentAt(0, long.class); + final int maxRecords = invocation.getArgumentAt(1, int.class); + + final List eventsToReturn = new ArrayList<>(); + for (int i = (int) Math.max(0, startId); i < (int) (startId + maxRecords) && totalEvents.get() < maxEventId; i++) { + eventsToReturn.add(event); + totalEvents.getAndIncrement(); + } + return eventsToReturn; + } + }).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt()); + + final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class); + Mockito.doAnswer(new Answer() { + @Override + public Long answer(final InvocationOnMock invocation) throws Throwable { + return maxEventId; + } + }).when(provenanceRepository).getMaxEventId(); + + Mockito.when(context.getEventAccess()).thenReturn(eventAccess); + Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository); + + final ComponentLog logger = Mockito.mock(ComponentLog.class); + final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class); + Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()); + Mockito.when(initContext.getLogger()).thenReturn(logger); + + + task.initialize(initContext); + task.onTrigger(context); + + assertEquals(3, dataSent.size()); + final String msg = new String(dataSent.get(0), StandardCharsets.UTF_8); + JsonReader jsonReader = Json.createReader(new ByteArrayInputStream(msg.getBytes())); + JsonObject msgArray = jsonReader.readArray().getJsonObject(0).getJsonObject("updatedAttributes"); + assertEquals(msgArray.getString("abc"), event.getAttributes().get("abc")); + } + + public static FlowFile createFlowFile(final long id, final Map attributes) { + MockFlowFile mockFlowFile = new MockFlowFile(id); + mockFlowFile.putAttributes(attributes); + return mockFlowFile; + } +} diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/pom.xml b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/pom.xml new file mode 100644 index 000000000000..cf56d8921ed2 --- /dev/null +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/pom.xml @@ -0,0 +1,46 @@ + + + + 4.0.0 + + org.apache.nifi + nifi-nar-bundles + 1.0.0-SNAPSHOT + + + nifi-site-to-site-reporting-bundle + pom + + + nifi-site-to-site-reporting-task + nifi-site-to-site-reporting-nar + + + + + + org.apache.nifi + nifi-site-to-site-reporting-task + 1.0.0-SNAPSHOT + + + org.glassfish.jersey.core + jersey-client + 2.19 + + + + diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 08a5c3d81a40..37459766546a 100644 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -60,6 +60,7 @@ nifi-cassandra-bundle nifi-spring-bundle nifi-hive-bundle + nifi-site-to-site-reporting-bundle @@ -145,4 +146,4 @@ - \ No newline at end of file + diff --git a/pom.xml b/pom.xml index 675fd5669d9f..24ec57835a68 100644 --- a/pom.xml +++ b/pom.xml @@ -1126,6 +1126,12 @@ language governing permissions and limitations under the License. --> 1.0.0-SNAPSHOT nar + + org.apache.nifi + nifi-site-to-site-reporting-nar + 1.0.0-SNAPSHOT + nar + org.apache.nifi nifi-properties