New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-11232 Fix buffer handling in ContentClaimInputStream #6996
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for submitting the contribution to address this issue @Chrzi!
One general question, given the important nature of the problem, have you evaluated implementing a unit test for this class to verify the intended behavior?
I have confirmed the behaviour using the unit test provided in the Issue, which I also used to reproduce the bug. At the moment I am having issues executing If that works I want to get the unit test into a state where it is mergeable. |
Thanks, adding the unit test to the PR would be very helpful. |
I tested that the new unit test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -81,14 +81,13 @@ public long getCurrentOffset() { | |||
|
|||
@Override | |||
public int read() throws IOException { | |||
int value = -1; | |||
int value; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this change, it looks like value
can be marked final
:
int value; | |
final int value; |
@@ -117,11 +115,10 @@ public int read(final byte[] b) throws IOException { | |||
|
|||
@Override | |||
public int read(final byte[] b, final int off, final int len) throws IOException { | |||
int count = -1; | |||
int count; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int count; | |
final int count; |
} catch (IOException ex) { | ||
throw new RuntimeException("Failed to read content claim!", ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of throwing a RuntimeException
, it looks like an UncheckedIOException
would be more appropriate.
As a general rule, exclamation points should be avoided in error messages, so see the following suggested wording:
} catch (IOException ex) { | |
throw new RuntimeException("Failed to read content claim!", ex); | |
} catch (IOException e) { | |
throw new UncheckedIOException("Failed to read repository Content Claim", e); |
* Resets to the last marked position. | ||
* | ||
* @see ContentClaimInputStream#mark(int) | ||
* @throws IOException only if no mark point has been set. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The IOException
can also be thrown from other operations such as resetting or closing the buffer or delegate stream, so recommend the following wording:
* @throws IOException only if no mark point has been set. | |
* @throws IOException Thrown when a mark position is not set or on other stream handling failures |
@@ -200,6 +218,10 @@ public void reset() throws IOException { | |||
|
|||
@Override | |||
public void close() throws IOException { | |||
if (bufferedIn != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor spacing issue:
if (bufferedIn != null) { | |
if (bufferedIn != null) { |
...mponents/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimInputStream.java
Show resolved
Hide resolved
@@ -121,7 +122,7 @@ public void testRereadEntireClaim() throws IOException { | |||
in.reset(); | |||
} | |||
|
|||
Mockito.verify(repo, Mockito.times(invocations + 1)).read(contentClaim); // Will call reset() 'invocations' times plus the initial read | |||
Mockito.verify(repo, Mockito.times( 1)).read(contentClaim); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The times()
call is not necessary since the default behavior of verify()
expects 1 invocation.
Mockito.verify(repo, Mockito.times( 1)).read(contentClaim); | |
Mockito.verify(repo).read(contentClaim); |
@@ -152,7 +153,7 @@ public void testMultipleResetCallsAfterMark() throws IOException { | |||
in.reset(); | |||
} | |||
|
|||
Mockito.verify(repo, Mockito.times(invocations + 1)).read(contentClaim); // Will call reset() 'invocations' times plus the initial read | |||
Mockito.verify(repo, Mockito.times( 1)).read(contentClaim); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mockito.verify(repo, Mockito.times( 1)).read(contentClaim); | |
Mockito.verify(repo).read(contentClaim); |
@@ -183,11 +184,88 @@ public void testRereadWithOffset() throws IOException { | |||
in.reset(); | |||
} | |||
|
|||
Mockito.verify(repo, Mockito.times(invocations + 1)).read(contentClaim); // Will call reset() 'invocations' times plus the initial read | |||
Mockito.verify(repo, Mockito.times( 1)).read(contentClaim); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mockito.verify(repo, Mockito.times( 1)).read(contentClaim); | |
Mockito.verify(repo).read(contentClaim); |
Aside from the deviations from the standard style that we tend to use in NiFi, which @exceptionfactory noted, I am a +1 |
Thanks for the fix @Chrzi ! |
This closes #6996 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This closes apache#6996 Signed-off-by: David Handermann <exceptionfactory@apache.org>
Summary
NIFI-11232
Under some circumstances the buffer is not recreated correctly in ContentClaimInputStream leading to a situation where data is read from the buffer and the buffer trying to read from a closed stream. More details in the issue.
One case where this can happen is in a custom processor using Apache Tika. Passing the InputStream created of a flowfile by the processSession to Tika exposes the bug, since it uses the mark and reset feature extensively.
This PR creates a new buffer when the stream is marked to buffer the readLimit (which is the primary cause why it was introduced in NIFI-10888). It also throws the buffer away during reset, if a reset within the buffer is not possible.
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000
NIFI-00000
Pull Request Formatting
main
branchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
mvn clean install -P contrib-check
Licensing
LICENSE
andNOTICE
filesDocumentation