Skip to content

Commit

Permalink
Avoid flipping translog header version (#58866)
Browse files Browse the repository at this point in the history
An old translog header does not have a checksum. If we flip the header
version of an empty translog to the older version, then we won't detect
that corruption, and translog will be considered clean as before.

Closes #58671
  • Loading branch information
dnhatn committed Jul 6, 2020
1 parent 5c71f63 commit 35a8445
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.io.Channels;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;

import java.io.EOFException;
import java.io.IOException;
Expand Down Expand Up @@ -105,6 +106,20 @@ private static int headerSizeInBytes(int version, int uuidLength) {
return size;
}

static int readHeaderVersion(final Path path, final FileChannel channel, final StreamInput in) throws IOException {
final int version;
try {
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
tryReportOldVersionError(path, channel);
throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e);
}
if (version == VERSION_CHECKSUMS) {
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
}
return version;
}

/**
* Read a translog header from the given path and file channel
*/
Expand All @@ -115,16 +130,7 @@ static TranslogHeader read(final String translogUUID, final Path path, final Fil
new BufferedChecksumStreamInput(
new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel), channel.size()),
path.toString());
final int version;
try {
version = CodecUtil.checkHeader(new InputStreamDataInput(in), TRANSLOG_CODEC, VERSION_CHECKSUMS, VERSION_PRIMARY_TERM);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException e) {
tryReportOldVersionError(path, channel);
throw new TranslogCorruptedException(path.toString(), "translog header corrupted", e);
}
if (version == VERSION_CHECKSUMS) {
throw new IllegalStateException("pre-2.0 translog found [" + path + "]");
}
final int version = readHeaderVersion(path, channel, in);
// Read the translogUUID
final int uuidLen = in.readInt();
if (uuidLen > channel.size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.CombinedDeletionPolicy;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
Expand All @@ -47,6 +49,7 @@
import java.util.regex.Pattern;

import static org.elasticsearch.index.translog.Translog.CHECKPOINT_FILE_NAME;
import static org.elasticsearch.index.translog.Translog.TRANSLOG_FILE_SUFFIX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -151,26 +154,29 @@ static void corruptFile(Logger logger, Random random, Path fileToCorrupt, boolea
final long corruptPosition = RandomNumbers.randomLongBetween(random, 0, fileSize - 1);

if (random.nextBoolean()) {
// read
fileChannel.position(corruptPosition);
assertThat(fileChannel.position(), equalTo(corruptPosition));
ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
fileChannel.read(bb);
bb.flip();

// corrupt
byte oldValue = bb.get(0);
byte newValue;
do {
newValue = (byte) random.nextInt(0x100);
} while (newValue == oldValue);
bb.put(0, newValue);

// rewrite
fileChannel.position(corruptPosition);
fileChannel.write(bb);
logger.info("corruptFile: corrupting file {} at position {} turning 0x{} into 0x{}", fileToCorrupt, corruptPosition,
Integer.toHexString(oldValue & 0xff), Integer.toHexString(newValue & 0xff));
// read
fileChannel.position(corruptPosition);
assertThat(fileChannel.position(), equalTo(corruptPosition));
ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
fileChannel.read(bb);
bb.flip();

// corrupt
byte oldValue = bb.get(0);
byte newValue;
do {
newValue = (byte) random.nextInt(0x100);
} while (newValue == oldValue);
bb.put(0, newValue);

// rewrite
fileChannel.position(corruptPosition);
fileChannel.write(bb);
logger.info("corruptFile: corrupting file {} at position {} turning 0x{} into 0x{}", fileToCorrupt, corruptPosition,
Integer.toHexString(oldValue & 0xff), Integer.toHexString(newValue & 0xff));
} while (isTranslogHeaderVersionFlipped(fileToCorrupt, fileChannel));

} else {
logger.info("corruptFile: truncating file {} from length {} to length {}", fileToCorrupt, fileSize, corruptPosition);
fileChannel.truncate(corruptPosition);
Expand Down Expand Up @@ -233,4 +239,22 @@ public void close() {
}
};
}

/**
* An old translog header does not have a checksum. If we flip the header version of an empty translog from 3 to 2,
* then we won't detect that corruption, and the translog will be considered clean as before.
*/
static boolean isTranslogHeaderVersionFlipped(Path corruptedFile, FileChannel channel) throws IOException {
if (corruptedFile.toString().endsWith(TRANSLOG_FILE_SUFFIX) == false) {
return false;
}
channel.position(0);
final InputStreamStreamInput in = new InputStreamStreamInput(Channels.newInputStream(channel), channel.size());
try {
final int version = TranslogHeader.readHeaderVersion(corruptedFile, channel, in);
return version == TranslogHeader.VERSION_CHECKPOINTS;
} catch (IllegalStateException | TranslogCorruptedException | IOException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@

import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;

public class TranslogHeaderTests extends ESTestCase {
Expand Down Expand Up @@ -125,6 +127,25 @@ public void testLegacyTranslogVersions() throws Exception {
IllegalStateException.class, "pre-2.0 translog");
}

public void testCorruptTranslogHeader() throws Exception {
final String translogUUID = UUIDs.randomBase64UUID();
final TranslogHeader outHeader = new TranslogHeader(translogUUID, randomNonNegativeLong());
final long generation = randomNonNegativeLong();
final Path translogLocation = createTempDir();
final Path translogFile = translogLocation.resolve(Translog.getFilename(generation));
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
outHeader.write(channel);
assertThat(outHeader.sizeInBytes(), equalTo((int) channel.position()));
}
TestTranslog.corruptFile(logger, random(), translogFile, false);
final Exception error = expectThrows(Exception.class, () -> {
try (FileChannel channel = FileChannel.open(translogFile, StandardOpenOption.READ)) {
TranslogHeader.read(randomValueOtherThan(translogUUID, UUIDs::randomBase64UUID), translogFile, channel);
}
});
assertThat(error, either(instanceOf(IllegalStateException.class)).or(instanceOf(TranslogCorruptedException.class)));
}

private <E extends Exception> void checkFailsToOpen(String file, Class<E> expectedErrorType, String expectedMessage) {
final Path translogFile = getDataPath(file);
assertThat("test file [" + translogFile + "] should exist", Files.exists(translogFile), equalTo(true));
Expand Down

0 comments on commit 35a8445

Please sign in to comment.