Skip to content

Commit

Permalink
Enable sort optimization in query Lucene changes (#77907)
Browse files Browse the repository at this point in the history
This change enables the sort optimization in LuceneChangesSnapshot
to speed up CCR and peer recoveries.

Backport of #77907
  • Loading branch information
dnhatn committed Sep 21, 2021
1 parent 6523f02 commit 3b1d7fe
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ public abstract Translog.Snapshot readHistoryOperations(String reason, HistorySo
* Returns the estimated number of history operations whose seq# at least {@code startingSeqNo}(inclusive) in this engine.
*/
public abstract int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) throws IOException;
long startingSeqNo) throws IOException;

/**
* Checks if this engine has every operations since {@code startingSeqNo}(inclusive) in its history (either Lucene or translog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ public Translog.Snapshot readHistoryOperations(String reason, HistorySource hist
MapperService mapperService, long startingSeqNo) throws IOException {
if (historySource == HistorySource.INDEX) {
ensureSoftDeletesEnabled();
return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false, false);
return newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false, false, true);
} else {
return getTranslog().newSnapshot(startingSeqNo, Long.MAX_VALUE);
}
Expand All @@ -568,13 +568,13 @@ public Translog.Snapshot readHistoryOperations(String reason, HistorySource hist
* Returns the estimated number of history operations whose seq# at least the provided seq# in this engine.
*/
@Override
public int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) throws IOException {
public int estimateNumberOfHistoryOperations(String reason, HistorySource historySource, long startingSeqNo) throws IOException {
if (historySource == HistorySource.INDEX) {
ensureSoftDeletesEnabled();
try (Translog.Snapshot snapshot =
newChangesSnapshot(reason, mapperService, Math.max(0, startingSeqNo), Long.MAX_VALUE, false, false)) {
return snapshot.totalOperations();
ensureOpen();
refresh(reason, SearcherScope.INTERNAL, true);
try (Searcher searcher = acquireSearcher(reason, SearcherScope.INTERNAL)) {
return LuceneChangesSnapshot.countOperations(searcher, startingSeqNo, Long.MAX_VALUE);
}
} else {
return getTranslog().estimateTotalOperationsFromMinSeq(startingSeqNo);
Expand Down Expand Up @@ -2710,15 +2710,15 @@ private void ensureSoftDeletesEnabled() {
}
}

Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService, long fromSeqNo, long toSeqNo,
boolean requiredFullRange, boolean singleConsumer) throws IOException {
final Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService, long fromSeqNo, long toSeqNo,
boolean requiredFullRange, boolean singleConsumer, boolean accessStats) throws IOException {
ensureSoftDeletesEnabled();
ensureOpen();
refreshIfNeeded(source, toSeqNo);
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
try {
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(
searcher, mapperService, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, fromSeqNo, toSeqNo, requiredFullRange, singleConsumer);
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE,
fromSeqNo, toSeqNo, requiredFullRange, singleConsumer, accessStats);
searcher = null;
return snapshot;
} catch (Exception e) {
Expand All @@ -2736,7 +2736,7 @@ Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
@Override
public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperService,
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange, true);
return newChangesSnapshot(source, mapperService, fromSeqNo, toSeqNo, requiredFullRange, true, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -60,6 +62,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
private final IndexSearcher indexSearcher;
private final MapperService mapperService;
private int docIndex = 0;
private final boolean accessStats;
private final int totalHits;
private ScoreDoc[] scoreDocs;
private final ParallelArray parallelArray;
Expand All @@ -80,9 +83,11 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
* @param toSeqNo the maximum requesting seq# - inclusive
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
* @param singleConsumer true if the snapshot is accessed by a single thread that creates the snapshot
* @param accessStats true if the stats of the snapshot can be accessed via {@link #totalOperations()}
*/
LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService, int searchBatchSize,
long fromSeqNo, long toSeqNo, boolean requiredFullRange, boolean singleConsumer) throws IOException {
long fromSeqNo, long toSeqNo, boolean requiredFullRange,
boolean singleConsumer, boolean accessStats) throws IOException {
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
}
Expand All @@ -104,10 +109,11 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
this.lastSeenSeqNo = fromSeqNo - 1;
this.requiredFullRange = requiredFullRange;
this.singleConsumer = singleConsumer;
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
this.indexSearcher = newIndexSearcher(engineSearcher);
this.indexSearcher.setQueryCache(null);
this.accessStats = accessStats;
this.parallelArray = new ParallelArray(this.searchBatchSize);
final TopDocs topDocs = searchOperations(null);
final TopDocs topDocs = searchOperations(null, accessStats);
this.totalHits = Math.toIntExact(topDocs.totalHits.value);
this.scoreDocs = topDocs.scoreDocs;
fillParallelArray(scoreDocs, parallelArray);
Expand All @@ -122,6 +128,9 @@ public void close() throws IOException {
@Override
public int totalOperations() {
assert assertAccessingThread();
if (accessStats == false) {
throw new IllegalStateException("Access stats of a snapshot created with [access_stats] is false");
}
return totalHits;
}

Expand Down Expand Up @@ -176,7 +185,7 @@ private int nextDocIndex() throws IOException {
// we have processed all docs in the current search - fetch the next batch
if (docIndex == scoreDocs.length && docIndex > 0) {
final ScoreDoc prev = scoreDocs[scoreDocs.length - 1];
scoreDocs = searchOperations(prev).scoreDocs;
scoreDocs = searchOperations((FieldDoc) prev, false).scoreDocs;
fillParallelArray(scoreDocs, parallelArray);
docIndex = 0;
}
Expand Down Expand Up @@ -243,14 +252,34 @@ private static boolean hasSequentialAccess(ScoreDoc[] scoreDocs) {
return true;
}

private TopDocs searchOperations(ScoreDoc after) throws IOException {
final Query rangeQuery = new BooleanQuery.Builder()
.add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo), BooleanClause.Occur.MUST)
private static IndexSearcher newIndexSearcher(Engine.Searcher engineSearcher) throws IOException {
return new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
}

private static Query rangeQuery(long fromSeqNo, long toSeqNo) {
return new BooleanQuery.Builder()
.add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST)
// exclude non-root nested documents
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.MUST)
.build();
final Sort sortedBySeqNo = new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG));
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNo);
}

