Skip to content

Commit

Permalink
Snapshot checksum verification
Browse files Browse the repository at this point in the history
Adds automatic verification of all files that are being snapshotted. Closes #5593
  • Loading branch information
imotov committed Aug 20, 2014
1 parent 65995c9 commit 1e00a0b
Show file tree
Hide file tree
Showing 6 changed files with 384 additions and 137 deletions.
Expand Up @@ -22,10 +22,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.store.*;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.SnapshotId;
Expand All @@ -35,11 +32,11 @@
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.lucene.store.ThreadSafeInputStreamIndexInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.*;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
Expand All @@ -59,7 +56,6 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static com.google.common.collect.Lists.newArrayList;

Expand Down Expand Up @@ -396,8 +392,10 @@ private class SnapshotContext extends Context {
*/
public SnapshotContext(SnapshotId snapshotId, ShardId shardId, IndexShardSnapshotStatus snapshotStatus) {
super(snapshotId, shardId);
store = indicesService.indexServiceSafe(shardId.getIndex()).shardInjectorSafe(shardId.id()).getInstance(Store.class);
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
store = indexService.shardInjectorSafe(shardId.id()).getInstance(Store.class);
this.snapshotStatus = snapshotStatus;

}

/**
Expand Down Expand Up @@ -439,11 +437,6 @@ public void snapshot(SnapshotIndexCommit snapshotIndexCommit) {
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md = metadata.get(fileName);
boolean snapshotRequired = false;
// TODO: For now segment files are copied on each commit because segment files don't have checksum
// if (snapshot.indexChanged() && fileName.equals(snapshotIndexCommit.getSegmentsFileName())) {
// snapshotRequired = true; // we want to always snapshot the segment file if the index changed
// }

BlobStoreIndexShardSnapshot.FileInfo fileInfo = snapshots.findPhysicalIndexFile(fileName);

if (fileInfo == null || !fileInfo.isSame(md) || !snapshotFileExistsInBlobs(fileInfo, blobs)) {
Expand Down Expand Up @@ -532,50 +525,101 @@ public void snapshot(SnapshotIndexCommit snapshotIndexCommit) {
* @throws IOException
*/
private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo, final CountDownLatch latch, final List<Throwable> failures) throws IOException {
final AtomicLong counter = new AtomicLong(fileInfo.numberOfParts());
for (long i = 0; i < fileInfo.numberOfParts(); i++) {
IndexInput indexInput = null;
try {
final String file = fileInfo.physicalName();
indexInput = store.directory().openInput(file, IOContext.READONCE);
indexInput.seek(i * fileInfo.partBytes());
InputStreamIndexInput inputStreamIndexInput = new ThreadSafeInputStreamIndexInput(indexInput, fileInfo.partBytes());

final IndexInput fIndexInput = indexInput;
long size = inputStreamIndexInput.actualSizeToRead();
InputStream inputStream;
if (snapshotRateLimiter != null) {
inputStream = new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, snapshotThrottleListener);
} else {
inputStream = inputStreamIndexInput;
}
inputStream = new AbortableInputStream(inputStream, file);
blobContainer.writeBlob(fileInfo.partName(i), inputStream, size, new ImmutableBlobContainer.WriterListener() {
@Override
public void onCompleted() {
IOUtils.closeWhileHandlingException(fIndexInput);
snapshotStatus.addProcessedFile(fileInfo.length());
if (counter.decrementAndGet() == 0) {
latch.countDown();
}
}
final String file = fileInfo.physicalName();
IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata());
writeBlob(indexInput, fileInfo, 0, latch, failures);
}

@Override
public void onFailure(Throwable t) {
IOUtils.closeWhileHandlingException(fIndexInput);
snapshotStatus.addProcessedFile(0);
failures.add(t);
if (counter.decrementAndGet() == 0) {
latch.countDown();
}
}
});
} catch (Throwable e) {
IOUtils.closeWhileHandlingException(indexInput);
failures.add(e);
private class BlobPartWriter implements ImmutableBlobContainer.WriterListener {

private final int part;

private final FileInfo fileInfo;

private final List<Throwable> failures;

private final CountDownLatch latch;

private final IndexInput indexInput;

private final InputStream inputStream;

private final InputStreamIndexInput inputStreamIndexInput;

private BlobPartWriter(IndexInput indexInput, FileInfo fileInfo, int part, CountDownLatch latch, List<Throwable> failures) throws IOException {
this.indexInput = indexInput;
this.part = part;
this.fileInfo = fileInfo;
this.failures = failures;
this.latch = latch;
inputStreamIndexInput = new InputStreamIndexInput(indexInput, fileInfo.partBytes());
InputStream inputStream = inputStreamIndexInput;
if (snapshotRateLimiter != null) {
inputStream = new RateLimitingInputStream(inputStream, snapshotRateLimiter, snapshotThrottleListener);
}
inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName());
this.inputStream = inputStream;
}

@Override
public void onCompleted() {
int nextPart = part + 1;
if (nextPart < fileInfo.numberOfParts()) {
try {
// We have more parts to go
writeBlob(indexInput, fileInfo, nextPart, latch, failures);
} catch (Throwable t) {
onFailure(t);
}
} else {
// Last part - verify checksum
try {
Store.verify(indexInput);
indexInput.close();
snapshotStatus.addProcessedFile(fileInfo.length());
} catch (Throwable t) {
onFailure(t);
return;
}
latch.countDown();
}
}

@Override
public void onFailure(Throwable t) {
cleanupFailedSnapshot(t, indexInput, latch, failures);
}

public void writeBlobPart() throws IOException {
blobContainer.writeBlob(fileInfo.partName(part), inputStream, inputStreamIndexInput.actualSizeToRead(), this);
}

}

