From bc5d9f3430a99f2084c04c5acab3d81dd5e93d67 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Sat, 22 Jul 2017 16:24:57 +0200 Subject: [PATCH] NIFI-617 - Message destination option in MonitorActivity --- .../processors/standard/MonitorActivity.java | 78 +++++++++++------- .../standard/TestMonitorActivity.java | 79 +++++++++++++++++++ 2 files changed, 129 insertions(+), 28 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java index 201d287041a5..a0715fe1ba82 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MonitorActivity.java @@ -49,7 +49,6 @@ import org.apache.nifi.components.state.StateManager; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -69,7 +68,8 @@ + "some specified amount of time and again when the flow's activity is restored") @WritesAttributes({ @WritesAttribute(attribute = "inactivityStartMillis", description = "The time at which Inactivity began, in the form of milliseconds since Epoch"), - @WritesAttribute(attribute = "inactivityDurationMillis", description = "The number of milliseconds that the inactivity has spanned")}) + @WritesAttribute(attribute = "inactivityDurationMillis", description = "The number of milliseconds that the inactivity has spanned"), + @WritesAttribute(attribute = MonitorActivity.MESSAGE_ATTRIBUTE, description = "Message produced by the processor in case message destination is 'Attribute'.")}) @Stateful(scopes = Scope.CLUSTER, description = "MonitorActivity stores the last timestamp at each node as state, so that it can examine activity at cluster wide." + "If 'Copy Attribute' is set to true, then flow file attributes are also persisted.") public class MonitorActivity extends AbstractProcessor { @@ -78,6 +78,10 @@ public class MonitorActivity extends AbstractProcessor { public static final AllowableValue SCOPE_CLUSTER = new AllowableValue("cluster"); public static final AllowableValue REPORT_NODE_ALL = new AllowableValue("all"); public static final AllowableValue REPORT_NODE_PRIMARY = new AllowableValue("primary"); + public static final AllowableValue DEST_CONTENT = new AllowableValue("content"); + public static final AllowableValue DEST_ATTRIBUTE = new AllowableValue("attribute"); + + public static final String MESSAGE_ATTRIBUTE = "monitor.activity.message"; public static final PropertyDescriptor THRESHOLD = new PropertyDescriptor.Builder() .name("Threshold Duration") @@ -96,7 +100,7 @@ public class MonitorActivity extends AbstractProcessor { .build(); public static final PropertyDescriptor ACTIVITY_RESTORED_MESSAGE = new PropertyDescriptor.Builder() .name("Activity Restored Message") - .description("The message that will be the content of FlowFiles that are sent to 'activity.restored' relationship") + .description("The message to be set in FlowFiles that are sent to 'activity.restored' relationship") .required(true) .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -104,7 +108,7 @@ public class MonitorActivity extends AbstractProcessor { .build(); public static final PropertyDescriptor INACTIVITY_MESSAGE = new PropertyDescriptor.Builder() .name("Inactivity Message") - .description("The message that will be the content of FlowFiles that are sent to the 'inactive' relationship") + .description("The message to be set in FlowFiles that are sent to the 'inactive' relationship") .required(true) .expressionLanguageSupported(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) @@ -117,6 +121,15 @@ public class MonitorActivity extends AbstractProcessor { .allowableValues("true", "false") .defaultValue("false") .build(); + public static final PropertyDescriptor MESSAGE_DESTINATION = new PropertyDescriptor.Builder() + .name("monitor-activity-destination") + .displayName("Message destination") + .description("If set to 'attribute', the message will be set in '" + MESSAGE_ATTRIBUTE + "' attribute of the emitted flow file. " + + "Otherwise it will be the content of the flow file sent to 'inactive' or 'activity.restored' relationships.") + .required(true) + .allowableValues(DEST_CONTENT, DEST_ATTRIBUTE) + .defaultValue(DEST_CONTENT.getValue()) + .build(); public static final PropertyDescriptor MONITORING_SCOPE = new PropertyDescriptor.Builder() .name("Monitoring Scope") .description("Specify how to determine activeness of the flow. 'node' means that activeness is examined at individual node separately." + @@ -178,6 +191,7 @@ protected void init(final ProcessorInitializationContext context) { properties.add(INACTIVITY_MESSAGE); properties.add(ACTIVITY_RESTORED_MESSAGE); properties.add(COPY_ATTRIBUTES); + properties.add(MESSAGE_DESTINATION); properties.add(MONITORING_SCOPE); properties.add(REPORTING_NODE); this.properties = Collections.unmodifiableList(properties); @@ -248,13 +262,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final boolean copyAttributes = context.getProperty(COPY_ATTRIBUTES).asBoolean(); final boolean isClusterScope = isClusterScope(context, false); final boolean shouldReportOnlyOnPrimary = shouldReportOnlyOnPrimary(isClusterScope, context); - final List flowFiles = session.get(50); + final FlowFile flowFile = session.get(); boolean isInactive = false; long updatedLatestSuccessTransfer = -1; StateMap clusterState = null; - if (flowFiles.isEmpty()) { + if (flowFile == null) { final long previousSuccessMillis = latestSuccessTransfer.get(); boolean sendInactiveMarker = false; @@ -292,13 +306,17 @@ public void onTrigger(final ProcessContext context, final ProcessSession session inactiveFlowFile = session.putAttribute(inactiveFlowFile, "inactivityStartMillis", String.valueOf(previousSuccessMillis)); inactiveFlowFile = session.putAttribute(inactiveFlowFile, "inactivityDurationMillis", String.valueOf(now - previousSuccessMillis)); - final byte[] outBytes = context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue().getBytes(UTF8); - inactiveFlowFile = session.write(inactiveFlowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(outBytes); - } - }); + final String message = context.getProperty(INACTIVITY_MESSAGE).evaluateAttributeExpressions(inactiveFlowFile).getValue(); + if(DEST_CONTENT.getValue().equals(context.getProperty(MESSAGE_DESTINATION).getValue())) { + inactiveFlowFile = session.write(inactiveFlowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(message.getBytes(UTF8)); + } + }); + } else { + inactiveFlowFile = session.putAttribute(inactiveFlowFile, MESSAGE_ATTRIBUTE, message); + } session.getProvenanceReporter().create(inactiveFlowFile); session.transfer(inactiveFlowFile, REL_INACTIVE); @@ -308,10 +326,6 @@ public void process(final OutputStream out) throws IOException { } } else { - session.transfer(flowFiles, REL_SUCCESS); - updatedLatestSuccessTransfer = now; - logger.info("Transferred {} FlowFiles to 'success'", new Object[]{flowFiles.size()}); - final long latestStateReportTimestamp = latestReportedNodeState.get(); if (isClusterScope && (now - latestStateReportTimestamp) > (thresholdMillis / 3)) { @@ -324,7 +338,7 @@ public void process(final OutputStream out) throws IOException { // Persist attributes so that other nodes can copy it if (copyAttributes) { - newValues.putAll(flowFiles.get(0).getAttributes()); + newValues.putAll(flowFile.getAttributes()); } newValues.put(STATE_KEY_LATEST_SUCCESS_TRANSFER, String.valueOf(now)); @@ -354,34 +368,42 @@ public void process(final OutputStream out) throws IOException { latestSuccessTransfer.set(updatedLatestSuccessTransfer); } if (inactive.getAndSet(false) && shouldThisNodeReport(isClusterScope, shouldReportOnlyOnPrimary)) { - FlowFile activityRestoredFlowFile = session.create(); + FlowFile activityRestoredFlowFile = flowFile != null ? session.clone(flowFile) : session.create(); if (copyAttributes) { - final Map attributes = new HashMap<>(); - if (flowFiles.size() > 0) { - // copy attributes from the first flow file in the list - attributes.putAll(flowFiles.get(0).getAttributes()); - } else if (clusterState != null) { + if (flowFile == null && clusterState != null) { attributes.putAll(clusterState.toMap()); attributes.remove(STATE_KEY_LATEST_SUCCESS_TRANSFER); } - // don't copy the UUID - attributes.remove(CoreAttributes.UUID.key()); activityRestoredFlowFile = session.putAllAttributes(activityRestoredFlowFile, attributes); + } else { + if(flowFile != null) { + activityRestoredFlowFile = session.removeAllAttributes(activityRestoredFlowFile, flowFile.getAttributes().keySet()); + } } activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityStartMillis", String.valueOf(inactivityStartMillis)); activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, "inactivityDurationMillis", String.valueOf(now - inactivityStartMillis)); - final byte[] outBytes = context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue().getBytes(UTF8); - activityRestoredFlowFile = session.write(activityRestoredFlowFile, out -> out.write(outBytes)); + final String message = context.getProperty(ACTIVITY_RESTORED_MESSAGE).evaluateAttributeExpressions(activityRestoredFlowFile).getValue(); + if(DEST_CONTENT.getValue().equals(context.getProperty(MESSAGE_DESTINATION).getValue())) { + activityRestoredFlowFile = session.write(activityRestoredFlowFile, out -> out.write(message.getBytes(UTF8))); + } else { + activityRestoredFlowFile = session.putAttribute(activityRestoredFlowFile, MESSAGE_ATTRIBUTE, message); + } session.getProvenanceReporter().create(activityRestoredFlowFile); session.transfer(activityRestoredFlowFile, REL_ACTIVITY_RESTORED); logger.info("Transferred {} to 'activity.restored'", new Object[]{activityRestoredFlowFile}); } } + + if(flowFile != null) { + session.transfer(flowFile, REL_SUCCESS); + updatedLatestSuccessTransfer = now; + logger.debug("Transferred flowFile to 'success'"); + } } @OnStopped diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java index bd375e470d4d..c17fd6c90d01 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMonitorActivity.java @@ -108,6 +108,85 @@ public void testFirstMessage() throws InterruptedException, IOException { restoredFlowFile.assertAttributeNotExists("key1"); } + @Test + public void testFirstMessageInAttribute() throws InterruptedException, IOException { + final TestRunner runner = TestRunners.newTestRunner(new TestableProcessor(1000L)); + runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false"); + runner.setProperty(MonitorActivity.THRESHOLD, "100 millis"); + runner.setProperty(MonitorActivity.MESSAGE_DESTINATION, MonitorActivity.DEST_ATTRIBUTE); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1); + runner.clearTransferState(); + + Thread.sleep(1000L); + + runNext(runner); + runner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1); + runner.clearTransferState(); + + // ensure we don't keep creating the message + for (int i = 0; i < 10; i++) { + runNext(runner); + runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0); + runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0); + runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0); + Thread.sleep(100L); + } + + Map attributes = new HashMap<>(); + attributes.put("key", "value"); + attributes.put("key1", "value1"); + + runner.enqueue("test", attributes); + runNext(runner); + + runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1); + runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1); + + MockFlowFile restoredFlowFile = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0); + String flowFileContent = new String(restoredFlowFile.toByteArray()); + Assert.assertEquals("test", flowFileContent); + Assert.assertTrue(Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", + restoredFlowFile.getAttribute(MonitorActivity.MESSAGE_ATTRIBUTE))); + restoredFlowFile.assertAttributeNotExists("key"); + restoredFlowFile.assertAttributeNotExists("key1"); + + runner.clearTransferState(); + runner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true"); + Thread.sleep(200L); + + for (int i = 0; i < 10; i++) { + runNext(runner); + Thread.sleep(200L); + } + + runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 10); + MockFlowFile inactiveFlowFile = runner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE).get(0); + Assert.assertTrue(Pattern.matches("Lacking activity as of time: (.*); flow has been inactive for (.*) minutes", + inactiveFlowFile.getAttribute(MonitorActivity.MESSAGE_ATTRIBUTE))); + + runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0); + runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0); + runner.clearTransferState(); + + runner.enqueue("test", attributes); + runNext(runner); + + runner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0); + runner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1); + runner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1); + + restoredFlowFile = runner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0); + flowFileContent = new String(restoredFlowFile.toByteArray()); + Assert.assertEquals("test", flowFileContent); + Assert.assertTrue(Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", + restoredFlowFile.getAttribute(MonitorActivity.MESSAGE_ATTRIBUTE))); + restoredFlowFile.assertAttributeNotExists("key"); + restoredFlowFile.assertAttributeNotExists("key1"); + } + private void runNext(TestRunner runner) { // Don't initialize, otherwise @OnScheduled is called and state gets reset runner.run(1, false, false);