From 5f66aa225fbe525b5517b7d9bec8ae07f800726d Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 6 Oct 2017 14:53:04 -0400 Subject: [PATCH] NIFI-4468: If an entire batch of Provenance Events are read by the Site-to-Site Provenance Reporting Task and none of them match the filters, then the reporting did not update its state, so it would be stuck in this cycle indefinitely. Made fix so that if any event is read from the provenance repository, regardless of whether or not it matches the filters, we update the state to keep track of what has been processed --- .../SiteToSiteProvenanceReportingTask.java | 113 ++++++++++-------- 1 file changed, 64 insertions(+), 49 deletions(-) diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java index e998debb1e85..8af9412473bb 100644 --- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java +++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java @@ -272,15 +272,17 @@ public void onTrigger(final ReportingContext context) { return; } - List events; + List rawEvents; + List filteredEvents; try { - events = filterEvents(context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger())); + rawEvents = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger()); + filteredEvents = filterEvents(rawEvents); } catch (final IOException ioe) { getLogger().error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe); return; } - if (events == null || events.isEmpty()) { + if (rawEvents == null || rawEvents.isEmpty()) { getLogger().debug("No events to send due to 'events' being null or empty."); return; } @@ -304,69 +306,82 @@ public void onTrigger(final ReportingContext context) { final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT); df.setTimeZone(TimeZone.getTimeZone("Z")); - while (events != null && !events.isEmpty() && isScheduled()) { + while (rawEvents != null && !rawEvents.isEmpty() && isScheduled()) { final long start = System.nanoTime(); - // Create a JSON array of all the events in the current batch - final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); - for (final ProvenanceEventRecord event : events) { - final String componentName = componentMap.get(event.getComponentId()); - arrayBuilder.add(serialize(factory, builder, event, df, componentName, hostname, url, rootGroupName, platform, nodeId)); - } - final JsonArray jsonArray = arrayBuilder.build(); - - // Send the JSON document for the current batch - try { - final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); - if (transaction == null) { - getLogger().debug("All destination nodes are penalized; will attempt to send data later"); - return; + if (!filteredEvents.isEmpty()) { + // Create a JSON array of all the events in the current batch + final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder(); + for (final ProvenanceEventRecord event : filteredEvents) { + final String componentName = componentMap.get(event.getComponentId()); + arrayBuilder.add(serialize(factory, builder, event, df, componentName, hostname, url, rootGroupName, platform, nodeId)); } + final JsonArray jsonArray = arrayBuilder.build(); - final Map attributes = new HashMap<>(); - final String transactionId = UUID.randomUUID().toString(); - attributes.put("reporting.task.transaction.id", transactionId); - attributes.put("mime.type", "application/json"); - - final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8); - transaction.send(data, attributes); - transaction.confirm(); - transaction.complete(); - - final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}", - new Object[]{events.size(), transferMillis, transactionId, events.get(0).getEventId()}); - } catch (final IOException e) { - throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e); - } - - // Store the id of the last event so we know where we left off - final ProvenanceEventRecord lastEvent = events.get(events.size() - 1); - final String lastEventId = String.valueOf(lastEvent.getEventId()); - try { - StateManager stateManager = context.getStateManager(); - Map newMapOfState = new HashMap<>(); - newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId); - stateManager.setState(newMapOfState, Scope.LOCAL); - } catch (final IOException ioe) { - getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart. The message of {} was: {}", - new Object[]{lastEventId, ioe, ioe, ioe.getMessage()}, ioe); + // Send the JSON document for the current batch + try { + final Transaction transaction = getClient().createTransaction(TransferDirection.SEND); + if (transaction == null) { + getLogger().debug("All destination nodes are penalized; will attempt to send data later"); + return; + } + + final Map attributes = new HashMap<>(); + final String transactionId = UUID.randomUUID().toString(); + attributes.put("reporting.task.transaction.id", transactionId); + attributes.put("mime.type", "application/json"); + + final byte[] data = jsonArray.toString().getBytes(StandardCharsets.UTF_8); + transaction.send(data, attributes); + transaction.confirm(); + transaction.complete(); + + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + getLogger().info("Successfully sent {} Provenance Events to destination in {} ms; Transaction ID = {}; First Event ID = {}", + new Object[] {filteredEvents.size(), transferMillis, transactionId, rawEvents.get(0).getEventId()}); + } catch (final IOException e) { + throw new ProcessException("Failed to send Provenance Events to destination due to IOException:" + e.getMessage(), e); + } } - firstEventId = lastEvent.getEventId() + 1; + firstEventId = updateLastEventId(rawEvents, context.getStateManager()); // Retrieve the next batch try { - events = filterEvents(context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger())); + rawEvents = context.getEventAccess().getProvenanceEvents(firstEventId, context.getProperty(BATCH_SIZE).asInteger()); + filteredEvents = filterEvents(rawEvents); } catch (final IOException ioe) { getLogger().error("Failed to retrieve Provenance Events from repository due to: " + ioe.getMessage(), ioe); return; } } + } + + private long updateLastEventId(final List events, final StateManager stateManager) { + if (events == null || events.isEmpty()) { + return firstEventId; + } + + // Store the id of the last event so we know where we left off + final ProvenanceEventRecord lastEvent = events.get(events.size() - 1); + final String lastEventId = String.valueOf(lastEvent.getEventId()); + try { + Map newMapOfState = new HashMap<>(); + newMapOfState.put(LAST_EVENT_ID_KEY, lastEventId); + stateManager.setState(newMapOfState, Scope.LOCAL); + } catch (final IOException ioe) { + getLogger().error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart. The message of {} was: {}", + new Object[] {lastEventId, ioe, ioe, ioe.getMessage()}, ioe); + } + return lastEvent.getEventId() + 1; } - private List filterEvents(List provenanceEvents) { + private List filterEvents(final List provenanceEvents) { + if (provenanceEvents == null || provenanceEvents.isEmpty()) { + return Collections.emptyList(); + } + if(isFilteringEnabled) { List filteredEvents = new ArrayList();