Skip to content

Commit

Permalink
Use local checkpoint to calculate min translog gen for recovery (#51905)
Browse files Browse the repository at this point in the history
Today we use the translog_generation of the safe commit as the minimum
required translog generation for recovery. This approach has a
limitation, where we won't be able to clean up translog unless we flush.
Reopening an already recovered engine will create a new empty translog,
and we leave it there until we force flush.

This commit removes the translog_generation commit tag and uses the 
local checkpoint of the safe commit to calculate the minimum required
translog generation for recovery instead.

Closes #49970
  • Loading branch information
dnhatn committed Feb 10, 2020
1 parent b85b12d commit ebc4681
Show file tree
Hide file tree
Showing 23 changed files with 316 additions and 531 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
cluster.health:
wait_for_no_initializing_shards: true
wait_for_events: languid
# Before 8.0, an empty shard has two empty translog files as we used the translog_generation commit tag as the minimum required
# translog generation for recovery. Here we force-flush to have a consistent translog stats for both old and new indices.
- do:
indices.flush:
index: test
force: true
wait_if_ongoing: true
- do:
indices.stats:
metric: [ translog ]
Expand Down Expand Up @@ -37,10 +44,9 @@
- do:
indices.stats:
metric: [ translog ]
# after flushing we have one empty translog file while an empty index before flushing has two empty translog files.
- lt: { indices.test.primaries.translog.size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.operations: 0 }
- lt: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_size_in_bytes: $creation_size }
- match: { indices.test.primaries.translog.uncommitted_operations: 0 }

---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,10 @@ private void updateRetentionPolicy() throws IOException {
assert Thread.holdsLock(this);
logger.debug("Safe commit [{}], last commit [{}]", commitDescription(safeCommit), commitDescription(lastCommit));
assert safeCommit.isDeleted() == false : "The safe commit must not be deleted";
final long minRequiredGen = Long.parseLong(safeCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
assert lastCommit.isDeleted() == false : "The last commit must not be deleted";
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));

assert minRequiredGen <= lastGen : "minRequiredGen must not be greater than lastGen";
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(minRequiredGen);

softDeletesPolicy.setLocalCheckpointOfSafeCommit(
Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)));
final long localCheckpointOfSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
softDeletesPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit);
}

