Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advance max_seq_no before add operation to Lucene #38879

Merged
merged 2 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
}
}
}
markSeqNoAsSeen(index.seqNo());
return plan;
}

Expand Down Expand Up @@ -1306,6 +1307,7 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws
delete.seqNo(), delete.version());
}
}
markSeqNoAsSeen(delete.seqNo());
return plan;
}

Expand Down Expand Up @@ -1460,6 +1462,7 @@ public void maybePruneDeletes() {
public NoOpResult noOp(final NoOp noOp) {
NoOpResult noOpResult;
try (ReleasableLock ignored = readLock.acquire()) {
markSeqNoAsSeen(noOp.seqNo());
noOpResult = innerNoOp(noOp);
} catch (final Exception e) {
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), e);
Expand Down Expand Up @@ -2439,6 +2442,13 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
localCheckpointTracker.waitForOpsToComplete(seqNo);
}

/**
* Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value.
*/
protected final void markSeqNoAsSeen(long seqNo) {
localCheckpointTracker.advanceMaxSeqNo(seqNo);
}

/**
* Checks if the given operation has been processed in this engine or not.
* @return true if the given operation was processed; otherwise false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ public synchronized long generateSeqNo() {
return nextSeqNo++;
}

/**
* Marks the provided sequence number as seen and updates the max_seq_no if needed.
*/
public synchronized void advanceMaxSeqNo(long seqNo) {
if (seqNo >= nextSeqNo) {
nextSeqNo = seqNo + 1;
}
}

/**
* Marks the processing of the provided sequence number as completed as updates the checkpoint if possible.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5653,4 +5653,42 @@ public void testStoreHonorsLuceneVersion() throws IOException {
}
}
}

public void testMaxSeqNoInCommitUserData() throws Exception {
AtomicBoolean running = new AtomicBoolean(true);
Thread rollTranslog = new Thread(() -> {
while (running.get() && engine.getTranslog().currentFileGeneration() < 500) {
engine.rollTranslogGeneration(); // make adding operations to translog slower
}
});
rollTranslog.start();

Thread indexing = new Thread(() -> {
long seqNo = 0;
while (running.get() && seqNo <= 1000) {
try {
String id = Long.toString(between(1, 50));
if (randomBoolean()) {
ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null);
engine.index(replicaIndexForDoc(doc, 1L, seqNo, false));
} else {
engine.delete(replicaDeleteForDoc(id, 1L, seqNo, 0L));
}
seqNo++;
} catch (IOException e) {
throw new AssertionError(e);
}
}
});
indexing.start();

int numCommits = between(5, 20);
for (int i = 0; i < numCommits; i++) {
engine.flush(false, true);
}
running.set(false);
indexing.join();
rollTranslog.join();
assertMaxSeqNoInCommitUserData(engine);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReader;
Expand Down Expand Up @@ -126,6 +128,7 @@
import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;

public abstract class EngineTestCase extends ESTestCase {
Expand Down Expand Up @@ -254,18 +257,20 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
@After
public void tearDown() throws Exception {
super.tearDown();
if (engine != null && engine.isClosed.get() == false) {
engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
}
if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
try {
if (engine != null && engine.isClosed.get() == false) {
engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
assertMaxSeqNoInCommitUserData(engine);
}
if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test"));
assertMaxSeqNoInCommitUserData(replicaEngine);
}
} finally {
IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool));
}
IOUtils.close(
replicaEngine, storeReplica,
engine, store);
terminate(threadPool);
}


Expand Down Expand Up @@ -1067,6 +1072,21 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e
}
}

/**
* Asserts that the max_seq_no stored in the commit's user_data is never smaller than seq_no of any document in the commit.
*/
public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exception {
List<IndexCommit> commits = DirectoryReader.listCommits(engine.store.directory());
for (IndexCommit commit : commits) {
try (DirectoryReader reader = DirectoryReader.open(commit)) {
AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get())));
assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
greaterThanOrEqualTo(maxSeqNoFromDocs.get()));
}
}
}

public static MapperService createMapperService(String type) throws IOException {
IndexMetaData indexMetaData = IndexMetaData.builder("test")
.settings(Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ private void preFlight(final Operation operation) {
@Override
protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException {
preFlight(index);
markSeqNoAsSeen(index.seqNo());
// NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers.
final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes();
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized";
Expand Down Expand Up @@ -103,6 +104,7 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind
@Override
protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException {
preFlight(delete);
markSeqNoAsSeen(delete.seqNo());
if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) {
// See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation.
final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.index.engine.EngineTestCase.getDocIds;
import static org.elasticsearch.index.engine.EngineTestCase.getTranslog;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
Expand Down Expand Up @@ -659,4 +660,49 @@ public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException {
}
});
}

public void testMaxSeqNoInCommitUserData() throws Exception {
final Settings settings = Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true)
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build();
final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings);
try (Store store = createStore(shardId, indexSettings, newDirectory())) {
final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry());
try (FollowingEngine engine = createEngine(store, engineConfig)) {
AtomicBoolean running = new AtomicBoolean(true);
Thread rollTranslog = new Thread(() -> {
while (running.get() && getTranslog(engine).currentFileGeneration() < 500) {
engine.rollTranslogGeneration(); // make adding operations to translog slower
}
});
rollTranslog.start();

Thread indexing = new Thread(() -> {
List<Engine.Operation> ops = EngineTestCase.generateSingleDocHistory(true, VersionType.EXTERNAL, 2, 50, 500, "id");
engine.advanceMaxSeqNoOfUpdatesOrDeletes(ops.stream().mapToLong(Engine.Operation::seqNo).max().getAsLong());
for (Engine.Operation op : ops) {
if (running.get() == false) {
return;
}
try {
EngineTestCase.applyOperation(engine, op);
} catch (IOException e) {
throw new AssertionError(e);
}
}
});
indexing.start();

int numCommits = between(5, 20);
for (int i = 0; i < numCommits; i++) {
engine.flush(false, true);
}
running.set(false);
indexing.join();
rollTranslog.join();
EngineTestCase.assertMaxSeqNoInCommitUserData(engine);
}
}
}
}