Skip to content

Commit

Permalink
Exclude nested documents in LuceneChangesSnapshot (#51279)
Browse files Browse the repository at this point in the history
LuceneChangesSnapshot can be slow if nested documents are heavily used. 
Also, it estimates the number of operations to be recovered in peer
recoveries inaccurately. With this change, we prefer excluding the
nested non-root documents in a Lucene query instead.
  • Loading branch information
dnhatn committed Jan 22, 2020
1 parent 1dc9dd4 commit 1511486
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ final class CombinedDocValues {
long docVersion(int segmentDocId) throws IOException {
assert versionDV.docID() < segmentDocId;
if (versionDV.advanceExact(segmentDocId) == false) {
assert false : "DocValues for field [" + VersionFieldMapper.NAME + "] is not found";
throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found");
}
return versionDV.longValue();
Expand All @@ -55,19 +56,18 @@ long docVersion(int segmentDocId) throws IOException {
long docSeqNo(int segmentDocId) throws IOException {
assert seqNoDV.docID() < segmentDocId;
if (seqNoDV.advanceExact(segmentDocId) == false) {
assert false : "DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found";
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found");
}
return seqNoDV.longValue();
}

long docPrimaryTerm(int segmentDocId) throws IOException {
if (primaryTermDV == null) {
return -1L;
}
// We exclude non-root nested documents when querying changes, every returned document must have primary term.
assert primaryTermDV.docID() < segmentDocId;
// Use -1 for docs which don't have primary term. The caller considers those docs as nested docs.
if (primaryTermDV.advanceExact(segmentDocId) == false) {
return -1;
assert false : "DocValues for field [" + SeqNoFieldMapper.PRIMARY_TERM_NAME + "] is not found";
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.PRIMARY_TERM_NAME + "] is not found");
}
return primaryTermDV.longValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.lucene.index.ShuffleForcedMergePolicy;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
Expand All @@ -63,6 +65,7 @@
import org.elasticsearch.common.lucene.LoggerInfoStream;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
Expand Down Expand Up @@ -2731,8 +2734,12 @@ private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean a
private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryReader) throws IOException {
final IndexSearcher searcher = new IndexSearcher(directoryReader);
searcher.setQueryCache(null);
final Query query = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE);
final Weight weight = searcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 1.0f);
final Query query = new BooleanQuery.Builder()
.add(LongPoint.newRangeQuery(
SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE), BooleanClause.Occur.MUST)
.add(Queries.newNonNestedFilter(), BooleanClause.Occur.MUST) // exclude non-root nested documents
.build();
final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1.0f);
for (LeafReaderContext leaf : directoryReader.leaves()) {
final Scorer scorer = weight.scorer(leaf);
if (scorer == null) {
Expand All @@ -2744,9 +2751,6 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead
int docId;
while ((docId = iterator.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
final long primaryTerm = dv.docPrimaryTerm(docId);
if (primaryTerm == -1L) {
continue; // skip children docs which do not have primary term
}
final long seqNo = dv.docSeqNo(docId);
localCheckpointTracker.markSeqNoAsProcessed(seqNo);
localCheckpointTracker.markSeqNoAsPersisted(seqNo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
Expand All @@ -33,6 +35,7 @@
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
import org.elasticsearch.index.mapper.IdFieldMapper;
Expand Down Expand Up @@ -210,7 +213,10 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray
}

private TopDocs searchOperations(ScoreDoc after) throws IOException {
final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo);
final Query rangeQuery = new BooleanQuery.Builder()
.add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo), BooleanClause.Occur.MUST)
.add(Queries.newNonNestedFilter(), BooleanClause.Occur.MUST) // exclude non-root nested documents
.build();
final Sort sortedBySeqNo = new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG));
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNo);
}
Expand All @@ -219,11 +225,7 @@ private Translog.Operation readDocAsOp(int docIndex) throws IOException {
final LeafReaderContext leaf = parallelArray.leafReaderContexts[docIndex];
final int segmentDocID = scoreDocs[docIndex].doc - leaf.docBase;
final long primaryTerm = parallelArray.primaryTerm[docIndex];
// We don't have to read the nested child documents - those docs don't have primary terms.
if (primaryTerm == -1) {
skippedOperations++;
return null;
}
assert primaryTerm > 0 : "nested child document must be excluded";
final long seqNo = parallelArray.seqNo[docIndex];
// Only pick the first seen seq#
if (seqNo == lastSeenSeqNo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1551,12 +1551,12 @@ public void run() {

public void testVersioningCreateExistsException() throws IOException {
ParsedDocument doc = testParsedDocument("1", null, testDocument(), B_1, null);
Engine.Index create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
Engine.Index create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0);
Engine.IndexResult indexResult = engine.index(create);
assertThat(indexResult.getVersion(), equalTo(1L));

create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_DELETED,
create = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_DELETED,
VersionType.INTERNAL, PRIMARY, 0, -1, false, UNASSIGNED_SEQ_NO, 0);
indexResult = engine.index(create);
assertThat(indexResult.getResultType(), equalTo(Engine.Result.Type.FAILURE));
Expand Down Expand Up @@ -2796,7 +2796,7 @@ public void testTranslogReplay() throws IOException {
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
assertThat(indexResult.getVersion(), equalTo(1L));
Expand All @@ -2823,7 +2823,7 @@ public void testTranslogReplay() throws IOException {
final boolean flush = randomBoolean();
int randomId = randomIntBetween(numDocs + 1, numDocs + 10);
ParsedDocument doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 1,
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, 1,
VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
assertThat(indexResult.getVersion(), equalTo(1L));
Expand All @@ -2833,7 +2833,7 @@ public void testTranslogReplay() throws IOException {
}

doc = testParsedDocument(Integer.toString(randomId), null, testDocument(), new BytesArray("{}"), null);
Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, 2,
Engine.Index idxRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, 2,
VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
Engine.IndexResult result = engine.index(idxRequest);
engine.refresh("test");
Expand Down Expand Up @@ -2869,7 +2869,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
final int numDocs = randomIntBetween(1, 10);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), new BytesArray("{}"), null);
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false, UNASSIGNED_SEQ_NO, 0);
Engine.IndexResult index = engine.index(firstIndexRequest);
assertThat(index.getVersion(), equalTo(1L));
Expand Down Expand Up @@ -3402,7 +3402,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep
boolean isRetry = false;
long autoGeneratedIdTimestamp = 0;

Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
Engine.Index index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(),
autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
Engine.IndexResult indexResult = engine.index(index);
Expand All @@ -3414,7 +3414,7 @@ public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOExcep
assertThat(indexResult.getVersion(), equalTo(1L));

isRetry = true;
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL,
index = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, Versions.MATCH_ANY, VersionType.INTERNAL,
PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
indexResult = engine.index(index);
assertThat(indexResult.getVersion(), equalTo(1L));
Expand Down Expand Up @@ -3443,7 +3443,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs()
boolean isRetry = true;
long autoGeneratedIdTimestamp = 0;

Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
Engine.Index firstIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
randomBoolean() ? Versions.MATCH_DELETED : Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(),
autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO, 0);
Engine.IndexResult result = engine.index(firstIndexRequest);
Expand All @@ -3455,7 +3455,7 @@ public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs()
assertThat(indexReplicaResult.getVersion(), equalTo(1L));

