Skip to content

Commit

Permalink
Completion suggestions to be reduced once instead of twice (#39255)
Browse files Browse the repository at this point in the history
We have been calling `reduce` against completion suggestions twice, once
in `SearchPhaseController#reducedQueryPhase` where all suggestions get
reduced, and once more in `SearchPhaseController#sortDocs` where we
add the top completion suggestions to the `TopDocs` so their docs can
be fetched. There is no need to do reduction twice. All suggestions can
be reduced in one call, then we can filter the result and pass only the
already reduced completion suggestions over to `sortDocs`. The small
important detail is that `shardIndex`, which is currently used only
to fetch suggestions hits, needs to be set before the first reduction,
hence outside of `sortDocs` where we have been doing it until now.
  • Loading branch information
javanna committed Feb 26, 2019
1 parent 2b9ee3a commit 23fbf6e
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.Suggest.Suggestion;
import org.elasticsearch.search.suggest.Suggest.Suggestion.Entry;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;

import java.util.ArrayList;
Expand Down Expand Up @@ -155,12 +154,12 @@ public AggregatedDfs aggregateDfs(Collection<DfsSearchResult> results) {
* @param size the number of hits to return from the merged top docs
*/
static SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPhaseResult> results,
final Collection<TopDocs> bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size) {
final Collection<TopDocs> bufferedTopDocs, final TopDocsStats topDocsStats, int from, int size,
List<CompletionSuggestion> reducedCompletionSuggestions) {
if (results.isEmpty()) {
return SortedTopDocs.EMPTY;
}
final Collection<TopDocs> topDocs = bufferedTopDocs == null ? new ArrayList<>() : bufferedTopDocs;
final Map<String, List<Suggestion<CompletionSuggestion.Entry>>> groupedCompletionSuggestions = new HashMap<>();
for (SearchPhaseResult sortedResult : results) { // TODO we can move this loop into the reduce call to only loop over this once
/* We loop over all results once, group together the completion suggestions if there are any and collect relevant
* top docs results. Each top docs gets it's shard index set on all top docs to simplify top docs merging down the road
Expand All @@ -177,36 +176,22 @@ static SortedTopDocs sortDocs(boolean ignoreFrom, Collection<? extends SearchPha
topDocs.add(td.topDocs);
}
}
if (queryResult.hasSuggestHits()) {
Suggest shardSuggest = queryResult.suggest();
for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {
suggestion.setShardIndex(sortedResult.getShardIndex());
List<Suggestion<CompletionSuggestion.Entry>> suggestions =
groupedCompletionSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
suggestions.add(suggestion);
}
}
}
final boolean hasHits = (groupedCompletionSuggestions.isEmpty() && topDocs.isEmpty()) == false;
final boolean hasHits = (reducedCompletionSuggestions.isEmpty() && topDocs.isEmpty()) == false;
if (hasHits) {
final TopDocs mergedTopDocs = mergeTopDocs(topDocs, size, ignoreFrom ? 0 : from);
final ScoreDoc[] mergedScoreDocs = mergedTopDocs == null ? EMPTY_DOCS : mergedTopDocs.scoreDocs;
ScoreDoc[] scoreDocs = mergedScoreDocs;
if (groupedCompletionSuggestions.isEmpty() == false) {
if (reducedCompletionSuggestions.isEmpty() == false) {
int numSuggestDocs = 0;
List<Suggestion<? extends Entry<? extends Entry.Option>>> completionSuggestions =
new ArrayList<>(groupedCompletionSuggestions.size());
for (List<Suggestion<CompletionSuggestion.Entry>> groupedSuggestions : groupedCompletionSuggestions.values()) {
final CompletionSuggestion completionSuggestion = CompletionSuggestion.reduceTo(groupedSuggestions);
for (CompletionSuggestion completionSuggestion : reducedCompletionSuggestions) {
assert completionSuggestion != null;
numSuggestDocs += completionSuggestion.getOptions().size();
completionSuggestions.add(completionSuggestion);
}
scoreDocs = new ScoreDoc[mergedScoreDocs.length + numSuggestDocs];
System.arraycopy(mergedScoreDocs, 0, scoreDocs, 0, mergedScoreDocs.length);
int offset = mergedScoreDocs.length;
Suggest suggestions = new Suggest(completionSuggestions);
for (CompletionSuggestion completionSuggestion : suggestions.filter(CompletionSuggestion.class)) {
for (CompletionSuggestion completionSuggestion : reducedCompletionSuggestions) {
for (CompletionSuggestion.Entry.Option option : completionSuggestion.getOptions()) {
scoreDocs[offset++] = option.getDoc();
}
Expand Down Expand Up @@ -479,6 +464,10 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
for (Suggestion<? extends Suggestion.Entry<? extends Suggestion.Entry.Option>> suggestion : result.suggest()) {
List<Suggestion> suggestionList = groupedSuggestions.computeIfAbsent(suggestion.getName(), s -> new ArrayList<>());
suggestionList.add(suggestion);
if (suggestion instanceof CompletionSuggestion) {
CompletionSuggestion completionSuggestion = (CompletionSuggestion) suggestion;
completionSuggestion.setShardIndex(result.getShardIndex());
}
}
}
if (consumeAggs) {
Expand All @@ -489,15 +478,24 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
profileResults.put(key, result.consumeProfileResult());
}
}
final Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
final Suggest reducedSuggest;
final List<CompletionSuggestion> reducedCompletionSuggestions;
if (groupedSuggestions.isEmpty()) {
reducedSuggest = null;
reducedCompletionSuggestions = Collections.emptyList();
} else {
reducedSuggest = new Suggest(Suggest.reduce(groupedSuggestions));
reducedCompletionSuggestions = reducedSuggest.filter(CompletionSuggestion.class);
}
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
reducedCompletionSuggestions);
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
topDocsStats.timedOut, topDocsStats.terminatedEarly, suggest, aggregations, shardResults, sortedTopDocs,
topDocsStats.timedOut, topDocsStats.terminatedEarly, reducedSuggest, aggregations, shardResults, sortedTopDocs,
firstResult.sortValueFormats(), numReducePhases, size, from, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public static Suggest fromXContent(XContentParser parser) throws IOException {

public static List<Suggestion<? extends Entry<? extends Option>>> reduce(Map<String, List<Suggest.Suggestion>> groupedSuggestions) {
List<Suggestion<? extends Entry<? extends Option>>> reduced = new ArrayList<>(groupedSuggestions.size());
for (java.util.Map.Entry<String, List<Suggestion>> unmergedResults : groupedSuggestions.entrySet()) {
for (Map.Entry<String, List<Suggestion>> unmergedResults : groupedSuggestions.entrySet()) {
List<Suggestion> value = unmergedResults.getValue();
Class<? extends Suggestion> suggestionClass = null;
for (Suggestion suggestion : value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,8 @@ boolean advanceToNextOption() {
}
}

/**
* Reduces suggestions to a single suggestion containing at most
* top {@link CompletionSuggestion#getSize()} options across <code>toReduce</code>
*/
public static CompletionSuggestion reduceTo(List<Suggest.Suggestion<Entry>> toReduce) {
@Override
public CompletionSuggestion reduce(List<Suggest.Suggestion<Entry>> toReduce) {
if (toReduce.isEmpty()) {
return null;
} else {
Expand Down Expand Up @@ -209,7 +206,7 @@ public static CompletionSuggestion reduceTo(List<Suggest.Suggestion<Entry>> toRe
pq.pop();
}
if (leader.skipDuplicates == false ||
seenSurfaceForms.add(current.getText().toString())) {
seenSurfaceForms.add(current.getText().toString())) {
options.add(current);
if (options.size() >= size) {
break;
Expand All @@ -223,11 +220,6 @@ public static CompletionSuggestion reduceTo(List<Suggest.Suggestion<Entry>> toRe
}
}

@Override
public Suggest.Suggestion<Entry> reduce(List<Suggest.Suggestion<Entry>> toReduce) {
return reduceTo(toReduce);
}

public void setShardIndex(int shardIndex) {
if (entries.isEmpty() == false) {
for (Entry.Option option : getOptions()) {
Expand Down
Loading

0 comments on commit 23fbf6e

Please sign in to comment.