Skip to content

Commit

Permalink
Always rebuild checkpoint tracker for old indices (#46340)
Browse files Browse the repository at this point in the history
The max_seq_no of Lucene commit of the old indices (before 6.6.2) can be
smaller than seq_no of some documents in the commit (see #38879).
Although we fixed this bug in 6.6.2 and 7.0.0, a problematic index
commit can still affect the newer version after a rolling upgrade or
full cluster restart. In particular, if a FollowingEngine (or an internal 
engine with MSU enabled) restores from a problematic commit, then 
it can apply MSU optimization for existing documents. The symptom
that we see here is the local checkpoint tracker assertion is violated.

Closes #46311
Relates #38879
  • Loading branch information
dnhatn committed Sep 7, 2019
1 parent c7326d2 commit eae6361
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,14 @@ private static LocalCheckpointTracker createLocalCheckpointTracker(EngineConfig
// Thus, we need to restore the LocalCheckpointTracker bit by bit to ensure the consistency between LocalCheckpointTracker and
// Lucene index. This is not the only solution since we can bootstrap max_seq_no_of_updates with max_seq_no of the commit to
// disable the MSU optimization during recovery. Here we prefer to maintain the consistency of LocalCheckpointTracker.
if (localCheckpoint < maxSeqNo && engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
// The max_seq_no of Lucene commit in the old indices might be smaller than seq_no of some documents in the commit.
// We have to rebuild the LocalCheckpointTracker for those indices. See https://github.com/elastic/elasticsearch/pull/38879.
// Note that this bug affects only indices created between 6.5.0 and 6.6.1 with soft-deletes is explicitly enabled.
final boolean mustRebuild = engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_6_6_2);
if (engineConfig.getIndexSettings().isSoftDeleteEnabled() && (localCheckpoint < maxSeqNo || mustRebuild)) {
try (Searcher searcher = searcherSupplier.get()) {
Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, maxSeqNo,
tracker::markSeqNoAsCompleted);
final long toSeqNo = mustRebuild ? Long.MAX_VALUE : maxSeqNo;
Lucene.scanSeqNosInReader(searcher.getDirectoryReader(), localCheckpoint + 1, toSeqNo, tracker::markSeqNoAsCompleted);
}
}
return tracker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.VersionUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;

Expand Down Expand Up @@ -5632,6 +5633,61 @@ public void testRebuildLocalCheckpointTracker() throws Exception {
}
}

/**
* Simulate a bug in https://github.com/elastic/elasticsearch/pull/38879 where the max_seq_no
* of the index commit can be smaller than seq_no of some documents in the commit.
*/
public void testAlwaysRebuildLocalCheckpointForOldIndex() throws Exception {
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(), Version.V_6_5_0, Version.V_6_6_1))
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true);
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
Path translogPath = createTempDir();
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean());
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(indexSettings, store, translogPath, NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get);
final List<DocIdSeqNoAndTerm> docs;
try (InternalEngine engine = createEngine(config)) {
for (Engine.Operation op : operations) {
applyOperation(engine, op);
if (randomInt(100) < 10) {
engine.flush();
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
}
}
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
engine.syncTranslog();
docs = getDocIds(engine, true);
}
trimUnsafeCommits(config);
// Simulate a bug in https://github.com/elastic/elasticsearch/pull/38879 where max_seq_no is smaller than seq_no of some docs.
if (randomBoolean()) {
IndexWriterConfig iwc = new IndexWriterConfig(null)
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setCommitOnClose(false)
.setMergePolicy(NoMergePolicy.INSTANCE)
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
try (IndexWriter writer = new IndexWriter(config.getStore().directory(), iwc)) {
Map<String, String> userData = new HashMap<>();
writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));
SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userData.entrySet());
long maxSeqNo = randomLongBetween(commitInfo.localCheckpoint, commitInfo.maxSeqNo);
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
writer.setLiveCommitData(userData.entrySet());
writer.commit();
}
}
try (InternalEngine engine = new InternalEngine(config)) {
engine.reinitializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
assertThat(getDocIds(engine, true), equalTo(docs));
}
}
}

public void testOpenSoftDeletesIndexWithSoftDeletesDisabled() throws Exception {
try (Store store = createStore()) {
Path translogPath = createTempDir();
Expand Down

0 comments on commit eae6361

Please sign in to comment.