Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Advance max_seq_no before add operation to Lucene #38879

Merged
merged 2 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,7 @@ public IndexResult index(Index index) throws IOException {
indexResult = plan.earlyResultOnPreFlightError.get();
assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
} else if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
localCheckpointTracker.advanceMaxSeqNo(plan.seqNoForIndexing);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Properly setting the max sequence number before indexing might not only be a useful property for Lucene-indexed stuff, but in general. I would prefer not to hide it in this sub-condition.
It bugs me that the planning process is mutating the state of the LocalCheckpointTracker in this way when being primary, but not when being replica. If we want the flow for the primary and replica to be the same, we could do this instead as part of the planning process in planIndexingAsNonPrimary. This will make it so that both primary and replica manage this property in the planning phase (will also require adaptation to FollowingEngine).
As a follow-up, I think we should also investigate whether we can remove this state mutation from the planning process, i.e., not have the planning process assign a sequence number or change the max seq no, but do it as an explicit step here in the index method, given that it is such a fundamental step in processing a request.

indexResult = indexIntoLucene(index, plan);
} else {
indexResult = new IndexResult(
Expand Down Expand Up @@ -1234,6 +1235,7 @@ public DeleteResult delete(Delete delete) throws IOException {
if (plan.earlyResultOnPreflightError.isPresent()) {
deleteResult = plan.earlyResultOnPreflightError.get();
} else if (plan.deleteFromLucene || plan.addStaleOpToLucene) {
localCheckpointTracker.advanceMaxSeqNo(plan.seqNoOfDeletion);
deleteResult = deleteInLucene(delete, plan);
} else {
deleteResult = new DeleteResult(
Expand Down Expand Up @@ -1479,6 +1481,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
} else {
Exception failure = null;
if (softDeleteEnabled) {
localCheckpointTracker.advanceMaxSeqNo(noOp.seqNo());
try {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason());
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ public synchronized long generateSeqNo() {
return nextSeqNo++;
}

/**
* Marks the provided sequence number as seen and updates the max_seq_no if needed.
*/
public synchronized void advanceMaxSeqNo(long seqNo) {
if (seqNo >= nextSeqNo) {
nextSeqNo = seqNo + 1;
}
}

/**
* Marks the processing of the provided sequence number as completed as updates the checkpoint if possible.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5653,4 +5653,46 @@ public void testStoreHonorsLuceneVersion() throws IOException {
}
}
}

public void testMaxSeqNoInCommitUserData() throws Exception {
AtomicBoolean running = new AtomicBoolean(true);
Thread rollTranslog = new Thread(() -> {
while (running.get() && engine.getTranslog().currentFileGeneration() < 500) {
engine.rollTranslogGeneration(); // make adding operations to translog slower
}
});
rollTranslog.start();

long maxNumDocs = 1000;
Thread indexing = new Thread(() -> {
long seqNo = 0;
while (running.get() && seqNo <= maxNumDocs) {
try {
ParsedDocument doc = testParsedDocument(Long.toString(seqNo), null, testDocumentWithTextField(), SOURCE, null);
engine.index(replicaIndexForDoc(doc, 1L, seqNo, false));
seqNo++;
} catch (IOException e) {
throw new AssertionError(e);
}
}
});
indexing.start();

int numCommits = between(5, 20);
for (int i = 0; i < numCommits; i++) {
engine.flush(false, true);
}
running.set(false);
indexing.join();
rollTranslog.join();
List<IndexCommit> commits = DirectoryReader.listCommits(store.directory());
for (IndexCommit commit : commits) {
try (DirectoryReader reader = DirectoryReader.open(commit)) {
AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Lucene.scanSeqNosInReader(reader, 0, maxNumDocs, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get())));
assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we check this property across all of our Engine tests?

greaterThanOrEqualTo(maxSeqNoFromDocs.get()));
}
}
}
}