From dab33e1e47214fa8797801126ca8e46b23693cd0 Mon Sep 17 00:00:00 2001 From: Joe Skora Date: Thu, 2 Feb 2017 19:11:05 +0000 Subject: [PATCH 1/2] NIFI-3055 StandardRecordWriter Can Throw UTFDataFormatException (0.x) * Updated StandardRecordWriter to consider the encoding behavior of java.io.DataOutputStream.writeUTF() and truncate string values such that the UTF representation will not be longer than that DataOutputStream's 64K UTF format limit. * Add test to confirm handling of large UTF strings. --- .../nifi/provenance/StandardRecordWriter.java | 62 ++++++++++++++++--- .../TestStandardRecordReaderWriter.java | 44 +++++++++++++ 2 files changed, 96 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java index a5c121abdaf2..7fa1ac27f6a2 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java @@ -20,6 +20,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.io.UTFDataFormatException; import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -37,6 +38,9 @@ import org.slf4j.LoggerFactory; public class StandardRecordWriter implements RecordWriter { + + public static final int MAX_ALLOWED_UTF_LENGTH = 65_535; + private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class); private final File file; @@ -83,7 +87,7 @@ public synchronized void writeHeader(final long firstEventId) throws IOException lastBlockOffset = rawOutStream.getBytesWritten(); resetWriteStream(firstEventId); - out.writeUTF(PersistentProvenanceRepository.class.getName()); + writeUTFLimited(out, PersistentProvenanceRepository.class.getName()); out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION); out.flush(); } catch (final IOException ioe) { @@ -161,7 +165,7 @@ public synchronized long writeRecord(final ProvenanceEventRecord record, final l } out.writeLong(recordIdentifier); - out.writeUTF(record.getEventType().name()); + writeUTFLimited(out, record.getEventType().name()); out.writeLong(record.getEventTime()); out.writeLong(record.getFlowFileEntryDate()); out.writeLong(record.getEventDuration()); @@ -192,9 +196,9 @@ public synchronized long writeRecord(final ProvenanceEventRecord record, final l // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. if (record.getContentClaimSection() != null && record.getContentClaimContainer() != null && record.getContentClaimIdentifier() != null) { out.writeBoolean(true); - out.writeUTF(record.getContentClaimContainer()); - out.writeUTF(record.getContentClaimSection()); - out.writeUTF(record.getContentClaimIdentifier()); + writeUTFLimited(out, record.getContentClaimContainer()); + writeUTFLimited(out, record.getContentClaimSection()); + writeUTFLimited(out, record.getContentClaimIdentifier()); if (record.getContentClaimOffset() == null) { out.writeLong(0L); } else { @@ -208,9 +212,9 @@ public synchronized long writeRecord(final ProvenanceEventRecord record, final l // If Previous Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'. if (record.getPreviousContentClaimSection() != null && record.getPreviousContentClaimContainer() != null && record.getPreviousContentClaimIdentifier() != null) { out.writeBoolean(true); - out.writeUTF(record.getPreviousContentClaimContainer()); - out.writeUTF(record.getPreviousContentClaimSection()); - out.writeUTF(record.getPreviousContentClaimIdentifier()); + writeUTFLimited(out, record.getPreviousContentClaimContainer()); + writeUTFLimited(out, record.getPreviousContentClaimSection()); + writeUTFLimited(out, record.getPreviousContentClaimIdentifier()); if (record.getPreviousContentClaimOffset() == null) { out.writeLong(0L); } else { @@ -256,7 +260,7 @@ public synchronized long writeRecord(final ProvenanceEventRecord record, final l } protected void writeUUID(final DataOutputStream out, final String uuid) throws IOException { - out.writeUTF(uuid); + writeUTFLimited(out, uuid); } protected void writeUUIDs(final DataOutputStream out, final Collection list) throws IOException { @@ -275,7 +279,7 @@ protected void writeNullableString(final DataOutputStream out, final String toWr out.writeBoolean(false); } else { out.writeBoolean(true); - out.writeUTF(toWrite); + writeUTFLimited(out, toWrite); } } @@ -400,4 +404,42 @@ public void markDirty() { public boolean isDirty() { return dirtyFlag.get(); } + + private void writeUTFLimited(final DataOutputStream out, final String utfString) throws IOException { + try { + out.writeUTF(utfString); + } catch (UTFDataFormatException e) { + final String truncated = utfString.substring(0, getCharsInUTFLength(utfString, MAX_ALLOWED_UTF_LENGTH)); + logger.warn("Truncating UTF value! Attempted to write string with char length {} and UTF length greater than " + + "supported maximum allowed ({}), truncating to char length {}.", + utfString.length(), MAX_ALLOWED_UTF_LENGTH, truncated.length()); + if (logger.isDebugEnabled()) { + logger.warn("String value was:\n{}", truncated); + } + out.writeUTF(truncated); + } + } + + static int getCharsInUTFLength(final String str, final int utfLimit) { + // see java.io.DataOutputStream.writeUTF() + int strlen = str.length(); + int utflen = 0; + int c; + + /* use charAt instead of copying String to Char array */ + for (int i = 0; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) & (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + if (utflen > utfLimit) { + return i; + } + } + return strlen; + } } diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java index e11502a60ba8..60a25189dc16 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestStandardRecordReaderWriter.java @@ -20,7 +20,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -28,6 +31,7 @@ import java.util.Map; import java.util.UUID; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.provenance.toc.StandardTocReader; import org.apache.nifi.provenance.toc.StandardTocWriter; import org.apache.nifi.provenance.toc.TocReader; @@ -186,4 +190,44 @@ public void testMultipleRecordsMultipleBlocksCompressed() throws IOException { FileUtils.deleteFile(journalFile.getParentFile(), true); } + + @Test + public void testWriteUtfLargerThan64k() throws IOException, InterruptedException { + + final Map attributes = new HashMap<>(); + attributes.put("filename", "1.txt"); + attributes.put("uuid", UUID.randomUUID().toString()); + + final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder(); + builder.setEventTime(System.currentTimeMillis()); + builder.setEventType(ProvenanceEventType.RECEIVE); + builder.setTransitUri("nifi://unit-test"); + builder.fromFlowFile(createFlowFile(3L, 3000L, attributes)); + builder.setComponentId("1234"); + builder.setComponentType("dummy processor"); + final String seventyK = StringUtils.repeat("X", 70000); + assertTrue(seventyK.length() > 65535); + assertTrue(seventyK.getBytes("UTF-8").length > 65535); + builder.setDetails(seventyK); + final ProvenanceEventRecord record = builder.build(); + + try (final ByteArrayOutputStream headerOut = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(headerOut)) { + out.writeUTF(PersistentProvenanceRepository.class.getName()); + out.writeInt(9); + } + + final File journalFile = new File("target/storage/" + UUID.randomUUID().toString() + "/testSimpleWrite.gz"); + final File tocFile = TocUtil.getTocFile(journalFile); + final TocWriter tocWriter = new StandardTocWriter(tocFile, false, false); + try (final ByteArrayOutputStream recordOut = new ByteArrayOutputStream(); + final StandardRecordWriter writer = new StandardRecordWriter(journalFile, tocWriter, false, 0)) { + + writer.writeHeader(1L); + recordOut.reset(); + + writer.writeRecord(record, 1L); + } + } + } From 1ea1d4e132ab26bdcd18bcd2c6cd47f8021c69f9 Mon Sep 17 00:00:00 2001 From: Joe Skora Date: Fri, 3 Feb 2017 16:04:49 +0000 Subject: [PATCH 2/2] NIFI-3055 StandardRecordWriter Can Throw UTFDataFormatException (0.x) * Clarify log message per code review comments. --- .../java/org/apache/nifi/provenance/StandardRecordWriter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java index 7fa1ac27f6a2..f015cc8f3a9e 100644 --- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java +++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java @@ -410,8 +410,8 @@ private void writeUTFLimited(final DataOutputStream out, final String utfString) out.writeUTF(utfString); } catch (UTFDataFormatException e) { final String truncated = utfString.substring(0, getCharsInUTFLength(utfString, MAX_ALLOWED_UTF_LENGTH)); - logger.warn("Truncating UTF value! Attempted to write string with char length {} and UTF length greater than " - + "supported maximum allowed ({}), truncating to char length {}.", + logger.warn("Truncating provenance record value! Attempted to write {} chars that encode to a UTF byte length greater than " + + "supported maximum ({}), truncating to {} chars.", utfString.length(), MAX_ALLOWED_UTF_LENGTH, truncated.length()); if (logger.isDebugEnabled()) { logger.warn("String value was:\n{}", truncated);