Skip to content

Commit

Permalink
Advance max_seq_no before add operation to Lucene
Browse files Browse the repository at this point in the history
This commit fixes the issue of potential mismatches between the max_seq_no in
the commit's user_data and the seq_no of some documents in the Lucene commit.
The mismatch could arise when processing an operation on a replica engine, as
we first added it to Lucene, then to the translog, to finally mark seq_no as
completed. If a flush occurred after step1, but before the marking, then the
max_seq_no in the commit's user_data would be smaller than the seq_no of some
documents in the Lucene commit.

Port of elastic/elasticsearch#38879
  • Loading branch information
marregui authored and mergify[bot] committed Nov 18, 2019
1 parent b469ea9 commit b126ec5
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
}
}
}
markSeqNoAsSeen(index.seqNo());
return plan;
}

Expand Down Expand Up @@ -1346,6 +1347,7 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws
delete.seqNo(), delete.version());
}
}
markSeqNoAsSeen(delete.seqNo());
return plan;
}

Expand Down Expand Up @@ -2482,6 +2484,13 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
localCheckpointTracker.waitForProcessedOpsToComplete(seqNo);
}

/**
* Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value.
*/
protected final void markSeqNoAsSeen(long seqNo) {
localCheckpointTracker.advanceMaxSeqNo(seqNo);
}

/**
* Checks if the given operation has been processed in this engine or not.
* @return true if the given operation was processed; otherwise false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
Expand Down Expand Up @@ -4038,7 +4039,7 @@ public void testRestoreLocalHistoryFromTranslog() throws IOException {
engine.flush();
}
}
globalCheckpoint.set(randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, engine.getProcessedLocalCheckpoint()));
globalCheckpoint.set(randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, engine.getPersistedLocalCheckpoint()));
engine.syncTranslog();
prevSeqNoStats = engine.getSeqNoStats(globalCheckpoint.get());
prevDocs = getDocIds(engine, true);
Expand Down Expand Up @@ -5214,6 +5215,44 @@ public long addDocument(Iterable<? extends IndexableField> doc) throws IOExcepti
}
}

public void testMaxSeqNoInCommitUserData() throws Exception {
AtomicBoolean running = new AtomicBoolean(true);
Thread rollTranslog = new Thread(() -> {
while (running.get() && engine.getTranslog().currentFileGeneration() < 500) {
engine.rollTranslogGeneration(); // make adding operations to translog slower
}
});
rollTranslog.start();

Thread indexing = new Thread(() -> {
long seqNo = 0;
while (running.get() && seqNo <= 1000) {
try {
String id = Long.toString(between(1, 50));
if (randomBoolean()) {
ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
engine.index(replicaIndexForDoc(doc, 1L, seqNo, false));
} else {
engine.delete(replicaDeleteForDoc(id, 1L, seqNo, 0L));
}
seqNo++;
} catch (IOException e) {
throw new AssertionError(e);
}
}
});
indexing.start();

int numCommits = between(5, 20);
for (int i = 0; i < numCommits; i++) {
engine.flush(false, true);
}
running.set(false);
indexing.join();
rollTranslog.join();
assertMaxSeqNoInCommitUserData(engine);
}

@Test
public void testNoOpOnClosingEngine() throws Exception {
engine.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReader;
Expand All @@ -48,6 +50,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -113,6 +116,7 @@
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;

public abstract class EngineTestCase extends ESTestCase {
Expand Down Expand Up @@ -225,18 +229,22 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
@After
public void tearDown() throws Exception {
super.tearDown();
if (engine != null && engine.isClosed.get() == false) {
engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
}
if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
try {
if (engine != null && engine.isClosed.get() == false) {
engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
assertMaxSeqNoInCommitUserData(engine);
}
if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
assertMaxSeqNoInCommitUserData(replicaEngine);
}
assertThat(engine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L));
assertThat(replicaEngine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L));
} finally {
IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool));
}
IOUtils.close(
replicaEngine, storeReplica,
engine, store);
terminate(threadPool);
}


Expand Down Expand Up @@ -975,6 +983,21 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
}
}

/**
* Asserts that the max_seq_no stored in the commit's user_data is never smaller than seq_no of any document in the commit.
*/
public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exception {
List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
for (IndexCommit commit : commits) {
try (DirectoryReader reader = DirectoryReader.open(commit)) {
AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get())));
assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
greaterThanOrEqualTo(maxSeqNoFromDocs.get()));
}
}
}

public static MapperService createMapperService(String type) throws IOException {
IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(Settings.builder()
Expand Down

0 comments on commit b126ec5

Please sign in to comment.