diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java index 59e401dc94dc..1e6727d5951c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java @@ -782,7 +782,7 @@ private void processTailFile(final ProcessContext context, final ProcessSession final FileChannel fileReader = reader; final AtomicLong positionHolder = new AtomicLong(position); - final Boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean(); + final boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean(); AtomicReference abort = new AtomicReference<>(); flowFile = session.write(flowFile, new OutputStreamCallback() { @@ -857,6 +857,10 @@ public void process(final OutputStream rawOut) throws IOException { persistState(tfo, session, context); } + private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum, Boolean reReadOnNul) throws IOException { + return readLines(reader, buffer, out, checksum, reReadOnNul, false); + } + /** * Read new lines from the given FileChannel, copying it to the given Output * Stream. The Checksum is used in order to later determine whether or not @@ -871,11 +875,14 @@ public void process(final OutputStream rawOut) throws IOException { * temporary values and a NulCharacterEncounteredException is thrown. * This allows the caller to re-attempt a read from the same position. * If set to 'false' these characters will be treated as regular content. + * @param readFully If set to 'true' the last chunk of bytes after the last whole line + * will be also written to the OutputStream * * @return The new position after the lines have been read * @throws java.io.IOException if an I/O error occurs. */ - private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum, Boolean reReadOnNul) throws IOException { + private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum, + Boolean reReadOnNul, final boolean readFully) throws IOException { getLogger().debug("Reading lines starting at position {}", new Object[]{reader.position()}); try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { @@ -887,7 +894,7 @@ private long readLines(final FileChannel reader, final ByteBuffer buffer, final boolean seenCR = false; buffer.clear(); - while (((num = reader.read(buffer)) != -1)) { + while ((num = reader.read(buffer)) != -1) { buffer.flip(); for (int i = 0; i < num; i++) { @@ -897,14 +904,7 @@ private long readLines(final FileChannel reader, final ByteBuffer buffer, final case '\n': { baos.write(ch); seenCR = false; - baos.writeTo(out); - final byte[] baosBuffer = baos.toByteArray(); - checksum.update(baosBuffer, 0, baos.size()); - if (getLogger().isTraceEnabled()) { - getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()}); - } - - baos.reset(); + flushByteArrayOutputStream(baos, out, checksum); rePos = pos + i + 1; linesRead++; break; @@ -922,15 +922,8 @@ private long readLines(final FileChannel reader, final ByteBuffer buffer, final default: { if (seenCR) { seenCR = false; - baos.writeTo(out); - final byte[] baosBuffer = baos.toByteArray(); - checksum.update(baosBuffer, 0, baos.size()); - if (getLogger().isTraceEnabled()) { - getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()}); - } - + flushByteArrayOutputStream(baos, out, checksum); linesRead++; - baos.reset(); baos.write(ch); rePos = pos + i; } else { @@ -943,6 +936,11 @@ private long readLines(final FileChannel reader, final ByteBuffer buffer, final pos = reader.position(); } + if (readFully) { + flushByteArrayOutputStream(baos, out, checksum); + rePos = reader.position(); + } + if (rePos < reader.position()) { getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[]{linesRead, pos, rePos}); reader.position(rePos); // Ensure we can re-read if necessary @@ -952,6 +950,16 @@ private long readLines(final FileChannel reader, final ByteBuffer buffer, final } } + private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, final OutputStream out, final Checksum checksum) throws IOException { + baos.writeTo(out); + final byte[] baosBuffer = baos.toByteArray(); + checksum.update(baosBuffer, 0, baos.size()); + if (getLogger().isTraceEnabled()) { + getLogger().trace("Checksum updated to {}", checksum.getValue()); + } + baos.reset(); + } + /** * Returns a list of all Files that match the following criteria: * @@ -1127,7 +1135,7 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe // a file when we stopped running, then that file that we were reading from should be the first file in this list, // assuming that the file still exists on the file system. final List rolledOffFiles = getRolledOffFiles(context, timestamp, tailFile); - return recoverRolledFiles(context, session, tailFile, rolledOffFiles, expectedChecksum, timestamp, position); + return recoverRolledFiles(context, session, tailFile, rolledOffFiles, expectedChecksum, position); } catch (final IOException e) { getLogger().error("Failed to recover files that have rolled over due to {}", new Object[]{e}); return false; @@ -1147,9 +1155,6 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe * FlowFile creation and content. * @param expectedChecksum the checksum value that is expected for the * oldest file from offset 0 through <position>. - * @param timestamp the latest Last Modfiied Timestamp that has been - * consumed. Any data that was written before this data will not be - * ingested. * @param position the byte offset in the file being tailed, where tailing * last left off. * @@ -1157,7 +1162,7 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe * otherwise */ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSession session, final String tailFile, final List rolledOffFiles, final Long expectedChecksum, - final long timestamp, final long position) { + final long position) { try { getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[]{rolledOffFiles.size()}); TailFileObject tfo = states.get(tailFile); @@ -1172,8 +1177,9 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe final File firstFile = rolledOffFiles.get(0); final long startNanos = System.nanoTime(); + final Boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean(); if (position > 0) { - try (final InputStream fis = new FileInputStream(firstFile); + try (final FileInputStream fis = new FileInputStream(firstFile); final CheckedInputStream in = new CheckedInputStream(fis, new CRC32())) { StreamUtils.copy(in, new NullOutputStream(), position); @@ -1184,7 +1190,15 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe // This is the same file that we were reading when we shutdown. Start reading from this point on. rolledOffFiles.remove(0); FlowFile flowFile = session.create(); - flowFile = session.importFrom(in, flowFile); + + try { + flowFile = session.write(flowFile, + out -> readLines(fis.getChannel(), ByteBuffer.allocate(65536), out, new CRC32(), reReadOnNul, true)); + } catch (NulCharacterEncounteredException ncee) { + rolledOffFiles.add(0, firstFile); + session.remove(flowFile); + throw ncee; + } if (flowFile.getSize() == 0L) { session.remove(flowFile); // use a timestamp of lastModified() + 1 so that we do not ingest this file again. diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java index 808d0adf8e08..ebb752c7e684 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java @@ -205,6 +205,47 @@ private void testNULContent(String content1, Integer reposition, String content2 assertEquals(expected, lines); } + @Test + public void testNULContentWhenRolledOver() throws IOException { + runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*"); + runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE.getValue()); + runner.setProperty(TailFile.REREAD_ON_NUL, "true"); + + + // first line fully written, second partially + raf.write("a\nb".getBytes()); + // read the first line + runner.run(1, false, true); + + // zero bytes and rollover occurs between two runs + raf.write(new byte[] { 0, 0 }); + final long originalLastMod = file.lastModified(); + final File rolledOverFile = rollover(0); + // this should not pick up the zeros, still one file in the success relationship + runner.run(1, false, false); + runner.assertTransferCount(TailFile.REL_SUCCESS, 1); + + // nuls replaced + try (final RandomAccessFile rolledOverRAF = new RandomAccessFile(rolledOverFile, "rw")) { + rolledOverRAF.seek(3); + rolledOverRAF.write("c\n".getBytes()); + } + // lastmod reset to the TailFile not to consider this as an updated file (as NFS "nul-replacement" doesn't touch the lastmod timestamp) + rolledOverFile.setLastModified(originalLastMod); + runner.run(1, false, false); + + raf.write("d\n".getBytes()); + + runner.run(1, true, false); + + runner.assertTransferCount(TailFile.REL_SUCCESS, 3); + List flowFiles = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS); + List lines = flowFiles.stream().map(MockFlowFile::toByteArray).map(String::new).collect(Collectors.toList()); + assertEquals(Arrays.asList("a\n", "bc\n", "d\n"), lines); + } + + + @Test public void testRotateMultipleBeforeConsuming() throws IOException { runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*"); @@ -261,10 +302,12 @@ public void testStartPositionCurrentTime() throws IOException { out.assertContentEquals("6\n"); } - private void rollover(final int index) throws IOException { + private File rollover(final int index) throws IOException { raf.close(); - file.renameTo(new File(file.getParentFile(), file.getName() + "." + index + ".log")); + final File rolledOverFile = new File(file.getParentFile(), file.getName() + "." + index + ".log"); + file.renameTo(rolledOverFile); raf = new RandomAccessFile(file, "rw"); + return rolledOverFile; }