private void writeBlob(IndexInput indexInput, FileInfo fileInfo, int part, CountDownLatch latch, List<Throwable> failures) {
try {
new BlobPartWriter(indexInput, fileInfo, part, latch, failures).writeBlobPart();
} catch (Throwable t) {
cleanupFailedSnapshot(t, indexInput, latch, failures);
}
}

private void cleanupFailedSnapshot(Throwable t, IndexInput indexInput, CountDownLatch latch, List<Throwable> failures) {
IOUtils.closeWhileHandlingException(indexInput);
failStoreIfCorrupted(t);
snapshotStatus.addProcessedFile(0);
failures.add(t);
latch.countDown();
}

private void failStoreIfCorrupted(Throwable t) {
if (t instanceof CorruptIndexException) {
try {
store.markStoreCorrupted((CorruptIndexException) t);
} catch (IOException e) {
logger.warn("store cannot be marked as corrupted", e);
}
}
}

/**
Expand Down Expand Up @@ -850,9 +894,9 @@ public void onFailure(Throwable t) {
IOUtils.closeWhileHandlingException(indexOutput);
if (t instanceof CorruptIndexException) {
try {
store.markStoreCorrupted((CorruptIndexException)t);
store.markStoreCorrupted((CorruptIndexException) t);
} catch (IOException e) {
//
logger.warn("store cannot be marked as corrupted", e);
}
}
store.deleteQuiet(fileInfo.physicalName());
Expand Down
Expand Up @@ -21,15 +21,14 @@

import org.apache.lucene.store.RateLimiter;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;

/**
* Rate limiting wrapper for InputStream
*/
public class RateLimitingInputStream extends InputStream {

private final InputStream delegate;
public class RateLimitingInputStream extends FilterInputStream {

private final RateLimiter rateLimiter;

Expand All @@ -42,7 +41,7 @@ public interface Listener {
}

public RateLimitingInputStream(InputStream delegate, RateLimiter rateLimiter, Listener listener) {
this.delegate = delegate;
super(delegate);
this.rateLimiter = rateLimiter;
this.listener = listener;
}
Expand All @@ -60,52 +59,17 @@ private void maybePause(int bytes) {

@Override
public int read() throws IOException {
int b = delegate.read();
int b = super.read();
maybePause(1);
return b;
}

@Override
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int n = delegate.read(b, off, len);
int n = super.read(b, off, len);
if (n > 0) {
maybePause(n);
}
return n;
}

@Override
public long skip(long n) throws IOException {
return delegate.skip(n);
}

@Override
public int available() throws IOException {
return delegate.available();
}

@Override
public void close() throws IOException {
delegate.close();
}

@Override
public void mark(int readlimit) {
delegate.mark(readlimit);
}

@Override
public void reset() throws IOException {
delegate.reset();
}

@Override
public boolean markSupported() {
return delegate.markSupported();
}
}

0 comments on commit 1e00a0b

Please sign in to comment.