-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-8183 TailFile intermittently creates records with ascii NULL after rollover #4792
Conversation
a8ec6c4
to
643eb6d
Compare
try (final InputStream fis = new FileInputStream(firstFile); | ||
final CheckedInputStream in = new CheckedInputStream(fis, new CRC32())) { | ||
StreamUtils.copy(in, new NullOutputStream(), position); | ||
try (final FileInputStream fis = new FileInputStream(firstFile)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need readLines
here because we already processed to content up to this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tpalfy , I have updated the change according to your comment. I also extracted a couple of repeating lines from the readLines()
method.
@@ -1184,7 +1191,15 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe | |||
// This is the same file that we were reading when we shutdown. Start reading from this point on. | |||
rolledOffFiles.remove(0); | |||
FlowFile flowFile = session.create(); | |||
flowFile = session.importFrom(in, flowFile); | |||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you extract this nested try block to a separate method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Lehel44 for the suggestion.
Although I agree that the method gets more and more complicated but I tried to extract these lines to a separate method and unless I do a bigger refactor it doesn't really simplify things.
My biggest concern with it that the new method needs 5 parameters (ProcessContext context, ProcessSession session, List<File> rolledOffFiles, File file, FileInputStream fis
) and there's no good way to check whether the FileInputStream
belongs to the given File
.
It'd be definitely worth to do a bigger refactor on this method, but I'd suggest doing it in a separate PR.
@@ -1184,7 +1191,15 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe | |||
// This is the same file that we were reading when we shutdown. Start reading from this point on. | |||
rolledOffFiles.remove(0); | |||
FlowFile flowFile = session.create(); | |||
flowFile = session.importFrom(in, flowFile); | |||
try { | |||
ByteArrayOutputStream out = new ByteArrayOutputStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This approach is going to buffer everything that gets read into the ByteArrayOutputStream. This could potentially be a huge amount of data. We want to be sure that we don't buffer that up. Instead, I would recommend an approach like:
try (final OutputStream out = session.write(flowFile)) {
readLines(fis.getChannel(), ByteBuffer.allocate(65536), out, new CRC32(), reReadOnNul, true);
}
This allows us to write directly to the content repository instead of buffering the data in a ByteArrayOutputStream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, wow, I wasn't aware of this version of session.write
. I was also a bit concerned about the buffering, thanks, will update the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed a new commit: I got rid of the BAOS, but used session.write(FlowFile, OutputStreamCallback)
because we need the updated FF later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @adenes all looks good to me except for the buffering of data into ByteArrayOutputStream (commented more inline). Otherwise I'm a +1. Thanks!
…er rollover Remove readLines() usage when reading already processed chunk
a18a272
to
4a9bc9a
Compare
…er rollover Remove buffering, use write(FlowFile, OutputStreamCallback) instead
4a9bc9a
to
2bbbd4b
Compare
Thanks for the update @adenes LGTM +1 will merge to |
…er rollover This closes apache#4792. Signed-off-by: Mark Payne <markap14@hotmail.com>
…er rollover This closes apache#4792. Signed-off-by: Mark Payne <markap14@hotmail.com>
TailFile processor configured to an NFS mount reads unexpected NUL characters after a file had been rolled over.
Fixed by adding the NUL check to the part where the rolled over file is being read.
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically
main
)?Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not
squash
or use--force
when pushing to allow for clean monitoring of changes.For code changes:
mvn -Pcontrib-check clean install
at the rootnifi
folder?LICENSE
file, including the mainLICENSE
file undernifi-assembly
?NOTICE
file, including the mainNOTICE
file found undernifi-assembly
?.displayName
in addition to .name (programmatic access) for each of the new properties?For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.