isRetry = false;
Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0,
Engine.Index secondIndexRequest = new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1,
Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry, UNASSIGNED_SEQ_NO,
0);
Engine.IndexResult indexResult = engine.index(secondIndexRequest);
Expand Down Expand Up @@ -3485,7 +3485,7 @@ public Engine.Index randomAppendOnly(ParsedDocument doc, boolean retry, final lo
}

public Engine.Index appendOnlyPrimary(ParsedDocument doc, boolean retry, final long autoGeneratedIdTimestamp, boolean create) {
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 0, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY,
return new Engine.Index(newUid(doc), doc, UNASSIGNED_SEQ_NO, 1, create ? Versions.MATCH_DELETED : Versions.MATCH_ANY,
VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, retry,
UNASSIGNED_SEQ_NO, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,9 @@ searcher, mapperService, between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), f
public void testSkipNonRootOfNestedDocuments() throws Exception {
Map<Long, Long> seqNoToTerm = new HashMap<>();
List<Engine.Operation> operations = generateHistoryOnReplica(between(1, 100), randomBoolean(), randomBoolean(), randomBoolean());
int totalOps = 0;
for (Engine.Operation op : operations) {
if (engine.getLocalCheckpointTracker().hasProcessed(op.seqNo()) == false) {
seqNoToTerm.put(op.seqNo(), op.primaryTerm());
if (op instanceof Engine.Index) {
totalOps += ((Engine.Index) op).docs().size();
} else {
totalOps++;
}
}
applyOperation(engine, op);
if (rarely()) {
Expand All @@ -182,14 +176,12 @@ public void testSkipNonRootOfNestedDocuments() throws Exception {
engine.refresh("test");
Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, between(1, 100), 0, maxSeqNo, false)) {
searcher = null;
assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size()));
Translog.Operation op;
while ((op = snapshot.next()) != null) {
assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo())));
}
assertThat(snapshot.skippedOperations(), equalTo(totalOps - seqNoToTerm.size()));
} finally {
IOUtils.close(searcher);
assertThat(snapshot.skippedOperations(), equalTo(0));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public abstract class EngineTestCase extends ESTestCase {
protected Path primaryTranslogDir;
protected Path replicaTranslogDir;
// A default primary term is used by engine instances created in this test.
protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(0L);
protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(1L);

protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException {
assertVisibleCount(engine, numDocs, true);
Expand Down

0 comments on commit 1511486

Please sign in to comment.