Skip to content

Commit

Permalink
Engine: back port fix to a potential dead lock when failing engine du…
Browse files Browse the repository at this point in the history
…ring COMMIT_TRANSLOG flush

This was fixed as part of elastic#9211

Closes elastic#9501
  • Loading branch information
bleskes committed Feb 9, 2015
1 parent f24f385 commit 20aa5fb
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 71 deletions.
Expand Up @@ -134,7 +134,11 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
private final SearcherFactory searcherFactory = new SearchFactory();
private volatile SearcherManager searcherManager;

private volatile boolean closed = false;
// a boolean indicating whether the engine is usable, i.e. was started but didn't fail or closed
private volatile boolean closedOrFailed = true;

// this is a marker to prevent double closing
private final AtomicBoolean isClosed = new AtomicBoolean(false);

// flag indicating if a dirty operation has occurred since the last refresh
private volatile boolean dirty = false;
Expand Down Expand Up @@ -162,7 +166,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
private final ApplySettings applySettings = new ApplySettings();

private volatile boolean failOnMergeFailure;
private Throwable failedEngine = null;
private volatile Throwable failedEngine = null;
private final Lock failEngineLock = new ReentrantLock();
private final CopyOnWriteArrayList<FailedEngineListener> failedEngineListeners = new CopyOnWriteArrayList<>();

