Skip to content

Commit

Permalink
NIFI-8183 TailFile intermittently creates records with ascii NULL aft…
Browse files Browse the repository at this point in the history
…er rollover

This closes apache#4792.

Signed-off-by: Mark Payne <markap14@hotmail.com>
  • Loading branch information
adenes authored and driesva committed Mar 19, 2021
1 parent 6562355 commit dc02d63
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ private void processTailFile(final ProcessContext context, final ProcessSession

final FileChannel fileReader = reader;
final AtomicLong positionHolder = new AtomicLong(position);
final Boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean();
final boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean();

AtomicReference<NulCharacterEncounteredException> abort = new AtomicReference<>();
flowFile = session.write(flowFile, new OutputStreamCallback() {
Expand Down Expand Up @@ -857,6 +857,10 @@ public void process(final OutputStream rawOut) throws IOException {
persistState(tfo, session, context);
}

private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum, Boolean reReadOnNul) throws IOException {
return readLines(reader, buffer, out, checksum, reReadOnNul, false);
}

/**
* Read new lines from the given FileChannel, copying it to the given Output
* Stream. The Checksum is used in order to later determine whether or not
Expand All @@ -871,11 +875,14 @@ public void process(final OutputStream rawOut) throws IOException {
* temporary values and a NulCharacterEncounteredException is thrown.
* This allows the caller to re-attempt a read from the same position.
* If set to 'false' these characters will be treated as regular content.
* @param readFully If set to 'true' the last chunk of bytes after the last whole line
* will be also written to the OutputStream
*
* @return The new position after the lines have been read
* @throws java.io.IOException if an I/O error occurs.
*/
private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum, Boolean reReadOnNul) throws IOException {
private long readLines(final FileChannel reader, final ByteBuffer buffer, final OutputStream out, final Checksum checksum,
Boolean reReadOnNul, final boolean readFully) throws IOException {
getLogger().debug("Reading lines starting at position {}", new Object[]{reader.position()});

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
Expand All @@ -887,7 +894,7 @@ private long readLines(final FileChannel reader, final ByteBuffer buffer, final
boolean seenCR = false;
buffer.clear();

while (((num = reader.read(buffer)) != -1)) {
while ((num = reader.read(buffer)) != -1) {
buffer.flip();

for (int i = 0; i < num; i++) {
Expand All @@ -897,14 +904,7 @@ private long readLines(final FileChannel reader, final ByteBuffer buffer, final
case '\n': {
baos.write(ch);
seenCR = false;
baos.writeTo(out);
final byte[] baosBuffer = baos.toByteArray();
checksum.update(baosBuffer, 0, baos.size());
if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
}

baos.reset();
flushByteArrayOutputStream(baos, out, checksum);
rePos = pos + i + 1;
linesRead++;
break;
Expand All @@ -922,15 +922,8 @@ private long readLines(final FileChannel reader, final ByteBuffer buffer, final
default: {
if (seenCR) {
seenCR = false;
baos.writeTo(out);
final byte[] baosBuffer = baos.toByteArray();
checksum.update(baosBuffer, 0, baos.size());
if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", new Object[]{checksum.getValue()});
}

flushByteArrayOutputStream(baos, out, checksum);
linesRead++;
baos.reset();
baos.write(ch);
rePos = pos + i;
} else {
Expand All @@ -943,6 +936,11 @@ private long readLines(final FileChannel reader, final ByteBuffer buffer, final
pos = reader.position();
}

if (readFully) {
flushByteArrayOutputStream(baos, out, checksum);
rePos = reader.position();
}

if (rePos < reader.position()) {
getLogger().debug("Read {} lines; repositioning reader from {} to {}", new Object[]{linesRead, pos, rePos});
reader.position(rePos); // Ensure we can re-read if necessary
Expand All @@ -952,6 +950,16 @@ private long readLines(final FileChannel reader, final ByteBuffer buffer, final
}
}

private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, final OutputStream out, final Checksum checksum) throws IOException {
baos.writeTo(out);
final byte[] baosBuffer = baos.toByteArray();
checksum.update(baosBuffer, 0, baos.size());
if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", checksum.getValue());
}
baos.reset();
}

/**
* Returns a list of all Files that match the following criteria:
*
Expand Down Expand Up @@ -1127,7 +1135,7 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe
// a file when we stopped running, then that file that we were reading from should be the first file in this list,
// assuming that the file still exists on the file system.
final List<File> rolledOffFiles = getRolledOffFiles(context, timestamp, tailFile);
return recoverRolledFiles(context, session, tailFile, rolledOffFiles, expectedChecksum, timestamp, position);
return recoverRolledFiles(context, session, tailFile, rolledOffFiles, expectedChecksum, position);
} catch (final IOException e) {
getLogger().error("Failed to recover files that have rolled over due to {}", new Object[]{e});
return false;
Expand All @@ -1147,17 +1155,14 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe
* FlowFile creation and content.
* @param expectedChecksum the checksum value that is expected for the
* oldest file from offset 0 through &lt;position&gt;.
* @param timestamp the latest Last Modfiied Timestamp that has been
* consumed. Any data that was written before this data will not be
* ingested.
* @param position the byte offset in the file being tailed, where tailing
* last left off.
*
* @return <code>true</code> if the file being tailed has rolled over, false
* otherwise
*/
private boolean recoverRolledFiles(final ProcessContext context, final ProcessSession session, final String tailFile, final List<File> rolledOffFiles, final Long expectedChecksum,
final long timestamp, final long position) {
final long position) {
try {
getLogger().debug("Recovering Rolled Off Files; total number of files rolled off = {}", new Object[]{rolledOffFiles.size()});
TailFileObject tfo = states.get(tailFile);
Expand All @@ -1172,8 +1177,9 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe
final File firstFile = rolledOffFiles.get(0);

final long startNanos = System.nanoTime();
final Boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean();
if (position > 0) {
try (final InputStream fis = new FileInputStream(firstFile);
try (final FileInputStream fis = new FileInputStream(firstFile);
final CheckedInputStream in = new CheckedInputStream(fis, new CRC32())) {
StreamUtils.copy(in, new NullOutputStream(), position);

Expand All @@ -1184,7 +1190,15 @@ private boolean recoverRolledFiles(final ProcessContext context, final ProcessSe
// This is the same file that we were reading when we shutdown. Start reading from this point on.
rolledOffFiles.remove(0);
FlowFile flowFile = session.create();
flowFile = session.importFrom(in, flowFile);

try {
flowFile = session.write(flowFile,
out -> readLines(fis.getChannel(), ByteBuffer.allocate(65536), out, new CRC32(), reReadOnNul, true));
} catch (NulCharacterEncounteredException ncee) {
rolledOffFiles.add(0, firstFile);
session.remove(flowFile);
throw ncee;
}
if (flowFile.getSize() == 0L) {
session.remove(flowFile);
// use a timestamp of lastModified() + 1 so that we do not ingest this file again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,47 @@ private void testNULContent(String content1, Integer reposition, String content2
assertEquals(expected, lines);
}

@Test
public void testNULContentWhenRolledOver() throws IOException {
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
runner.setProperty(TailFile.START_POSITION, TailFile.START_CURRENT_FILE.getValue());
runner.setProperty(TailFile.REREAD_ON_NUL, "true");


// first line fully written, second partially
raf.write("a\nb".getBytes());
// read the first line
runner.run(1, false, true);

// zero bytes and rollover occurs between two runs
raf.write(new byte[] { 0, 0 });
final long originalLastMod = file.lastModified();
final File rolledOverFile = rollover(0);
// this should not pick up the zeros, still one file in the success relationship
runner.run(1, false, false);
runner.assertTransferCount(TailFile.REL_SUCCESS, 1);

// nuls replaced
try (final RandomAccessFile rolledOverRAF = new RandomAccessFile(rolledOverFile, "rw")) {
rolledOverRAF.seek(3);
rolledOverRAF.write("c\n".getBytes());
}
// lastmod reset to the TailFile not to consider this as an updated file (as NFS "nul-replacement" doesn't touch the lastmod timestamp)
rolledOverFile.setLastModified(originalLastMod);
runner.run(1, false, false);

raf.write("d\n".getBytes());

runner.run(1, true, false);

runner.assertTransferCount(TailFile.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS);
List<String> lines = flowFiles.stream().map(MockFlowFile::toByteArray).map(String::new).collect(Collectors.toList());
assertEquals(Arrays.asList("a\n", "bc\n", "d\n"), lines);
}



@Test
public void testRotateMultipleBeforeConsuming() throws IOException {
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
Expand Down Expand Up @@ -261,10 +302,12 @@ public void testStartPositionCurrentTime() throws IOException {
out.assertContentEquals("6\n");
}

private void rollover(final int index) throws IOException {
private File rollover(final int index) throws IOException {
raf.close();
file.renameTo(new File(file.getParentFile(), file.getName() + "." + index + ".log"));
final File rolledOverFile = new File(file.getParentFile(), file.getName() + "." + index + ".log");
file.renameTo(rolledOverFile);
raf = new RandomAccessFile(file, "rw");
return rolledOverFile;
}


Expand Down

0 comments on commit dc02d63

Please sign in to comment.