From 40504f82bb7b2ebf3678c6ffa8bba8baba401a3b Mon Sep 17 00:00:00 2001 From: Mark Bean Date: Fri, 16 Mar 2018 19:43:29 -0400 Subject: [PATCH] NIFI-4658 set Maximum Number of Entries to required and allow FlowFiles having fragment.count greater than Max Entries property --- .../apache/nifi/processor/util/bin/Bin.java | 14 ++++---- .../nifi/processor/util/bin/BinFiles.java | 5 +-- .../nifi/processor/util/bin/BinManager.java | 4 +++ .../processors/standard/TestMergeContent.java | 34 ++++++++++++++++--- 4 files changed, 44 insertions(+), 13 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java index b427e06f23f8..f95c4702f117 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/Bin.java @@ -48,10 +48,10 @@ public class Bin { private volatile int maximumEntries = Integer.MAX_VALUE; private final String fileCountAttribute; - final List binContents = new ArrayList<>(); + private final List binContents = new ArrayList<>(); private final Set binIndexSet = new HashSet<>(); - long size; - int successiveFailedOfferings = 0; + private long size; + private int successiveFailedOfferings = 0; /** * Constructs a new bin @@ -141,11 +141,11 @@ public boolean offer(final FlowFile flowFile, final ProcessSession session) { if (fileCountAttribute != null) { final String countValue = flowFile.getAttribute(fileCountAttribute); final Integer count = toInteger(countValue); - if (count != null) { - int currentMaxEntries = this.maximumEntries; - this.maximumEntries = Math.min(count, currentMaxEntries); - this.minimumEntries = currentMaxEntries; + if (count == null) { + return false; } + this.maximumEntries = count; + this.minimumEntries = count; final String index = flowFile.getAttribute(FRAGMENT_INDEX_ATTRIBUTE); if (index == null || index.isEmpty() || !binIndexSet.add(index)) { diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java index 643aae415775..975fe0fc3628 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java @@ -72,9 +72,9 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor { .build(); public static final PropertyDescriptor MAX_ENTRIES = new PropertyDescriptor.Builder() .name("Maximum Number of Entries") - .description("The maximum number of files to include in a bundle. If not specified, there is no maximum.") + .description("The maximum number of files to include in a bundle") .defaultValue("1000") - .required(false) + .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); @@ -261,6 +261,7 @@ private int binFlowFiles(final ProcessContext context, final ProcessSessionFacto } final ProcessSession session = sessionFactory.createSession(); + final List flowFiles = session.get(1000); if (flowFiles.isEmpty()) { break; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java index e6cec7858f9d..60c29665d26d 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinManager.java @@ -76,6 +76,10 @@ public void setFileCountAttribute(final String fileCountAttribute) { this.fileCountAttribute.set(fileCountAttribute); } + public String getFileCountAttribute() { + return fileCountAttribute.get(); + } + public void setMinimumEntries(final int minimumEntries) { this.minEntries.set(minimumEntries); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java index af448d640e6b..b3f08c4822bc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java @@ -87,6 +87,9 @@ public void testFlowFileLargerThanBin() { runner.assertTransferCount(MergeContent.REL_MERGED, 1); runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + final MockFlowFile bundle = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + assertEquals(1024 * 6, bundle.getSize()); + // Queue should not be empty because the first FlowFile will be transferred back to the input queue // when we run out @OnStopped logic, since it won't be transferred to any bin. runner.assertQueueNotEmpty(); @@ -886,6 +889,33 @@ public void testDefragmentDuplicateFragement() throws IOException, InterruptedEx runner.assertTransferCount(MergeContent.REL_MERGED, 0); } + @Test + public void testDefragmentWithTooManyFragements() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); + runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); + runner.setProperty(MergeContent.MAX_ENTRIES, "3"); + + final Map attributes = new HashMap<>(); + attributes.put(MergeContent.FRAGMENT_ID_ATTRIBUTE, "1"); + attributes.put(MergeContent.FRAGMENT_COUNT_ATTRIBUTE, "4"); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "1"); + + runner.enqueue("A Man ".getBytes("UTF-8"), attributes); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "2"); + runner.enqueue("A Plan ".getBytes("UTF-8"), attributes); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "3"); + runner.enqueue("A Canal ".getBytes("UTF-8"), attributes); + attributes.put(MergeContent.FRAGMENT_INDEX_ATTRIBUTE, "4"); + runner.enqueue("Panama".getBytes("UTF-8"), attributes); + + runner.run(); + + runner.assertTransferCount(MergeContent.REL_FAILURE, 0); + runner.assertTransferCount(MergeContent.REL_MERGED, 1); + final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); + assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8")); + } + @Test public void testDefragmentWithTooFewFragments() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); @@ -988,13 +1018,11 @@ public void testDefragmentOldStyleAttributes() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new MergeContent()); runner.setProperty(MergeContent.MERGE_STRATEGY, MergeContent.MERGE_STRATEGY_DEFRAGMENT); runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min"); - final Map attributes = new HashMap<>(); attributes.put("segment.identifier", "1"); attributes.put("segment.count", "4"); attributes.put("segment.index", "1"); attributes.put("segment.original.filename", "originalfilename"); - runner.enqueue("A Man ".getBytes("UTF-8"), attributes); attributes.put("segment.index", "2"); runner.enqueue("A Plan ".getBytes("UTF-8"), attributes); @@ -1002,9 +1030,7 @@ public void testDefragmentOldStyleAttributes() throws IOException { runner.enqueue("A Canal ".getBytes("UTF-8"), attributes); attributes.put("segment.index", "4"); runner.enqueue("Panama".getBytes("UTF-8"), attributes); - runner.run(); - runner.assertTransferCount(MergeContent.REL_MERGED, 1); final MockFlowFile assembled = runner.getFlowFilesForRelationship(MergeContent.REL_MERGED).get(0); assembled.assertContentEquals("A Man A Plan A Canal Panama".getBytes("UTF-8"));