From 452e9493784cfc8fa08fe7f7b6f917e18e68ec2a Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 28 Mar 2018 16:06:12 -0400 Subject: [PATCH] NIFI-5030: If ControlRate encounters a FlowFile that cannot be transferred, it should continue processing other FlowFiles that have different group attribute values --- .../nifi/processors/standard/ControlRate.java | 17 ++++++-- .../processors/standard/TestControlRate.java | 40 +++++++++++++++++-- 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java index c73f86696ac5..cf2364adcc5b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ControlRate.java @@ -394,18 +394,19 @@ private class ThrottleFilter implements FlowFileFilter { @Override public FlowFileFilterResult filter(FlowFile flowFile) { long accrual = getFlowFileAccrual(flowFile); - if(accrual < 0){ + if (accrual < 0) { // this FlowFile is invalid for this configuration so let the processor deal with it return FlowFileFilterResult.ACCEPT_AND_TERMINATE; } - String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile - .getAttribute(groupingAttributeName); + String groupName = (groupingAttributeName == null) ? DEFAULT_GROUP_ATTRIBUTE : flowFile.getAttribute(groupingAttributeName); + // the flow file may not have the required attribute: in this case it is considered part // of the DEFAULT_GROUP_ATTRIBUTE if (groupName == null) { groupName = DEFAULT_GROUP_ATTRIBUTE; } + Throttle throttle = throttleMap.get(groupName); if (throttle == null) { throttle = new Throttle(timePeriodSeconds, TimeUnit.SECONDS, getLogger()); @@ -436,7 +437,15 @@ public FlowFileFilterResult filter(FlowFile flowFile) { throttle.unlock(); } - return FlowFileFilterResult.REJECT_AND_TERMINATE; + // If we are not using a grouping attribute, then no FlowFile will be able to continue on. So we can + // just TERMINATE the iteration over FlowFiles. + // However, if we are using a grouping attribute, then another FlowFile in the queue may be able to proceed, + // so we want to continue our iteration. + if (groupingAttributeName == null) { + return FlowFileFilterResult.REJECT_AND_TERMINATE; + } + + return FlowFileFilterResult.REJECT_AND_CONTINUE; } } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java index 0260276b49b0..dea87d97cfe6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestControlRate.java @@ -16,19 +16,51 @@ */ package org.apache.nifi.processors.standard; +import static org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH; +import static org.junit.Assert.assertEquals; + +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; - import org.junit.Test; -import static org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH; -import static org.junit.Assert.assertEquals; - public class TestControlRate { + @Test + public void testLimitExceededThenOtherLimitNotExceeded() { + // If we have flowfiles queued that have different values for the "Rate Controlled Attribute" + // and we encounter a FlowFile whose rate should be throttled, we should continue pulling other flowfiles + // whose rate does not need to be throttled. + final TestRunner runner = TestRunners.newTestRunner(new ControlRate()); + runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE); + runner.setProperty(ControlRate.MAX_RATE, "3"); + runner.setProperty(ControlRate.TIME_PERIOD, "1 min"); + runner.setProperty(ControlRate.GROUPING_ATTRIBUTE_NAME, "group"); + + final Map group1 = Collections.singletonMap("group", "1"); + final Map group2 = Collections.singletonMap("group", "2"); + + for (int i = 0; i < 5; i++) { + runner.enqueue("test data", group1); + } + + runner.enqueue("test data", group2); + + // Run several times, just to allow the processor to terminate the first poll if it wishes to + runner.run(); + + runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, 4); + + final List output = runner.getFlowFilesForRelationship(ControlRate.REL_SUCCESS); + assertEquals(3L, output.stream().filter(ff -> ff.getAttribute("group").equals("1")).count()); + assertEquals(1L, output.stream().filter(ff -> ff.getAttribute("group").equals("2")).count()); + } + @Test public void testFileCountRate() throws InterruptedException { final TestRunner runner = TestRunners.newTestRunner(new ControlRate());