Skip to content

Commit

Permalink
Change
Browse files Browse the repository at this point in the history
  • Loading branch information
Tim-Brooks committed Jun 30, 2023
1 parent ffe3877 commit 8967983
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 89 deletions.
62 changes: 43 additions & 19 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -225,7 +226,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state;
// ensure happens-before relation between addRefreshListener() and postRecovery()
private final Object postRecoveryMutex = new Object();
private final Semaphore postRecoveryMutex = new Semaphore(1);
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
private final Object engineMutex = new Object(); // lock ordering: engineMutex -> mutex
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
Expand Down Expand Up @@ -1657,7 +1658,7 @@ public void close(String reason, boolean flushEngine) throws IOException {
}
}

public void preRecovery(ActionListener<Void> listener) {
public void preRecovery(ActionListener<Void> listener) throws IndexShardNotRecoveringException {
final IndexShardState currentState = this.state; // single volatile read
if (currentState == IndexShardState.CLOSED) {
throw new IndexShardNotRecoveringException(shardId, currentState);
Expand All @@ -1667,23 +1668,40 @@ public void preRecovery(ActionListener<Void> listener) {
}

public void postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
synchronized (postRecoveryMutex) {
// we need to refresh again to expose all operations that were index until now. Otherwise
// we may not expose operations that were indexed with a refresh listener that was immediately
// responded to in addRefreshListener. The refresh must happen under the same mutex used in addRefreshListener
// and before moving this shard to POST_RECOVERY state (i.e., allow to read from this shard).
getEngine().refresh("post_recovery");
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new IndexShardClosedException(shardId);
}
if (state == IndexShardState.STARTED) {
throw new IndexShardStartedException(shardId);
PlainActionFuture<Void> listener = PlainActionFuture.newFuture();
postRecovery(reason, listener);
listener.actionGet();
}

public void postRecovery(String reason, ActionListener<Void> listener) throws IndexShardStartedException, IndexShardRelocatedException,
IndexShardClosedException {
postRecoveryMutex.acquireUninterruptibly();
// we need to refresh again to expose all operations that were index until now. Otherwise
// we may not expose operations that were indexed with a refresh listener that was immediately
// responded to in addRefreshListener. The refresh must happen under the same mutex used in addRefreshListener
// and before moving this shard to POST_RECOVERY state (i.e., allow to read from this shard).
getEngine().externalRefresh("post_recovery", ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(Engine.RefreshResult refreshResult) {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
listener.onFailure(new IndexShardClosedException(shardId));
}
if (state == IndexShardState.STARTED) {
listener.onFailure(new IndexShardStartedException(shardId));
}
recoveryState.setStage(RecoveryState.Stage.DONE);
changeState(IndexShardState.POST_RECOVERY, reason);
listener.onResponse(null);
}
recoveryState.setStage(RecoveryState.Stage.DONE);
changeState(IndexShardState.POST_RECOVERY, reason);
}
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);

}
}, postRecoveryMutex::release));
}