protected int getDocCountOfCommit(IndexCommit indexCommit) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ public int restoreLocalHistoryFromTranslog(TranslogRecoveryRunner translogRecove
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
try (Translog.Snapshot snapshot = getTranslog().newSnapshotFromMinSeqNo(localCheckpoint + 1)) {
try (Translog.Snapshot snapshot = getTranslog().newSnapshot(localCheckpoint + 1, Long.MAX_VALUE)) {
return translogRecoveryRunner.run(this, snapshot);
}
}
Expand Down Expand Up @@ -458,23 +458,24 @@ public void skipTranslogRecovery() {
}

private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
Translog.TranslogGeneration translogGeneration = translog.getGeneration();
final int opsRecovered;
final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) {
opsRecovered = translogRecoveryRunner.run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
final long localCheckpoint = getProcessedLocalCheckpoint();
if (localCheckpoint < recoverUpToSeqNo) {
try (Translog.Snapshot snapshot = translog.newSnapshot(localCheckpoint + 1, recoverUpToSeqNo)) {
opsRecovered = translogRecoveryRunner.run(this, snapshot);
} catch (Exception e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
} else {
opsRecovered = 0;
}
// flush if we recovered something or if we have references to older translogs
// note: if opsRecovered == 0 and we have older translogs it means they are corrupted or 0 length.
assert pendingTranslogRecovery.get() : "translogRecovery is not pending but should be";
pendingTranslogRecovery.set(false); // we are good - now we can commit
if (opsRecovered > 0) {
logger.trace("flushing post recovery from translog. ops recovered [{}]. committed translog id [{}]. current id [{}]",
opsRecovered, translogGeneration == null ? null :
translogGeneration.translogFileGeneration, translog.currentFileGeneration());
logger.trace("flushing post recovery from translog: ops recovered [{}], current translog generation [{}]",
opsRecovered, translog.currentFileGeneration());
commitIndexWriter(indexWriter, translog);
refreshLastCommittedSegmentInfos();
refresh("translog_recovery");
Expand All @@ -486,7 +487,8 @@ private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy
LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer) throws IOException {

final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final String translogUUID = loadTranslogUUIDFromLastCommit();
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong!
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier,
engineConfig.getPrimaryTermSupplier(), persistedSequenceNumberConsumer);
Expand Down Expand Up @@ -551,18 +553,6 @@ public long getWritingBytes() {
return indexWriter.getFlushingBytes() + versionMap.getRefreshingBytes();
}

/**
* Reads the current stored translog ID from the last commit data.
*/
@Nullable
private String loadTranslogUUIDFromLastCommit() throws IOException {
final Map<String, String> commitUserData = store.readLastCommittedSegmentsInfo().getUserData();
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
return commitUserData.get(Translog.TRANSLOG_UUID_KEY);
}

/**
* Reads the current stored history ID from the IW commit data.
*/
Expand Down Expand Up @@ -1588,8 +1578,10 @@ public boolean shouldPeriodicallyFlush() {
if (shouldPeriodicallyFlushAfterBigMerge.get()) {
return true;
}
final long localCheckpointOfLastCommit =
Long.parseLong(lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final long translogGenerationOfLastCommit =
Long.parseLong(lastCommittedSegmentInfos.userData.get(Translog.TRANSLOG_GENERATION_KEY));
translog.getMinGenerationForSeqNo(localCheckpointOfLastCommit + 1).translogFileGeneration;
final long flushThreshold = config().getIndexSettings().getFlushThresholdSize().getBytes();
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
return false;
Expand Down Expand Up @@ -2281,11 +2273,6 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
ensureCanFlush();
try {
final long localCheckpoint = localCheckpointTracker.getProcessedCheckpoint();
final Translog.TranslogGeneration translogGeneration = translog.getMinGenerationForSeqNo(localCheckpoint + 1);
final String translogFileGeneration = Long.toString(translogGeneration.translogFileGeneration);
final String translogUUID = translogGeneration.translogUUID;
final String localCheckpointValue = Long.toString(localCheckpoint);

writer.setLiveCommitData(() -> {
/*
* The user data captured above (e.g. local checkpoint) contains data that must be evaluated *before* Lucene flushes
Expand All @@ -2296,10 +2283,9 @@ protected void commitIndexWriter(final IndexWriter writer, final Translog transl
* {@link IndexWriter#commit()} call flushes all documents, we defer computation of the maximum sequence number to the time
* of invocation of the commit data iterator (which occurs after all documents have been flushed to Lucene).
*/
final Map<String, String> commitData = new HashMap<>(7);
commitData.put(Translog.TRANSLOG_GENERATION_KEY, translogFileGeneration);
commitData.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpointValue);
final Map<String, String> commitData = new HashMap<>(6);
commitData.put(Translog.TRANSLOG_UUID_KEY, translog.getTranslogUUID());
commitData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, Long.toString(localCheckpoint));
commitData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(localCheckpointTracker.getMaxSeqNo()));
commitData.put(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID, Long.toString(maxUnsafeAutoIdTimestamp.get()));
commitData.put(HISTORY_UUID_KEY, historyUUID);
Expand Down
31 changes: 12 additions & 19 deletions server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.lucene.store.Directory;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
Expand Down Expand Up @@ -137,31 +138,23 @@ public void trimUnreferencedTranslogFiles() {
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
final List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
if (commits.size() == 1) {
if (commits.size() == 1 && translogStats.getTranslogSizeInBytes() > translogStats.getUncommittedSizeInBytes()) {
final Map<String, String> commitUserData = getLastCommittedSegmentInfos().getUserData();
final String translogUuid = commitUserData.get(Translog.TRANSLOG_UUID_KEY);
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) {
throw new IllegalStateException("commit doesn't contain translog generation id");
}
final long lastCommitGeneration = Long.parseLong(commitUserData.get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = engineConfig.getTranslogConfig();
final long minTranslogGeneration = Translog.readMinTranslogGeneration(translogConfig.getTranslogPath(), translogUuid);

if (minTranslogGeneration < lastCommitGeneration) {
// a translog deletion policy that retains nothing but the last translog generation from safe commit
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastCommitGeneration);
translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastCommitGeneration);

try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
translog.trimUnreferencedReaders();
// refresh the translog stats
this.translogStats = translog.stats();
}
final long localCheckpoint = Long.parseLong(commitUserData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy,
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier(), seqNo -> {})) {
translog.trimUnreferencedReaders();
// refresh the translog stats
this.translogStats = translog.stats();
assert translog.currentFileGeneration() == translog.getMinFileGeneration() : "translog was not trimmed "
+ " current gen " + translog.currentFileGeneration() + " != min gen " + translog.getMinFileGeneration();
}
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,10 @@ private static TranslogStats translogStats(final EngineConfig config, final Segm
if (translogUuid == null) {
throw new IllegalStateException("commit doesn't contain translog unique id");
}
final long translogGenOfLastCommit = Long.parseLong(infos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
final TranslogConfig translogConfig = config.getTranslogConfig();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
translogDeletionPolicy.setTranslogGenerationOfLastCommit(translogGenOfLastCommit);

final long localCheckpoint = Long.parseLong(infos.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
translogDeletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
try (Translog translog = new Translog(translogConfig, translogUuid, translogDeletionPolicy, config.getGlobalCheckpointSupplier(),
config.getPrimaryTermSupplier(), seqNo -> {})
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1437,10 +1437,7 @@ public void associateIndexWithNewTranslog(final String translogUUID) throws IOEx
if (translogUUID.equals(getUserData(writer).get(Translog.TRANSLOG_UUID_KEY))) {
throw new IllegalArgumentException("a new translog uuid can't be equal to existing one. got [" + translogUUID + "]");
}
final Map<String, String> map = new HashMap<>();
map.put(Translog.TRANSLOG_GENERATION_KEY, "1");
map.put(Translog.TRANSLOG_UUID_KEY, translogUUID);
updateCommitData(writer, map);
updateCommitData(writer, Map.of(Translog.TRANSLOG_UUID_KEY, translogUUID));
} finally {
metadataLock.writeLock().unlock();
}
Expand Down
Loading

0 comments on commit ebc4681

Please sign in to comment.