From 9d4a6e742bc67838176756bb7ef600e54d2904df Mon Sep 17 00:00:00 2001 From: Joe Skora Date: Thu, 13 Oct 2016 02:18:23 -0400 Subject: [PATCH 1/3] NIFI-2861 ControlRate should accept more than one flow file per execution * Support multiple files per onTrigger call. (0.x branch) --- .../nifi/processors/standard/ControlRate.java | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index 5612d4fd4e42..085d5b68fc2b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@ -115,6 +115,13 @@ public class ControlRate extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) .build(); + public static final PropertyDescriptor MAX_FF_PER_TRIGGER = new PropertyDescriptor.Builder() + .name("Max FlowFiles per Trigger") + .description("Maximum number of FlowFiles to accept per onTrigger() call, even within the allowable limit.") + .required(false) + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -137,6 +144,7 @@ public class ControlRate extends AbstractProcessor { private volatile String rateControlAttribute = null; private volatile String maximumRateStr = null; private volatile String groupingAttributeName = null; + private volatile String maxFlowFilePerTrigger = null; private volatile int timePeriodSeconds = 1; @Override @@ -147,6 +155,7 @@ protected void init(final ProcessorInitializationContext context) { properties.add(RATE_CONTROL_ATTRIBUTE_NAME); properties.add(TIME_PERIOD); properties.add(GROUPING_ATTRIBUTE_NAME); + properties.add(MAX_FF_PER_TRIGGER); this.properties = Collections.unmodifiableList(properties); final Set relationships = new HashSet<>(); @@ -205,6 +214,7 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String if (descriptor.equals(RATE_CONTROL_CRITERIA) || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME) || descriptor.equals(GROUPING_ATTRIBUTE_NAME) + || descriptor.equals(MAX_FF_PER_TRIGGER) || descriptor.equals(TIME_PERIOD)) { // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map. throttleMap.clear(); @@ -228,12 +238,13 @@ public void onScheduled(final ProcessContext context) { rateControlAttribute = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); maximumRateStr = context.getProperty(MAX_RATE).getValue().toUpperCase(); groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue(); + maxFlowFilePerTrigger = context.getProperty(MAX_FF_PER_TRIGGER).getValue(); timePeriodSeconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS).intValue(); } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - List flowFiles = session.get(new ThrottleFilter()); + List flowFiles = session.get(new ThrottleFilter(maxFlowFilePerTrigger)); if (flowFiles.isEmpty()) { context.yield(); return; @@ -381,6 +392,14 @@ public boolean tryAdd(final long value) { private class ThrottleFilter implements FlowFileFilter { + private final long flowFilesPerTrigger; + private final AtomicLong flowFilesFiltered = new AtomicLong(0L); + + ThrottleFilter(final String ffPerTrigger) { + super(); + flowFilesPerTrigger = ffPerTrigger == null ? 1L : Long.parseLong(ffPerTrigger); + } + @Override public FlowFileFilterResult filter(FlowFile flowFile) { long accrual = getFlowFileAccrual(flowFile); @@ -409,7 +428,13 @@ public FlowFileFilterResult filter(FlowFile flowFile) { throttle.lock(); try { if (throttle.tryAdd(accrual)) { - return FlowFileFilterResult.ACCEPT_AND_TERMINATE; + long files = flowFilesFiltered.addAndGet(1); + if (files >= flowFilesPerTrigger) { + flowFilesFiltered.set(0L); + return FlowFileFilterResult.ACCEPT_AND_TERMINATE; + } else { + return FlowFileFilterResult.ACCEPT_AND_CONTINUE; + } } } finally { throttle.unlock(); From 6b53c27a436a7623e8a6107ab7669dbfb9370667 Mon Sep 17 00:00:00 2001 From: Joe Skora Date: Mon, 17 Oct 2016 23:24:45 -0400 Subject: [PATCH 2/3] Renamed flowFile limit to reflect batch metaphor. Per PR reviews changed variables to ints from String and AtomicLong. Add unit tests to confirm function of default and configured batch size values. --- .../nifi/processors/standard/ControlRate.java | 36 ++++++----- .../processors/standard/TestControlRate.java | 59 +++++++++++++++++++ 2 files changed, 79 insertions(+), 16 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index 085d5b68fc2b..8997ab45ca80 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@ -115,9 +115,10 @@ public class ControlRate extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) .build(); - public static final PropertyDescriptor MAX_FF_PER_TRIGGER = new PropertyDescriptor.Builder() - .name("Max FlowFiles per Trigger") - .description("Maximum number of FlowFiles to accept per onTrigger() call, even within the allowable limit.") + public static final PropertyDescriptor MAX_FF_PER_BATCH = new PropertyDescriptor.Builder() + .name("Max FlowFiles per processing batch") + .description("Maximum number of FlowFiles to accept per processing batch, even if Maximum Rate isn't reached.") + .displayName("Max FlowFiles per batch") .required(false) .addValidator(StandardValidators.INTEGER_VALIDATOR) .expressionLanguageSupported(false) @@ -144,7 +145,7 @@ public class ControlRate extends AbstractProcessor { private volatile String rateControlAttribute = null; private volatile String maximumRateStr = null; private volatile String groupingAttributeName = null; - private volatile String maxFlowFilePerTrigger = null; + private volatile int maxFlowFilesPerBatch = 1; private volatile int timePeriodSeconds = 1; @Override @@ -155,7 +156,7 @@ protected void init(final ProcessorInitializationContext context) { properties.add(RATE_CONTROL_ATTRIBUTE_NAME); properties.add(TIME_PERIOD); properties.add(GROUPING_ATTRIBUTE_NAME); - properties.add(MAX_FF_PER_TRIGGER); + properties.add(MAX_FF_PER_BATCH); this.properties = Collections.unmodifiableList(properties); final Set relationships = new HashSet<>(); @@ -214,7 +215,7 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String if (descriptor.equals(RATE_CONTROL_CRITERIA) || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME) || descriptor.equals(GROUPING_ATTRIBUTE_NAME) - || descriptor.equals(MAX_FF_PER_TRIGGER) + || descriptor.equals(MAX_FF_PER_BATCH) || descriptor.equals(TIME_PERIOD)) { // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map. throttleMap.clear(); @@ -238,13 +239,17 @@ public void onScheduled(final ProcessContext context) { rateControlAttribute = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); maximumRateStr = context.getProperty(MAX_RATE).getValue().toUpperCase(); groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue(); - maxFlowFilePerTrigger = context.getProperty(MAX_FF_PER_TRIGGER).getValue(); + if (context.getProperty(MAX_FF_PER_BATCH).isSet()) { + maxFlowFilesPerBatch = context.getProperty(MAX_FF_PER_BATCH).asInteger(); + } else { + maxFlowFilesPerBatch = 1; + } timePeriodSeconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS).intValue(); } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - List flowFiles = session.get(new ThrottleFilter(maxFlowFilePerTrigger)); + List flowFiles = session.get(new ThrottleFilter(maxFlowFilesPerBatch)); if (flowFiles.isEmpty()) { context.yield(); return; @@ -392,12 +397,11 @@ public boolean tryAdd(final long value) { private class ThrottleFilter implements FlowFileFilter { - private final long flowFilesPerTrigger; - private final AtomicLong flowFilesFiltered = new AtomicLong(0L); + private final int flowFilesPerBatch; + private int flowFilesInBatch = 0; - ThrottleFilter(final String ffPerTrigger) { - super(); - flowFilesPerTrigger = ffPerTrigger == null ? 1L : Long.parseLong(ffPerTrigger); + ThrottleFilter(final int maxFFPerBatch) { + flowFilesPerBatch = maxFFPerBatch; } @Override @@ -428,9 +432,9 @@ public FlowFileFilterResult filter(FlowFile flowFile) { throttle.lock(); try { if (throttle.tryAdd(accrual)) { - long files = flowFilesFiltered.addAndGet(1); - if (files >= flowFilesPerTrigger) { - flowFilesFiltered.set(0L); + flowFilesInBatch += 1; + if (flowFilesInBatch>= flowFilesPerBatch) { + flowFilesInBatch = 0; return FlowFileFilterResult.ACCEPT_AND_TERMINATE; } else { return FlowFileFilterResult.ACCEPT_AND_CONTINUE; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java index 2e6ce45e1e04..de07fe32b530 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java @@ -175,6 +175,65 @@ public void testBadAttributeRate() { runner.assertQueueEmpty(); } + @Test + public void testBatchSizeDefaultOne() throws InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); + runner.setProperty(ControlRate.MAX_RATE, "5555"); + runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + + runner.enqueue("test data 0"); + runner.enqueue("test data 1"); + runner.enqueue("test data 2"); + runner.enqueue("test data 3"); + runner.enqueue("test data 4"); + runner.enqueue("test data 5"); + runner.enqueue("test data 6"); + runner.enqueue("test data 7"); + runner.enqueue("test data 8"); + runner.enqueue("test data 9"); + + runner.run(4, false); + + // we've run 4 times and should have 4 files with 6 in the queue + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueNotEmpty(); + runner.clearTransferState(); + + runner.run(8, false); + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 6); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueEmpty(); + } + + @Test + public void testBatchSizeOneHundred() throws InterruptedException { + final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); + runner.setProperty(ControlRate.MAX_RATE, "5555"); + runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); + runner.setProperty(ControlRate.MAX_FF_PER_BATCH, "100"); + + runner.enqueue("test data 0"); + runner.enqueue("test data 1"); + runner.enqueue("test data 2"); + runner.enqueue("test data 3"); + runner.enqueue("test data 4"); + runner.enqueue("test data 5"); + runner.enqueue("test data 6"); + runner.enqueue("test data 7"); + runner.enqueue("test data 8"); + runner.enqueue("test data 9"); + + runner.run(1, false); + + // we've run 1 time and should have all 10 files with 0 in the queue + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 10); + runner.assertTransferCount(ControlRate.REL_FAILURE, 0); + runner.assertQueueEmpty(); + } + private void createFlowFile(final TestRunner runner, final int value) { final Map attributeMap = new HashMap<>(); attributeMap.put("count", String.valueOf(value)); From fa32c20d0d26daf96d25f327a22e0bdea640e64f Mon Sep 17 00:00:00 2001 From: Joe Skora Date: Fri, 6 Jan 2017 19:15:22 +0000 Subject: [PATCH 3/3] Refactored max FlowFile limit from property to constant. --- .../nifi/processors/standard/ControlRate.java | 21 ++----- .../processors/standard/TestControlRate.java | 57 +++++-------------- 2 files changed, 19 insertions(+), 59 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index 8997ab45ca80..18d6ff9af3a8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@ -77,6 +77,9 @@ public class ControlRate extends AbstractProcessor { public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE, "Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration"); + // based on testing to balance commits and 10,000 FF swap limit + public static final int MAX_FLOW_FILES_PER_BATCH = 1000; + public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder() .name("Rate Control Criteria") .description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.") @@ -115,14 +118,6 @@ public class ControlRate extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) .build(); - public static final PropertyDescriptor MAX_FF_PER_BATCH = new PropertyDescriptor.Builder() - .name("Max FlowFiles per processing batch") - .description("Maximum number of FlowFiles to accept per processing batch, even if Maximum Rate isn't reached.") - .displayName("Max FlowFiles per batch") - .required(false) - .addValidator(StandardValidators.INTEGER_VALIDATOR) - .expressionLanguageSupported(false) - .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -145,7 +140,6 @@ public class ControlRate extends AbstractProcessor { private volatile String rateControlAttribute = null; private volatile String maximumRateStr = null; private volatile String groupingAttributeName = null; - private volatile int maxFlowFilesPerBatch = 1; private volatile int timePeriodSeconds = 1; @Override @@ -156,7 +150,6 @@ protected void init(final ProcessorInitializationContext context) { properties.add(RATE_CONTROL_ATTRIBUTE_NAME); properties.add(TIME_PERIOD); properties.add(GROUPING_ATTRIBUTE_NAME); - properties.add(MAX_FF_PER_BATCH); this.properties = Collections.unmodifiableList(properties); final Set relationships = new HashSet<>(); @@ -215,7 +208,6 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String if (descriptor.equals(RATE_CONTROL_CRITERIA) || descriptor.equals(RATE_CONTROL_ATTRIBUTE_NAME) || descriptor.equals(GROUPING_ATTRIBUTE_NAME) - || descriptor.equals(MAX_FF_PER_BATCH) || descriptor.equals(TIME_PERIOD)) { // if the criteria that is being used to determine limits/throttles is changed, we must clear our throttle map. throttleMap.clear(); @@ -239,17 +231,12 @@ public void onScheduled(final ProcessContext context) { rateControlAttribute = context.getProperty(RATE_CONTROL_ATTRIBUTE_NAME).getValue(); maximumRateStr = context.getProperty(MAX_RATE).getValue().toUpperCase(); groupingAttributeName = context.getProperty(GROUPING_ATTRIBUTE_NAME).getValue(); - if (context.getProperty(MAX_FF_PER_BATCH).isSet()) { - maxFlowFilesPerBatch = context.getProperty(MAX_FF_PER_BATCH).asInteger(); - } else { - maxFlowFilesPerBatch = 1; - } timePeriodSeconds = context.getProperty(TIME_PERIOD).asTimePeriod(TimeUnit.SECONDS).intValue(); } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - List flowFiles = session.get(new ThrottleFilter(maxFlowFilesPerBatch)); + List flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH)); if (flowFiles.isEmpty()) { context.yield(); return; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java index de07fe32b530..050f8186540e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java @@ -24,6 +24,9 @@ import org.junit.Test; +import static org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH; +import static org.junit.Assert.assertEquals; + public class TestControlRate { @Test @@ -176,60 +179,30 @@ public void testBadAttributeRate() { } @Test - public void testBatchSizeDefaultOne() throws InterruptedException { + public void testBatchLimit() throws InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); runner.setProperty(ControlRate.MAX_RATE, "5555"); runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); - runner.enqueue("test data 0"); - runner.enqueue("test data 1"); - runner.enqueue("test data 2"); - runner.enqueue("test data 3"); - runner.enqueue("test data 4"); - runner.enqueue("test data 5"); - runner.enqueue("test data 6"); - runner.enqueue("test data 7"); - runner.enqueue("test data 8"); - runner.enqueue("test data 9"); + final int TEST_FILE_COUNT = 1500; - runner.run(4, false); + for (int i = 0; i < TEST_FILE_COUNT; i++) { + runner.enqueue("test data " + i); + } - // we've run 4 times and should have 4 files with 6 in the queue - runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4); - runner.assertTransferCount(ControlRate.REL_FAILURE, 0); - runner.assertQueueNotEmpty(); - runner.clearTransferState(); + runner.run(1, false); - runner.run(8, false); - runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 6); + // after 1 run should have MAX_FLOW_FILES_PER_BATCH files transferred and remainder of TEST_FILE_COUNT in queue + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, MAX_FLOW_FILES_PER_BATCH); runner.assertTransferCount(ControlRate.REL_FAILURE, 0); - runner.assertQueueEmpty(); - } - - @Test - public void testBatchSizeOneHundred() throws InterruptedException { - final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); - runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); - runner.setProperty(ControlRate.MAX_RATE, "5555"); - runner.setProperty(ControlRate.TIME_PERIOD, "1 sec"); - runner.setProperty(ControlRate.MAX_FF_PER_BATCH, "100"); - - runner.enqueue("test data 0"); - runner.enqueue("test data 1"); - runner.enqueue("test data 2"); - runner.enqueue("test data 3"); - runner.enqueue("test data 4"); - runner.enqueue("test data 5"); - runner.enqueue("test data 6"); - runner.enqueue("test data 7"); - runner.enqueue("test data 8"); - runner.enqueue("test data 9"); + runner.assertQueueNotEmpty(); + assertEquals(TEST_FILE_COUNT - MAX_FLOW_FILES_PER_BATCH, runner.getQueueSize().getObjectCount()); runner.run(1, false); - // we've run 1 time and should have all 10 files with 0 in the queue - runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 10); + // after 2 runs should have TEST_FILE_COUNT files transferred and 0 in queue + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, TEST_FILE_COUNT); runner.assertTransferCount(ControlRate.REL_FAILURE, 0); runner.assertQueueEmpty(); }