Skip to content

Commit

Permalink
Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Jul 20, 2023
1 parent aed19d3 commit 375dd0c
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;

import static org.elasticsearch.core.Strings.format;

Expand Down Expand Up @@ -243,17 +244,16 @@ public void onStoreClosed(ShardId shardId) {
}
}

private void iterateBeforeIndexShardRecovery(
final IndexShard indexShard,
final IndexSettings indexSettings,
final Iterator<IndexEventListener> iterator,
final ActionListener<Void> outerListener
private void callListeners(
IndexShard indexShard,
Iterator<Consumer<ActionListener<Void>>> iterator,
ActionListener<Void> outerListener
) {
while (iterator.hasNext()) {
final var nextListener = iterator.next();
final var future = new ListenableFuture<Void>();
try {
nextListener.beforeIndexShardRecovery(indexShard, indexSettings, future);
nextListener.accept(future);
if (future.isDone()) {
// common case, not actually async, so just check for an exception and continue on the same thread
future.result();
Expand All @@ -269,9 +269,7 @@ private void iterateBeforeIndexShardRecovery(
outerListener.delegateFailure(
(delegate, v) -> indexShard.getThreadPool()
.executor(ThreadPool.Names.GENERIC)
.execute(
ActionRunnable.wrap(delegate, l -> iterateBeforeIndexShardRecovery(indexShard, indexSettings, iterator, l))
)
.execute(ActionRunnable.wrap(delegate, l -> callListeners(indexShard, iterator, l)))
)
);
return;
Expand All @@ -280,38 +278,31 @@ private void iterateBeforeIndexShardRecovery(
outerListener.onResponse(null);
}

private void iterateAfterIndexShardRecovery(
private void iterateBeforeIndexShardRecovery(
final IndexShard indexShard,
final Iterator<IndexEventListener> iterator,
final IndexSettings indexSettings,
final List<IndexEventListener> listeners,
final ActionListener<Void> outerListener
) {
while (iterator.hasNext()) {
final var nextListener = iterator.next();
final var future = new ListenableFuture<Void>();
try {
nextListener.afterIndexShardRecovery(indexShard, future);
if (future.isDone()) {
// common case, not actually async, so just check for an exception and continue on the same thread
future.result();
continue;
}
} catch (Exception e) {
outerListener.onFailure(e);
return;
}

// future was not completed straight away, but might be done by now, so continue on a fresh thread to avoid stack overflow
future.addListener(
outerListener.delegateFailure(
(delegate, v) -> indexShard.getThreadPool()
.executor(ThreadPool.Names.GENERIC)
.execute(ActionRunnable.wrap(delegate, l -> iterateAfterIndexShardRecovery(indexShard, iterator, l)))
)
);
return;
}
callListeners(
indexShard,
listeners.stream()
.map(iel -> (Consumer<ActionListener<Void>>) (l) -> iel.beforeIndexShardRecovery(indexShard, indexSettings, l))
.iterator(),
outerListener
);
}

outerListener.onResponse(null);
private void iterateAfterIndexShardRecovery(
final IndexShard indexShard,
final List<IndexEventListener> listeners,
final ActionListener<Void> outerListener
) {
callListeners(
indexShard,
listeners.stream().map(iel -> (Consumer<ActionListener<Void>>) (l) -> iel.afterIndexShardRecovery(indexShard, l)).iterator(),
outerListener
);
}

@Override
Expand All @@ -320,15 +311,15 @@ public void beforeIndexShardRecovery(
final IndexSettings indexSettings,
final ActionListener<Void> outerListener
) {
iterateBeforeIndexShardRecovery(indexShard, indexSettings, listeners.iterator(), outerListener.delegateResponse((l, e) -> {
iterateBeforeIndexShardRecovery(indexShard, indexSettings, listeners, outerListener.delegateResponse((l, e) -> {
logger.warn(() -> format("failed to invoke the listener before the shard recovery starts for %s", indexShard.shardId()), e);
l.onFailure(e);
}));
}

@Override
public void afterIndexShardRecovery(IndexShard indexShard, ActionListener<Void> outerListener) {
iterateAfterIndexShardRecovery(indexShard, listeners.iterator(), outerListener.delegateResponse((l, e) -> {
iterateAfterIndexShardRecovery(indexShard, listeners, outerListener.delegateResponse((l, e) -> {
logger.warn(() -> format("failed to invoke the listener after the shard recovery for %s", indexShard.shardId()), e);
l.onFailure(e);
}));
Expand Down
18 changes: 10 additions & 8 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state;
// ensure happens-before relation between addRefreshListener() and postRecovery()
private volatile SubscribableListener<Void> postRecoveryComplete;
private final SetOnce<SubscribableListener<Void>> postRecoveryComplete = new SetOnce<>();
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
Expand Down Expand Up @@ -1679,9 +1679,9 @@ public void preRecovery(ActionListener<Void> listener) {

public void postRecovery(String reason, ActionListener<Void> listener) throws IndexShardStartedException, IndexShardRelocatedException,
IndexShardClosedException {
assert postRecoveryComplete == null;
postRecoveryComplete = new SubscribableListener<>();
final ActionListener<Void> finalListener = ActionListener.runBefore(listener, () -> postRecoveryComplete.onResponse(null));
SubscribableListener<Void> subscribableListener = new SubscribableListener<>();
postRecoveryComplete.set(subscribableListener);
final ActionListener<Void> finalListener = ActionListener.runBefore(listener, () -> subscribableListener.onResponse(null));
try {
getEngine().refresh("post_recovery");
// we need to refresh again to expose all operations that were index until now. Otherwise
Expand Down Expand Up @@ -3906,8 +3906,9 @@ public final void ensureShardSearchActive(Consumer<Boolean> listener) {
* false otherwise.
*/
public void addRefreshListener(Translog.Location location, Consumer<Boolean> listener) {
if (postRecoveryComplete != null) {
postRecoveryComplete.addListener(new ActionListener<>() {
SubscribableListener<Void> subscribableListener = postRecoveryComplete.get();
if (subscribableListener != null) {
subscribableListener.addListener(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
if (isReadAllowed()) {
Expand Down Expand Up @@ -3936,8 +3937,9 @@ public void onFailure(Exception e) {
* @param listener for the refresh.
*/
public void addRefreshListener(long checkpoint, boolean allowUnIssuedSequenceNumber, ActionListener<Void> listener) {
if (postRecoveryComplete != null) {
postRecoveryComplete.addListener(new ActionListener<>() {
SubscribableListener<Void> subscribableListener = postRecoveryComplete.get();
if (subscribableListener != null) {
subscribableListener.addListener(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
if (isReadAllowed()) {
Expand Down
146 changes: 71 additions & 75 deletions server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -408,91 +407,88 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
* Recovers the state of the shard from the store.
*/
private void internalRecoverFromStore(IndexShard indexShard, ActionListener<Void> outerListener) {
indexShard.preRecovery(outerListener.delegateFailure((listener, ignored) -> new ActionRunnable<>(listener) {
@Override
protected void doRun() {
final RecoveryState recoveryState = indexShard.recoveryState();
final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE;
indexShard.prepareForIndexRecovery();
SegmentInfos si = null;
final Store store = indexShard.store();
store.incRef();
boolean triggeredPostRecovery = false;
indexShard.preRecovery(outerListener.delegateFailureAndWrap((listener, ignored) -> {
final RecoveryState recoveryState = indexShard.recoveryState();
final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE;
indexShard.prepareForIndexRecovery();
SegmentInfos si = null;
final Store store = indexShard.store();
store.incRef();
boolean triggeredPostRecovery = false;
try {
try {
store.failIfCorrupted();
try {
store.failIfCorrupted();
si = store.readLastCommittedSegmentsInfo();
} catch (Exception e) {
String files = "_unknown_";
try {
si = store.readLastCommittedSegmentsInfo();
} catch (Exception e) {
String files = "_unknown_";
try {
files = Arrays.toString(store.directory().listAll());
} catch (Exception inner) {
files += " (failure=" + ExceptionsHelper.stackTrace(inner) + ")";
}
if (indexShouldExists) {
throw new IndexShardRecoveryException(
shardId,
"shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files,
e
);
}
files = Arrays.toString(store.directory().listAll());
} catch (Exception inner) {
files += " (failure=" + ExceptionsHelper.stackTrace(inner) + ")";
}
if (si != null && indexShouldExists == false) {
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
// its a "new index create" API, we have to do something, so better to clean it than use same data
logger.trace("cleaning existing shard, shouldn't exists");
Lucene.cleanLuceneIndex(store.directory());
si = null;
if (indexShouldExists) {
throw new IndexShardRecoveryException(
shardId,
"shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files,
e
);
}
} catch (Exception e) {
throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e);
}
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
assert indexShouldExists;
bootstrap(indexShard, store);
writeEmptyRetentionLeasesFile(indexShard);
} else if (indexShouldExists) {
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
store.bootstrapNewHistory();
writeEmptyRetentionLeasesFile(indexShard);
}
// since we recover from local, just fill the files and size
final RecoveryState.Index index = recoveryState.getIndex();
try {
if (si != null) {
addRecoveredFileDetails(si, store, index);
}
} catch (IOException e) {
logger.debug("failed to list file details", e);
}
index.setFileDetailsComplete();
} else {
store.createEmpty();
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(),
SequenceNumbers.NO_OPS_PERFORMED,
shardId,
indexShard.getPendingPrimaryTerm()
);
store.associateIndexWithNewTranslog(translogUUID);
if (si != null && indexShouldExists == false) {
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
// its a "new index create" API, we have to do something, so better to clean it than use same data
logger.trace("cleaning existing shard, shouldn't exists");
Lucene.cleanLuceneIndex(store.directory());
si = null;
}
} catch (Exception e) {
throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e);
}
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
assert indexShouldExists;
bootstrap(indexShard, store);
writeEmptyRetentionLeasesFile(indexShard);
} else if (indexShouldExists) {
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
store.bootstrapNewHistory();
writeEmptyRetentionLeasesFile(indexShard);
indexShard.recoveryState().getIndex().setFileDetailsComplete();
}
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store", ActionListener.runBefore(listener, store::decRef));
triggeredPostRecovery = true;
} catch (EngineException | IOException e) {
listener.onFailure(new IndexShardRecoveryException(shardId, "failed to recover from gateway", e));
} finally {
if (triggeredPostRecovery == false) {
store.decRef();
// since we recover from local, just fill the files and size
final RecoveryState.Index index = recoveryState.getIndex();
try {
if (si != null) {
addRecoveredFileDetails(si, store, index);
}
} catch (IOException e) {
logger.debug("failed to list file details", e);
}
index.setFileDetailsComplete();
} else {
store.createEmpty();
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(),
SequenceNumbers.NO_OPS_PERFORMED,
shardId,
indexShard.getPendingPrimaryTerm()
);
store.associateIndexWithNewTranslog(translogUUID);
writeEmptyRetentionLeasesFile(indexShard);
indexShard.recoveryState().getIndex().setFileDetailsComplete();
}
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store", ActionListener.runBefore(listener, store::decRef));
triggeredPostRecovery = true;
} catch (EngineException | IOException e) {
listener.onFailure(new IndexShardRecoveryException(shardId, "failed to recover from gateway", e));
} finally {
if (triggeredPostRecovery == false) {
store.decRef();
}
}
}.run()));
}));
}

private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
Expand Down Expand Up @@ -305,17 +304,17 @@ public void notifyListener(RecoveryFailedException e, boolean sendShardFailure)
public void markAsDone() {
if (finished.compareAndSet(false, true)) {
assert multiFileWriter.tempFileNames.isEmpty() : "not all temporary files are renamed";
try {
// this might still throw an exception ie. if the shard is CLOSED due to some other event.
// it's safer to decrement the reference in a try finally here.
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
indexShard.postRecovery("peer recovery done", future);
future.actionGet();
} finally {
// release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now
decRef();
}
listener.onRecoveryDone(state(), indexShard.getTimestampRange());
indexShard.postRecovery("peer recovery done", ActionListener.runBefore(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
listener.onRecoveryDone(state(), indexShard.getTimestampRange());
}

@Override
public void onFailure(Exception e) {
logger.debug("{} recovery failed after being marked as done", this);
}
}, this::decRef));
}
}

Expand Down
Loading

0 comments on commit 375dd0c

Please sign in to comment.