Skip to content

Commit

Permalink
Remove unnecessary result sorting in SearchPhaseController (#23321)
Browse files Browse the repository at this point in the history
In oder to use lucene's utilities to merge top docs the results
need to be passed in a dense array where the index corresponds to the shard index in
the result list. Yet, we were sorting results before merging them just to order them
in the incoming order again for the above mentioned reason. This change removes the
obsolet sort and prevents unnecessary materializing of results.
  • Loading branch information
s1monw committed Feb 23, 2017
1 parent 771fd1f commit 2f3f9b9
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,6 @@

public class SearchPhaseController extends AbstractComponent {

private static final Comparator<AtomicArray.Entry<? extends QuerySearchResultProvider>> QUERY_RESULT_ORDERING = (o1, o2) -> {
int i = o1.value.shardTarget().getIndex().compareTo(o2.value.shardTarget().getIndex());
if (i == 0) {
i = o1.value.shardTarget().getShardId().id() - o2.value.shardTarget().getShardId().id();
}
return i;
};

private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];

private final BigArrays bigArrays;
Expand Down Expand Up @@ -150,6 +142,9 @@ private static long optionalSum(long left, long right) {
* named completion suggestion across all shards. If more than one named completion suggestion is specified in the
* request, the suggest docs for a named suggestion are ordered by the suggestion name.
*
* Note: The order of the sorted score docs depends on the shard index in the result array if the merge process needs to disambiguate
* the result. In oder to obtain stable results the shard index (index of the result in the result array) must be the same.
*
* @param ignoreFrom Whether to ignore the from and sort all hits in each shard result.
* Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase.
* @param resultsArr Shard result holder
Expand All @@ -160,26 +155,31 @@ public ScoreDoc[] sortDocs(boolean ignoreFrom, AtomicArray<? extends QuerySearch
return EMPTY_DOCS;
}

final QuerySearchResult result;
boolean canOptimize = false;
QuerySearchResult result = null;
int shardIndex = -1;
if (results.size() == 1) {
canOptimize = true;
result = results.get(0).value.queryResult();
shardIndex = results.get(0).index;
} else {
boolean hasResult = false;
QuerySearchResult resultToOptimize = null;
// lets see if we only got hits from a single shard, if so, we can optimize...
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
if (entry.value.queryResult().hasHits()) {
if (result != null) { // we already have one, can't really optimize
if (hasResult) { // we already have one, can't really optimize
canOptimize = false;
break;
}
canOptimize = true;
result = entry.value.queryResult();
hasResult = true;
resultToOptimize = entry.value.queryResult();
shardIndex = entry.index;
}
}
result = canOptimize ? resultToOptimize : results.get(0).value.queryResult();
assert result != null;
}
if (canOptimize) {
int offset = result.from();
Expand Down Expand Up @@ -225,74 +225,62 @@ public ScoreDoc[] sortDocs(boolean ignoreFrom, AtomicArray<? extends QuerySearch
return docs;
}

@SuppressWarnings("unchecked")
AtomicArray.Entry<? extends QuerySearchResultProvider>[] sortedResults = results.toArray(new AtomicArray.Entry[results.size()]);
Arrays.sort(sortedResults, QUERY_RESULT_ORDERING);
QuerySearchResultProvider firstResult = sortedResults[0].value;

int topN = firstResult.queryResult().size();
int from = firstResult.queryResult().from();
if (ignoreFrom) {
from = 0;
}
final int topN = result.queryResult().size();
final int from = ignoreFrom ? 0 : result.queryResult().from();

final TopDocs mergedTopDocs;
int numShards = resultsArr.length();
if (firstResult.queryResult().topDocs() instanceof CollapseTopFieldDocs) {
CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) firstResult.queryResult().topDocs();
final int numShards = resultsArr.length();
if (result.queryResult().topDocs() instanceof CollapseTopFieldDocs) {
CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) result.queryResult().topDocs();
final Sort sort = new Sort(firstTopDocs.fields);

final CollapseTopFieldDocs[] shardTopDocs = new CollapseTopFieldDocs[numShards];
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
if (result.size() != shardTopDocs.length) {
// TopDocs#merge can't deal with null shard TopDocs
final CollapseTopFieldDocs empty = new CollapseTopFieldDocs(firstTopDocs.field, 0, new FieldDoc[0],
sort.getSort(), new Object[0], Float.NaN);
Arrays.fill(shardTopDocs, empty);
}
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : results) {
TopDocs topDocs = sortedResult.value.queryResult().topDocs();
// the 'index' field is the position in the resultsArr atomic array
shardTopDocs[sortedResult.index] = (CollapseTopFieldDocs) topDocs;
}
// TopDocs#merge can't deal with null shard TopDocs
for (int i = 0; i < shardTopDocs.length; ++i) {
if (shardTopDocs[i] == null) {
shardTopDocs[i] = new CollapseTopFieldDocs(firstTopDocs.field, 0, new FieldDoc[0],
sort.getSort(), new Object[0], Float.NaN);
}
}
mergedTopDocs = CollapseTopFieldDocs.merge(sort, from, topN, shardTopDocs);
} else if (firstResult.queryResult().topDocs() instanceof TopFieldDocs) {
TopFieldDocs firstTopDocs = (TopFieldDocs) firstResult.queryResult().topDocs();
} else if (result.queryResult().topDocs() instanceof TopFieldDocs) {
TopFieldDocs firstTopDocs = (TopFieldDocs) result.queryResult().topDocs();
final Sort sort = new Sort(firstTopDocs.fields);

final TopFieldDocs[] shardTopDocs = new TopFieldDocs[resultsArr.length()];
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
if (result.size() != shardTopDocs.length) {
// TopDocs#merge can't deal with null shard TopDocs
final TopFieldDocs empty = new TopFieldDocs(0, new FieldDoc[0], sort.getSort(), Float.NaN);
Arrays.fill(shardTopDocs, empty);
}
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : results) {
TopDocs topDocs = sortedResult.value.queryResult().topDocs();
// the 'index' field is the position in the resultsArr atomic array
shardTopDocs[sortedResult.index] = (TopFieldDocs) topDocs;
}
// TopDocs#merge can't deal with null shard TopDocs
for (int i = 0; i < shardTopDocs.length; ++i) {
if (shardTopDocs[i] == null) {
shardTopDocs[i] = new TopFieldDocs(0, new FieldDoc[0], sort.getSort(), Float.NaN);
}
}
mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs);
} else {
final TopDocs[] shardTopDocs = new TopDocs[resultsArr.length()];
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
if (result.size() != shardTopDocs.length) {
// TopDocs#merge can't deal with null shard TopDocs
Arrays.fill(shardTopDocs, Lucene.EMPTY_TOP_DOCS);
}
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : results) {
TopDocs topDocs = sortedResult.value.queryResult().topDocs();
// the 'index' field is the position in the resultsArr atomic array
shardTopDocs[sortedResult.index] = topDocs;
}
// TopDocs#merge can't deal with null shard TopDocs
for (int i = 0; i < shardTopDocs.length; ++i) {
if (shardTopDocs[i] == null) {
shardTopDocs[i] = Lucene.EMPTY_TOP_DOCS;
}
}
mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs);
}

