Skip to content

Commit

Permalink
LUCENE-8939: Introduce Shared Count Early Termination In Parallel Sea…
Browse files Browse the repository at this point in the history
…rch (#823)

This commit introduces a strategy to early terminate for sorted
collections during parallel search when requested number of
hits have been collected but the total hits threshold has not
yet been reached.
  • Loading branch information
atris authored and jpountz committed Sep 5, 2019
1 parent a3cb9cb commit 4d82665
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 37 deletions.
5 changes: 5 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ Improvements
docs on equal scores. Also, remove the ability of TopDocs.merge to set shard
indices (Atri Sharma, Adrien Grand, Simon Willnauer)

* LUCENE-8939: Introduce shared count based early termination across multiple slices
(Atri Sharma)

* LUCENE-8958: Shared count early termination for relevance sorted indices (Atri Sharma)

* LUCENE-8937: Avoid agressive stemming on numbers in the FrenchMinimalStemmer.
(Adrien Gallou via Tomoko Uchida)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.lucene.search;

import java.util.concurrent.atomic.AtomicLong;

/**
* Used for defining custom algorithms to allow searches to early terminate
*/
abstract class HitsThresholdChecker {
/**
* Implementation of HitsThresholdChecker which allows global hit counting
*/
private static class GlobalHitsThresholdChecker extends HitsThresholdChecker {
private final int totalHitsThreshold;
private final AtomicLong globalHitCount;

public GlobalHitsThresholdChecker(int totalHitsThreshold) {

if (totalHitsThreshold < 0) {
throw new IllegalArgumentException("totalHitsThreshold must be >= 0, got " + totalHitsThreshold);
}

this.totalHitsThreshold = totalHitsThreshold;
this.globalHitCount = new AtomicLong();
}

@Override
public void incrementHitCount() {
globalHitCount.incrementAndGet();
}

@Override
public boolean isThresholdReached(){
return globalHitCount.getAcquire() > totalHitsThreshold;
}

@Override
public ScoreMode scoreMode() {
return totalHitsThreshold == Integer.MAX_VALUE ? ScoreMode.COMPLETE : ScoreMode.TOP_SCORES;
}

@Override
public int getHitsThreshold() {
return totalHitsThreshold;
}
}

/**
* Default implementation of HitsThresholdChecker to be used for single threaded execution
*/
private static class LocalHitsThresholdChecker extends HitsThresholdChecker {
private final int totalHitsThreshold;
private int hitCount;

public LocalHitsThresholdChecker(int totalHitsThreshold) {

if (totalHitsThreshold < 0) {
throw new IllegalArgumentException("totalHitsThreshold must be >= 0, got " + totalHitsThreshold);
}

this.totalHitsThreshold = totalHitsThreshold;
}

@Override
public void incrementHitCount() {
++hitCount;
}

@Override
public boolean isThresholdReached() {
return hitCount > totalHitsThreshold;
}

@Override
public ScoreMode scoreMode() {
return totalHitsThreshold == Integer.MAX_VALUE ? ScoreMode.COMPLETE : ScoreMode.TOP_SCORES;
}

@Override
public int getHitsThreshold() {
return totalHitsThreshold;
}
}

/*
* Returns a threshold checker that is useful for single threaded searches
*/
public static HitsThresholdChecker create(final int totalHitsThreshold) {
return new LocalHitsThresholdChecker(totalHitsThreshold);
}

/*
* Returns a threshold checker that is based on a shared counter
*/
public static HitsThresholdChecker createShared(final int totalHitsThreshold) {
return new GlobalHitsThresholdChecker(totalHitsThreshold);
}

public abstract void incrementHitCount();
public abstract ScoreMode scoreMode();
public abstract int getHitsThreshold();
public abstract boolean isThresholdReached();
}
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,11 @@ public TopDocs searchAfter(ScoreDoc after, Query query, int numHits) throws IOEx

final CollectorManager<TopScoreDocCollector, TopDocs> manager = new CollectorManager<TopScoreDocCollector, TopDocs>() {

private final HitsThresholdChecker hitsThresholdChecker = (executor == null || leafSlices.length <= 1) ? HitsThresholdChecker.create(TOTAL_HITS_THRESHOLD) :
HitsThresholdChecker.createShared(TOTAL_HITS_THRESHOLD);
@Override
public TopScoreDocCollector newCollector() throws IOException {
return TopScoreDocCollector.create(cappedNumHits, after, TOTAL_HITS_THRESHOLD);
return TopScoreDocCollector.create(cappedNumHits, after, hitsThresholdChecker);
}

@Override
Expand Down Expand Up @@ -595,10 +597,13 @@ private TopFieldDocs searchAfter(FieldDoc after, Query query, int numHits, Sort

final CollectorManager<TopFieldCollector, TopFieldDocs> manager = new CollectorManager<TopFieldCollector, TopFieldDocs>() {

private final HitsThresholdChecker hitsThresholdChecker = (executor == null || leafSlices.length <= 1) ? HitsThresholdChecker.create(TOTAL_HITS_THRESHOLD) :
HitsThresholdChecker.createShared(TOTAL_HITS_THRESHOLD);

@Override
public TopFieldCollector newCollector() throws IOException {
// TODO: don't pay the price for accurate hit counts by default
return TopFieldCollector.create(rewrittenSort, cappedNumHits, after, TOTAL_HITS_THRESHOLD);
return TopFieldCollector.create(rewrittenSort, cappedNumHits, after, hitsThresholdChecker);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -96,12 +97,12 @@ private static boolean canEarlyTerminateOnPrefix(Sort searchSort, Sort indexSort
* document scores and maxScore.
*/
private static class SimpleFieldCollector extends TopFieldCollector {

final Sort sort;
final FieldValueHitQueue<Entry> queue;

public SimpleFieldCollector(Sort sort, FieldValueHitQueue<Entry> queue, int numHits, int totalHitsThreshold) {
super(queue, numHits, totalHitsThreshold, sort.needsScores());
public SimpleFieldCollector(Sort sort, FieldValueHitQueue<Entry> queue, int numHits,
HitsThresholdChecker hitsThresholdChecker) {
super(queue, numHits, hitsThresholdChecker, sort.needsScores());
this.sort = sort;
this.queue = queue;
}
Expand All @@ -128,13 +129,14 @@ public void setScorer(Scorable scorer) throws IOException {
@Override
public void collect(int doc) throws IOException {
++totalHits;
hitsThresholdChecker.incrementHitCount();
if (queueFull) {
if (collectedAllCompetitiveHits || reverseMul * comparator.compareBottom(doc) <= 0) {
// since docs are visited in doc Id order, if compare is 0, it means
// this document is largest than anything else in the queue, and
// therefore not competitive.
if (canEarlyTerminate) {
if (totalHits > totalHitsThreshold) {
if (hitsThresholdChecker.isThresholdReached()) {
totalHitsRelation = Relation.GREATER_THAN_OR_EQUAL_TO;
throw new CollectionTerminatedException();
} else {
Expand Down Expand Up @@ -181,15 +183,13 @@ private final static class PagingFieldCollector extends TopFieldCollector {
int collectedHits;
final FieldValueHitQueue<Entry> queue;
final FieldDoc after;
final int totalHitsThreshold;

public PagingFieldCollector(Sort sort, FieldValueHitQueue<Entry> queue, FieldDoc after, int numHits,
int totalHitsThreshold) {
super(queue, numHits, totalHitsThreshold, sort.needsScores());
HitsThresholdChecker hitsThresholdChecker) {
super(queue, numHits, hitsThresholdChecker, sort.needsScores());
this.sort = sort;
this.queue = queue;
this.after = after;
this.totalHitsThreshold = totalHitsThreshold;

FieldComparator<?>[] comparators = queue.comparators;
// Tell all comparators their top value:
Expand Down Expand Up @@ -221,6 +221,7 @@ public void collect(int doc) throws IOException {
//System.out.println(" collect doc=" + doc);

totalHits++;
hitsThresholdChecker.incrementHitCount();

if (queueFull) {
// Fastmatch: return if this hit is no better than
Expand All @@ -230,7 +231,7 @@ public void collect(int doc) throws IOException {
// this document is largest than anything else in the queue, and
// therefore not competitive.
if (canEarlyTerminate) {
if (totalHits > totalHitsThreshold) {
if (hitsThresholdChecker.isThresholdReached()) {
totalHitsRelation = Relation.GREATER_THAN_OR_EQUAL_TO;
throw new CollectionTerminatedException();
} else {
Expand Down Expand Up @@ -282,7 +283,7 @@ public void collect(int doc) throws IOException {
private static final ScoreDoc[] EMPTY_SCOREDOCS = new ScoreDoc[0];

final int numHits;
final int totalHitsThreshold;
final HitsThresholdChecker hitsThresholdChecker;
final FieldComparator.RelevanceComparator firstComparator;
final boolean canSetMinScore;
final int numComparators;
Expand All @@ -297,17 +298,18 @@ public void collect(int doc) throws IOException {
// internal versions. If someone will define a constructor with any other
// visibility, then anyone will be able to extend the class, which is not what
// we want.
private TopFieldCollector(FieldValueHitQueue<Entry> pq, int numHits, int totalHitsThreshold, boolean needsScores) {
private TopFieldCollector(FieldValueHitQueue<Entry> pq, int numHits,
HitsThresholdChecker hitsThresholdChecker, boolean needsScores) {
super(pq);
this.needsScores = needsScores;
this.numHits = numHits;
this.totalHitsThreshold = totalHitsThreshold;
this.hitsThresholdChecker = hitsThresholdChecker;
this.numComparators = pq.getComparators().length;
FieldComparator<?> fieldComparator = pq.getComparators()[0];
int reverseMul = pq.reverseMul[0];
if (fieldComparator.getClass().equals(FieldComparator.RelevanceComparator.class)
&& reverseMul == 1 // if the natural sort is preserved (sort by descending relevance)
&& totalHitsThreshold != Integer.MAX_VALUE) {
&& hitsThresholdChecker.getHitsThreshold() != Integer.MAX_VALUE) {
firstComparator = (FieldComparator.RelevanceComparator) fieldComparator;
scoreMode = ScoreMode.TOP_SCORES;
canSetMinScore = true;
Expand All @@ -324,7 +326,7 @@ public ScoreMode scoreMode() {
}

protected void updateMinCompetitiveScore(Scorable scorer) throws IOException {
if (canSetMinScore && totalHits > totalHitsThreshold && queueFull) {
if (canSetMinScore && hitsThresholdChecker.isThresholdReached() && queueFull) {
assert bottom != null && firstComparator != null;
float minScore = firstComparator.value(bottom.slot);
scorer.setMinCompetitiveScore(minScore);
Expand Down Expand Up @@ -382,8 +384,19 @@ public static TopFieldCollector create(Sort sort, int numHits, int totalHitsThre
* @return a {@link TopFieldCollector} instance which will sort the results by
* the sort criteria.
*/
public static TopFieldCollector create(Sort sort, int numHits, FieldDoc after,
int totalHitsThreshold) {
public static TopFieldCollector create(Sort sort, int numHits, FieldDoc after, int totalHitsThreshold) {
if (totalHitsThreshold < 0) {
throw new IllegalArgumentException("totalHitsThreshold must be >= 0, got " + totalHitsThreshold);
}

return create(sort, numHits, after, HitsThresholdChecker.create(totalHitsThreshold));
}

/**
* Same as above with an additional parameter to allow passing in the threshold checker
*/
static TopFieldCollector create(Sort sort, int numHits, FieldDoc after,
HitsThresholdChecker hitsThresholdChecker) {

if (sort.fields.length == 0) {
throw new IllegalArgumentException("Sort must contain at least one field");
Expand All @@ -393,14 +406,14 @@ public static TopFieldCollector create(Sort sort, int numHits, FieldDoc after,
throw new IllegalArgumentException("numHits must be > 0; please use TotalHitCountCollector if you just need the total hit count");
}

if (totalHitsThreshold < 0) {
throw new IllegalArgumentException("totalHitsThreshold must be >= 0, got " + totalHitsThreshold);
if (hitsThresholdChecker == null) {
throw new IllegalArgumentException("hitsThresholdChecker should not be null");
}

FieldValueHitQueue<Entry> queue = FieldValueHitQueue.create(sort.fields, numHits);

if (after == null) {
return new SimpleFieldCollector(sort, queue, numHits, totalHitsThreshold);
return new SimpleFieldCollector(sort, queue, numHits, hitsThresholdChecker);
} else {
if (after.fields == null) {
throw new IllegalArgumentException("after.fields wasn't set; you must pass fillFields=true for the previous search");
Expand All @@ -410,10 +423,36 @@ public static TopFieldCollector create(Sort sort, int numHits, FieldDoc after,
throw new IllegalArgumentException("after.fields has " + after.fields.length + " values but sort has " + sort.getSort().length);
}

return new PagingFieldCollector(sort, queue, after, numHits, totalHitsThreshold);
return new PagingFieldCollector(sort, queue, after, numHits, hitsThresholdChecker);
}
}

/**
* Create a CollectorManager which uses a shared hit counter to maintain number of hits
*/
public static CollectorManager<TopFieldCollector, TopFieldDocs> createSharedManager(Sort sort, int numHits, FieldDoc after,
int totalHitsThreshold) {
return new CollectorManager<>() {

private final HitsThresholdChecker hitsThresholdChecker = HitsThresholdChecker.createShared(totalHitsThreshold);

@Override
public TopFieldCollector newCollector() throws IOException {
return create(sort, numHits, after, hitsThresholdChecker);
}

@Override
public TopFieldDocs reduce(Collection<TopFieldCollector> collectors) throws IOException {
final TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()];
int i = 0;
for (TopFieldCollector collector : collectors) {
topDocs[i++] = collector.topDocs();
}
return TopDocs.merge(sort, 0, numHits, topDocs);
}
};
}

/**
* Populate {@link ScoreDoc#score scores} of the given {@code topDocs}.
* @param topDocs the top docs to populate
Expand Down

0 comments on commit 4d82665

Please sign in to comment.