Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use global checkpoint as starting seq in ops-based recovery #43463

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
f8c0b15
Use global checkpoint as base for seq based recovery
dnhatn Jun 19, 2019
72458c9
Revert "Use global checkpoint as base for seq based recovery"
dnhatn Jun 25, 2019
d6fe637
Merge branch 'master' into recover-to-global-checkpoint
dnhatn Jun 25, 2019
ef454e2
Recover locally first
dnhatn Jun 25, 2019
800b329
Merge branch 'peer-recovery-retention-leases' into recover-to-global-…
dnhatn Jun 27, 2019
c05d70d
add comment
dnhatn Jun 26, 2019
c535d5f
fix tests
dnhatn Jun 27, 2019
ed024e7
Merge branch 'peer-recovery-retention-leases' into recover-to-global-…
dnhatn Jun 28, 2019
0a476b7
Merge branch 'peer-recovery-retention-leases' into recover-to-global-…
dnhatn Jul 12, 2019
3f35450
introduce prepare step
dnhatn Jul 13, 2019
0a67c49
Merge branch 'peer-recovery-retention-leases' into HEAD
dnhatn Jul 15, 2019
d465b43
verify index
dnhatn Jul 15, 2019
7e45622
fix test
dnhatn Jul 15, 2019
4cf88eb
Do not add new state
dnhatn Jul 19, 2019
3280e64
combine with performRecoveryRestart
dnhatn Jul 22, 2019
30ef0c6
more feedback
dnhatn Jul 22, 2019
a40b5dd
log the global checkpoint
dnhatn Jul 22, 2019
ed5d302
adjust the total local to reflect the exact count
dnhatn Jul 22, 2019
731af0e
Merge branch 'peer-recovery-retention-leases' into recover-to-global-…
dnhatn Jul 22, 2019
e6923e8
fix translog recovery stats
dnhatn Jul 22, 2019
6ec4e80
do not adjust
dnhatn Jul 23, 2019
7a26f32
Revert "do not adjust"
dnhatn Jul 23, 2019
1deddbe
Merge branch 'peer-recovery-retention-leases' into recover-to-global-…
dnhatn Jul 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
123 changes: 106 additions & 17 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Expand Up @@ -159,6 +159,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -1359,6 +1360,81 @@ public void prepareForIndexRecovery() {
assert currentEngineReference.get() == null;
}

/**
* A best effort to bring up this shard to the global checkpoint using the local translog before performing a peer recovery.
*
* @return a sequence number that an operation-based peer recovery can start with.
* This is the first operation after the local checkpoint of the safe commit if exists.
*/
public long recoverLocallyUpToGlobalCheckpoint() {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]";
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
final Optional<SequenceNumbers.CommitInfo> safeCommit;
final long globalCheckpoint;
try {
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
globalCheckpoint = Translog.readGlobalCheckpoint(translogConfig.getTranslogPath(), translogUUID);
safeCommit = store.findSafeIndexCommit(globalCheckpoint);
} catch (org.apache.lucene.index.IndexNotFoundException e) {
logger.trace("skip local recovery as no index commit found");
return UNASSIGNED_SEQ_NO;
} catch (Exception e) {
logger.debug("skip local recovery as failed to find the safe commit", e);
return UNASSIGNED_SEQ_NO;
}
if (safeCommit.isPresent() == false) {
logger.trace("skip local recovery as no safe commit found");
return UNASSIGNED_SEQ_NO;
}
assert safeCommit.get().localCheckpoint <= globalCheckpoint : safeCommit.get().localCheckpoint + " > " + globalCheckpoint;
try {
maybeCheckIndex(); // check index here and won't do it again if ops-based recovery occurs
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
if (safeCommit.get().localCheckpoint == globalCheckpoint) {
logger.trace("skip local recovery as the safe commit is up to date; safe commit {} global checkpoint {}",
safeCommit.get(), globalCheckpoint);
recoveryState.getTranslog().totalLocal(0);
return globalCheckpoint + 1;
}
try {
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
recoveryState.getTranslog().totalLocal(snapshot.totalOperations());
final int recoveredOps = runTranslogRecovery(engine, snapshot, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY,
recoveryState.getTranslog()::incrementRecoveredOperations);
recoveryState.getTranslog().totalLocal(recoveredOps); // adjust the total local to reflect the actual count
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
return recoveredOps;
};
innerOpenEngineAndTranslog();
getEngine().recoverFromTranslog(translogRecoveryRunner, globalCheckpoint);
logger.trace("shard locally recovered up to {}", getEngine().getSeqNoStats(globalCheckpoint));
} finally {
synchronized (mutex) {
IOUtils.close(currentEngineReference.getAndSet(null));
}
}
} catch (Exception e) {
logger.debug(new ParameterizedMessage("failed to recover shard locally up to global checkpoint {}", globalCheckpoint), e);
return UNASSIGNED_SEQ_NO;
}
try {
// we need to find the safe commit again as we should have created a new one during the local recovery
final Optional<SequenceNumbers.CommitInfo> newSafeCommit = store.findSafeIndexCommit(globalCheckpoint);
assert newSafeCommit.isPresent() : "no safe commit found after local recovery";
return newSafeCommit.get().localCheckpoint + 1;
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
if (Assertions.ENABLED) {
throw new AssertionError(
"failed to find the safe commit after recovering shard locally up to global checkpoint " + globalCheckpoint, e);
}
logger.debug(new ParameterizedMessage(
"failed to find the safe commit after recovering shard locally up to global checkpoint {}", globalCheckpoint), e);
return UNASSIGNED_SEQ_NO;
}
}

