Skip to content

Commit

Permalink
Take refresh IOExceptions into account when catching ACE in InternalE…
Browse files Browse the repository at this point in the history
…ngine (#20546)

Since #19975 we are aggressively failing with AssertionError when we catch an ACE
inside the InternalEngine. We treat everything that is neither a tragic even on
the IndexWriter or the Translog as a bug and throw an AssertionError. Yet, if the
engine hits an IOException on refresh of some sort and the IW doesn't realize it since
it's not fully under it's control we fail he engine but neither IW nor Translog are marked
as failed by tragic event while they are already closed.
This change takes the `failedEngine` exception into account and if it's set we know
that the engine failed by some other even than a tragic one and can continue.

This change also uses the `ReferenceManager#RefreshListener` interface in the engine rather
than it's concrete implementation.

Relates to #19975
  • Loading branch information
s1monw committed Sep 19, 2016
1 parent a530399 commit 817d55c
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 31 deletions.
17 changes: 10 additions & 7 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Expand Up @@ -42,6 +42,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -101,7 +102,7 @@ public abstract class Engine implements Closeable {
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
protected volatile Exception failedEngine = null;
protected final SetOnce<Exception> failedEngine = new SetOnce<>();
/*
* on <tt>lastWriteNanos</tt> we use System.nanoTime() to initialize this since:
* - we use the value for figuring out if the shard / engine is active so if we startup and no write has happened yet we still consider it active
Expand Down Expand Up @@ -377,7 +378,7 @@ public final Searcher acquireSearcher(String source) throws EngineException {

protected void ensureOpen() {
if (isClosed.get()) {
throw new EngineClosedException(shardId, failedEngine);
throw new EngineClosedException(shardId, failedEngine.get());
}
}

Expand Down Expand Up @@ -670,17 +671,19 @@ public void failEngine(String reason, @Nullable Exception failure) {
if (failEngineLock.tryLock()) {
store.incRef();
try {
if (failedEngine.get() != null) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("tried to fail engine but engine is already failed. ignoring. [{}]", reason), failure);
return;
}
// this must happen before we close IW or Translog such that we can check this state to opt out of failing the engine
// again on any caught AlreadyClosedException
failedEngine.set((failure != null) ? failure : new IllegalStateException(reason));
try {
// we just go and close this engine - no way to recover
closeNoLock("engine failed on: [" + reason + "]");
} finally {
if (failedEngine != null) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("tried to fail engine but engine is already failed. ignoring. [{}]", reason), failure);
return;
}
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed engine [{}]", reason), failure);
// we must set a failure exception, generate one if not supplied
failedEngine = (failure != null) ? failure : new IllegalStateException(reason);
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.similarities.Similarity;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
Expand All @@ -34,7 +35,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.shard.RefreshListeners;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -68,7 +68,7 @@ public final class EngineConfig {
private final QueryCachingPolicy queryCachingPolicy;
private final long maxUnsafeAutoIdTimestamp;
@Nullable
private final RefreshListeners refreshListeners;
private final ReferenceManager.RefreshListener refreshListeners;

/**
* Index setting to change the low level lucene codec used for writing new segments.
Expand Down Expand Up @@ -112,7 +112,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig, TimeValue flushMergesAfter, RefreshListeners refreshListeners,
TranslogConfig translogConfig, TimeValue flushMergesAfter, ReferenceManager.RefreshListener refreshListeners,
long maxUnsafeAutoIdTimestamp) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
Expand Down Expand Up @@ -322,9 +322,9 @@ public enum OpenMode {
}

/**
* {@linkplain RefreshListeners} instance to configure.
* {@linkplain ReferenceManager.RefreshListener} instance to configure.
*/
public RefreshListeners getRefreshListeners() {
public ReferenceManager.RefreshListener getRefreshListeners() {
return refreshListeners;
}

Expand Down
Expand Up @@ -82,9 +82,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

/**
*
*/
public class InternalEngine extends Engine {
/**
* When we last pruned expired tombstones from versionMap.deletes:
Expand Down Expand Up @@ -170,7 +167,6 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
allowCommits.compareAndSet(true, openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
if (engineConfig.getRefreshListeners() != null) {
searcherManager.addListener(engineConfig.getRefreshListeners());
engineConfig.getRefreshListeners().setTranslog(translog);
}
success = true;
} finally {
Expand Down Expand Up @@ -951,7 +947,7 @@ private void failOnTragicEvent(AlreadyClosedException ex) {
failEngine("already closed by tragic event on the index writer", tragedy);
} else if (translog.isOpen() == false && translog.getTragicException() != null) {
failEngine("already closed by tragic event on the translog", translog.getTragicException());
} else {
} else if (failedEngine.get() == null) { // we are closed but the engine is not failed yet?
// this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by
// a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error
throw new AssertionError("Unexpected AlreadyClosedException", ex);
Expand Down
Expand Up @@ -992,14 +992,18 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole
// but we need to make sure we don't loose deletes until we are done recovering
config.setEnableGcDeletes(false);
Engine newEngine = createNewEngine(config);
onNewEngine(newEngine);
verifyNotClosed();
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still give sync'd flush a chance to run:
active.set(true);
newEngine.recoverFromTranslog();
}
}

protected void onNewEngine(Engine newEngine) {
refreshListeners.setTranslog(newEngine.getTranslog());
}

/**
Expand Down
Expand Up @@ -114,4 +114,9 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
throw new UnsupportedOperationException("can't snapshot the directory as the primary may change it underneath us");
}

@Override
protected void onNewEngine(Engine newEngine) {
// nothing to do here - the superclass sets the translog on some listeners but we don't have such a thing
}
}
Expand Up @@ -59,7 +59,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -112,7 +111,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC

// the list of translog readers is guaranteed to be in order of translog generation
private final List<TranslogReader> readers = new ArrayList<>();
private volatile ScheduledFuture<?> syncScheduler;
// this is a concurrent set and is not protected by any of the locks. The main reason
// is that is being accessed by two separate classes (additions & reading are done by Translog, remove by View when closed)
private final Set<View> outstandingViews = ConcurrentCollections.newConcurrentSet();
Expand Down Expand Up @@ -312,7 +310,6 @@ public void close() throws IOException {
closeFilesIfNoPendingViews();
}
} finally {
FutureUtils.cancel(syncScheduler);
logger.debug("translog closed");
}
}
Expand Down

0 comments on commit 817d55c

Please sign in to comment.