/**
Expand Down Expand Up @@ -3895,8 +3913,11 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
// check again under postRecoveryMutex. this is important to create a happens before relationship
// between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond
// to a listener before a refresh actually happened that contained that operation.
synchronized (postRecoveryMutex) {
postRecoveryMutex.acquireUninterruptibly();
try {
readAllowed = isReadAllowed();
} finally {
postRecoveryMutex.release();
}
}
if (readAllowed) {
Expand All @@ -3922,8 +3943,11 @@ public void addRefreshListener(long checkpoint, boolean allowUnIssuedSequenceNum
// check again under postRecoveryMutex. this is important to create a happens before relationship
// between the switch to POST_RECOVERY + associated refresh. Otherwise we may respond
// to a listener before a refresh actually happened that contained that operation.
synchronized (postRecoveryMutex) {
postRecoveryMutex.acquireUninterruptibly();
try {
readAllowed = isReadAllowed();
} finally {
postRecoveryMutex.release();
}
}
if (readAllowed) {
Expand Down
150 changes: 80 additions & 70 deletions server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
Original file line number Diff line number Diff line change
Expand Up @@ -408,84 +408,94 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
* Recovers the state of the shard from the store.
*/
private void internalRecoverFromStore(IndexShard indexShard, ActionListener<Void> outerListener) {
indexShard.preRecovery(outerListener.delegateFailure((listener, ignored) -> ActionRunnable.run(listener, () -> {
final RecoveryState recoveryState = indexShard.recoveryState();
final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE;
indexShard.prepareForIndexRecovery();
SegmentInfos si = null;
final Store store = indexShard.store();
store.incRef();
try {
try {
store.failIfCorrupted();
indexShard.preRecovery(outerListener.delegateFailure((listener, ignored) -> {
new ActionRunnable<>(listener) {
@Override
protected void doRun() throws Exception {
final RecoveryState recoveryState = indexShard.recoveryState();
final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE;
indexShard.prepareForIndexRecovery();
SegmentInfos si = null;
final Store store = indexShard.store();
store.incRef();
boolean success = false;
try {
si = store.readLastCommittedSegmentsInfo();
} catch (Exception e) {
String files = "_unknown_";
try {
files = Arrays.toString(store.directory().listAll());
} catch (Exception inner) {
files += " (failure=" + ExceptionsHelper.stackTrace(inner) + ")";
store.failIfCorrupted();
try {
si = store.readLastCommittedSegmentsInfo();
} catch (Exception e) {
String files = "_unknown_";
try {
files = Arrays.toString(store.directory().listAll());
} catch (Exception inner) {
files += " (failure=" + ExceptionsHelper.stackTrace(inner) + ")";
}
if (indexShouldExists) {
throw new IndexShardRecoveryException(
shardId,
"shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files,
e
);
}
}
if (si != null && indexShouldExists == false) {
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
// its a "new index create" API, we have to do something, so better to clean it than use same data
logger.trace("cleaning existing shard, shouldn't exists");
Lucene.cleanLuceneIndex(store.directory());
si = null;
}
} catch (Exception e) {
throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e);
}
if (indexShouldExists) {
throw new IndexShardRecoveryException(
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
assert indexShouldExists;
bootstrap(indexShard, store);
writeEmptyRetentionLeasesFile(indexShard);
} else if (indexShouldExists) {
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
store.bootstrapNewHistory();
writeEmptyRetentionLeasesFile(indexShard);
}
// since we recover from local, just fill the files and size
final RecoveryState.Index index = recoveryState.getIndex();
try {
if (si != null) {
addRecoveredFileDetails(si, store, index);
}
} catch (IOException e) {
logger.debug("failed to list file details", e);
}
index.setFileDetailsComplete();
} else {
store.createEmpty();
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(),
SequenceNumbers.NO_OPS_PERFORMED,
shardId,
"shard allocated for local recovery (post api), should exist, but doesn't, current files: " + files,
e
indexShard.getPendingPrimaryTerm()
);
store.associateIndexWithNewTranslog(translogUUID);
writeEmptyRetentionLeasesFile(indexShard);
indexShard.recoveryState().getIndex().setFileDetailsComplete();
}
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
listener.onResponse(null);
success = true;
} catch (EngineException | IOException e) {
throw new IndexShardRecoveryException(shardId, "failed to recover from gateway", e);
} finally {
store.decRef();
// if (success == false) {
// }
}
if (si != null && indexShouldExists == false) {
// it exists on the directory, but shouldn't exist on the FS, its a leftover (possibly dangling)
// its a "new index create" API, we have to do something, so better to clean it than use same data
logger.trace("cleaning existing shard, shouldn't exists");
Lucene.cleanLuceneIndex(store.directory());
si = null;
}
} catch (Exception e) {
throw new IndexShardRecoveryException(shardId, "failed to fetch index version after copying it over", e);
}
if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
assert indexShouldExists;
bootstrap(indexShard, store);
writeEmptyRetentionLeasesFile(indexShard);
} else if (indexShouldExists) {
if (recoveryState.getRecoverySource().shouldBootstrapNewHistoryUUID()) {
store.bootstrapNewHistory();
writeEmptyRetentionLeasesFile(indexShard);
}
// since we recover from local, just fill the files and size
final RecoveryState.Index index = recoveryState.getIndex();
try {
if (si != null) {
addRecoveredFileDetails(si, store, index);
}
} catch (IOException e) {
logger.debug("failed to list file details", e);
}
index.setFileDetailsComplete();
} else {
store.createEmpty();
final String translogUUID = Translog.createEmptyTranslog(
indexShard.shardPath().resolveTranslog(),
SequenceNumbers.NO_OPS_PERFORMED,
shardId,
indexShard.getPendingPrimaryTerm()
);
store.associateIndexWithNewTranslog(translogUUID);
writeEmptyRetentionLeasesFile(indexShard);
indexShard.recoveryState().getIndex().setFileDetailsComplete();
}
indexShard.openEngineAndRecoverFromTranslog();
indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm());
indexShard.finalizeRecovery();
indexShard.postRecovery("post recovery from shard_store");
} catch (EngineException | IOException e) {
throw new IndexShardRecoveryException(shardId, "failed to recover from gateway", e);
} finally {
store.decRef();
}
}).run()));
}.run();
}));
}

private static void writeEmptyRetentionLeasesFile(IndexShard indexShard) throws IOException {
Expand Down

0 comments on commit 8967983

Please sign in to comment.