From 375dd0c3d2dd343273c5f61e26265b08f8187e2c Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 20 Jul 2023 16:15:29 -0600 Subject: [PATCH] Changes --- .../index/CompositeIndexEventListener.java | 69 ++++----- .../elasticsearch/index/shard/IndexShard.java | 18 ++- .../index/shard/StoreRecovery.java | 146 +++++++++--------- .../indices/recovery/RecoveryTarget.java | 23 ++- .../CompositeIndexEventListenerTests.java | 65 ++++++++ 5 files changed, 187 insertions(+), 134 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java index 4fdb6a2c77644..c96c0f149a972 100644 --- a/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java +++ b/server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.function.Consumer; import static org.elasticsearch.core.Strings.format; @@ -243,17 +244,16 @@ public void onStoreClosed(ShardId shardId) { } } - private void iterateBeforeIndexShardRecovery( - final IndexShard indexShard, - final IndexSettings indexSettings, - final Iterator iterator, - final ActionListener outerListener + private void callListeners( + IndexShard indexShard, + Iterator>> iterator, + ActionListener outerListener ) { while (iterator.hasNext()) { final var nextListener = iterator.next(); final var future = new ListenableFuture(); try { - nextListener.beforeIndexShardRecovery(indexShard, indexSettings, future); + nextListener.accept(future); if (future.isDone()) { // common case, not actually async, so just check for an exception and continue on the same thread future.result(); @@ -269,9 +269,7 @@ private void iterateBeforeIndexShardRecovery( outerListener.delegateFailure( (delegate, v) -> indexShard.getThreadPool() .executor(ThreadPool.Names.GENERIC) - .execute( - ActionRunnable.wrap(delegate, l -> iterateBeforeIndexShardRecovery(indexShard, indexSettings, iterator, l)) - ) + .execute(ActionRunnable.wrap(delegate, l -> callListeners(indexShard, iterator, l))) ) ); return; @@ -280,38 +278,31 @@ private void iterateBeforeIndexShardRecovery( outerListener.onResponse(null); } - private void iterateAfterIndexShardRecovery( + private void iterateBeforeIndexShardRecovery( final IndexShard indexShard, - final Iterator iterator, + final IndexSettings indexSettings, + final List listeners, final ActionListener outerListener ) { - while (iterator.hasNext()) { - final var nextListener = iterator.next(); - final var future = new ListenableFuture(); - try { - nextListener.afterIndexShardRecovery(indexShard, future); - if (future.isDone()) { - // common case, not actually async, so just check for an exception and continue on the same thread - future.result(); - continue; - } - } catch (Exception e) { - outerListener.onFailure(e); - return; - } - - // future was not completed straight away, but might be done by now, so continue on a fresh thread to avoid stack overflow - future.addListener( - outerListener.delegateFailure( - (delegate, v) -> indexShard.getThreadPool() - .executor(ThreadPool.Names.GENERIC) - .execute(ActionRunnable.wrap(delegate, l -> iterateAfterIndexShardRecovery(indexShard, iterator, l))) - ) - ); - return; - } + callListeners( + indexShard, + listeners.stream() + .map(iel -> (Consumer>) (l) -> iel.beforeIndexShardRecovery(indexShard, indexSettings, l)) + .iterator(), + outerListener + ); + } - outerListener.onResponse(null); + private void iterateAfterIndexShardRecovery( + final IndexShard indexShard, + final List listeners, + final ActionListener outerListener + ) { + callListeners( + indexShard, + listeners.stream().map(iel -> (Consumer>) (l) -> iel.afterIndexShardRecovery(indexShard, l)).iterator(), + outerListener + ); } @Override @@ -320,7 +311,7 @@ public void beforeIndexShardRecovery( final IndexSettings indexSettings, final ActionListener outerListener ) { - iterateBeforeIndexShardRecovery(indexShard, indexSettings, listeners.iterator(), outerListener.delegateResponse((l, e) -> { + iterateBeforeIndexShardRecovery(indexShard, indexSettings, listeners, outerListener.delegateResponse((l, e) -> { logger.warn(() -> format("failed to invoke the listener before the shard recovery starts for %s", indexShard.shardId()), e); l.onFailure(e); })); @@ -328,7 +319,7 @@ public void beforeIndexShardRecovery( @Override public void afterIndexShardRecovery(IndexShard indexShard, ActionListener outerListener) { - iterateAfterIndexShardRecovery(indexShard, listeners.iterator(), outerListener.delegateResponse((l, e) -> { + iterateAfterIndexShardRecovery(indexShard, listeners, outerListener.delegateResponse((l, e) -> { logger.warn(() -> format("failed to invoke the listener after the shard recovery for %s", indexShard.shardId()), e); l.onFailure(e); })); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c06ca65266245..40772b4256068 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -227,7 +227,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; // ensure happens-before relation between addRefreshListener() and postRecovery() - private volatile SubscribableListener postRecoveryComplete; + private final SetOnce> postRecoveryComplete = new SetOnce<>(); private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex private final AtomicReference currentEngineReference = new AtomicReference<>(); @@ -1679,9 +1679,9 @@ public void preRecovery(ActionListener listener) { public void postRecovery(String reason, ActionListener listener) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { - assert postRecoveryComplete == null; - postRecoveryComplete = new SubscribableListener<>(); - final ActionListener finalListener = ActionListener.runBefore(listener, () -> postRecoveryComplete.onResponse(null)); + SubscribableListener subscribableListener = new SubscribableListener<>(); + postRecoveryComplete.set(subscribableListener); + final ActionListener finalListener = ActionListener.runBefore(listener, () -> subscribableListener.onResponse(null)); try { getEngine().refresh("post_recovery"); // we need to refresh again to expose all operations that were index until now. Otherwise @@ -3906,8 +3906,9 @@ public final void ensureShardSearchActive(Consumer listener) { * false otherwise. */ public void addRefreshListener(Translog.Location location, Consumer listener) { - if (postRecoveryComplete != null) { - postRecoveryComplete.addListener(new ActionListener<>() { + SubscribableListener subscribableListener = postRecoveryComplete.get(); + if (subscribableListener != null) { + subscribableListener.addListener(new ActionListener<>() { @Override public void onResponse(Void unused) { if (isReadAllowed()) { @@ -3936,8 +3937,9 @@ public void onFailure(Exception e) { * @param listener for the refresh. */ public void addRefreshListener(long checkpoint, boolean allowUnIssuedSequenceNumber, ActionListener listener) { - if (postRecoveryComplete != null) { - postRecoveryComplete.addListener(new ActionListener<>() { + SubscribableListener subscribableListener = postRecoveryComplete.get(); + if (subscribableListener != null) { + subscribableListener.addListener(new ActionListener<>() { @Override public void onResponse(Void unused) { if (isReadAllowed()) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 9870a335ca8b0..d49925b6758d3 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -21,7 +21,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -408,91 +407,88 @@ private ActionListener recoveryListener(IndexShard indexShard, ActionLi * Recovers the state of the shard from the store. */ private void internalRecoverFromStore(IndexShard indexShard, ActionListener outerListener) { - indexShard.preRecovery(outerListener.delegateFailure((listener, ignored) -> new ActionRunnable<>(listener) { - @Override - protected void doRun() { - final RecoveryState recoveryState = indexShard.recoveryState(); - final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE; - indexShard.prepareForIndexRecovery(); - SegmentInfos si = null; - final Store store = indexShard.store(); - store.incRef(); - boolean triggeredPostRecovery = false; + indexShard.preRecovery(outerListener.delegateFailureAndWrap((listener, ignored) -> { + final RecoveryState recoveryState = indexShard.recoveryState(); + final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE; + indexShard.prepareForIndexRecovery(); + SegmentInfos si = null; + final Store store = indexShard.store(); + store.incRef(); + boolean triggeredPostRecovery = false; + try { try { + store.failIfCorrupted(); try { - store.failIfCorrupted(); + si = store.readLastCommittedSegmentsInfo(); + } catch (Exception e) { + String files = "_unknown_"; try { - si = store.readLastCommittedSegmentsInfo(); - } catch (Exception e) { - String files = "_unknown_"; - try { - files = Arrays.toString(store.directory().listAll()); - } catch (Exception inner) { - files += " (failure=" + ExceptionsHelper.stackTrace(inner) + ")"; - } - if (indexShouldExists) { - throw new IndexShardRecoveryException( - shardId, - "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, - e - ); - } + files = Arrays.toString(store.directory().listAll()); + } catch (Exception inner) { + files += " (failure=" + ExceptionsHelper.stackTrace(inner) + ")"; } - if (si != null && indexShouldExists == false) { - // it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling) - // its a "new index create" API, we have to do something, so better to clean it than use same data - logger.trace("cleaning existing shard, shouldn't exists"); - Lucene.cleanLuceneIndex(store.directory()); - si = null; + if (indexShouldExists) { + throw new IndexShardRecoveryException( + shardId, + "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, + e + ); } - } catch (Exception e) { - throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e); } - if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) { - assert indexShouldExists; - bootstrap(indexShard, store); - writeEmptyRetentionLeasesFile(indexShard); - } else if (indexShouldExists) { - if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) { - store.bootstrapNewHistory(); - writeEmptyRetentionLeasesFile(indexShard); - } - // since we recover from local, just fill the files and size - final RecoveryState.Index index = recoveryState.getIndex(); - try { - if (si != null) { - addRecoveredFileDetails(si, store, index); - } - } catch (IOException e) { - logger.debug("failed to list file details", e); - } - index.setFileDetailsComplete(); - } else { - store.createEmpty(); - final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), - SequenceNumbers.NO_OPS_PERFORMED, - shardId, - indexShard.getPendingPrimaryTerm() - ); - store.associateIndexWithNewTranslog(translogUUID); + if (si != null && indexShouldExists == false) { + // it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling) + // its a "new index create" API, we have to do something, so better to clean it than use same data + logger.trace("cleaning existing shard, shouldn't exists"); + Lucene.cleanLuceneIndex(store.directory()); + si = null; + } + } catch (Exception e) { + throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e); + } + if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) { + assert indexShouldExists; + bootstrap(indexShard, store); + writeEmptyRetentionLeasesFile(indexShard); + } else if (indexShouldExists) { + if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) { + store.bootstrapNewHistory(); writeEmptyRetentionLeasesFile(indexShard); - indexShard.recoveryState().getIndex().setFileDetailsComplete(); } - indexShard.openEngineAndRecoverFromTranslog(); - indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); - indexShard.finalizeRecovery(); - indexShard.postRecovery("post recovery from shard_store", ActionListener.runBefore(listener, store::decRef)); - triggeredPostRecovery = true; - } catch (EngineException | IOException e) { - listener.onFailure(new IndexShardRecoveryException(shardId, "failed to recover from gateway", e)); - } finally { - if (triggeredPostRecovery == false) { - store.decRef(); + // since we recover from local, just fill the files and size + final RecoveryState.Index index = recoveryState.getIndex(); + try { + if (si != null) { + addRecoveredFileDetails(si, store, index); + } + } catch (IOException e) { + logger.debug("failed to list file details", e); } + index.setFileDetailsComplete(); + } else { + store.createEmpty(); + final String translogUUID = Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + indexShard.getPendingPrimaryTerm() + ); + store.associateIndexWithNewTranslog(translogUUID); + writeEmptyRetentionLeasesFile(indexShard); + indexShard.recoveryState().getIndex().setFileDetailsComplete(); + } + indexShard.openEngineAndRecoverFromTranslog(); + indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); + indexShard.finalizeRecovery(); + indexShard.postRecovery("post recovery from shard_store", ActionListener.runBefore(listener, store::decRef)); + triggeredPostRecovery = true; + } catch (EngineException | IOException e) { + listener.onFailure(new IndexShardRecoveryException(shardId, "failed to recover from gateway", e)); + } finally { + if (triggeredPostRecovery == false) { + store.decRef(); } } - }.run())); + })); } private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 58a2c01c29765..4fabd415c8c02 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -16,7 +16,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.ReleasableBytesReference; @@ -305,17 +304,17 @@ public void notifyListener(RecoveryFailedException e, boolean sendShardFailure) public void markAsDone() { if (finished.compareAndSet(false, true)) { assert multiFileWriter.tempFileNames.isEmpty() : "not all temporary files are renamed"; - try { - // this might still throw an exception ie. if the shard is CLOSED due to some other event. - // it's safer to decrement the reference in a try finally here. - PlainActionFuture future = PlainActionFuture.newFuture(); - indexShard.postRecovery("peer recovery done", future); - future.actionGet(); - } finally { - // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now - decRef(); - } - listener.onRecoveryDone(state(), indexShard.getTimestampRange()); + indexShard.postRecovery("peer recovery done", ActionListener.runBefore(new ActionListener<>() { + @Override + public void onResponse(Void unused) { + listener.onRecoveryDone(state(), indexShard.getTimestampRange()); + } + + @Override + public void onFailure(Exception e) { + logger.debug("{} recovery failed after being marked as done", this); + } + }, this::decRef)); } } diff --git a/server/src/test/java/org/elasticsearch/index/CompositeIndexEventListenerTests.java b/server/src/test/java/org/elasticsearch/index/CompositeIndexEventListenerTests.java index a0a5d297deced..10778cfb86e46 100644 --- a/server/src/test/java/org/elasticsearch/index/CompositeIndexEventListenerTests.java +++ b/server/src/test/java/org/elasticsearch/index/CompositeIndexEventListenerTests.java @@ -109,4 +109,69 @@ private void runStep() { closeShards(shard); } } + + public void testAfterIndexShardRecoveryInOrder() throws Exception { + var shard = newShard(randomBoolean()); + var appender = new MockLogAppender(); + try (var ignored = appender.capturing(CompositeIndexEventListener.class)) { + final var stepNumber = new AtomicInteger(); + final var stepCount = between(0, 20); + final var failAtStep = new AtomicInteger(-1); + final var indexEventListener = new CompositeIndexEventListener( + shard.indexSettings(), + IntStream.range(0, stepCount).mapToObj(step -> new IndexEventListener() { + + @Override + public void afterIndexShardRecovery(IndexShard indexShard, ActionListener listener) { + if (randomBoolean()) { + // throws an exception sometimes + runStep(); + listener.onResponse(null); + } else { + // fails the listener sometimes + shard.getThreadPool() + .executor(randomFrom(ThreadPool.Names.GENERIC, ThreadPool.Names.SAME)) + .execute(ActionRunnable.run(listener, this::runStep)); + } + } + + private void runStep() { + assertThat(step, Matchers.lessThanOrEqualTo(failAtStep.get())); + assertTrue(stepNumber.compareAndSet(step, step + 1)); + if (step == failAtStep.get()) { + throw new ElasticsearchException("simulated failure at step " + step); + } + } + }).collect(Collectors.toList()) + ); + + final CheckedRunnable afterIndexShardRecoveryRunner = () -> assertNull( + PlainActionFuture.get(fut -> indexEventListener.afterIndexShardRecovery(shard, fut), 10, TimeUnit.SECONDS) + ); + + failAtStep.set(stepCount); + afterIndexShardRecoveryRunner.run(); + assertEquals(stepCount, stepNumber.getAndSet(0)); + + if (stepCount > 0) { + appender.addExpectation( + new MockLogAppender.SeenEventExpectation( + "warning", + CompositeIndexEventListener.class.getCanonicalName(), + Level.WARN, + "*failed to invoke the listener after the shard recovery for [index][0]" + ) + ); + + failAtStep.set(between(0, stepCount - 1)); + final var rootCause = getRootCause(expectThrows(ElasticsearchException.class, afterIndexShardRecoveryRunner::run)); + assertEquals("simulated failure at step " + failAtStep.get(), rootCause.getMessage()); + assertEquals(failAtStep.get() + 1, stepNumber.getAndSet(0)); + appender.assertAllExpectationsMatched(); + } + + } finally { + closeShards(shard); + } + } }