Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public class ControlRate extends AbstractProcessor {
public static final AllowableValue ATTRIBUTE_RATE_VALUE = new AllowableValue(ATTRIBUTE_RATE, ATTRIBUTE_RATE,
"Rate is controlled by accumulating the value of a specified attribute that is transferred per time duration");

// based on testing to balance commits and 10,000 FF swap limit
public static final int MAX_FLOW_FILES_PER_BATCH = 1000;

public static final PropertyDescriptor RATE_CONTROL_CRITERIA = new PropertyDescriptor.Builder()
.name("Rate Control Criteria")
.description("Indicates the criteria that is used to control the throughput rate. Changing this value resets the rate counters.")
Expand Down Expand Up @@ -233,7 +236,7 @@ public void onScheduled(final ProcessContext context) {

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
List<FlowFile> flowFiles = session.get(new ThrottleFilter());
List<FlowFile> flowFiles = session.get(new ThrottleFilter(MAX_FLOW_FILES_PER_BATCH));
if (flowFiles.isEmpty()) {
context.yield();
return;
Expand Down Expand Up @@ -381,6 +384,13 @@ public boolean tryAdd(final long value) {

private class ThrottleFilter implements FlowFileFilter {

private final int flowFilesPerBatch;
private int flowFilesInBatch = 0;

ThrottleFilter(final int maxFFPerBatch) {
flowFilesPerBatch = maxFFPerBatch;
}

@Override
public FlowFileFilterResult filter(FlowFile flowFile) {
long accrual = getFlowFileAccrual(flowFile);
Expand Down Expand Up @@ -409,7 +419,13 @@ public FlowFileFilterResult filter(FlowFile flowFile) {
throttle.lock();
try {
if (throttle.tryAdd(accrual)) {
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
flowFilesInBatch += 1;
if (flowFilesInBatch>= flowFilesPerBatch) {
flowFilesInBatch = 0;
return FlowFileFilterResult.ACCEPT_AND_TERMINATE;
} else {
return FlowFileFilterResult.ACCEPT_AND_CONTINUE;
}
}
} finally {
throttle.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

import org.junit.Test;

import static org.apache.nifi.processors.standard.ControlRate.MAX_FLOW_FILES_PER_BATCH;
import static org.junit.Assert.assertEquals;

public class TestControlRate {

@Test
Expand Down Expand Up @@ -175,6 +178,35 @@ public void testBadAttributeRate() {
runner.assertQueueEmpty();
}

@Test
public void testBatchLimit() throws InterruptedException {
final TestRunner runner = TestRunners.newTestRunner(new ControlRate());
runner.setProperty(ControlRate.RATE_CONTROL_CRITERIA, ControlRate.FLOWFILE_RATE);
runner.setProperty(ControlRate.MAX_RATE, "5555");
runner.setProperty(ControlRate.TIME_PERIOD, "1 sec");

final int TEST_FILE_COUNT = 1500;

for (int i = 0; i < TEST_FILE_COUNT; i++) {
runner.enqueue("test data " + i);
}

runner.run(1, false);

// after 1 run should have MAX_FLOW_FILES_PER_BATCH files transferred and remainder of TEST_FILE_COUNT in queue
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, MAX_FLOW_FILES_PER_BATCH);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueNotEmpty();
assertEquals(TEST_FILE_COUNT - MAX_FLOW_FILES_PER_BATCH, runner.getQueueSize().getObjectCount());

runner.run(1, false);

// after 2 runs should have TEST_FILE_COUNT files transferred and 0 in queue
runner.assertAllFlowFilesTransferred(ControlRate.REL_SUCCESS, TEST_FILE_COUNT);
runner.assertTransferCount(ControlRate.REL_FAILURE, 0);
runner.assertQueueEmpty();
}

private void createFlowFile(final TestRunner runner, final int value) {
final Map<String, String> attributeMap = new HashMap<>();
attributeMap.put("count", String.valueOf(value));
Expand Down