Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that max seq # is equal to the global checkpoint when creating ReadOnlyEngines #37426

Merged
merged 5 commits into from
Jan 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.Lock;
import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.core.internal.io.IOUtils;
Expand Down Expand Up @@ -98,7 +100,25 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
indexWriterLock = obtainLock ? directory.obtainLock(IndexWriter.WRITE_LOCK_NAME) : null;
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats;
if (seqNoStats == null) {
seqNoStats = buildSeqNoStats(lastCommittedSegmentInfos);
// During a peer-recovery the global checkpoint is not known and up to date when the engine
// is created, so we only check the max seq no / global checkpoint coherency when the global
// checkpoint is different from the unassigned sequence number value.
// In addition to that we only execute the check if the index the engine belongs to has been
// created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction
// that guarantee that all operations have been flushed to Lucene.
final long globalCheckpoint = engineConfig.getGlobalCheckpointSupplier().getAsLong();
if (globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO
&& engineConfig.getIndexSettings().getIndexVersionCreated().onOrAfter(Version.V_6_7_0)) {
if (seqNoStats.getMaxSeqNo() != globalCheckpoint) {
assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), globalCheckpoint);
throw new IllegalStateException("Maximum sequence number [" + seqNoStats.getMaxSeqNo()
tlrx marked this conversation as resolved.
Show resolved Hide resolved
+ "] from last commit does not match global checkpoint [" + globalCheckpoint + "]");
}
}
}
this.seqNoStats = seqNoStats;
this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory);
reader = open(indexCommit);
reader = wrapReader(reader, readerWrapperFunction);
Expand All @@ -116,6 +136,12 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
}
}

protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) {
if (Assertions.ENABLED) {
assert false : "max seq. no. [" + maxSeqNo + "] does not match [" + globalCheckpoint + "]";
}
}

protected final DirectoryReader wrapReader(DirectoryReader reader,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) throws IOException {
reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testReadOnlyEngine() throws Exception {
lastDocIds = getDocIds(engine, true);
assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint()));
assertThat(readOnlyEngine.getSeqNoStats(globalCheckpoint.get()).getMaxSeqNo(), equalTo(lastSeqNoStats.getMaxSeqNo()));
assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds));
assertThat(getDocIds(readOnlyEngine, false), equalTo(lastDocIds));
for (int i = 0; i < numDocs; i++) {
if (randomBoolean()) {
String delId = Integer.toString(i);
Expand Down Expand Up @@ -126,7 +126,7 @@ public void testFlushes() throws IOException {
if (rarely()) {
engine.flush();
}
globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), engine.getLocalCheckpoint()));
globalCheckpoint.set(i);
}
engine.syncTranslog();
engine.flushAndClose();
Expand All @@ -139,6 +139,40 @@ public void testFlushes() throws IOException {
}
}

public void testEnsureMaxSeqNoIsEqualToGlobalCheckpoint() throws IOException {
IOUtils.close(engine, store);
Engine readOnlyEngine = null;
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
final int numDocs = scaledRandomIntBetween(10, 100);
try (InternalEngine engine = createEngine(config)) {
long maxSeqNo = SequenceNumbers.NO_OPS_PERFORMED;
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
engine.index(new Engine.Index(newUid(doc), doc, i, primaryTerm.get(), 1, null, Engine.Operation.Origin.REPLICA,
System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0));
maxSeqNo = engine.getLocalCheckpoint();
}
globalCheckpoint.set(engine.getLocalCheckpoint() - 1);
engine.syncTranslog();
engine.flushAndClose();

IllegalStateException exception = expectThrows(IllegalStateException.class,
() -> new ReadOnlyEngine(engine.engineConfig, null, null, true, Function.identity()) {
@Override
protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) {
// we don't want the assertion to trip in this test
}
});
assertThat(exception.getMessage(), equalTo("Maximum sequence number [" + maxSeqNo
+ "] from last commit does not match global checkpoint [" + globalCheckpoint.get() + "]"));
} finally {
IOUtils.close(readOnlyEngine);
}
}
}

public void testReadOnly() throws IOException {
IOUtils.close(engine, store);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Expand Down