From c5902da88c413543b3bbe04cbb8b1d92b90382a5 Mon Sep 17 00:00:00 2001 From: patricker Date: Tue, 8 May 2018 15:52:46 +0800 Subject: [PATCH] NiFI-5168 --- .../nifi/processors/standard/ReplaceText.java | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java index 6cc91978017e..de17213f5846 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java @@ -45,7 +45,6 @@ import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.StreamCallback; -import org.apache.nifi.processor.util.FlowFileFilters; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.NLKBufferedReader; import org.apache.nifi.stream.io.StreamUtils; @@ -236,8 +235,8 @@ protected Collection customValidate(ValidationContext validati @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - final List flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 100)); - if (flowFiles.isEmpty()) { + FlowFile flowFile = session.get(); + if (flowFile == null) { return; } @@ -283,22 +282,20 @@ public void onTrigger(final ProcessContext context, final ProcessSession session throw new AssertionError(); } - for (FlowFile flowFile : flowFiles) { - if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { - if (flowFile.getSize() > maxBufferSize && replacementStrategyExecutor.isAllDataBufferedForEntireText()) { - session.transfer(flowFile, REL_FAILURE); - continue; - } + if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) { + if (flowFile.getSize() > maxBufferSize && replacementStrategyExecutor.isAllDataBufferedForEntireText()) { + session.transfer(flowFile, REL_FAILURE); + return; } + } - final StopWatch stopWatch = new StopWatch(true); + final StopWatch stopWatch = new StopWatch(true); - flowFile = replacementStrategyExecutor.replace(flowFile, session, context, evaluateMode, charset, maxBufferSize); + flowFile = replacementStrategyExecutor.replace(flowFile, session, context, evaluateMode, charset, maxBufferSize); - logger.info("Transferred {} to 'success'", new Object[] {flowFile}); - session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - session.transfer(flowFile, REL_SUCCESS); - } + logger.info("Transferred {} to 'success'", new Object[] {flowFile}); + session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); }