ScoreDoc[] scoreDocs = mergedTopDocs.scoreDocs;
final Map<String, List<Suggestion<CompletionSuggestion.Entry>>> groupedCompletionSuggestions = new HashMap<>();
// group suggestions and assign shard index
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : results) {
Suggest shardSuggest = sortedResult.value.queryResult().suggest();
if (shardSuggest != null) {
for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testSort() throws Exception {
}
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<QuerySearchResultProvider> results = generateQueryResults(nShards, suggestions, queryResultSize);
AtomicArray<QuerySearchResultProvider> results = generateQueryResults(nShards, suggestions, queryResultSize, false);
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results);
int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results));
for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) {
Expand All @@ -83,14 +83,26 @@ public void testSort() throws Exception {
assertThat(sortedDocs.length, equalTo(accumulatedLength));
}

public void testSortIsIdempotent() throws IOException {
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<QuerySearchResultProvider> results = generateQueryResults(nShards, Collections.emptyList(), queryResultSize,
randomBoolean() || true);
boolean ignoreFrom = randomBoolean();
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results);

ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results);
assertArrayEquals(sortedDocs, sortedDocs2);
}

public void testMerge() throws IOException {
List<CompletionSuggestion> suggestions = new ArrayList<>();
for (int i = 0; i < randomIntBetween(1, 5); i++) {
suggestions.add(new CompletionSuggestion(randomAsciiOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20)));
}
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<QuerySearchResultProvider> queryResults = generateQueryResults(nShards, suggestions, queryResultSize);
AtomicArray<QuerySearchResultProvider> queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false);

// calculate offsets and score doc array
List<ScoreDoc> mergedScoreDocs = new ArrayList<>();
Expand Down Expand Up @@ -127,7 +139,7 @@ public void testMerge() throws IOException {

private AtomicArray<QuerySearchResultProvider> generateQueryResults(int nShards,
List<CompletionSuggestion> suggestions,
int searchHitsSize) {
int searchHitsSize, boolean useConstantScore) {
AtomicArray<QuerySearchResultProvider> queryResults = new AtomicArray<>(nShards);
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex,
Expand All @@ -138,7 +150,7 @@ private AtomicArray<QuerySearchResultProvider> generateQueryResults(int nShards,
ScoreDoc[] scoreDocs = new ScoreDoc[nDocs];
float maxScore = 0F;
for (int i = 0; i < nDocs; i++) {
float score = Math.abs(randomFloat());
float score = useConstantScore ? 1.0F : Math.abs(randomFloat());
scoreDocs[i] = new ScoreDoc(i, score);
if (score > maxScore) {
maxScore = score;
Expand Down

0 comments on commit 2f3f9b9

Please sign in to comment.