From 24f6a7c9563757234f53ca23e12f9c9208b53082 Mon Sep 17 00:00:00 2001 From: Colin Patrick Mccabe Date: Mon, 31 Aug 2015 17:31:29 -0700 Subject: [PATCH] HDFS-8965. Harden edit log reading code against out of memory errors (cmccabe) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../BookKeeperEditLogInputStream.java | 2 +- .../hadoop/hdfs/protocol/LayoutVersion.java | 2 +- .../namenode/EditLogBackupInputStream.java | 2 +- .../namenode/EditLogFileInputStream.java | 2 +- .../hdfs/server/namenode/FSEditLogOp.java | 354 +++++++++++++----- .../hdfs/server/namenode/TestEditLog.java | 2 +- .../namenode/TestEditLogFileInputStream.java | 80 ++++ 8 files changed, 341 insertions(+), 105 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6584c84d155bf..57ddcb2943082 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -872,6 +872,8 @@ Release 2.8.0 - UNRELEASED HDFS-8946. Improve choosing datanode storage for block placement. (yliu) + HDFS-8965. Harden edit log reading code against out of memory errors (cmccabe) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java index e2098ddee19bc..86da80728b626 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java @@ -83,7 +83,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream { tracker = new FSEditLogLoader.PositionTrackingInputStream(bin); DataInputStream in = new DataInputStream(tracker); - reader = new FSEditLogOp.Reader(in, tracker, logVersion); + reader = FSEditLogOp.Reader.create(in, tracker, logVersion); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java index c893744434541..175079089411b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java @@ -87,7 +87,7 @@ public static enum Feature implements LayoutFeature { FSIMAGE_COMPRESSION(-25, "Support for fsimage compression"), FSIMAGE_CHECKSUM(-26, "Support checksum for fsimage"), REMOVE_REL13_DISK_LAYOUT_SUPPORT(-27, "Remove support for 0.13 disk layout"), - EDITS_CHESKUM(-28, "Support checksum for editlog"), + EDITS_CHECKSUM(-28, "Support checksum for editlog"), UNUSED(-29, "Skipped version"), FSIMAGE_NAME_OPTIMIZATION(-30, "Store only last part of path in fsimage"), RESERVED_REL20_203(-31, -19, "Reserved for release 0.20.203", true, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java index 689caccdc77bc..81d285a03626b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java @@ -119,7 +119,7 @@ void setBytes(byte[] newBytes, int version) throws IOException { this.version = version; - reader = new FSEditLogOp.Reader(in, tracker, version); + reader = FSEditLogOp.Reader.create(in, tracker, version); } void clear() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java index 3e21c24289461..73a162ebb107e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java @@ -157,7 +157,7 @@ private void init(boolean verifyLayoutVersion) "flags from log"); } } - reader = new FSEditLogOp.Reader(dataIn, tracker, logVersion); + reader = FSEditLogOp.Reader.create(dataIn, tracker, logVersion); reader.setMaxOpSize(maxOpSize); state = State.OPEN; } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index ab36f17af0a25..125e1cf55a741 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -4518,42 +4518,46 @@ public void writeOp(FSEditLogOp op) throws IOException { /** * Class for reading editlog ops from a stream */ - public static class Reader { - private final DataInputStream in; - private final StreamLimiter limiter; - private final int logVersion; - private final Checksum checksum; - private final OpInstanceCache cache; - private int maxOpSize; - private final boolean supportEditLogLength; + public abstract static class Reader { + final DataInputStream in; + final StreamLimiter limiter; + final OpInstanceCache cache; + final byte[] temp = new byte[4096]; + final int logVersion; + int maxOpSize; + + public static Reader create(DataInputStream in, StreamLimiter limiter, + int logVersion) { + if (logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION) { + // Use the LengthPrefixedReader on edit logs which are newer than what + // we can parse. (Newer layout versions are represented by smaller + // negative integers, for historical reasons.) Even though we can't + // parse the Ops contained in them, we should still be able to call + // scanOp on them. This is important for the JournalNode during rolling + // upgrade. + return new LengthPrefixedReader(in, limiter, logVersion); + } else if (NameNodeLayoutVersion.supports( + NameNodeLayoutVersion.Feature.EDITLOG_LENGTH, logVersion)) { + return new LengthPrefixedReader(in, limiter, logVersion); + } else if (NameNodeLayoutVersion.supports( + LayoutVersion.Feature.EDITS_CHECKSUM, logVersion)) { + Checksum checksum = DataChecksum.newCrc32(); + return new ChecksummedReader(checksum, in, limiter, logVersion); + } else { + return new LegacyReader(in, limiter, logVersion); + } + } /** * Construct the reader - * @param in The stream to read from. - * @param logVersion The version of the data coming from the stream. + * @param in The stream to read from. + * @param limiter The limiter for this stream. + * @param logVersion The version of the data coming from the stream. */ - public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) { - this.logVersion = logVersion; - if (NameNodeLayoutVersion.supports( - LayoutVersion.Feature.EDITS_CHESKUM, logVersion)) { - this.checksum = DataChecksum.newCrc32(); - } else { - this.checksum = null; - } - // It is possible that the logVersion is actually a future layoutversion - // during the rolling upgrade (e.g., the NN gets upgraded first). We - // assume future layout will also support length of editlog op. - this.supportEditLogLength = NameNodeLayoutVersion.supports( - NameNodeLayoutVersion.Feature.EDITLOG_LENGTH, logVersion) - || logVersion < NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION; - - if (this.checksum != null) { - this.in = new DataInputStream( - new CheckedInputStream(in, this.checksum)); - } else { - this.in = in; - } + Reader(DataInputStream in, StreamLimiter limiter, int logVersion) { + this.in = in; this.limiter = limiter; + this.logVersion = logVersion; this.cache = new OpInstanceCache(); this.maxOpSize = DFSConfigKeys.DFS_NAMENODE_MAX_OP_SIZE_DEFAULT; } @@ -4606,26 +4610,25 @@ public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException { } } - private void verifyTerminator() throws IOException { + void verifyTerminator() throws IOException { /** The end of the edit log should contain only 0x00 or 0xff bytes. * If it contains other bytes, the log itself may be corrupt. * It is important to check this; if we don't, a stray OP_INVALID byte * could make us stop reading the edit log halfway through, and we'd never * know that we had lost data. */ - byte[] buf = new byte[4096]; limiter.clearLimit(); int numRead = -1, idx = 0; while (true) { try { numRead = -1; idx = 0; - numRead = in.read(buf); + numRead = in.read(temp); if (numRead == -1) { return; } while (idx < numRead) { - if ((buf[idx] != (byte)0) && (buf[idx] != (byte)-1)) { + if ((temp[idx] != (byte)0) && (temp[idx] != (byte)-1)) { throw new IOException("Read extra bytes after " + "the terminator!"); } @@ -4638,7 +4641,7 @@ private void verifyTerminator() throws IOException { if (numRead != -1) { in.reset(); IOUtils.skipFully(in, idx); - in.mark(buf.length + 1); + in.mark(temp.length + 1); IOUtils.skipFully(in, 1); } } @@ -4653,14 +4656,164 @@ private void verifyTerminator() throws IOException { * If an exception is thrown, the stream's mark will be set to the first * problematic byte. This usually means the beginning of the opcode. */ - private FSEditLogOp decodeOp() throws IOException { - limiter.setLimit(maxOpSize); + public abstract FSEditLogOp decodeOp() throws IOException; + + /** + * Similar to decodeOp(), but we only retrieve the transaction ID of the + * Op rather than reading it. If the edit log format supports length + * prefixing, this can be much faster than full decoding. + * + * @return the last txid of the segment, or INVALID_TXID on EOF. + */ + public abstract long scanOp() throws IOException; + } + + /** + * Reads edit logs which are prefixed with a length. These edit logs also + * include a checksum and transaction ID. + */ + private static class LengthPrefixedReader extends Reader { + /** + * The minimum length of a length-prefixed Op. + * + * The minimum Op has: + * 1-byte opcode + * 4-byte length + * 8-byte txid + * 0-byte body + * 4-byte checksum + */ + private static final int MIN_OP_LENGTH = 17; + + /** + * The op id length. + * + * Not included in the stored length. + */ + private static final int OP_ID_LENGTH = 1; + + /** + * The checksum length. + * + * Not included in the stored length. + */ + private static final int CHECKSUM_LENGTH = 4; + + private final Checksum checksum; + + LengthPrefixedReader(DataInputStream in, StreamLimiter limiter, + int logVersion) { + super(in, limiter, logVersion); + this.checksum = DataChecksum.newCrc32(); + } + + @Override + public FSEditLogOp decodeOp() throws IOException { + long txid = decodeOpFrame(); + if (txid == HdfsServerConstants.INVALID_TXID) { + return null; + } + in.reset(); in.mark(maxOpSize); + FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(in.readByte()); + FSEditLogOp op = cache.get(opCode); + if (op == null) { + throw new IOException("Read invalid opcode " + opCode); + } + op.setTransactionId(txid); + IOUtils.skipFully(in, 4 + 8); // skip length and txid + op.readFields(in, logVersion); + // skip over the checksum, which we validated above. + IOUtils.skipFully(in, CHECKSUM_LENGTH); + return op; + } + + @Override + public long scanOp() throws IOException { + return decodeOpFrame(); + } - if (checksum != null) { - checksum.reset(); + /** + * Decode the opcode "frame". This includes reading the opcode and + * transaction ID, and validating the checksum and length. It does not + * include reading the opcode-specific fields. + * The input stream will be advanced to the end of the op at the end of this + * function. + * + * @return An op with the txid set, but none of the other fields + * filled in, or null if we hit EOF. + */ + private long decodeOpFrame() throws IOException { + limiter.setLimit(maxOpSize); + in.mark(maxOpSize); + byte opCodeByte; + try { + opCodeByte = in.readByte(); + } catch (EOFException eof) { + // EOF at an opcode boundary is expected. + return HdfsServerConstants.INVALID_TXID; } + if (opCodeByte == FSEditLogOpCodes.OP_INVALID.getOpCode()) { + verifyTerminator(); + return HdfsServerConstants.INVALID_TXID; + } + // Here, we verify that the Op size makes sense and that the + // data matches its checksum before attempting to construct an Op. + // This is important because otherwise we may encounter an + // OutOfMemoryException which could bring down the NameNode or + // JournalNode when reading garbage data. + int opLength = in.readInt() + OP_ID_LENGTH + CHECKSUM_LENGTH; + if (opLength > maxOpSize) { + throw new IOException("Op " + (int)opCodeByte + " has size " + + opLength + ", but maxOpSize = " + maxOpSize); + } else if (opLength < MIN_OP_LENGTH) { + throw new IOException("Op " + (int)opCodeByte + " has size " + + opLength + ", but the minimum op size is " + MIN_OP_LENGTH); + } + long txid = in.readLong(); + // Verify checksum + in.reset(); + in.mark(maxOpSize); + checksum.reset(); + for (int rem = opLength - CHECKSUM_LENGTH; rem > 0;) { + int toRead = Math.min(temp.length, rem); + IOUtils.readFully(in, temp, 0, toRead); + checksum.update(temp, 0, toRead); + rem -= toRead; + } + int expectedChecksum = in.readInt(); + int calculatedChecksum = (int)checksum.getValue(); + if (expectedChecksum != calculatedChecksum) { + throw new ChecksumException( + "Transaction is corrupt. Calculated checksum is " + + calculatedChecksum + " but read checksum " + + expectedChecksum, txid); + } + return txid; + } + } + + /** + * Read edit logs which have a checksum and a transaction ID, but not a + * length. + */ + private static class ChecksummedReader extends Reader { + private final Checksum checksum; + ChecksummedReader(Checksum checksum, DataInputStream in, + StreamLimiter limiter, int logVersion) { + super(new DataInputStream( + new CheckedInputStream(in, checksum)), limiter, logVersion); + this.checksum = checksum; + } + + @Override + public FSEditLogOp decodeOp() throws IOException { + limiter.setLimit(maxOpSize); + in.mark(maxOpSize); + // Reset the checksum. Since we are using a CheckedInputStream, each + // subsequent read from the stream will update the checksum. + checksum.reset(); byte opCodeByte; try { opCodeByte = in.readByte(); @@ -4668,88 +4821,89 @@ private FSEditLogOp decodeOp() throws IOException { // EOF at an opcode boundary is expected. return null; } - FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte); if (opCode == OP_INVALID) { verifyTerminator(); return null; } - FSEditLogOp op = cache.get(opCode); if (op == null) { throw new IOException("Read invalid opcode " + opCode); } - - if (supportEditLogLength) { - in.readInt(); + op.setTransactionId(in.readLong()); + op.readFields(in, logVersion); + // Verify checksum + int calculatedChecksum = (int)checksum.getValue(); + int expectedChecksum = in.readInt(); + if (expectedChecksum != calculatedChecksum) { + throw new ChecksumException( + "Transaction is corrupt. Calculated checksum is " + + calculatedChecksum + " but read checksum " + + expectedChecksum, op.txid); } + return op; + } + @Override + public long scanOp() throws IOException { + // Edit logs of this age don't have any length prefix, so we just have + // to read the entire Op. + FSEditLogOp op = decodeOp(); + return op == null ? + HdfsServerConstants.INVALID_TXID : op.getTransactionId(); + } + } + + /** + * Read older edit logs which may or may not have transaction IDs and other + * features. This code is used during upgrades and to allow HDFS INotify to + * read older edit log files. + */ + private static class LegacyReader extends Reader { + LegacyReader(DataInputStream in, + StreamLimiter limiter, int logVersion) { + super(in, limiter, logVersion); + } + + @Override + public FSEditLogOp decodeOp() throws IOException { + limiter.setLimit(maxOpSize); + in.mark(maxOpSize); + byte opCodeByte; + try { + opCodeByte = in.readByte(); + } catch (EOFException eof) { + // EOF at an opcode boundary is expected. + return null; + } + FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte); + if (opCode == OP_INVALID) { + verifyTerminator(); + return null; + } + FSEditLogOp op = cache.get(opCode); + if (op == null) { + throw new IOException("Read invalid opcode " + opCode); + } if (NameNodeLayoutVersion.supports( - LayoutVersion.Feature.STORED_TXIDS, logVersion)) { - // Read the txid + LayoutVersion.Feature.STORED_TXIDS, logVersion)) { op.setTransactionId(in.readLong()); } else { op.setTransactionId(HdfsServerConstants.INVALID_TXID); } - op.readFields(in, logVersion); - - validateChecksum(in, checksum, op.txid); return op; } - /** - * Similar with decodeOp(), but instead of doing the real decoding, we skip - * the content of the op if the length of the editlog is supported. - * @return the last txid of the segment, or INVALID_TXID on exception - */ + @Override public long scanOp() throws IOException { - if (supportEditLogLength) { - limiter.setLimit(maxOpSize); - in.mark(maxOpSize); - - final byte opCodeByte; - try { - opCodeByte = in.readByte(); // op code - } catch (EOFException e) { - return HdfsServerConstants.INVALID_TXID; - } - - FSEditLogOpCodes opCode = FSEditLogOpCodes.fromByte(opCodeByte); - if (opCode == OP_INVALID) { - verifyTerminator(); - return HdfsServerConstants.INVALID_TXID; - } - - int length = in.readInt(); // read the length of the op - long txid = in.readLong(); // read the txid - - // skip the remaining content - IOUtils.skipFully(in, length - 8); - // TODO: do we want to verify checksum for JN? For now we don't. - return txid; - } else { - FSEditLogOp op = decodeOp(); - return op == null ? HdfsServerConstants.INVALID_TXID : op.getTransactionId(); - } - } - - /** - * Validate a transaction's checksum - */ - private void validateChecksum(DataInputStream in, - Checksum checksum, - long txid) - throws IOException { - if (checksum != null) { - int calculatedChecksum = (int)checksum.getValue(); - int readChecksum = in.readInt(); // read in checksum - if (readChecksum != calculatedChecksum) { - throw new ChecksumException( - "Transaction is corrupt. Calculated checksum is " + - calculatedChecksum + " but read checksum " + readChecksum, txid); - } + if (!NameNodeLayoutVersion.supports( + LayoutVersion.Feature.STORED_TXIDS, logVersion)) { + throw new IOException("Can't scan a pre-transactional edit log."); } + FSEditLogOp op = decodeOp(); + return op == null ? + HdfsServerConstants.INVALID_TXID : op.getTransactionId(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index 68d008f567ac7..e59dec400e458 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -875,7 +875,7 @@ public EditLogByteInputStream(byte[] data) throws IOException { tracker = new FSEditLogLoader.PositionTrackingInputStream(in); in = new DataInputStream(tracker); - reader = new FSEditLogOp.Reader(in, tracker, version); + reader = FSEditLogOp.Reader.create(in, tracker, version); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java index c0eb890536831..aecdc789c4b92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogFileInputStream.java @@ -25,19 +25,35 @@ import static org.mockito.Mockito.mock; import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; import java.net.HttpURLConnection; import java.net.URL; import java.util.EnumMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.util.Holder; import org.apache.hadoop.hdfs.web.URLConnectionFactory; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.PathUtils; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; public class TestEditLogFileInputStream { + private static final Log LOG = + LogFactory.getLog(TestEditLogFileInputStream.class); private static final byte[] FAKE_LOG_DATA = TestEditLog.HADOOP20_SOME_EDITS; + private final static File TEST_DIR = PathUtils + .getTestDir(TestEditLogFileInputStream.class); + @Test public void testReadURL() throws Exception { HttpURLConnection conn = mock(HttpURLConnection.class); @@ -63,4 +79,68 @@ public void testReadURL() throws Exception { assertEquals(FAKE_LOG_DATA.length, elis.length()); elis.close(); } + + /** + * Regression test for HDFS-8965 which verifies that + * FSEditLogFileInputStream#scanOp verifies Op checksums. + */ + @Test(timeout=60000) + public void testScanCorruptEditLog() throws Exception { + Configuration conf = new Configuration(); + File editLog = new File(System.getProperty( + "test.build.data", "/tmp"), "testCorruptEditLog"); + + LOG.debug("Creating test edit log file: " + editLog); + EditLogFileOutputStream elos = new EditLogFileOutputStream(conf, + editLog.getAbsoluteFile(), 8192); + elos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); + FSEditLogOp.OpInstanceCache cache = new FSEditLogOp.OpInstanceCache(); + FSEditLogOp.MkdirOp mkdirOp = FSEditLogOp.MkdirOp.getInstance(cache); + mkdirOp.reset(); + mkdirOp.setRpcCallId(123); + mkdirOp.setTransactionId(1); + mkdirOp.setInodeId(789L); + mkdirOp.setPath("/mydir"); + PermissionStatus perms = PermissionStatus.createImmutable( + "myuser", "mygroup", FsPermission.createImmutable((short)0777)); + mkdirOp.setPermissionStatus(perms); + elos.write(mkdirOp); + mkdirOp.reset(); + mkdirOp.setRpcCallId(456); + mkdirOp.setTransactionId(2); + mkdirOp.setInodeId(123L); + mkdirOp.setPath("/mydir2"); + perms = PermissionStatus.createImmutable( + "myuser", "mygroup", FsPermission.createImmutable((short)0666)); + mkdirOp.setPermissionStatus(perms); + elos.write(mkdirOp); + elos.setReadyToFlush(); + elos.flushAndSync(false); + elos.close(); + long fileLen = editLog.length(); + + LOG.debug("Corrupting last 4 bytes of edit log file " + editLog + + ", whose length is " + fileLen); + RandomAccessFile rwf = new RandomAccessFile(editLog, "rw"); + rwf.seek(fileLen - 4); + int b = rwf.readInt(); + rwf.seek(fileLen - 4); + rwf.writeInt(b + 1); + rwf.close(); + + EditLogFileInputStream elis = new EditLogFileInputStream(editLog); + Assert.assertEquals(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION, + elis.getVersion(true)); + Assert.assertEquals(1, elis.scanNextOp()); + LOG.debug("Read transaction 1 from " + editLog); + try { + elis.scanNextOp(); + Assert.fail("Expected scanNextOp to fail when op checksum was corrupt."); + } catch (IOException e) { + LOG.debug("Caught expected checksum error when reading corrupt " + + "transaction 2", e); + GenericTestUtils.assertExceptionContains("Transaction is corrupt.", e); + } + elis.close(); + } }