diff --git a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java index ae846fdca7a05..25868bd1b3f10 100644 --- a/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java +++ b/src/main/java/org/elasticsearch/index/engine/internal/InternalEngine.java @@ -315,9 +315,7 @@ public void start() throws EngineException { } private void readLastCommittedSegmentsInfo() throws IOException { - SegmentInfos infos = new SegmentInfos(); - infos.read(store.directory()); - lastCommittedSegmentInfos = infos; + lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); } @Override @@ -907,11 +905,13 @@ public void flush(Flush flush) throws EngineException { } catch (Throwable e) { if (!closed) { logger.warn("failed to read latest segment infos on flush", e); + if (Lucene.isCorruptionException(e)) { + throw new FlushFailedEngineException(shardId, e); + } } } - } catch (FlushFailedEngineException ex) { - maybeFailEngine(ex.getCause(), "flush"); + maybeFailEngine(ex, "flush"); throw ex; } finally { flushLock.unlock(); @@ -1033,8 +1033,10 @@ public void optimize(Optimize optimize) throws EngineException { @Override public SnapshotIndexCommit snapshotIndex() throws EngineException { + // we have to flush outside of the readlock otherwise we might have a problem upgrading + // the to a write lock when we fail the engine in this operation + flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true)); try (InternalLock _ = readLock.acquire()) { - flush(new Flush().type(Flush.Type.COMMIT).waitIfOngoing(true)); ensureOpen(); return deletionPolicy.snapshot(); } catch (IOException e) { @@ -1102,16 +1104,19 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException { } } - private void maybeFailEngine(Throwable t, String source) { + private boolean maybeFailEngine(Throwable t, String source) { if (Lucene.isCorruptionException(t)) { if (this.failEngineOnCorruption) { failEngine("corrupt file detected source: [" + source + "]", t); + return true; } else { logger.warn("corrupt file detected source: [{}] but [{}] is set to [{}]", t, source, INDEX_FAIL_ON_CORRUPTION, this.failEngineOnCorruption); } }else if (ExceptionsHelper.isOOM(t)) { failEngine("out of memory", t); + return true; } + return false; } private Throwable wrapIfClosed(Throwable t) { @@ -1611,11 +1616,11 @@ public void close() throws ElasticsearchException { } private static final class InternalLock implements Releasable { - private final ThreadLocal lockIsHeld; + private final ThreadLocal lockIsHeld; private final Lock lock; InternalLock(Lock lock) { - ThreadLocal tl = null; + ThreadLocal tl = null; assert (tl = new ThreadLocal<>()) != null; lockIsHeld = tl; this.lock = lock; @@ -1635,18 +1640,26 @@ InternalLock acquire() throws EngineException { protected boolean onAssertRelease() { - lockIsHeld.set(Boolean.FALSE); + AtomicInteger count = lockIsHeld.get(); + if (count.decrementAndGet() == 0) { + lockIsHeld.remove(); + } return true; } protected boolean onAssertLock() { - lockIsHeld.remove(); + AtomicInteger count = lockIsHeld.get(); + if (count == null) { + count = new AtomicInteger(0); + lockIsHeld.set(count); + } + count.incrementAndGet(); return true; } boolean assertLockIsHeld() { - Boolean aBoolean = lockIsHeld.get(); - return aBoolean != null && aBoolean.booleanValue(); + AtomicInteger count = lockIsHeld.get(); + return count != null && count.get() > 0; } } diff --git a/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java b/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java index 61bc01262375d..5a4687b8c0494 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java +++ b/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotAndRestoreService.java @@ -22,6 +22,7 @@ import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.routing.RestoreSource; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; @@ -123,6 +124,9 @@ public void restore(final RecoveryState recoveryState) { indexShardRepository.restore(restoreSource.snapshotId(), shardId, snapshotShardId, recoveryState); restoreService.indexShardRestoreCompleted(restoreSource.snapshotId(), shardId); } catch (Throwable t) { + if (Lucene.isCorruptionException(t)) { + restoreService.failRestore(restoreSource.snapshotId(), shardId()); + } throw new IndexShardRestoreFailedException(shardId, "restore failed", t); } } diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index 98bc701188db3..63c95a0469139 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; @@ -674,6 +675,9 @@ public void restore() { Map metadata = Collections.emptyMap(); try { metadata = store.getMetadata().asMap(); + } catch (CorruptIndexException e) { + logger.warn("{} Can't read metadata from store", e, shardId); + throw new IndexShardRestoreFailedException(shardId, "Can't restore corrupted shard", e); } catch (Throwable e) { // if the index is broken we might not be able to read it logger.warn("{} Can't read metadata from store", e, shardId); @@ -684,8 +688,7 @@ public void restore() { String fileName = fileInfo.physicalName(); final StoreFileMetaData md = metadata.get(fileName); numberOfFiles++; - // we don't compute checksum for segments, so always recover them - if (!fileName.startsWith("segments") && md != null && fileInfo.isSame(md)) { + if (md != null && fileInfo.isSame(md)) { totalSize += md.length(); numberOfReusedFiles++; reusedTotalSize += md.length(); @@ -845,6 +848,13 @@ public void onFailure(Throwable t) { try { failures.add(t); IOUtils.closeWhileHandlingException(indexOutput); + if (t instanceof CorruptIndexException) { + try { + store.markStoreCorrupted((CorruptIndexException)t); + } catch (IOException e) { + // + } + } store.deleteQuiet(fileInfo.physicalName()); } finally { latch.countDown(); diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index 825e4fdecee14..8bcc1039f070f 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -107,6 +107,27 @@ public Directory directory() { return directory; } + /** + * Returns the last committed segments info for this store + * @throws IOException if the index is corrupted or the segments file is not present + */ + public SegmentInfos readLastCommittedSegmentsInfo() throws IOException { + return readLastCommittedSegmentsInfo(directory()); + } + + /** + * Returns the last committed segments info for the given directory + * @throws IOException if the index is corrupted or the segments file is not present + */ + private static SegmentInfos readLastCommittedSegmentsInfo(Directory directory) throws IOException { + try { + return Lucene.readSegmentInfos(directory); + } catch (EOFException eof) { + // TODO this should be caught by lucene - EOF is almost certainly an index corruption + throw new CorruptIndexException("Read past EOF while reading segment infos", eof); + } + } + private final void ensureOpen() { if (this.refCount.get() <= 0) { throw new AlreadyClosedException("Store is already closed"); @@ -119,7 +140,12 @@ private final void ensureOpen() { public MetadataSnapshot getMetadata() throws IOException { ensureOpen(); failIfCorrupted(); - return new MetadataSnapshot(distributorDirectory, logger); + try { + return new MetadataSnapshot(distributorDirectory, logger); + } catch (CorruptIndexException ex) { + markStoreCorrupted(ex); + throw ex; + } } /** @@ -241,6 +267,7 @@ public static MetadataSnapshot readMetadataSnapshot(File[] indexLocations, ESLog dirs[i] = new SimpleFSDirectory(indexLocations[i]); } DistributorDirectory dir = new DistributorDirectory(dirs); + failIfCorrupted(dir, new ShardId("", 1)); return new MetadataSnapshot(dir, logger); } finally { IOUtils.close(dirs); @@ -298,14 +325,18 @@ public boolean isMarkedCorrupted() throws IOException { public void failIfCorrupted() throws IOException { ensureOpen(); - final String[] files = directory().listAll(); + failIfCorrupted(directory, shardId); + } + + private static final void failIfCorrupted(Directory directory, ShardId shardId) throws IOException { + final String[] files = directory.listAll(); List ex = new ArrayList<>(); for (String file : files) { if (file.startsWith(CORRUPTED)) { - try(ChecksumIndexInput input = directory().openChecksumInput(file, IOContext.READONCE)) { + try(ChecksumIndexInput input = directory.openChecksumInput(file, IOContext.READONCE)) { CodecUtil.checkHeader(input, CODEC, VERSION, VERSION); String msg = input.readString(); - StringBuilder builder = new StringBuilder(this.shardId.toString()); + StringBuilder builder = new StringBuilder(shardId.toString()); builder.append(" Corrupted index ["); builder.append(file).append("] caused by: "); builder.append(msg); @@ -408,14 +439,11 @@ ImmutableMap buildMetadata(Directory directory, ESLog try { final SegmentInfos segmentCommitInfos; try { - segmentCommitInfos = Lucene.readSegmentInfos(directory); + segmentCommitInfos = Store.readLastCommittedSegmentsInfo(directory); } catch (FileNotFoundException | NoSuchFileException ex) { // no segments file -- can't read metadata logger.trace("Can't read segment infos", ex); return ImmutableMap.of(); - } catch (EOFException eof) { - // TODO this should be caught by lucene - EOF is almost certainly an index corruption - throw new CorruptIndexException("Read past EOF while reading segment infos", eof); } Version maxVersion = Version.LUCENE_3_0; // we don't know which version was used to write so we take the max version. Set added = new HashSet<>(); diff --git a/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 8047081c3c5ba..9b02bd1ec48c1 100644 --- a/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -477,6 +477,21 @@ private void processDeletedIndices(ClusterChangedEvent event) { } } + /** + * Fails the given snapshot restore operation for the given shard + */ + public void failRestore(SnapshotId snapshotId, ShardId shardId) { + logger.debug("[{}] failed to restore shard [{}]", snapshotId, shardId); + UpdateIndexShardRestoreStatusRequest request = new UpdateIndexShardRestoreStatusRequest(snapshotId, shardId, + new ShardRestoreStatus(clusterService.state().nodes().localNodeId(), RestoreMetaData.State.FAILURE)); + if (clusterService.state().nodes().localNodeMaster()) { + innerUpdateRestoreState(request); + } else { + transportService.sendRequest(clusterService.state().nodes().masterNode(), + UpdateRestoreStateRequestHandler.ACTION, request, EmptyTransportResponseHandler.INSTANCE_SAME); + } + } + private boolean failed(Snapshot snapshot, String index) { for (SnapshotShardFailure failure : snapshot.shardFailures()) { if (index.equals(failure.index())) { diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java index 8c5e6573fd77c..e65b565360821 100644 --- a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java +++ b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.index.store; +import com.carrotsearch.randomizedtesting.LifecycleScope; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.google.common.base.Charsets; import com.google.common.base.Predicate; @@ -32,6 +33,8 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -62,6 +65,7 @@ import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.monitor.fs.FsStats; +import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.store.MockFSDirectoryService; import org.elasticsearch.test.transport.MockTransportService; @@ -376,6 +380,72 @@ public void sendRequest(DiscoveryNode node, long requestId, String action, Trans } + + /** + * Tests that restoring of a corrupted shard fails and we get a partial snapshot. + * TODO once checksum verification on snapshotting is implemented this test needs to be fixed or split into several + * parts... We should also corrupt files on the actual snapshot and check that we don't restore the corrupted shard. + */ + @Test + public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, InterruptedException, IOException { + int numDocs = scaledRandomIntBetween(100, 1000); + assertThat(cluster().numDataNodes(), greaterThanOrEqualTo(2)); + + assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test + .put(MergePolicyModule.MERGE_POLICY_TYPE_KEY, NoMergePolicyProvider.class) + .put(MockFSDirectoryService.CHECK_INDEX_ON_CLOSE, false) // no checkindex - we corrupt shards on purpose + .put(InternalEngine.INDEX_FAIL_ON_CORRUPTION, true) + .put("indices.recovery.concurrent_streams", 10) + )); + ensureGreen(); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test", "type").setSource("field", "value"); + } + indexRandom(true, builders); + ensureGreen(); + assertAllSuccessful(client().admin().indices().prepareFlush().setForce(true).execute().actionGet()); + // we have to flush at least once here since we don't corrupt the translog + CountResponse countResponse = client().prepareCount().get(); + assertHitCount(countResponse, numDocs); + + ShardRouting shardRouting = corruptRandomFile(false); + // we don't corrupt segments.gen since S/R doesn't snapshot this file + // the other problem here why we can't corrupt segments.X files is that the snapshot flushes again before + // it snapshots and that will write a new segments.X+1 file + logger.info("--> creating repository"); + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(ImmutableSettings.settingsBuilder() + .put("location", newTempDir(LifecycleScope.SUITE).getAbsolutePath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000)))); + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true).setIndices("test").get(); + if (createSnapshotResponse.getSnapshotInfo().state() == SnapshotState.PARTIAL) { + logger.info("failed during snapshot -- maybe SI file got corrupted"); + final List files = listShardFiles(shardRouting); + File corruptedFile = null; + for (File file : files) { + if (file.getName().startsWith("corrupted_")) { + corruptedFile = file; + break; + } + } + assertThat(corruptedFile, notNullValue()); + } else { + assertThat(""+createSnapshotResponse.getSnapshotInfo().state(), createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(""+createSnapshotResponse.getSnapshotInfo().state(), createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + + cluster().wipeIndices("test"); + RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + assertThat(restoreSnapshotResponse.getRestoreInfo().successfulShards(), equalTo(restoreSnapshotResponse.getRestoreInfo().totalShards()-1)); + } + } + private int numShards(String... index) { ClusterState state = client().admin().cluster().prepareState().get().getState(); GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(index, false); @@ -384,6 +454,10 @@ private int numShards(String... index) { private ShardRouting corruptRandomFile() throws IOException { + return corruptRandomFile(true); + } + + private ShardRouting corruptRandomFile(final boolean includeSegmentsFiles) throws IOException { ClusterState state = client().admin().cluster().prepareState().get().getState(); GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[]{"test"}, false); ShardIterator shardIterator = RandomPicks.randomFrom(getRandom(), shardIterators.iterators()); @@ -401,7 +475,8 @@ private ShardRouting corruptRandomFile() throws IOException { files.addAll(Arrays.asList(file.listFiles(new FileFilter() { @Override public boolean accept(File pathname) { - return pathname.isFile() && !"write.lock".equals(pathname.getName()); + return pathname.isFile() && !"write.lock".equals(pathname.getName()) && + (includeSegmentsFiles == true || pathname.getName().startsWith("segments") == false); } }))); }