Skip to content

Commit

Permalink
Persist all engine failure exceptions
Browse files Browse the repository at this point in the history
Currently when an engine fails the shard state file is no longer deleted #11933
and the underlying store is only marked as corrupted for index corruption exceptions.
This means that the store can be opened, even after it failed with IOE, OOM exceptions.

It would be useful to persist the engine failures that are not due to corruption for
inspection, these can be exposed later through #11545
  • Loading branch information
areek committed Jul 9, 2015
1 parent 15d6271 commit 6c6d05c
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 21 deletions.
12 changes: 5 additions & 7 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
Expand Down Expand Up @@ -533,12 +532,11 @@ public void failEngine(String reason, @Nullable Throwable failure) {
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(new IOException("failed engine (reason: [" + reason + "])", ExceptionsHelper.unwrapCorruption(failure)));
} catch (IOException e) {
logger.warn("Couldn't mark store corrupted", e);
}
try {
store.markStoreFailed(new IOException("failed engine (reason: [" + reason + "])",
Lucene.isCorruptionException(failure) ? ExceptionsHelper.unwrapCorruption(failure) : failure));
} catch (IOException e) {
logger.warn("Couldn't mark store failed", e);
}
failedEngineListener.onFailedEngine(shardId, reason, failure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) t
private void failStoreIfCorrupted(Throwable t) {
if (t instanceof CorruptIndexException || t instanceof IndexFormatTooOldException || t instanceof IndexFormatTooNewException) {
try {
store.markStoreCorrupted((IOException) t);
store.markStoreFailed((IOException) t);
} catch (IOException e) {
logger.warn("store cannot be marked as corrupted", e);
}
Expand Down Expand Up @@ -962,7 +962,7 @@ private void restoreFile(final FileInfo fileInfo) throws IOException {
success = true;
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
try {
store.markStoreCorrupted(ex);
store.markStoreFailed(ex);
} catch (IOException e) {
logger.warn("store cannot be marked as corrupted", e);
}
Expand Down
78 changes: 68 additions & 10 deletions core/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -89,6 +90,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
private static final int VERSION_START = 0;
private static final int VERSION = VERSION_STACK_TRACE;
private static final String CORRUPTED = "corrupted_";
private static final String FAILED = "failed_";
public static final String INDEX_STORE_STATS_REFRESH_INTERVAL = "index.store.stats_refresh_interval";

private final AtomicBoolean isClosed = new AtomicBoolean(false);
Expand Down Expand Up @@ -140,7 +142,7 @@ public SegmentInfos readLastCommittedSegmentsInfo() throws IOException {
try {
return readSegmentsInfo(null, directory());
} catch (CorruptIndexException ex) {
markStoreCorrupted(ex);
markStoreFailed(ex);
throw ex;
}
}
Expand Down Expand Up @@ -225,7 +227,7 @@ public MetadataSnapshot getMetadata(IndexCommit commit) throws IOException {
try {
return new MetadataSnapshot(commit, directory, logger);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
markStoreCorrupted(ex);
markStoreFailed(ex);
throw ex;
} finally {
metadataLock.readLock().unlock();
Expand Down Expand Up @@ -500,6 +502,10 @@ public static void checkIntegrity(final StoreFileMetaData md, final Directory di
}
}

/**
* Returns <code>true</code> iff the store has ever failed due
* to index corruption
*/
public boolean isMarkedCorrupted() throws IOException {
ensureOpen();
/* marking a store as corrupted is basically adding a _corrupted to all
Expand All @@ -515,15 +521,30 @@ public boolean isMarkedCorrupted() throws IOException {
}

/**
* Deletes all corruption markers from this store.
* Returns <code>true</code> iff the store has ever failed
*/
public void removeCorruptionMarker() throws IOException {
public boolean isMarkedFailed() throws IOException {
ensureOpen();

final String[] files = directory().listAll();
for (String file : files) {
if (file.startsWith(CORRUPTED) || file.startsWith(FAILED)) {
return true;
}
}
return false;
}

/**
* Deletes all corruption and failure markers from this store.
*/
public void removeFailureMarker() throws IOException {
ensureOpen();
final Directory directory = directory();
IOException firstException = null;
final String[] files = directory.listAll();
for (String file : files) {
if (file.startsWith(CORRUPTED)) {
if (file.startsWith(CORRUPTED) || file.startsWith(FAILED)) {
try {
directory.deleteFile(file);
} catch (IOException ex) {
Expand Down Expand Up @@ -571,6 +592,38 @@ private static final void failIfCorrupted(Directory directory, ShardId shardId)
}
}

/**
* Returns all failures seen by the store, except for index corruption failures
* These failures, unlike index corruption failures, allows the store to be opened
* and used
*/
public List<EngineException> getStoreFailures() throws IOException {
ensureOpen();
return getStoreFailures(directory, shardId);
}

private static final List<EngineException> getStoreFailures(Directory directory, ShardId shardId) throws IOException {
final String[] files = directory.listAll();
List<EngineException> failures = new ArrayList<>();
for (String file : files) {
if (file.startsWith(FAILED)) {
try (ChecksumIndexInput input = directory.openChecksumInput(file, IOContext.READONCE)) {
int version = CodecUtil.checkHeader(input, CODEC, VERSION_START, VERSION);
String msg = input.readString();
StringBuilder builder = new StringBuilder(shardId.toString());
builder.append(" Preexisting failed index [");
builder.append(file).append("] caused by: ");
builder.append(msg);
builder.append(System.lineSeparator());
builder.append(input.readString());
failures.add(new EngineException(shardId, builder.toString()));
CodecUtil.checkFooter(input);
}
}
}
return failures;
}

/**
* This method deletes every file in this store that is not contained in the given source meta data or is a
* legacy checksum file. After the delete it pulls the latest metadata snapshot from the store and compares it
Expand Down Expand Up @@ -1430,17 +1483,22 @@ public void deleteQuiet(String... files) {
* Marks this store as corrupted. This method writes a <tt>corrupted_${uuid}</tt> file containing the given exception
* message. If a store contains a <tt>corrupted_${uuid}</tt> file {@link #isMarkedCorrupted()} will return <code>true</code>.
*/
public void markStoreCorrupted(IOException exception) throws IOException {
public void markStoreFailed(Throwable exception) throws IOException {
ensureOpen();
if (!isMarkedCorrupted()) {
String uuid = CORRUPTED + Strings.randomBase64UUID();
if (exception != null) {
final String uuid;
if (Lucene.isCorruptionException(exception)) {
uuid = CORRUPTED + Strings.randomBase64UUID();
} else {
uuid = FAILED + Strings.randomBase64UUID();
}
try (IndexOutput output = this.directory().createOutput(uuid, IOContext.DEFAULT)) {
CodecUtil.writeHeader(output, CODEC, VERSION);
output.writeString(ExceptionsHelper.detailedMessage(exception, true, 0)); // handles null exception
output.writeString(ExceptionsHelper.stackTrace(exception));
CodecUtil.writeFooter(output);
} catch (IOException ex) {
logger.warn("Can't mark store as corrupted", ex);
} catch (IOException e) {
logger.warn("Can't mark store as corrupted", e);
}
directory().sync(Collections.singleton(uuid));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel
// its content on disk if possible.
try {
try {
store.removeCorruptionMarker();
store.removeFailureMarker();
} finally {
Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files
}
Expand Down
58 changes: 57 additions & 1 deletion core/src/test/java/org/elasticsearch/index/store/StoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1263,8 +1263,64 @@ public Directory newDirectory() throws IOException {
}
};
Store store = new Store(shardId, Settings.EMPTY, directoryService, new DummyShardLock(shardId));
store.markStoreCorrupted(new CorruptIndexException("foo", "bar"));
store.markStoreFailed(new CorruptIndexException("foo", "bar"));
assertFalse(Store.canOpenIndex(logger, tempDir));
store.close();
}

public void testMarkFailures() throws IOException {
IndexWriterConfig iwc = newIndexWriterConfig();
Path tempDir = createTempDir();
final BaseDirectoryWrapper dir = newFSDirectory(tempDir);
assertFalse(Store.canOpenIndex(logger, tempDir));
IndexWriter writer = new IndexWriter(dir, iwc);
Document doc = new Document();
doc.add(new StringField("id", "1", random().nextBoolean() ? Field.Store.YES : Field.Store.NO));
writer.addDocument(doc);
writer.commit();
writer.close();
assertTrue(Store.canOpenIndex(logger, tempDir));

final ShardId shardId = new ShardId(new Index("index"), 1);
DirectoryService directoryService = new DirectoryService(shardId, Settings.EMPTY) {
@Override
public long throttleTimeInNanos() {
return 0;
}

@Override
public Directory newDirectory() throws IOException {
return dir;
}
};
try (Store store = new Store(shardId, Settings.EMPTY, directoryService, new DummyShardLock(shardId))) {
// persist non-index corruption failures
assertTrue(store.getStoreFailures().isEmpty());
store.markStoreFailed(new OutOfMemoryError("foo"));
// store can still be opened
assertTrue(Store.canOpenIndex(logger, tempDir));
assertThat(store.getStoreFailures().size(), equalTo(1));

store.markStoreFailed(new IOException("bar"));
assertTrue(Store.canOpenIndex(logger, tempDir));
// ensure all non-index corruption failures persist
assertThat(store.getStoreFailures().size(), equalTo(2));

// should not fail, the store was marked with non index corruption exceptions
store.failIfCorrupted();

store.markStoreFailed(new CorruptIndexException("foo", "bar"));
assertFalse(Store.canOpenIndex(logger, tempDir));
// store failures still the same
assertThat(store.getStoreFailures().size(), equalTo(2));
// but should fail using the store
assertFalse(Store.canOpenIndex(logger, tempDir));
try {
store.failIfCorrupted();
fail("corrupted store should fail");
} catch (CorruptIndexException ex) {
// expected
}
}
}
}

0 comments on commit 6c6d05c

Please sign in to comment.