Skip to content

Commit

Permalink
[RESTORE] Fail restore if snapshot is corrupted
Browse files Browse the repository at this point in the history
today if a snapshot is corrupted the restore operation
never terminates. Yet, if the snapshot is corrupted there
is no way to restore it anyway. If such a snapshot is restored
today the only way to cancle it is to delete the entire index which
might cause dataloss. This commit also fixes an issue in InternalEngine
where a deadlock can occur if a corruption is detected during flush
since the InternalEngine#snapshotIndex aqcuires a topLevel read lock
which prevents closing the engine.

Closes #6938
  • Loading branch information
s1monw committed Jul 21, 2014
1 parent d65a9d6 commit e730c76
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 24 deletions.
Expand Up @@ -315,9 +315,7 @@ public void start() throws EngineException {
}

private void readLastCommittedSegmentsInfo() throws IOException {
SegmentInfos infos = new SegmentInfos();
infos.read(store.directory());
lastCommittedSegmentInfos = infos;
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
}

@Override
Expand Down Expand Up @@ -907,11 +905,13 @@ public void flush(Flush flush) throws EngineException {
} catch (Throwable e) {
if (!closed) {
logger.warn("failed to read latest segment infos on flush", e);
if (Lucene.isCorruptionException(e)) {
throw new FlushFailedEngineException(shardId, e);
}
}
}

} catch (FlushFailedEngineException ex) {
maybeFailEngine(ex.getCause(), "flush");
maybeFailEngine(ex, "flush");
throw ex;
} finally {
flushLock.unlock();
Expand Down Expand Up @@ -1033,8 +1033,10 @@ public void optimize(Optimize optimize) throws EngineException {

@Override
public SnapshotIndexCommit snapshotIndex() throws EngineException {
// we have to flush outside of the readlock otherwise we might have a problem upgrading
// the to a write lock when we fail the engine in this operation
flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true));
try (InternalLock _ = readLock.acquire()) {
flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true));
ensureOpen();
return deletionPolicy.snapshot();
} catch (IOException e) {
Expand Down Expand Up @@ -1102,16 +1104,19 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException {
}
}

private void maybeFailEngine(Throwable t, String source) {
private boolean maybeFailEngine(Throwable t, String source) {
if (Lucene.isCorruptionException(t)) {
if (this.failEngineOnCorruption) {
failEngine("corrupt file detected source: [" + source + "]", t);
return true;
} else {
logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source, INDEX_FAIL_ON_CORRUPTION, this.failEngineOnCorruption);
}
}else if (ExceptionsHelper.isOOM(t)) {
failEngine("out of memory", t);
return true;
}
return false;
}

