Skip to content

Commit

Permalink
Advance max_seq_no before add operation to Lucene (#38879)
Browse files Browse the repository at this point in the history
Today when processing an operation on a replica engine (or the
following engine), we first add it to Lucene, then add it to translog,
then finally marks its seq_no as completed. If a flush occurs after step1,
but before step-3, the max_seq_no in the commit's user_data will be
smaller than the seq_no of some documents in the Lucene commit.
  • Loading branch information
dnhatn committed Feb 15, 2019
1 parent a3d2310 commit d458ada
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,7 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0
}
}
}
markSeqNoAsSeen(index.seqNo());
return plan;
}

Expand Down Expand Up @@ -1387,6 +1388,7 @@ assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_0
delete.seqNo(), delete.version());
}
}
markSeqNoAsSeen(delete.seqNo());
return plan;
}

Expand Down Expand Up @@ -1541,6 +1543,7 @@ public void maybePruneDeletes() {
public NoOpResult noOp(final NoOp noOp) {
NoOpResult noOpResult;
try (ReleasableLock ignored = readLock.acquire()) {
markSeqNoAsSeen(noOp.seqNo());
noOpResult = innerNoOp(noOp);
} catch (final Exception e) {
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), e);
Expand Down Expand Up @@ -2520,6 +2523,13 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
localCheckpointTracker.waitForOpsToComplete(seqNo);
}

/**
* Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value.
*/
protected final void markSeqNoAsSeen(long seqNo) {
localCheckpointTracker.advanceMaxSeqNo(seqNo);
}

/**
* Checks if the given operation has been processed in this engine or not.
* @return true if the given operation was processed; otherwise false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ public synchronized long generateSeqNo() {
return nextSeqNo++;
}

/**
* Marks the provided sequence number as seen and updates the max_seq_no if needed.
*/
public synchronized void advanceMaxSeqNo(long seqNo) {
if (seqNo >= nextSeqNo) {
nextSeqNo = seqNo + 1;
}
}

/**
* Marks the processing of the provided sequence number as completed as updates the checkpoint if possible.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,7 @@ public void close() {
transport.endConnectMode();
transportService.stop();
transportClientNodesService.close();
try {
terminate(threadPool);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
terminate(threadPool);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5655,4 +5655,42 @@ void assertLuceneOperations(InternalEngine engine, long expectedAppends, long ex
assertThat(message, engine.getNumDocUpdates(), equalTo(expectedUpdates));
assertThat(message, engine.getNumDocDeletes(), equalTo(expectedDeletes));
}

public void testMaxSeqNoInCommitUserData() throws Exception {
AtomicBoolean running = new AtomicBoolean(true);
Thread rollTranslog = new Thread(() -> {
while (running.get() && engine.getTranslog().currentFileGeneration() < 500) {
engine.rollTranslogGeneration(); // make adding operations to translog slower
}
});
rollTranslog.start();

Thread indexing = new Thread(() -> {
long seqNo = 0;
while (running.get() && seqNo <= 1000) {
try {
String id = Long.toString(between(1, 50));
if (randomBoolean()) {
ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
engine.index(replicaIndexForDoc(doc, 1L, seqNo, false));
} else {
engine.delete(replicaDeleteForDoc(id, 1L, seqNo, 0L));
}
seqNo++;
} catch (IOException e) {
throw new AssertionError(e);
}
}
});
indexing.start();

int numCommits = between(5, 20);
for (int i = 0; i < numCommits; i++) {
engine.flush(false, true);
}
running.set(false);
indexing.join();
rollTranslog.join();
assertMaxSeqNoInCommitUserData(engine);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,7 @@ public void setUp() throws Exception {
@Override
public void tearDown() throws Exception {
super.tearDown();
IOUtils.close(serviceA, serviceB, serviceC, () -> {
try {
terminate(threadPool);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
IOUtils.close(serviceA, serviceB, serviceC, () -> terminate(threadPool));
}

private MockTransportService buildService(final Version version) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReader;
Expand Down Expand Up @@ -126,6 +128,7 @@
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;

public abstract class EngineTestCase extends ESTestCase {
Expand Down Expand Up @@ -254,18 +257,20 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
@After
public void tearDown() throws Exception {
super.tearDown();
if (engine != null && engine.isClosed.get() == false) {
engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
}
if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
try {
if (engine != null && engine.isClosed.get() == false) {
engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
assertMaxSeqNoInCommitUserData(engine);
}
if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
assertMaxSeqNoInCommitUserData(replicaEngine);
}
} finally {
IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool));
}
IOUtils.close(
replicaEngine, storeReplica,
engine, store);
terminate(threadPool);
}


Expand Down Expand Up @@ -1079,6 +1084,21 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
}
}

/**
* Asserts that the max_seq_no stored in the commit's user_data is never smaller than seq_no of any document in the commit.
*/
public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exception {
List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
for (IndexCommit commit : commits) {
try (DirectoryReader reader = DirectoryReader.open(commit)) {
AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get())));
assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
greaterThanOrEqualTo(maxSeqNoFromDocs.get()));
}
}
}