public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
getEngine().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo);
}
Expand Down Expand Up @@ -1462,6 +1538,9 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat
* Operations from the translog will be replayed to bring lucene up to date.
**/
public void openEngineAndRecoverFromTranslog() throws IOException {
assert recoveryState.getStage() == RecoveryState.Stage.INDEX : "unexpected recovery stage [" + recoveryState.getStage() + "]";
maybeCheckIndex();
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
final RecoveryState.Translog translogRecoveryStats = recoveryState.getTranslog();
final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> {
translogRecoveryStats.totalOperations(snapshot.totalOperations());
Expand All @@ -1478,6 +1557,8 @@ public void openEngineAndRecoverFromTranslog() throws IOException {
* The translog is kept but its operations won't be replayed.
*/
public void openEngineAndSkipTranslogRecovery() throws IOException {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "unexpected recovery stage [" + recoveryState.getStage() + "]";
innerOpenEngineAndTranslog();
getEngine().skipTranslogRecovery();
}
Expand All @@ -1486,17 +1567,6 @@ private void innerOpenEngineAndTranslog() throws IOException {
if (state != IndexShardState.RECOVERING) {
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
throw new IndexShardNotRecoveringException(shardId, state);
}
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
// also check here, before we apply the translog
if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) {
try {
checkIndex();
} catch (IOException ex) {
throw new RecoveryFailedException(recoveryState, "check index failed", ex);
}
}
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);

final EngineConfig config = newEngineConfig();

// we disable deletes since we allow for operations to be executed against the shard while recovering
Expand Down Expand Up @@ -1552,14 +1622,22 @@ private void onNewEngine(Engine newEngine) {
*/
public void performRecoveryRestart() throws IOException {
synchronized (mutex) {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
assert refreshListeners.pendingCount() == 0 : "we can't restart with pending listeners";
final Engine engine = this.currentEngineReference.getAndSet(null);
IOUtils.close(engine);
recoveryState().setStage(RecoveryState.Stage.INIT);
IOUtils.close(currentEngineReference.getAndSet(null));
resetRecoveryStage();
}
}

/**
* If a file-based recovery occurs, a recovery target calls this method to reset the recovery stage.
*/
public void resetRecoveryStage() {
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
assert currentEngineReference.get() == null;
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
recoveryState().setStage(RecoveryState.Stage.INIT);
}

/**
Expand Down Expand Up @@ -2296,6 +2374,17 @@ public void noopUpdate(String type) {
internalIndexingStats.noopUpdate(type);
}

public void maybeCheckIndex() {
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
if (Booleans.isTrue(checkIndexOnStartup) || "checksum".equals(checkIndexOnStartup)) {
try {
checkIndex();
} catch (IOException ex) {
throw new RecoveryFailedException(recoveryState, "check index failed", ex);
}
}
}

void checkIndex() throws IOException {
if (store.tryIncRef()) {
try {
Expand Down
17 changes: 17 additions & 0 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Expand Up @@ -96,6 +96,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -1560,6 +1561,22 @@ public void trimUnsafeCommits(final long lastSyncedGlobalCheckpoint, final long
}
}

/**
* Returns a {@link org.elasticsearch.index.seqno.SequenceNumbers.CommitInfo} of the safe commit if exists.
*/
public Optional<SequenceNumbers.CommitInfo> findSafeIndexCommit(long globalCheckpoint) throws IOException {
final List<IndexCommit> commits = DirectoryReader.listCommits(directory);
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
assert commits.isEmpty() == false : "no commit found";
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(commits, globalCheckpoint);
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(safeCommit.getUserData().entrySet());
// all operations of the safe commit must be at most the global checkpoint.
if (commitInfo.maxSeqNo <= globalCheckpoint) {
return Optional.of(commitInfo);
} else {
return Optional.empty();
dnhatn marked this conversation as resolved.
Show resolved Hide resolved
}
}

private static void updateCommitData(IndexWriter writer, Map<String, String> keysToUpdate) throws IOException {
final Map<String, String> userData = getUserData(writer);
userData.putAll(keysToUpdate);
Expand Down
Expand Up @@ -22,8 +22,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.RateLimiter;
import org.elasticsearch.ElasticsearchException;
Expand All @@ -44,18 +42,14 @@
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CombinedDeletionPolicy;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -68,12 +62,11 @@
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;

/**
* The recovery target handles recoveries of peer shards of the shard+node to recover to.
Expand Down Expand Up @@ -178,9 +171,12 @@ private void doRecovery(final long recoveryId) {
cancellableThreads = recoveryTarget.cancellableThreads();
try {
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
request = getStartRecoveryRequest(recoveryTarget);
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
recoveryTarget.indexShard().prepareForIndexRecovery();
final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint();
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG :
"unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
} catch (final Exception e) {
// this will be logged as warning later on...
logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);
Expand Down Expand Up @@ -319,7 +315,7 @@ public RecoveryResponse read(StreamInput in) throws IOException {
* @param recoveryTarget the target of the recovery
* @return a snapshot of the store metadata
*/
private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
private static Store.MetadataSnapshot getStoreMetadataSnapshot(final Logger logger, final RecoveryTarget recoveryTarget) {
try {
return recoveryTarget.indexShard().snapshotStoreMetadata();
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
Expand All @@ -335,89 +331,32 @@ private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget rec
/**
* Prepare the start recovery request.
*
* @param logger the logger
* @param localNode the local node of the recovery target
* @param recoveryTarget the target of the recovery
* @param startingSeqNo a sequence number that an operation-based peer recovery can start with.
* This is the first operation after the local checkpoint of the safe commit if exists.
* @return a start recovery request
*/
private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recoveryTarget) {
public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, DiscoveryNode localNode,
RecoveryTarget recoveryTarget, long startingSeqNo) {
final StartRecoveryRequest request;
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());

final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(logger, recoveryTarget);
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());

final long startingSeqNo;
if (metadataSnapshot.size() > 0) {
startingSeqNo = getStartingSeqNo(logger, recoveryTarget);
} else {
startingSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
}

if (startingSeqNo == SequenceNumbers.UNASSIGNED_SEQ_NO) {
logger.trace("{} preparing for file-based recovery from [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
} else {
logger.trace(
"{} preparing for sequence-number-based recovery starting at sequence number [{}] from [{}]",
recoveryTarget.shardId(),
startingSeqNo,
recoveryTarget.sourceNode());
}

request = new StartRecoveryRequest(
recoveryTarget.shardId(),
recoveryTarget.indexShard().routingEntry().allocationId().getId(),
recoveryTarget.sourceNode(),
clusterService.localNode(),
localNode,
metadataSnapshot,
recoveryTarget.state().getPrimary(),
recoveryTarget.recoveryId(),
startingSeqNo);
return request;
}

/**
* Get the starting sequence number for a sequence-number-based request.
*
* @param recoveryTarget the target of the recovery
* @return the starting sequence number or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number
* failed
*/
public static long getStartingSeqNo(final Logger logger, final RecoveryTarget recoveryTarget) {
try {
final Store store = recoveryTarget.store();
final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY);
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), translogUUID);
final List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
final IndexCommit safeCommit = CombinedDeletionPolicy.findSafeCommitPoint(existingCommits, globalCheckpoint);
final SequenceNumbers.CommitInfo seqNoStats = Store.loadSeqNoInfo(safeCommit);
if (logger.isTraceEnabled()) {
final StringJoiner descriptionOfExistingCommits = new StringJoiner(",");
for (IndexCommit commit : existingCommits) {
descriptionOfExistingCommits.add(CombinedDeletionPolicy.commitDescription(commit));
}
logger.trace("Calculate starting seqno based on global checkpoint [{}], safe commit [{}], existing commits [{}]",
globalCheckpoint, CombinedDeletionPolicy.commitDescription(safeCommit), descriptionOfExistingCommits);
}
if (seqNoStats.maxSeqNo <= globalCheckpoint) {
assert seqNoStats.localCheckpoint <= globalCheckpoint;
/*
* Commit point is good for sequence-number based recovery as the maximum sequence number included in it is below the global
* checkpoint (i.e., it excludes any operations that may not be on the primary). Recovery will start at the first operation
* after the local checkpoint stored in the commit.
*/
return seqNoStats.localCheckpoint + 1;
} else {
return SequenceNumbers.UNASSIGNED_SEQ_NO;
}
} catch (final TranslogCorruptedException | IOException e) {
/*
* This can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the
* translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and
* proceeds to attempt a sequence-number-based recovery.
*/
return SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}

public interface RecoveryListener {
void onRecoveryDone(RecoveryState state);

Expand Down