Skip to content

Commit

Permalink
Re-structure collate option in PhraseSuggester to only collate on loc…
Browse files Browse the repository at this point in the history
…al shard.

Previously, collate feature would be executed on all shards of an index using the client,
this leads to a deadlock when concurrent collate requests are run from the _search API,
due to the fact that both the external request and internal collate requests use the
same search threadpool.

As phrase suggestions are generated from the terms of the local shard, in most cases the
generated suggestion, which does not yield a hit for the collate query on the local shard
would not yield a hit for collate query on non-local shards.

Instead of using the client for collating suggestions, collate query is executed against
the ContextIndexSearcher. This PR removes the ability to specify a preference for a collate
query, as the collate query is only run on the local shard.

closes elastic#9377
  • Loading branch information
areek committed May 14, 2015
1 parent 33fd250 commit 6bcedd9
Show file tree
Hide file tree
Showing 16 changed files with 110 additions and 148 deletions.
32 changes: 14 additions & 18 deletions docs/reference/search/suggesters/phrase-suggest.asciidoc
Expand Up @@ -163,20 +163,18 @@ can contain misspellings (See parameter descriptions below).

`collate`::
Checks each suggestion against the specified `query` or `filter` to
prune suggestions for which no matching docs exist in the index. Either
a `query` or a `filter` must be specified, and it is run as a
<<query-dsl-template-query,`template` query>>. The current suggestion is
automatically made available as the `{{suggestion}}` variable, which
should be used in your query/filter. You can still specify your own
template `params` -- the `suggestion` value will be added to the
variables you specify. You can specify a `preference` to control
on which shards the query is executed (see <<search-request-preference>>).
The default value is `_only_local`. Additionally, you can specify
a `prune` to control if all phrase suggestions will be
returned, when set to `true` the suggestions will have an additional
option `collate_match`, which will be `true` if matching documents
for the phrase was found, `false` otherwise. The default value for
`prune` is `false`.
prune suggestions for which no matching docs exist in the index.
The collate query for a suggestion is run only on the local shard from which
the suggestion has been generated from. Either a `query` or a `filter` must
be specified, and it is run as a <<query-dsl-template-query,`template` query>>.
The current suggestion is automatically made available as the `{{suggestion}}`
variable, which should be used in your query/filter. You can still specify
your own template `params` -- the `suggestion` value will be added to the
variables you specify. Additionally, you can specify a `prune` to control
if all phrase suggestions will be returned, when set to `true` the suggestions
will have an additional option `collate_match`, which will be `true` if
matching documents for the phrase was found, `false` otherwise.
The default value for `prune` is `false`.

