diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 2201476d0f482..17516ba4d5541 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -167,6 +167,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -225,7 +226,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; // ensure happens-before relation between addRefreshListener() and postRecovery() - private final Object postRecoveryMutex = new Object(); + private final Semaphore postRecoveryMutex = new Semaphore(1); private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex private final AtomicReference currentEngineReference = new AtomicReference<>(); @@ -1657,7 +1658,7 @@ public void close(String reason, boolean flushEngine) throws IOException { } } - public void preRecovery(ActionListener listener) { + public void preRecovery(ActionListener listener) throws IndexShardNotRecoveringException { final IndexShardState currentState = this.state; // single volatile read if (currentState == IndexShardState.CLOSED) { throw new IndexShardNotRecoveringException(shardId, currentState); @@ -1667,23 +1668,40 @@ public void preRecovery(ActionListener listener) { } public void postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { - synchronized (postRecoveryMutex) { - // we need to refresh again to expose all operations that were index until now. Otherwise - // we may not expose operations that were indexed with a refresh listener that was immediately - // responded to in addRefreshListener. The refresh must happen under the same mutex used in addRefreshListener - // and before moving this shard to POST_RECOVERY state (i.e., allow to read from this shard). - getEngine().refresh("post_recovery"); - synchronized (mutex) { - if (state == IndexShardState.CLOSED) { - throw new IndexShardClosedException(shardId); - } - if (state == IndexShardState.STARTED) { - throw new IndexShardStartedException(shardId); + PlainActionFuture listener = PlainActionFuture.newFuture(); + postRecovery(reason, listener); + listener.actionGet(); + } + + public void postRecovery(String reason, ActionListener listener) throws IndexShardStartedException, IndexShardRelocatedException, + IndexShardClosedException { + postRecoveryMutex.acquireUninterruptibly(); + // we need to refresh again to expose all operations that were index until now. Otherwise + // we may not expose operations that were indexed with a refresh listener that was immediately + // responded to in addRefreshListener. The refresh must happen under the same mutex used in addRefreshListener + // and before moving this shard to POST_RECOVERY state (i.e., allow to read from this shard). + getEngine().externalRefresh("post_recovery", ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(Engine.RefreshResult refreshResult) { + synchronized (mutex) { + if (state == IndexShardState.CLOSED) { + listener.onFailure(new IndexShardClosedException(shardId)); + } + if (state == IndexShardState.STARTED) { + listener.onFailure(new IndexShardStartedException(shardId)); + } + recoveryState.setStage(RecoveryState.Stage.DONE); + changeState(IndexShardState.POST_RECOVERY, reason); + listener.onResponse(null); } - recoveryState.setStage(RecoveryState.Stage.DONE); - changeState(IndexShardState.POST_RECOVERY, reason); } - } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + + } + }, postRecoveryMutex::release)); } /** @@ -3895,8 +3913,11 @@ public void addRefreshListener(Translog.Location location, Consumer lis // check again under postRecoveryMutex. this is important to create a happens before relationship // between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond // to a listener before a refresh actually happened that contained that operation. - synchronized (postRecoveryMutex) { + postRecoveryMutex.acquireUninterruptibly(); + try { readAllowed = isReadAllowed(); + } finally { + postRecoveryMutex.release(); } } if (readAllowed) { @@ -3922,8 +3943,11 @@ public void addRefreshListener(long checkpoint, boolean allowUnIssuedSequenceNum // check again under postRecoveryMutex. this is important to create a happens before relationship // between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond // to a listener before a refresh actually happened that contained that operation. - synchronized (postRecoveryMutex) { + postRecoveryMutex.acquireUninterruptibly(); + try { readAllowed = isReadAllowed(); + } finally { + postRecoveryMutex.release(); } } if (readAllowed) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index c3da645d92d6c..71c36711b5e8d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -408,84 +408,94 @@ private ActionListener recoveryListener(IndexShard indexShard, ActionLi * Recovers the state of the shard from the store. */ private void internalRecoverFromStore(IndexShard indexShard, ActionListener outerListener) { - indexShard.preRecovery(outerListener.delegateFailure((listener, ignored) -> ActionRunnable.run(listener, () -> { - final RecoveryState recoveryState = indexShard.recoveryState(); - final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE; - indexShard.prepareForIndexRecovery(); - SegmentInfos si = null; - final Store store = indexShard.store(); - store.incRef(); - try { - try { - store.failIfCorrupted(); + indexShard.preRecovery(outerListener.delegateFailure((listener, ignored) -> { + new ActionRunnable<>(listener) { + @Override + protected void doRun() throws Exception { + final RecoveryState recoveryState = indexShard.recoveryState(); + final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE; + indexShard.prepareForIndexRecovery(); + SegmentInfos si = null; + final Store store = indexShard.store(); + store.incRef(); + boolean success = false; try { - si = store.readLastCommittedSegmentsInfo(); - } catch (Exception e) { - String files = "_unknown_"; try { - files = Arrays.toString(store.directory().listAll()); - } catch (Exception inner) { - files += " (failure=" + ExceptionsHelper.stackTrace(inner) + ")"; + store.failIfCorrupted(); + try { + si = store.readLastCommittedSegmentsInfo(); + } catch (Exception e) { + String files = "_unknown_"; + try { + files = Arrays.toString(store.directory().listAll()); + } catch (Exception inner) { + files += " (failure=" + ExceptionsHelper.stackTrace(inner) + ")"; + } + if (indexShouldExists) { + throw new IndexShardRecoveryException( + shardId, + "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, + e + ); + } + } + if (si != null && indexShouldExists == false) { + // it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling) + // its a "new index create" API, we have to do something, so better to clean it than use same data + logger.trace("cleaning existing shard, shouldn't exists"); + Lucene.cleanLuceneIndex(store.directory()); + si = null; + } + } catch (Exception e) { + throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e); } - if (indexShouldExists) { - throw new IndexShardRecoveryException( + if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) { + assert indexShouldExists; + bootstrap(indexShard, store); + writeEmptyRetentionLeasesFile(indexShard); + } else if (indexShouldExists) { + if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) { + store.bootstrapNewHistory(); + writeEmptyRetentionLeasesFile(indexShard); + } + // since we recover from local, just fill the files and size + final RecoveryState.Index index = recoveryState.getIndex(); + try { + if (si != null) { + addRecoveredFileDetails(si, store, index); + } + } catch (IOException e) { + logger.debug("failed to list file details", e); + } + index.setFileDetailsComplete(); + } else { + store.createEmpty(); + final String translogUUID = Translog.createEmptyTranslog( + indexShard.shardPath().resolveTranslog(), + SequenceNumbers.NO_OPS_PERFORMED, shardId, - "shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files, - e + indexShard.getPendingPrimaryTerm() ); + store.associateIndexWithNewTranslog(translogUUID); + writeEmptyRetentionLeasesFile(indexShard); + indexShard.recoveryState().getIndex().setFileDetailsComplete(); } + indexShard.openEngineAndRecoverFromTranslog(); + indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); + indexShard.finalizeRecovery(); + indexShard.postRecovery("post recovery from shard_store"); + listener.onResponse(null); + success = true; + } catch (EngineException | IOException e) { + throw new IndexShardRecoveryException(shardId, "failed to recover from gateway", e); + } finally { + store.decRef(); +// if (success == false) { +// } } - if (si != null && indexShouldExists == false) { - // it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling) - // its a "new index create" API, we have to do something, so better to clean it than use same data - logger.trace("cleaning existing shard, shouldn't exists"); - Lucene.cleanLuceneIndex(store.directory()); - si = null; - } - } catch (Exception e) { - throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e); - } - if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) { - assert indexShouldExists; - bootstrap(indexShard, store); - writeEmptyRetentionLeasesFile(indexShard); - } else if (indexShouldExists) { - if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) { - store.bootstrapNewHistory(); - writeEmptyRetentionLeasesFile(indexShard); - } - // since we recover from local, just fill the files and size - final RecoveryState.Index index = recoveryState.getIndex(); - try { - if (si != null) { - addRecoveredFileDetails(si, store, index); - } - } catch (IOException e) { - logger.debug("failed to list file details", e); - } - index.setFileDetailsComplete(); - } else { - store.createEmpty(); - final String translogUUID = Translog.createEmptyTranslog( - indexShard.shardPath().resolveTranslog(), - SequenceNumbers.NO_OPS_PERFORMED, - shardId, - indexShard.getPendingPrimaryTerm() - ); - store.associateIndexWithNewTranslog(translogUUID); - writeEmptyRetentionLeasesFile(indexShard); - indexShard.recoveryState().getIndex().setFileDetailsComplete(); } - indexShard.openEngineAndRecoverFromTranslog(); - indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); - indexShard.finalizeRecovery(); - indexShard.postRecovery("post recovery from shard_store"); - } catch (EngineException | IOException e) { - throw new IndexShardRecoveryException(shardId, "failed to recover from gateway", e); - } finally { - store.decRef(); - } - }).run())); + }.run(); + })); } private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws IOException {