Expand Down Expand Up @@ -263,7 +267,7 @@ public void start() throws EngineException {
if (indexWriter != null) {
throw new EngineAlreadyStartedException(shardId);
}
if (closed) {
if (isClosed.get()) {
throw new EngineClosedException(shardId);
}
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -309,6 +313,7 @@ public void start() throws EngineException {
this.searcherManager = buildSearchManager(indexWriter);
versionMap.setManager(searcherManager);
readLastCommittedSegmentsInfo();
closedOrFailed = false;
} catch (IOException e) {
maybeFailEngine(e, "start");
try {
Expand Down Expand Up @@ -534,7 +539,7 @@ private void checkVersionMapRefresh() {
// TODO: we force refresh when versionMap is using > 25% of IW's RAM buffer; should we make this separately configurable?
if (versionMap.ramBytesUsedForRefresh() > 0.25 * indexingBufferSize.bytes() && versionMapRefreshPending.getAndSet(true) == false) {
try {
if (closed) {
if (closedOrFailed) {
// no point...
return;
}
Expand Down Expand Up @@ -962,7 +967,7 @@ public void flush(Flush flush) throws EngineException {
ensureOpen();
readLastCommittedSegmentsInfo();
} catch (Throwable e) {
if (!closed) {
if (closedOrFailed == false) {
logger.warn("failed to read latest segment infos on flush", e);
if (Lucene.isCorruptionException(e)) {
throw new FlushFailedEngineException(shardId, e);
Expand All @@ -979,7 +984,7 @@ public void flush(Flush flush) throws EngineException {
}

private void ensureOpen() {
if (indexWriter == null) {
if (closedOrFailed) {
throw new EngineClosedException(shardId, failedEngine);
}
}
Expand All @@ -990,8 +995,10 @@ private void ensureOpen() {
* @throws EngineClosedException if the engine is already closed
*/
private IndexWriter currentIndexWriter() {
ensureOpen();
final IndexWriter writer = indexWriter;
if (writer == null) {
assert closedOrFailed : "Engine is not closed but writer is null";
throw new EngineClosedException(shardId, failedEngine);
}
return writer;
Expand Down Expand Up @@ -1123,9 +1130,7 @@ public void recover(RecoveryHandler recoveryHandler) throws EngineException {
// take a write lock here so it won't happen while a flush is in progress
// this means that next commits will not be allowed once the lock is released
try (InternalLock _ = writeLock.acquire()) {
if (closed) {
throw new EngineClosedException(shardId);
}
ensureOpen();
onGoingRecoveries.startRecovery();
}

Expand Down Expand Up @@ -1194,7 +1199,7 @@ private boolean maybeFailEngine(Throwable t, String source) {
}

private Throwable wrapIfClosed(Throwable t) {
if (closed) {
if (closedOrFailed) {
return new EngineClosedException(shardId, t);
}
return t;
Expand Down Expand Up @@ -1306,9 +1311,9 @@ public int compare(Segment o1, Segment o2) {
@Override
public void close() throws ElasticsearchException {
try (InternalLock _ = writeLock.acquire()) {
if (!closed) {
if (isClosed.compareAndSet(false, true)) {
try {
closed = true;
closedOrFailed = true;
indexSettingsService.removeListener(applySettings);
this.versionMap.clear();
this.failedEngineListeners.clear();
Expand Down Expand Up @@ -1360,32 +1365,43 @@ public void failEngine(String reason, Throwable failure) {
assert failure != null;
if (failEngineLock.tryLock()) {
try {
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(ExceptionsHelper.unwrap(failure, CorruptIndexException.class));
} catch (IOException e) {
logger.warn("Couldn't marks store corrupted", e);
}
}
} finally {
assert !readLock.assertLockIsHeld() : "readLock is held by a thread that tries to fail the engine";
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
return;
}
try {
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(ExceptionsHelper.unwrap(failure, CorruptIndexException.class));
} catch (IOException e) {
logger.warn("Couldn't marks store corrupted", e);
}
}
} finally {
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
return;
}
logger.warn("failed engine [{}]", failure, reason);
// we must set a failure exception, generate one if not supplied
failedEngine = failure;
for (FailedEngineListener listener : failedEngineListeners) {
listener.onFailedEngine(shardId, reason, failure);
}
} finally {
close();
}
} catch (Throwable t) {
// don't bubble up these exceptions up
logger.warn("failEngine threw exception", t);
} finally {
closedOrFailed = true;
try (InternalLock _ = readLock.acquire()) {
// we take the readlock here to ensure nobody replaces this IW concurrently.
if (indexWriter != null) {
indexWriter.rollback();
}
} catch (Throwable t) {
logger.warn("Rolling back indexwriter on engine failure failed", t);
// to be on the safe side we just rollback the IW
}
}
} else {
Expand Down Expand Up @@ -1458,7 +1474,7 @@ public void warm(AtomicReader reader) throws IOException {
}
} catch (Throwable t) {
// Don't fail a merge if the warm-up failed
if (!closed) {
if (!closedOrFailed) {
logger.warn("Warm-up failed", t);
}
if (t instanceof Error) {
Expand Down Expand Up @@ -1653,7 +1669,7 @@ public IndexSearcher newSearcher(IndexReader reader) throws IOException {
}
warmer.warmTopReader(new IndicesWarmer.WarmerContext(shardId, new SimpleSearcher("warmer", searcher)));
} catch (Throwable e) {
if (!closed) {
if (!closedOrFailed) {
logger.warn("failed to prepare/warm", e);
}
} finally {
Expand Down Expand Up @@ -1703,51 +1719,21 @@ public void close() throws ElasticsearchException {
}

private static final class InternalLock implements Releasable {
private final ThreadLocal<AtomicInteger> lockIsHeld;
private final Lock lock;

InternalLock(Lock lock) {
ThreadLocal<AtomicInteger> tl = null;
assert (tl = new ThreadLocal<>()) != null;
lockIsHeld = tl;
this.lock = lock;
}

@Override
public void close() {
lock.unlock();
assert onAssertRelease();
}

InternalLock acquire() throws EngineException {
lock.lock();
assert onAssertLock();
return this;
}


protected boolean onAssertRelease() {
AtomicInteger count = lockIsHeld.get();
if (count.decrementAndGet() == 0) {
lockIsHeld.remove();
}
return true;
}

protected boolean onAssertLock() {
AtomicInteger count = lockIsHeld.get();
if (count == null) {
count = new AtomicInteger(0);
lockIsHeld.set(count);
}
count.incrementAndGet();
return true;
}

boolean assertLockIsHeld() {
AtomicInteger count = lockIsHeld.get();
return count != null && count.get() > 0;
}
}


Expand Down
Expand Up @@ -22,9 +22,8 @@
import com.google.common.base.Predicate;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.LogManager;

import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Field;
Expand All @@ -33,12 +32,10 @@
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.index.*;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand All @@ -58,7 +55,6 @@
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
Expand Down Expand Up @@ -666,7 +662,7 @@ public void testFailEngineOnCorruption() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), Lucene.STANDARD_ANALYZER, B_1, false);
engine.create(new Engine.Create(null, newUid("1"), doc));
engine.flush(new Engine.Flush());
final boolean failEngine = defaultSettings.getAsBoolean(InternalEngine.INDEX_FAIL_ON_CORRUPTION, false);
final boolean failEngine = defaultSettings.getAsBoolean(InternalEngine.INDEX_FAIL_ON_CORRUPTION, true);
final int failInPhase = randomIntBetween(1,3);
try {
engine.recover(new Engine.RecoveryHandler() {
Expand Down

0 comments on commit 20aa5fb

Please sign in to comment.