static int countOperations(Engine.Searcher engineSearcher, long fromSeqNo, long toSeqNo) throws IOException {
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
}
return newIndexSearcher(engineSearcher).count(rangeQuery(fromSeqNo, toSeqNo));
}

private TopDocs searchOperations(FieldDoc after, boolean accurateTotalHits) throws IOException {
final Query rangeQuery = rangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo);
assert accurateTotalHits == false || after == null : "accurate total hits is required by the first batch only";
final SortField sortBySeqNo = new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG);
sortBySeqNo.setCanUsePoints();
final TopFieldCollector collector =
TopFieldCollector.create(new Sort(sortBySeqNo), searchBatchSize, after, accurateTotalHits ? Integer.MAX_VALUE : 0);
indexSearcher.search(rangeQuery, collector);
return collector.topDocs();
}

private Translog.Operation readDocAsOp(int docIndex) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,7 @@ public Translog.Snapshot readHistoryOperations(String reason, HistorySource hist
}

@Override
public int estimateNumberOfHistoryOperations(String reason, HistorySource historySource,
MapperService mapperService, long startingSeqNo) {
public int estimateNumberOfHistoryOperations(String reason, HistorySource historySource, long startingSeqNo) {
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2124,7 +2124,7 @@ public Closeable acquireHistoryRetentionLock(Engine.HistorySource source) {
* Returns the estimated number of history operations whose seq# at least the provided seq# in this shard.
*/
public int estimateNumberOfHistoryOperations(String reason, Engine.HistorySource source, long startingSeqNo) throws IOException {
return getEngine().estimateNumberOfHistoryOperations(reason, source, mapperService, startingSeqNo);
return getEngine().estimateNumberOfHistoryOperations(reason, source, startingSeqNo);
}

/**
Expand Down

0 comments on commit 3b1d7fe

Please sign in to comment.