From e75efeeb572f96310a5c39d187f3de5efa39bec3 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 23 Nov 2016 11:09:42 -0500 Subject: [PATCH] NIFI-3091: Ensure that we set the appropriate size on FlowFiles when modifying them --- .../repository/StandardProcessSession.java | 3 +- .../TestStandardProcessSession.java | 59 +++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 002bac932ef7..80c917cebec3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -2580,7 +2580,8 @@ public FlowFile write(final FlowFile source, final StreamCallback writer) { cnfeThrown = true; throw cnfe; } finally { - this.bytesWritten += countingOut.getBytesWritten(); + writtenToFlowFile = countingOut.getBytesWritten(); + this.bytesWritten += writtenToFlowFile; this.bytesRead += countingIn.getBytesRead(); recursionSet.remove(source); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 8cc088d480e6..6f94994ef289 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -276,6 +276,65 @@ public void testWriteForChildThrowsIOExceptionThenRemove() throws IOException { assertEquals(0, numClaims); } + @Test + public void testModifyContentWithStreamCallbackHasCorrectSize() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile original = session.get(); + assertNotNull(original); + + FlowFile child = session.write(original, (in, out) -> out.write("hello".getBytes())); + session.transfer(child); + session.commit(); + + final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet()); + assertEquals(5, onQueue.getSize()); + } + + @Test + public void testModifyContentWithOutputStreamCallbackHasCorrectSize() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile original = session.get(); + assertNotNull(original); + + FlowFile child = session.write(original, out -> out.write("hello".getBytes())); + session.transfer(child); + session.commit(); + + final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet()); + assertEquals(5, onQueue.getSize()); + } + + @Test + public void testModifyContentWithAppendHasCorrectSize() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile original = session.get(); + assertNotNull(original); + + FlowFile child = session.append(original, out -> out.write("hello".getBytes())); + session.transfer(child); + session.commit(); + + final FlowFileRecord onQueue = flowFileQueue.poll(Collections.emptySet()); + assertEquals(5, onQueue.getSize()); + } + + + @Test public void testModifyContentThenRollback() throws IOException { assertEquals(0, contentRepo.getExistingClaims().size());