public static MapperService createMapperService(String type) throws IOException {
IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,7 +888,7 @@ public static boolean terminate(ExecutorService... services) throws InterruptedE
return terminated;
}

public static boolean terminate(ThreadPool threadPool) throws InterruptedException {
public static boolean terminate(ThreadPool threadPool) {
return ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,7 @@ public void tearDown() throws Exception {
assertNoPendingHandshakes(serviceA.getOriginalTransport());
assertNoPendingHandshakes(serviceB.getOriginalTransport());
} finally {
IOUtils.close(serviceA, serviceB, () -> {
try {
terminate(threadPool);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
IOUtils.close(serviceA, serviceB, () -> terminate(threadPool));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private void preFlight(final Operation operation) {
@Override
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
preFlight(index);
markSeqNoAsSeen(index.seqNo());
// NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers.
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
Expand Down Expand Up @@ -103,6 +104,7 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind
@Override
protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException {
preFlight(delete);
markSeqNoAsSeen(delete.seqNo());
if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) {
// See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation.
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import java.util.stream.Collectors;

import static org.elasticsearch.index.engine.EngineTestCase.getDocIds;
import static org.elasticsearch.index.engine.EngineTestCase.getTranslog;
import static org.elasticsearch.index.engine.EngineTestCase.getTranslog;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -660,4 +662,50 @@ public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
}
});
}

public void testMaxSeqNoInCommitUserData() throws Exception {
final Settings settings = Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
try (FollowingEngine engine = createEngine(store, engineConfig)) {
AtomicBoolean running = new AtomicBoolean(true);
Thread rollTranslog = new Thread(() -> {
while (running.get() && getTranslog(engine).currentFileGeneration() < 500) {
engine.rollTranslogGeneration(); // make adding operations to translog slower
}
});
rollTranslog.start();

Thread indexing = new Thread(() -> {
List<Engine.Operation> ops = EngineTestCase.generateSingleDocHistory(
true, VersionType.EXTERNAL, false, 2, 50, 500, "id");
engine.advanceMaxSeqNoOfUpdatesOrDeletes(ops.stream().mapToLong(Engine.Operation::seqNo).max().getAsLong());
for (Engine.Operation op : ops) {
if (running.get() == false) {
return;
}
try {
EngineTestCase.applyOperation(engine, op);
} catch (IOException e) {
throw new AssertionError(e);
}
}
});
indexing.start();

int numCommits = between(5, 20);
for (int i = 0; i < numCommits; i++) {
engine.flush(false, true);
}
running.set(false);
indexing.join();
rollTranslog.join();
EngineTestCase.assertMaxSeqNoInCommitUserData(engine);
}
}
}
}

0 comments on commit d458ada

Please sign in to comment.