private Throwable wrapIfClosed(Throwable t) {
Expand Down Expand Up @@ -1611,11 +1616,11 @@ public void close() throws ElasticsearchException {
}

private static final class InternalLock implements Releasable {
private final ThreadLocal<Boolean> lockIsHeld;
private final ThreadLocal<AtomicInteger> lockIsHeld;
private final Lock lock;

InternalLock(Lock lock) {
ThreadLocal<Boolean> tl = null;
ThreadLocal<AtomicInteger> tl = null;
assert (tl = new ThreadLocal<>()) != null;
lockIsHeld = tl;
this.lock = lock;
Expand All @@ -1635,18 +1640,26 @@ InternalLock acquire() throws EngineException {


protected boolean onAssertRelease() {
lockIsHeld.set(Boolean.FALSE);
AtomicInteger count = lockIsHeld.get();
if (count.decrementAndGet() == 0) {
lockIsHeld.remove();
}
return true;
}

protected boolean onAssertLock() {
lockIsHeld.remove();
AtomicInteger count = lockIsHeld.get();
if (count == null) {
count = new AtomicInteger(0);
lockIsHeld.set(count);
}
count.incrementAndGet();
return true;
}

boolean assertLockIsHeld() {
Boolean aBoolean = lockIsHeld.get();
return aBoolean != null && aBoolean.booleanValue();
AtomicInteger count = lockIsHeld.get();
return count != null && count.get() > 0;
}
}

Expand Down
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
Expand Down Expand Up @@ -123,6 +124,9 @@ public void restore(final RecoveryState recoveryState) {
indexShardRepository.restore(restoreSource.snapshotId(), shardId, snapshotShardId, recoveryState);
restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shardId);
} catch (Throwable t) {
if (Lucene.isCorruptionException(t)) {
restoreService.failRestore(restoreSource.snapshotId(), shardId());
}
throw new IndexShardRestoreFailedException(shardId, "restore failed", t);
}
}
Expand Down
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
Expand Down Expand Up @@ -674,6 +675,9 @@ public void restore() {
Map<String, StoreFileMetaData> metadata = Collections.emptyMap();
try {
metadata = store.getMetadata().asMap();
} catch (CorruptIndexException e) {
logger.warn("{} Can't read metadata from store", e, shardId);
throw new IndexShardRestoreFailedException(shardId, "Can't restore corrupted shard", e);
} catch (Throwable e) {
// if the index is broken we might not be able to read it
logger.warn("{} Can't read metadata from store", e, shardId);
Expand All @@ -684,8 +688,7 @@ public void restore() {
String fileName = fileInfo.physicalName();
final StoreFileMetaData md = metadata.get(fileName);
numberOfFiles++;
// we don't compute checksum for segments, so always recover them
if (!fileName.startsWith("segments") && md != null && fileInfo.isSame(md)) {
if (md != null && fileInfo.isSame(md)) {
totalSize += md.length();
numberOfReusedFiles++;
reusedTotalSize += md.length();
Expand Down Expand Up @@ -845,6 +848,13 @@ public void onFailure(Throwable t) {
try {
failures.add(t);
IOUtils.closeWhileHandlingException(indexOutput);
if (t instanceof CorruptIndexException) {
try {
store.markStoreCorrupted((CorruptIndexException)t);
} catch (IOException e) {
//
}
}
store.deleteQuiet(fileInfo.physicalName());
} finally {
latch.countDown();
Expand Down
44 changes: 36 additions & 8 deletions src/main/java/org/elasticsearch/index/store/Store.java
Expand Up @@ -107,6 +107,27 @@ public Directory directory() {
return directory;
}

/**
* Returns the last committed segments info for this store
* @throws IOException if the index is corrupted or the segments file is not present
*/
public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
return readLastCommittedSegmentsInfo(directory());
}

/**
* Returns the last committed segments info for the given directory
* @throws IOException if the index is corrupted or the segments file is not present
*/
private static SegmentInfos readLastCommittedSegmentsInfo(Directory directory) throws IOException {
try {
return Lucene.readSegmentInfos(directory);
} catch (EOFException eof) {
// TODO this should be caught by lucene - EOF is almost certainly an index corruption
throw new CorruptIndexException("Read past EOF while reading segment infos", eof);
}
}

private final void ensureOpen() {
if (this.refCount.get() <= 0) {
throw new AlreadyClosedException("Store is already closed");
Expand All @@ -119,7 +140,12 @@ private final void ensureOpen() {
public MetadataSnapshot getMetadata() throws IOException {
ensureOpen();
failIfCorrupted();
return new MetadataSnapshot(distributorDirectory, logger);
try {
return new MetadataSnapshot(distributorDirectory, logger);
} catch (CorruptIndexException ex) {
markStoreCorrupted(ex);
throw ex;
}
}

/**
Expand Down Expand Up @@ -241,6 +267,7 @@ public static MetadataSnapshot readMetadataSnapshot(File[] indexLocations, ESLog
dirs[i] = new SimpleFSDirectory(indexLocations[i]);
}
DistributorDirectory dir = new DistributorDirectory(dirs);
failIfCorrupted(dir, new ShardId("", 1));
return new MetadataSnapshot(dir, logger);
} finally {
IOUtils.close(dirs);
Expand Down Expand Up @@ -298,14 +325,18 @@ public boolean isMarkedCorrupted() throws IOException {

public void failIfCorrupted() throws IOException {
ensureOpen();
final String[] files = directory().listAll();
failIfCorrupted(directory, shardId);
}

private static final void failIfCorrupted(Directory directory, ShardId shardId) throws IOException {
final String[] files = directory.listAll();
List<CorruptIndexException> ex = new ArrayList<>();
for (String file : files) {
if (file.startsWith(CORRUPTED)) {
try(ChecksumIndexInput input = directory().openChecksumInput(file, IOContext.READONCE)) {
try(ChecksumIndexInput input = directory.openChecksumInput(file, IOContext.READONCE)) {
CodecUtil.checkHeader(input, CODEC, VERSION, VERSION);
String msg = input.readString();
StringBuilder builder = new StringBuilder(this.shardId.toString());
StringBuilder builder = new StringBuilder(shardId.toString());
builder.append(" Corrupted index [");
builder.append(file).append("] caused by: ");
builder.append(msg);
Expand Down Expand Up @@ -408,14 +439,11 @@ ImmutableMap<String, StoreFileMetaData> buildMetadata(Directory directory, ESLog
try {
final SegmentInfos segmentCommitInfos;
try {
segmentCommitInfos = Lucene.readSegmentInfos(directory);
segmentCommitInfos = Store.readLastCommittedSegmentsInfo(directory);
} catch (FileNotFoundException | NoSuchFileException ex) {
// no segments file -- can't read metadata
logger.trace("Can't read segment infos", ex);
return ImmutableMap.of();
} catch (EOFException eof) {
// TODO this should be caught by lucene - EOF is almost certainly an index corruption
throw new CorruptIndexException("Read past EOF while reading segment infos", eof);
}
Version maxVersion = Version.LUCENE_3_0; // we don't know which version was used to write so we take the max version.
Set<String> added = new HashSet<>();
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/elasticsearch/snapshots/RestoreService.java
Expand Up @@ -477,6 +477,21 @@ private void processDeletedIndices(ClusterChangedEvent event) {
}
}

/**
* Fails the given snapshot restore operation for the given shard
*/
public void failRestore(SnapshotId snapshotId, ShardId shardId) {
logger.debug("[{}] failed to restore shard [{}]", snapshotId, shardId);
UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId,
new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.FAILURE));
if (clusterService.state().nodes().localNodeMaster()) {
innerUpdateRestoreState(request);
} else {
transportService.sendRequest(clusterService.state().nodes().masterNode(),
UpdateRestoreStateRequestHandler.ACTION, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}

private boolean failed(Snapshot snapshot, String index) {
for (SnapshotShardFailure failure : snapshot.shardFailures()) {
if (index.equals(failure.index())) {
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.store;

import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.google.common.base.Charsets;
import com.google.common.base.Predicate;
Expand All @@ -32,6 +33,8 @@
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
Expand Down Expand Up @@ -62,6 +65,7 @@
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.monitor.fs.FsStats;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.elasticsearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -376,6 +380,72 @@ public void sendRequest(DiscoveryNode node, long requestId, String action, Trans

}


/**
* Tests that restoring of a corrupted shard fails and we get a partial snapshot.
* TODO once checksum verification on snapshotting is implemented this test needs to be fixed or split into several
* parts... We should also corrupt files on the actual snapshot and check that we don't restore the corrupted shard.
*/
@Test
public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, InterruptedException, IOException {
int numDocs = scaledRandomIntBetween(100, 1000);
assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(2));

assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
.put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class)
.put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose
.put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, true)
.put("indices.recovery.concurrent_streams", 10)
));
ensureGreen();
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex("test", "type").setSource("field", "value");
}
indexRandom(true, builders);
ensureGreen();
assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet());
// we have to flush at least once here since we don't corrupt the translog
CountResponse countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);

ShardRouting shardRouting = corruptRandomFile(false);
// we don't corrupt segments.gen since S/R doesn't snapshot this file
// the other problem here why we can't corrupt segments.X files is that the snapshot flushes again before
// it snapshots and that will write a new segments.X+1 file
logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
.put("location", newTempDir(LifecycleScope.SUITE).getAbsolutePath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000))));
logger.info("--> snapshot");
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test").get();
if (createSnapshotResponse.getSnapshotInfo().state() == SnapshotState.PARTIAL) {
logger.info("failed during snapshot -- maybe SI file got corrupted");
final List<File> files = listShardFiles(shardRouting);
File corruptedFile = null;
for (File file : files) {
if (file.getName().startsWith("corrupted_")) {
corruptedFile = file;
break;
}
}
assertThat(corruptedFile, notNullValue());
} else {
assertThat(""+createSnapshotResponse.getSnapshotInfo().state(), createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(""+createSnapshotResponse.getSnapshotInfo().state(), createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));

assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));

cluster().wipeIndices("test");
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(restoreSnapshotResponse.getRestoreInfo().totalShards()-1));
}
}

private int numShards(String... index) {
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(index, false);
Expand All @@ -384,6 +454,10 @@ private int numShards(String... index) {


private ShardRouting corruptRandomFile() throws IOException {
return corruptRandomFile(true);
}

private ShardRouting corruptRandomFile(final boolean includeSegmentsFiles) throws IOException {
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false);
ShardIterator shardIterator = RandomPicks.randomFrom(getRandom(), shardIterators.iterators());
Expand All @@ -401,7 +475,8 @@ private ShardRouting corruptRandomFile() throws IOException {
files.addAll(Arrays.asList(file.listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname.isFile() && !"write.lock".equals(pathname.getName());
return pathname.isFile() && !"write.lock".equals(pathname.getName()) &&
(includeSegmentsFiles == true || pathname.getName().startsWith("segments") == false);
}
})));
}
Expand Down

0 comments on commit e730c76

Please sign in to comment.