[source,js]
--------------------------------------------------
Expand All @@ -199,8 +197,7 @@ curl -XPOST 'localhost:9200/_search' -d {
}
},
"params": {"field_name" : "title"}, <3>
"preference": "_primary", <4>
"prune": true <5>
"prune": true <4>
}
}
}
Expand All @@ -212,8 +209,7 @@ curl -XPOST 'localhost:9200/_search' -d {
of each suggestion.
<3> An additional `field_name` variable has been specified in
`params` and is used by the `match` query.
<4> The default `preference` has been changed to `_primary`.
<5> All suggestions will be returned with an extra `collate_match`
<4> All suggestions will be returned with an extra `collate_match`
option indicating whether the generated phrase matched any
document.

Expand Down
Expand Up @@ -130,27 +130,26 @@ protected SuggestResponse newResponse(SuggestRequest request, AtomicReferenceArr
protected ShardSuggestResponse shardOperation(ShardSuggestRequest request) {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.shardSafe(request.shardId().id());
final Engine.Searcher searcher = indexShard.acquireSearcher("suggest");
ShardSuggestService shardSuggestService = indexShard.shardSuggestService();
shardSuggestService.preSuggest();
long startTime = System.nanoTime();
XContentParser parser = null;
try {
try (Engine.Searcher searcher = indexShard.acquireSearcher("suggest")) {
BytesReference suggest = request.suggest();
if (suggest != null && suggest.length() > 0) {
parser = XContentFactory.xContent(suggest).createParser(suggest);
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("suggest content missing");
}
final SuggestionSearchContext context = suggestPhase.parseElement().parseInternal(parser, indexService.mapperService(), request.shardId().getIndex(), request.shardId().id());
final Suggest result = suggestPhase.execute(context, searcher.reader());
final SuggestionSearchContext context = suggestPhase.parseElement().parseInternal(parser, indexService.mapperService(),
indexService.queryParserService(), request.shardId().getIndex(), request.shardId().id());
final Suggest result = suggestPhase.execute(context, searcher.searcher());
return new ShardSuggestResponse(request.shardId(), result);
}
return new ShardSuggestResponse(request.shardId(), new Suggest());
} catch (Throwable ex) {
throw new ElasticsearchException("failed to execute suggest", ex);
} finally {
searcher.close();
if (parser != null) {
parser.close();
}
Expand Down
Expand Up @@ -22,8 +22,9 @@

import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.IndexQueryParserService;

public interface SuggestContextParser {
public SuggestionSearchContext.SuggestionContext parse(XContentParser parser, MapperService mapperService) throws IOException;
public SuggestionSearchContext.SuggestionContext parse(XContentParser parser, MapperService mapperService, IndexQueryParserService queryParserService) throws IOException;

}
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.suggest.SuggestionSearchContext.SuggestionContext;
Expand All @@ -44,11 +45,11 @@ public SuggestParseElement(Suggesters suggesters) {

@Override
public void parse(XContentParser parser, SearchContext context) throws Exception {
SuggestionSearchContext suggestionSearchContext = parseInternal(parser, context.mapperService(), context.shardTarget().index(), context.shardTarget().shardId());
SuggestionSearchContext suggestionSearchContext = parseInternal(parser, context.mapperService(), context.queryParserService(), context.shardTarget().index(), context.shardTarget().shardId());
context.suggest(suggestionSearchContext);
}

public SuggestionSearchContext parseInternal(XContentParser parser, MapperService mapperService, String index, int shardId) throws IOException {
public SuggestionSearchContext parseInternal(XContentParser parser, MapperService mapperService, IndexQueryParserService queryParserService, String index, int shardId) throws IOException {
SuggestionSearchContext suggestionSearchContext = new SuggestionSearchContext();
BytesRef globalText = null;
String fieldName = null;
Expand Down Expand Up @@ -86,7 +87,7 @@ public SuggestionSearchContext parseInternal(XContentParser parser, MapperServic
throw new IllegalArgumentException("Suggester[" + fieldName + "] not supported");
}
final SuggestContextParser contextParser = suggesters.get(fieldName).getContextParser();
suggestionContext = contextParser.parse(parser, mapperService);
suggestionContext = contextParser.parse(parser, mapperService, queryParserService);
}
}
if (suggestionContext != null) {
Expand Down
Expand Up @@ -19,7 +19,7 @@
package org.elasticsearch.search.suggest;

import com.google.common.collect.ImmutableMap;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.util.CharsRefBuilder;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.component.AbstractComponent;
Expand Down Expand Up @@ -71,18 +71,18 @@ public void execute(SearchContext context) {
if (suggest == null) {
return;
}
context.queryResult().suggest(execute(suggest, context.searcher().getIndexReader()));
context.queryResult().suggest(execute(suggest, context.searcher()));
}

public Suggest execute(SuggestionSearchContext suggest, IndexReader reader) {
public Suggest execute(SuggestionSearchContext suggest, IndexSearcher searcher) {
try {
CharsRefBuilder spare = new CharsRefBuilder();
final List<Suggestion<? extends Entry<? extends Option>>> suggestions = new ArrayList<>(suggest.suggestions().size());

for (Map.Entry<String, SuggestionSearchContext.SuggestionContext> entry : suggest.suggestions().entrySet()) {
SuggestionSearchContext.SuggestionContext suggestion = entry.getValue();
Suggester<SuggestionContext> suggester = suggestion.getSuggester();
Suggestion<? extends Entry<? extends Option>> result = suggester.execute(entry.getKey(), suggestion, reader, spare);
Suggestion<? extends Entry<? extends Option>> result = suggester.execute(entry.getKey(), suggestion, searcher, spare);
if (result != null) {
assert entry.getKey().equals(result.name);
suggestions.add(result);
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/org/elasticsearch/search/suggest/Suggester.java
Expand Up @@ -19,27 +19,28 @@

package org.elasticsearch.search.suggest;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.util.CharsRefBuilder;

import java.io.IOException;

public abstract class Suggester<T extends SuggestionSearchContext.SuggestionContext> {

protected abstract Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>>
innerExecute(String name, T suggestion, IndexReader indexReader, CharsRefBuilder spare) throws IOException;
innerExecute(String name, T suggestion, IndexSearcher searcher, CharsRefBuilder spare) throws IOException;

public abstract String[] names();

public abstract SuggestContextParser getContextParser();

public Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>>
execute(String name, T suggestion, IndexReader indexReader, CharsRefBuilder spare) throws IOException {
execute(String name, T suggestion, IndexSearcher searcher, CharsRefBuilder spare) throws IOException {
// #3469 We want to ignore empty shards
if (indexReader.numDocs() == 0) {

if (searcher.getIndexReader().numDocs() == 0) {
return null;
}
return innerExecute(name, suggestion, indexReader, spare);
return innerExecute(name, suggestion, searcher, spare);
}

}
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.core.CompletionFieldMapper;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.search.suggest.SuggestContextParser;
import org.elasticsearch.search.suggest.SuggestionSearchContext;
import org.elasticsearch.search.suggest.context.ContextMapping.ContextQuery;
Expand All @@ -48,7 +49,7 @@ public CompletionSuggestParser(CompletionSuggester completionSuggester) {
}

@Override
public SuggestionSearchContext.SuggestionContext parse(XContentParser parser, MapperService mapperService) throws IOException {
public SuggestionSearchContext.SuggestionContext parse(XContentParser parser, MapperService mapperService, IndexQueryParserService queryParserService) throws IOException {
XContentParser.Token token;
String fieldName = null;
CompletionSuggestionContext suggestion = new CompletionSuggestionContext(completionSuggester);
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.Terms;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.suggest.Lookup;
import org.apache.lucene.util.CharsRefBuilder;
import org.apache.lucene.util.CollectionUtil;
Expand All @@ -48,11 +49,11 @@ public class CompletionSuggester extends Suggester<CompletionSuggestionContext>

@Override
protected Suggest.Suggestion<? extends Suggest.Suggestion.Entry<? extends Suggest.Suggestion.Entry.Option>> innerExecute(String name,
CompletionSuggestionContext suggestionContext, IndexReader indexReader, CharsRefBuilder spare) throws IOException {
CompletionSuggestionContext suggestionContext, IndexSearcher searcher, CharsRefBuilder spare) throws IOException {
if (suggestionContext.mapper() == null || !(suggestionContext.mapper() instanceof CompletionFieldMapper)) {
throw new ElasticsearchException("Field [" + suggestionContext.getField() + "] is not a completion suggest field");
}

final IndexReader indexReader = searcher.getIndexReader();
CompletionSuggestion completionSuggestion = new CompletionSuggestion(name, suggestionContext.getSize());
spare.copyUTF8Bytes(suggestionContext.getText());

Expand Down
Expand Up @@ -60,7 +60,7 @@ public NoisyChannelSpellChecker(double nonErrorLikelihood, boolean requireUnigra
}

public Result getCorrections(TokenStream stream, final CandidateGenerator generator,
float maxErrors, int numCorrections, IndexReader reader, WordScorer wordScorer, BytesRef separator, float confidence, int gramSize) throws IOException {
float maxErrors, int numCorrections, WordScorer wordScorer, float confidence, int gramSize) throws IOException {

final List<CandidateSet> candidateSetsList = new ArrayList<>();
SuggestUtils.analyze(stream, new SuggestUtils.TokenConsumer() {
Expand Down Expand Up @@ -134,7 +134,7 @@ public void end() {
public Result getCorrections(Analyzer analyzer, BytesRef query, CandidateGenerator generator,
float maxErrors, int numCorrections, IndexReader reader, String analysisField, WordScorer scorer, float confidence, int gramSize) throws IOException {

return getCorrections(tokenStream(analyzer, query, new CharsRefBuilder(), analysisField), generator, maxErrors, numCorrections, reader, scorer, new BytesRef(" "), confidence, gramSize);
return getCorrections(tokenStream(analyzer, query, new CharsRefBuilder(), analysisField), generator, maxErrors, numCorrections, scorer, confidence, gramSize);

}

Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.index.analysis.ShingleTokenFilterFactory;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
Expand All @@ -49,8 +50,9 @@ public PhraseSuggestParser(PhraseSuggester suggester) {
}

@Override
public SuggestionSearchContext.SuggestionContext parse(XContentParser parser, MapperService mapperService) throws IOException {
public SuggestionSearchContext.SuggestionContext parse(XContentParser parser, MapperService mapperService, IndexQueryParserService queryParserService) throws IOException {
PhraseSuggestionContext suggestion = new PhraseSuggestionContext(suggester);
suggestion.setQueryParserService(queryParserService);
XContentParser.Token token;
String fieldName = null;
boolean gramSizeSet = false;
Expand Down Expand Up @@ -159,8 +161,6 @@ public SuggestionSearchContext.SuggestionContext parse(XContentParser parser, Ma
} else {
suggestion.setCollateFilterScript(compiledScript);
}
} else if ("preference".equals(fieldName)) {
suggestion.setPreference(parser.text());
} else if ("params".equals(fieldName)) {
suggestion.setCollateScriptParams(parser.map());
} else if ("prune".equals(fieldName)) {
Expand Down

0 comments on commit 6bcedd9

Please sign in to comment.