From 1e00a0b9edd29502c69b777e5dffd90a60b86dd5 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Wed, 30 Jul 2014 12:36:23 -0400 Subject: [PATCH] Snapshot checksum verification Adds automatic verification of all files that are being snapshotted. Closes #5593 --- .../BlobStoreIndexShardRepository.java | 152 +++++++++++------- .../blobstore/RateLimitingInputStream.java | 46 +----- .../org/elasticsearch/index/store/Store.java | 150 +++++++++++++++++ .../BasicBackwardsCompatibilityTest.java | 62 ++++--- .../index/store/CorruptedFileTest.java | 29 ++-- .../elasticsearch/index/store/StoreTest.java | 82 ++++++++++ 6 files changed, 384 insertions(+), 137 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index 89e48c500b636..80cf506a9ba4d 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -22,10 +22,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.apache.lucene.index.CorruptIndexException; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.IndexInput; -import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.store.RateLimiter; +import org.apache.lucene.store.*; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.metadata.SnapshotId; @@ -35,11 +32,11 @@ import org.elasticsearch.common.io.stream.BytesStreamInput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; -import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.*; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; +import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.*; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; @@ -59,7 +56,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import static com.google.common.collect.Lists.newArrayList; @@ -396,8 +392,10 @@ private class SnapshotContext extends Context { */ public SnapshotContext(SnapshotId snapshotId, ShardId shardId, IndexShardSnapshotStatus snapshotStatus) { super(snapshotId, shardId); - store = indicesService.indexServiceSafe(shardId.getIndex()).shardInjectorSafe(shardId.id()).getInstance(Store.class); + IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); + store = indexService.shardInjectorSafe(shardId.id()).getInstance(Store.class); this.snapshotStatus = snapshotStatus; + } /** @@ -439,11 +437,6 @@ public void snapshot(SnapshotIndexCommit snapshotIndexCommit) { logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); final StoreFileMetaData md = metadata.get(fileName); boolean snapshotRequired = false; - // TODO: For now segment files are copied on each commit because segment files don't have checksum - // if (snapshot.indexChanged() && fileName.equals(snapshotIndexCommit.getSegmentsFileName())) { - // snapshotRequired = true; // we want to always snapshot the segment file if the index changed - // } - BlobStoreIndexShardSnapshot.FileInfo fileInfo = snapshots.findPhysicalIndexFile(fileName); if (fileInfo == null || !fileInfo.isSame(md) || !snapshotFileExistsInBlobs(fileInfo, blobs)) { @@ -532,50 +525,101 @@ public void snapshot(SnapshotIndexCommit snapshotIndexCommit) { * @throws IOException */ private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final CountDownLatch latch, final List failures) throws IOException { - final AtomicLong counter = new AtomicLong(fileInfo.numberOfParts()); - for (long i = 0; i < fileInfo.numberOfParts(); i++) { - IndexInput indexInput = null; - try { - final String file = fileInfo.physicalName(); - indexInput = store.directory().openInput(file, IOContext.READONCE); - indexInput.seek(i * fileInfo.partBytes()); - InputStreamIndexInput inputStreamIndexInput = new ThreadSafeInputStreamIndexInput(indexInput, fileInfo.partBytes()); - - final IndexInput fIndexInput = indexInput; - long size = inputStreamIndexInput.actualSizeToRead(); - InputStream inputStream; - if (snapshotRateLimiter != null) { - inputStream = new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, snapshotThrottleListener); - } else { - inputStream = inputStreamIndexInput; - } - inputStream = new AbortableInputStream(inputStream, file); - blobContainer.writeBlob(fileInfo.partName(i), inputStream, size, new ImmutableBlobContainer.WriterListener() { - @Override - public void onCompleted() { - IOUtils.closeWhileHandlingException(fIndexInput); - snapshotStatus.addProcessedFile(fileInfo.length()); - if (counter.decrementAndGet() == 0) { - latch.countDown(); - } - } + final String file = fileInfo.physicalName(); + IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata()); + writeBlob(indexInput, fileInfo, 0, latch, failures); + } - @Override - public void onFailure(Throwable t) { - IOUtils.closeWhileHandlingException(fIndexInput); - snapshotStatus.addProcessedFile(0); - failures.add(t); - if (counter.decrementAndGet() == 0) { - latch.countDown(); - } - } - }); - } catch (Throwable e) { - IOUtils.closeWhileHandlingException(indexInput); - failures.add(e); + private class BlobPartWriter implements ImmutableBlobContainer.WriterListener { + + private final int part; + + private final FileInfo fileInfo; + + private final List failures; + + private final CountDownLatch latch; + + private final IndexInput indexInput; + + private final InputStream inputStream; + + private final InputStreamIndexInput inputStreamIndexInput; + + private BlobPartWriter(IndexInput indexInput, FileInfo fileInfo, int part, CountDownLatch latch, List failures) throws IOException { + this.indexInput = indexInput; + this.part = part; + this.fileInfo = fileInfo; + this.failures = failures; + this.latch = latch; + inputStreamIndexInput = new InputStreamIndexInput(indexInput, fileInfo.partBytes()); + InputStream inputStream = inputStreamIndexInput; + if (snapshotRateLimiter != null) { + inputStream = new RateLimitingInputStream(inputStream, snapshotRateLimiter, snapshotThrottleListener); + } + inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName()); + this.inputStream = inputStream; + } + + @Override + public void onCompleted() { + int nextPart = part + 1; + if (nextPart < fileInfo.numberOfParts()) { + try { + // We have more parts to go + writeBlob(indexInput, fileInfo, nextPart, latch, failures); + } catch (Throwable t) { + onFailure(t); + } + } else { + // Last part - verify checksum + try { + Store.verify(indexInput); + indexInput.close(); + snapshotStatus.addProcessedFile(fileInfo.length()); + } catch (Throwable t) { + onFailure(t); + return; + } latch.countDown(); } } + + @Override + public void onFailure(Throwable t) { + cleanupFailedSnapshot(t, indexInput, latch, failures); + } + + public void writeBlobPart() throws IOException { + blobContainer.writeBlob(fileInfo.partName(part), inputStream, inputStreamIndexInput.actualSizeToRead(), this); + } + + } + + private void writeBlob(IndexInput indexInput, FileInfo fileInfo, int part, CountDownLatch latch, List failures) { + try { + new BlobPartWriter(indexInput, fileInfo, part, latch, failures).writeBlobPart(); + } catch (Throwable t) { + cleanupFailedSnapshot(t, indexInput, latch, failures); + } + } + + private void cleanupFailedSnapshot(Throwable t, IndexInput indexInput, CountDownLatch latch, List failures) { + IOUtils.closeWhileHandlingException(indexInput); + failStoreIfCorrupted(t); + snapshotStatus.addProcessedFile(0); + failures.add(t); + latch.countDown(); + } + + private void failStoreIfCorrupted(Throwable t) { + if (t instanceof CorruptIndexException) { + try { + store.markStoreCorrupted((CorruptIndexException) t); + } catch (IOException e) { + logger.warn("store cannot be marked as corrupted", e); + } + } } /** @@ -850,9 +894,9 @@ public void onFailure(Throwable t) { IOUtils.closeWhileHandlingException(indexOutput); if (t instanceof CorruptIndexException) { try { - store.markStoreCorrupted((CorruptIndexException)t); + store.markStoreCorrupted((CorruptIndexException) t); } catch (IOException e) { - // + logger.warn("store cannot be marked as corrupted", e); } } store.deleteQuiet(fileInfo.physicalName()); diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/RateLimitingInputStream.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/RateLimitingInputStream.java index 2fa3395ba4218..4ad90a8b0bb0f 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/RateLimitingInputStream.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/RateLimitingInputStream.java @@ -21,15 +21,14 @@ import org.apache.lucene.store.RateLimiter; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; /** * Rate limiting wrapper for InputStream */ -public class RateLimitingInputStream extends InputStream { - - private final InputStream delegate; +public class RateLimitingInputStream extends FilterInputStream { private final RateLimiter rateLimiter; @@ -42,7 +41,7 @@ public interface Listener { } public RateLimitingInputStream(InputStream delegate, RateLimiter rateLimiter, Listener listener) { - this.delegate = delegate; + super(delegate); this.rateLimiter = rateLimiter; this.listener = listener; } @@ -60,52 +59,17 @@ private void maybePause(int bytes) { @Override public int read() throws IOException { - int b = delegate.read(); + int b = super.read(); maybePause(1); return b; } - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - @Override public int read(byte[] b, int off, int len) throws IOException { - int n = delegate.read(b, off, len); + int n = super.read(b, off, len); if (n > 0) { maybePause(n); } return n; } - - @Override - public long skip(long n) throws IOException { - return delegate.skip(n); - } - - @Override - public int available() throws IOException { - return delegate.available(); - } - - @Override - public void close() throws IOException { - delegate.close(); - } - - @Override - public void mark(int readlimit) { - delegate.mark(readlimit); - } - - @Override - public void reset() throws IOException { - delegate.reset(); - } - - @Override - public boolean markSupported() { - return delegate.markSupported(); - } } diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index 7b82e1c3024b5..3343aaf207318 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -50,6 +50,8 @@ import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.CRC32; +import java.util.zip.Checksum; /** * A Store provides plain access to files written by an elasticsearch index shard. Each shard @@ -298,6 +300,22 @@ public static void verify(IndexOutput output) throws IOException { } } + public IndexInput openVerifyingInput(String filename, IOContext context, StoreFileMetaData metadata) throws IOException { + if (metadata.hasLegacyChecksum() || metadata.checksum() == null) { + logger.debug("open legacy input for {}", filename); + return directory().openInput(filename, context); + } + assert metadata.writtenBy() != null; + assert metadata.writtenBy().onOrAfter(Version.LUCENE_48); + return new VerifyingIndexInput(directory().openInput(filename, context)); + } + + public static void verify(IndexInput input) throws IOException { + if (input instanceof VerifyingIndexInput) { + ((VerifyingIndexInput)input).verify(); + } + } + public boolean checkIntegrity(StoreFileMetaData md) { if (md.writtenBy() != null && md.writtenBy().onOrAfter(Version.LUCENE_48)) { try (IndexInput input = directory().openInput(md.name(), IOContext.READONCE)) { @@ -704,6 +722,138 @@ public void writeBytes(byte[] b, int offset, int length) throws IOException { } + /** + * Index input that calculates checksum as data is read from the input. + * + * This class supports random access (it is possible to seek backward and forward) in order to accommodate retry + * mechanism that is used in some repository plugins (S3 for example). However, the checksum is only calculated on + * the first read. All consecutive reads of the same data are not used to calculate the checksum. + */ + static class VerifyingIndexInput extends ChecksumIndexInput { + private final IndexInput input; + private final Checksum digest; + private final long checksumPosition; + private final byte[] checksum = new byte[8]; + private long verifiedPosition = 0; + + public VerifyingIndexInput(IndexInput input) { + this(input, new BufferedChecksum(new CRC32())); + } + + public VerifyingIndexInput(IndexInput input, Checksum digest) { + super("VerifyingIndexInput(" + input + ")"); + this.input = input; + this.digest = digest; + checksumPosition = input.length() - 8; + } + + @Override + public byte readByte() throws IOException { + long pos = input.getFilePointer(); + final byte b = input.readByte(); + pos++; + if (pos > verifiedPosition) { + if (pos <= checksumPosition) { + digest.update(b); + } else { + checksum[(int) (pos - checksumPosition - 1)] = b; + } + verifiedPosition = pos; + } + return b; + } + + @Override + public void readBytes(byte[] b, int offset, int len) + throws IOException { + long pos = input.getFilePointer(); + input.readBytes(b, offset, len); + if (pos + len > verifiedPosition) { + // Conversion to int is safe here because (verifiedPosition - pos) can be at most len, which is integer + int alreadyVerified = (int)Math.max(0, verifiedPosition - pos); + if (pos < checksumPosition) { + if (pos + len < checksumPosition) { + digest.update(b, offset + alreadyVerified, len - alreadyVerified); + } else { + int checksumOffset = (int) (checksumPosition - pos); + if (checksumOffset - alreadyVerified > 0) { + digest.update(b, offset + alreadyVerified, checksumOffset - alreadyVerified); + } + System.arraycopy(b, offset + checksumOffset, checksum, 0, len - checksumOffset); + } + } else { + // Conversion to int is safe here because checksumPosition is (file length - 8) so + // (pos - checksumPosition) cannot be bigger than 8 unless we are reading after the end of file + assert pos - checksumPosition < 8; + System.arraycopy(b, offset, checksum, (int) (pos - checksumPosition), len); + } + verifiedPosition = pos + len; + } + } + + @Override + public long getChecksum() { + return digest.getValue(); + } + + @Override + public void seek(long pos) throws IOException { + if (pos < verifiedPosition) { + // going within verified region - just seek there + input.seek(pos); + } else { + if (verifiedPosition > getFilePointer()) { + // portion of the skip region is verified and portion is not + // skipping the verified portion + input.seek(verifiedPosition); + // and checking unverified + skipBytes(pos - verifiedPosition); + } else { + skipBytes(pos - getFilePointer()); + } + } + } + + @Override + public void close() throws IOException { + input.close(); + } + + @Override + public long getFilePointer() { + return input.getFilePointer(); + } + + @Override + public long length() { + return input.length(); + } + + @Override + public IndexInput clone() { + throw new UnsupportedOperationException(); + } + + @Override + public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { + throw new UnsupportedOperationException(); + } + + public long getStoredChecksum() { + return new ByteArrayDataInput(checksum).readLong(); + } + + public void verify() throws CorruptIndexException { + long storedChecksum = getStoredChecksum(); + if (getChecksum() == storedChecksum) { + return; + } + throw new CorruptIndexException("verification failed : calculated=" + Store.digestToString(getChecksum()) + + " stored=" + Store.digestToString(storedChecksum)); + } + + } + public void deleteQuiet(String... files) { for (String file : files) { try { diff --git a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java index f4d2f6c804dfa..ea096658bab24 100644 --- a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java +++ b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java @@ -460,42 +460,55 @@ public void testSnapshotAndRestore() throws ExecutionException, InterruptedExcep .put("location", newTempDir(LifecycleScope.SUITE).getAbsolutePath()) .put("compress", randomBoolean()) .put("chunk_size", randomIntBetween(100, 1000)))); - String[] indices = new String[randomIntBetween(1,5)]; - for (int i = 0; i < indices.length; i++) { - indices[i] = "index_" + i; - createIndex(indices[i]); - } + String[] indicesBefore = new String[randomIntBetween(2,5)]; + String[] indicesAfter = new String[randomIntBetween(2,5)]; + for (int i = 0; i < indicesBefore.length; i++) { + indicesBefore[i] = "index_before_" + i; + createIndex(indicesBefore[i]); + } + for (int i = 0; i < indicesAfter.length; i++) { + indicesAfter[i] = "index_after_" + i; + createIndex(indicesAfter[i]); + } + String[] indices = new String[indicesBefore.length + indicesAfter.length]; + System.arraycopy(indicesBefore, 0, indices, 0, indicesBefore.length); + System.arraycopy(indicesAfter, 0, indices, indicesBefore.length, indicesAfter.length); ensureYellow(); logger.info("--> indexing some data"); - IndexRequestBuilder[] builders = new IndexRequestBuilder[randomIntBetween(10, 200)]; - for (int i = 0; i < builders.length; i++) { - builders[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indices), "foo", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } "); + IndexRequestBuilder[] buildersBefore = new IndexRequestBuilder[randomIntBetween(10, 200)]; + for (int i = 0; i < buildersBefore.length; i++) { + buildersBefore[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indicesBefore), "foo", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } "); + } + IndexRequestBuilder[] buildersAfter = new IndexRequestBuilder[randomIntBetween(10, 200)]; + for (int i = 0; i < buildersAfter.length; i++) { + buildersAfter[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indicesBefore), "bar", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } "); } - indexRandom(true, builders); - assertThat(client().prepareCount(indices).get().getCount(), equalTo((long)builders.length)); + indexRandom(true, buildersBefore); + indexRandom(true, buildersAfter); + assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); long[] counts = new long[indices.length]; for (int i = 0; i < indices.length; i++) { counts[i] = client().prepareCount(indices[i]).get().getCount(); } - logger.info("--> snapshot"); - CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("index_*").get(); + logger.info("--> snapshot subset of indices before upgrage"); + CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).setIndices("index_before_*").get(); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); - assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-1").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); - logger.info("--> delete some data"); - int howMany = randomIntBetween(1, builders.length); + logger.info("--> delete some data from indices that were already snapshotted"); + int howMany = randomIntBetween(1, buildersBefore.length); for (int i = 0; i < howMany; i++) { - IndexRequestBuilder indexRequestBuilder = RandomPicks.randomFrom(getRandom(), builders); + IndexRequestBuilder indexRequestBuilder = RandomPicks.randomFrom(getRandom(), buildersBefore); IndexRequest request = indexRequestBuilder.request(); client().prepareDelete(request.index(), request.type(), request.id()).get(); } refresh(); final long numDocs = client().prepareCount(indices).get().getCount(); - assertThat(client().prepareCount(indices).get().getCount(), lessThan((long)builders.length)); + assertThat(client().prepareCount(indices).get().getCount(), lessThan((long) (buildersBefore.length + buildersAfter.length))); client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get(); @@ -515,27 +528,32 @@ public void testSnapshotAndRestore() throws ExecutionException, InterruptedExcep logger.info("--> close indices"); - client().admin().indices().prepareClose(indices).get(); + client().admin().indices().prepareClose("index_before_*").get(); logger.info("--> restore all indices from the snapshot"); - RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet(); + RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).execute().actionGet(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); ensureYellow(); - assertThat(client().prepareCount(indices).get().getCount(), equalTo((long)builders.length)); + assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); for (int i = 0; i < indices.length; i++) { assertThat(counts[i], equalTo(client().prepareCount(indices[i]).get().getCount())); } + logger.info("--> snapshot subset of indices after upgrade"); + createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices("index_*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + // Test restore after index deletion logger.info("--> delete indices"); String index = RandomPicks.randomFrom(getRandom(), indices); cluster().wipeIndices(index); logger.info("--> restore one index after deletion"); - restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices(index).execute().actionGet(); + restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices(index).execute().actionGet(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); ensureYellow(); - assertThat(client().prepareCount(indices).get().getCount(), equalTo((long)builders.length)); + assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); for (int i = 0; i < indices.length; i++) { assertThat(counts[i], equalTo(client().prepareCount(indices[i]).get().getCount())); } diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java index 8c89d532d4020..41b23b1ea51b8 100644 --- a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java +++ b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java @@ -422,28 +422,17 @@ public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, I .put("chunk_size", randomIntBetween(100, 1000)))); logger.info("--> snapshot"); CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test").get(); - if (createSnapshotResponse.getSnapshotInfo().state() == SnapshotState.PARTIAL) { - logger.info("failed during snapshot -- maybe SI file got corrupted"); - final List files = listShardFiles(shardRouting); - File corruptedFile = null; - for (File file : files) { - if (file.getName().startsWith("corrupted_")) { - corruptedFile = file; - break; - } + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo(SnapshotState.PARTIAL)); + logger.info("failed during snapshot -- maybe SI file got corrupted"); + final List files = listShardFiles(shardRouting); + File corruptedFile = null; + for (File file : files) { + if (file.getName().startsWith("corrupted_")) { + corruptedFile = file; + break; } - assertThat(corruptedFile, notNullValue()); - } else { - assertThat(""+createSnapshotResponse.getSnapshotInfo().state(), createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); - assertThat(""+createSnapshotResponse.getSnapshotInfo().state(), createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); - - assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); - - cluster().wipeIndices("test"); - RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet(); - assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(restoreSnapshotResponse.getRestoreInfo().totalShards()-1)); } + assertThat(corruptedFile, notNullValue()); } private int numShards(String... index) { diff --git a/src/test/java/org/elasticsearch/index/store/StoreTest.java b/src/test/java/org/elasticsearch/index/store/StoreTest.java index 1df33e7b44252..80b0917707e0f 100644 --- a/src/test/java/org/elasticsearch/index/store/StoreTest.java +++ b/src/test/java/org/elasticsearch/index/store/StoreTest.java @@ -48,6 +48,8 @@ import java.util.HashMap; import java.util.Map; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomInt; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween; import static org.hamcrest.Matchers.*; public class StoreTest extends ElasticsearchLuceneTestCase { @@ -362,6 +364,86 @@ public void testRenameFile() throws IOException { IOUtils.close(store); } + @Test + public void testVerifyingIndexInput() throws IOException { + Directory dir = newDirectory(); + IndexOutput output = dir.createOutput("foo.bar", IOContext.DEFAULT); + int iters = scaledRandomIntBetween(10, 100); + for (int i = 0; i < iters; i++) { + BytesRef bytesRef = new BytesRef(TestUtil.randomRealisticUnicodeString(random(), 10, 1024)); + output.writeBytes(bytesRef.bytes, bytesRef.offset, bytesRef.length); + } + CodecUtil.writeFooter(output); + output.close(); + + // Check file + IndexInput indexInput = dir.openInput("foo.bar", IOContext.DEFAULT); + long checksum = CodecUtil.retrieveChecksum(indexInput); + indexInput.seek(0); + IndexInput verifyingIndexInput = new Store.VerifyingIndexInput(dir.openInput("foo.bar", IOContext.DEFAULT)); + readIndexInputFullyWithRandomSeeks(verifyingIndexInput); + Store.verify(verifyingIndexInput); + assertThat(checksum, equalTo(((ChecksumIndexInput) verifyingIndexInput).getChecksum())); + IOUtils.close(indexInput, verifyingIndexInput); + + // Corrupt file and check again + corruptFile(dir, "foo.bar", "foo1.bar"); + verifyingIndexInput = new Store.VerifyingIndexInput(dir.openInput("foo1.bar", IOContext.DEFAULT)); + readIndexInputFullyWithRandomSeeks(verifyingIndexInput); + try { + Store.verify(verifyingIndexInput); + fail("should be a corrupted index"); + } catch (CorruptIndexException ex) { + // ok + } + IOUtils.close(verifyingIndexInput); + + IOUtils.close(dir); + } + + private void readIndexInputFullyWithRandomSeeks(IndexInput indexInput) throws IOException{ + BytesRef ref = new BytesRef(scaledRandomIntBetween(1, 1024)); + long pos = 0; + while (pos < indexInput.length()) { + assertEquals(pos, indexInput.getFilePointer()); + int op = random().nextInt(5); + if (op == 0 ) { + int shift = 100 - randomIntBetween(0, 200); + pos = Math.min(indexInput.length() - 1, Math.max(0, pos + shift)); + indexInput.seek(pos); + } else if (op == 1) { + indexInput.readByte(); + pos ++; + } else { + int min = (int) Math.min(indexInput.length() - pos, ref.bytes.length); + indexInput.readBytes(ref.bytes, ref.offset, min); + pos += min; + } + } + } + + private void corruptFile(Directory dir, String fileIn, String fileOut) throws IOException { + IndexInput input = dir.openInput(fileIn, IOContext.READONCE); + IndexOutput output = dir.createOutput(fileOut, IOContext.DEFAULT); + long len = input.length(); + byte[] b = new byte[1024]; + long broken = randomInt((int) len); + long pos = 0; + while (pos < len) { + int min = (int) Math.min(input.length() - pos, b.length); + input.readBytes(b, 0, min); + if (broken >= pos && broken < pos + min) { + // Flip one byte + int flipPos = (int) (broken - pos); + b[flipPos] = (byte) (b[flipPos] ^ 42); + } + output.writeBytes(b, min); + pos += min; + } + IOUtils.close(input, output); + + } + public void assertDeleteContent(Store store,DirectoryService service) throws IOException { store.deleteContent(); assertThat(Arrays.toString(store.directory().listAll()), store.directory().listAll().length, equalTo(0));