From ea3aada4faba631ec0dc7c2a209626f512d1343b Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Wed, 25 Jul 2018 13:55:10 -0400 Subject: [PATCH] NIFI-5454: Added EL support and copy.index attribute to DuplicateFlowFile --- .../standard/DuplicateFlowFile.java | 23 +++++++++++---- .../standard/TestDuplicateFlowFile.java | 28 ++++++++++++++++++- 2 files changed, 45 insertions(+), 6 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java index ecc8e60d9146..4ba5f833109a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DuplicateFlowFile.java @@ -24,9 +24,12 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -39,14 +42,22 @@ @SupportsBatching @Tags({"test", "load", "duplicate"}) @InputRequirement(Requirement.INPUT_REQUIRED) -@CapabilityDescription("Intended for load testing, this processor will create the configured number of copies of each incoming FlowFile") +@CapabilityDescription("Intended for load testing, this processor will create the configured number of copies of each incoming FlowFile. The original FlowFile as well as all " ++ "generated copies are sent to the 'success' relationship. In addition, each FlowFile gets an attribute 'copy.index' set to the copy number, where the original FlowFile gets " ++ "a value of zero, and all copies receive incremented integer values.") +@WritesAttributes({ + @WritesAttribute(attribute = "copy.index", description = "A zero-based incrementing integer value based on which copy the FlowFile is.") +}) public class DuplicateFlowFile extends AbstractProcessor { + public static final String COPY_INDEX_ATTRIBUTE = "copy.index"; + static final PropertyDescriptor NUM_COPIES = new PropertyDescriptor.Builder() .name("Number of Copies") + .displayName("Number of Copies") .description("Specifies how many copies of each incoming FlowFile will be made") .required(true) - .expressionLanguageSupported(false) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .defaultValue("100") .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) .build(); @@ -68,16 +79,18 @@ protected List getSupportedPropertyDescriptors() { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final FlowFile flowFile = session.get(); + FlowFile flowFile = session.get(); if (flowFile == null) { return; } - for (int i=0; i < context.getProperty(NUM_COPIES).asInteger(); i++) { - final FlowFile copy = session.clone(flowFile); + for (int i = 1; i <= context.getProperty(NUM_COPIES).evaluateAttributeExpressions(flowFile).asInteger(); i++) { + FlowFile copy = session.clone(flowFile); + copy = session.putAttribute(copy, COPY_INDEX_ATTRIBUTE, Integer.toString(i)); session.transfer(copy, REL_SUCCESS); } + flowFile = session.putAttribute(flowFile, COPY_INDEX_ATTRIBUTE, "0"); session.transfer(flowFile, REL_SUCCESS); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java index 76026bcfb306..8a0b2efe789a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDuplicateFlowFile.java @@ -16,20 +16,46 @@ */ package org.apache.nifi.processors.standard; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; +import java.util.HashMap; +import java.util.List; + +import static org.apache.nifi.processors.standard.DuplicateFlowFile.COPY_INDEX_ATTRIBUTE; + public class TestDuplicateFlowFile { @Test public void test() { + final int numCopies = 100; final TestRunner runner = TestRunners.newTestRunner(DuplicateFlowFile.class); - runner.setProperty(DuplicateFlowFile.NUM_COPIES, "100"); + runner.setProperty(DuplicateFlowFile.NUM_COPIES, Integer.toString(numCopies)); runner.enqueue("hello".getBytes()); runner.run(); + runner.assertAllFlowFilesTransferred(DuplicateFlowFile.REL_SUCCESS, numCopies + 1); + List flowFiles = runner.getFlowFilesForRelationship(DuplicateFlowFile.REL_SUCCESS); + // copy.index starts with 1, original has copy.index = 0 but is transferred last + for (int i = 1; i <= numCopies; i++) { + flowFiles.get(i - 1).assertAttributeEquals(COPY_INDEX_ATTRIBUTE, Integer.toString(i)); + } + flowFiles.get(numCopies).assertAttributeEquals(COPY_INDEX_ATTRIBUTE, "0"); + } + + @Test + public void testNumberOfCopiesEL() { + final TestRunner runner = TestRunners.newTestRunner(DuplicateFlowFile.class); + runner.setProperty(DuplicateFlowFile.NUM_COPIES, "${num.copies}"); + + runner.enqueue("hello".getBytes(), new HashMap() {{ + put("num.copies", "100"); + }}); + runner.run(); + runner.assertAllFlowFilesTransferred(DuplicateFlowFile.REL_SUCCESS, 101); }