Skip to content

Commit

Permalink
Post filter phrase suggestions
Browse files Browse the repository at this point in the history
This implementation has a bunch of problems that'll need to be worked
before it is a valid candidate for merging.  I don't have time to rebase
it right now but would still love the feedback on problem.  The ones I
remember:

1.  It performs the filtering by blocking the suggesting thread.
2.  Because there is no "exists" query type it uses a limit.  I now know
that isn't ass efficient as just using a count but it might be worth
implementing an exists query type for it any way.
3.  It feels like there are a lot of plumbing changes required for this
feature.  My guess is that is because I'm going about it wrong.  This
correlates with #1 pretty well.
4.  I have to wrap the filter through the map nodes and parse it during
the reduce step.  That feels silly.

Closes elastic#3482
  • Loading branch information
nik9000 committed Nov 4, 2013
1 parent fddb742 commit 53cbce7
Show file tree
Hide file tree
Showing 20 changed files with 419 additions and 35 deletions.
Expand Up @@ -73,7 +73,7 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest requ
@Override
protected void moveToSecondPhase() throws Exception {
// no need to sort, since we know we have no hits back
final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty());
final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty(), request);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, null);
Expand Down
Expand Up @@ -180,7 +180,7 @@ void finishHim() {

void innerFinishHim() throws Exception {
sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults, request);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
Expand Down
Expand Up @@ -289,7 +289,7 @@ void finishHim() {
}

void innerFinishHim() throws Exception {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults, request);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
Expand Down
Expand Up @@ -86,7 +86,7 @@ protected void moveToSecondPhase() throws Exception {

private void innerFinishHim() throws IOException {
sortedShardList = searchPhaseController.sortDocs(firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, firstResults, request);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, null);
Expand Down
Expand Up @@ -190,7 +190,7 @@ void finishHim() {
}

void innerFinishHim() throws Exception {
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults);
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults, fetchResults, request);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
Expand Down
Expand Up @@ -70,7 +70,7 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest requ

@Override
protected void moveToSecondPhase() throws Exception {
final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty());
final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults, (AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty(), request);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, ImmutableMap.of("total_hits", Long.toString(internalResponse.hits().totalHits())));
Expand Down
Expand Up @@ -242,7 +242,7 @@ private void finishHim() {

private void innerFinishHim() {
ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults, null);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
Expand Down
Expand Up @@ -273,7 +273,7 @@ private void finishHim() {
}

private void innerFinishHim() {
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults, null);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();
Expand Down
Expand Up @@ -260,7 +260,7 @@ private void innerFinishHim() throws IOException {
docs[counter++] = scoreDoc;
}
}
final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(docs, queryFetchResults, queryFetchResults, null);
((InternalSearchHits) internalResponse.hits()).totalHits = Long.parseLong(this.scrollId.getAttributes().get("total_hits"));


Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -62,13 +63,16 @@ public class TransportSuggestAction extends TransportBroadcastOperationAction<Su
private final IndicesService indicesService;

private final SuggestPhase suggestPhase;

private final Client client;

@Inject
public TransportSuggestAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, SuggestPhase suggestPhase) {
IndicesService indicesService, SuggestPhase suggestPhase, Client client) {
super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService;
this.suggestPhase = suggestPhase;
this.client = client;
}

@Override
Expand Down Expand Up @@ -142,7 +146,10 @@ protected SuggestResponse newResponse(SuggestRequest request, AtomicReferenceArr
}
}

return new SuggestResponse(new Suggest(Suggest.reduce(groupedSuggestions)), shardsResponses.length(), successfulShards, failedShards, shardFailures);
Suggest.ReduceContext reduceContext = new Suggest.ReduceContext(client, request.indices());
reduceContext.setPreference(request.preference());
reduceContext.setRouting(request.routing());
return new SuggestResponse(new Suggest(Suggest.reduce(groupedSuggestions, reduceContext)), shardsResponses.length(), successfulShards, failedShards, shardFailures);
}

@Override
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.query;

import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.geo.ShapeRelation;
import org.elasticsearch.common.geo.builders.ShapeBuilder;
Expand Down Expand Up @@ -553,6 +554,10 @@ public static WrapperFilterBuilder wrapperFilter(String filter) {
public static WrapperFilterBuilder wrapperFilter(byte[] data, int offset, int length) {
return new WrapperFilterBuilder(data, offset, length);
}

public static WrapperFilterBuilder wrapperFilter(BytesReference bytes) {
return new WrapperFilterBuilder(bytes);
}

private FilterBuilders() {

Expand Down
Expand Up @@ -25,7 +25,8 @@
* Time: 11:30
*/

import com.google.common.base.Charsets;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
Expand All @@ -36,27 +37,24 @@
* query builders.
*/
public class WrapperFilterBuilder extends BaseFilterBuilder {

private final byte[] source;
private final int offset;
private final int length;
private final BytesReference bytes;

public WrapperFilterBuilder(String source) {
this.source = source.getBytes(Charsets.UTF_8);
this.offset = 0;
this.length = this.source.length;
this(new BytesArray(source));
}

public WrapperFilterBuilder(byte[] source, int offset, int length) {
this.source = source;
this.offset = offset;
this.length = length;
this(new BytesArray(source, offset, length));
}

public WrapperFilterBuilder(BytesReference bytes) {
this.bytes = bytes;
}

@Override
protected void doXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(WrapperFilterParser.NAME);
builder.field("filter", source, offset, length);
builder.field("filter", bytes);
builder.endObject();
}
}
Expand Up @@ -23,7 +23,9 @@
import org.apache.lucene.index.Term;
import org.apache.lucene.search.*;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cache.recycler.CacheRecycler;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.XMaps;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -66,12 +68,14 @@ public int compare(AtomicArray.Entry<? extends QuerySearchResultProvider> o1, At

private final CacheRecycler cacheRecycler;
private final boolean optimizeSingleShard;
private final Client client;

@Inject
public SearchPhaseController(Settings settings, CacheRecycler cacheRecycler) {
public SearchPhaseController(Settings settings, CacheRecycler cacheRecycler, Client client) {
super(settings);
this.cacheRecycler = cacheRecycler;
this.optimizeSingleShard = componentSettings.getAsBoolean("optimize_single_shard", true);
this.client = client;
}

public boolean optimizeSingleShard() {
Expand Down Expand Up @@ -296,7 +300,7 @@ public void fillDocIdsToLoad(AtomicArray<ExtTIntArrayList> docsIdsToLoad, ScoreD
}
}

public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray<? extends QuerySearchResultProvider> queryResultsArr, AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr) {
public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray<? extends QuerySearchResultProvider> queryResultsArr, AtomicArray<? extends FetchSearchResultProvider> fetchResultsArr, SearchRequest request) {

List<? extends AtomicArray.Entry<? extends QuerySearchResultProvider>> queryResults = queryResultsArr.asList();
List<? extends AtomicArray.Entry<? extends FetchSearchResultProvider>> fetchResults = fetchResultsArr.asList();
Expand Down Expand Up @@ -411,7 +415,14 @@ public InternalSearchResponse merge(ScoreDoc[] sortedDocs, AtomicArray<? extends
Suggest.group(groupedSuggestions, shardResult);
}

suggest = hasSuggestions ? new Suggest(Suggest.Fields.SUGGEST, Suggest.reduce(groupedSuggestions)) : null;
Suggest.ReduceContext reduceContext = null;
if (request != null) {
reduceContext = new Suggest.ReduceContext(client, request.indices());
reduceContext.setPreference(request.preference());
reduceContext.setRouting(request.routing());
reduceContext.setTypes(request.types());
}
suggest = hasSuggestions ? new Suggest(Suggest.Fields.SUGGEST, Suggest.reduce(groupedSuggestions, reduceContext)) : null;
}

InternalSearchHits searchHits = new InternalSearchHits(hits.toArray(new InternalSearchHit[hits.size()]), totalHits, maxScore);
Expand Down
62 changes: 60 additions & 2 deletions src/main/java/org/elasticsearch/search/suggest/Suggest.java
Expand Up @@ -21,6 +21,9 @@
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
Expand Down Expand Up @@ -176,11 +179,12 @@ public static Map<String, List<Suggest.Suggestion>> group(Map<String, List<Sugge
return groupedSuggestions;
}

public static List<Suggestion<? extends Entry<? extends Option>>> reduce(Map<String, List<Suggest.Suggestion>> groupedSuggestions) {
public static List<Suggestion<? extends Entry<? extends Option>>> reduce(Map<String, List<Suggest.Suggestion>> groupedSuggestions, ReduceContext context) {
List<Suggestion<? extends Entry<? extends Option>>> reduced = new ArrayList<Suggestion<? extends Entry<? extends Option>>>(groupedSuggestions.size());
for (java.util.Map.Entry<String, List<Suggestion>> unmergedResults : groupedSuggestions.entrySet()) {
List<Suggestion> value = unmergedResults.getValue();
Suggestion reduce = value.get(0).reduce(value);
reduce.filter(context);
reduce.trim();
reduced.add(reduce);
}
Expand Down Expand Up @@ -265,7 +269,14 @@ public Suggestion<T> reduce(List<Suggestion<T>> toReduce) {
protected Comparator<Option> sortComparator() {
return COMPARATOR;
}


/**
* Filter options after they've been reduced.
*/
protected void filter(ReduceContext context) {
// Default implementation is noop
}

/**
* Trims the number of options per suggest text term to the requested size.
* For internal usage.
Expand Down Expand Up @@ -647,4 +658,51 @@ public static Sort fromId(byte id) {
}
}
}

public static class ReduceContext {
private final Client client;
private final String[] indecies;
@Nullable
private String routing;
@Nullable
private String preference;
private String[] types = Strings.EMPTY_ARRAY;

public ReduceContext(Client client, String[] indecies) {
this.client = client;
this.indecies = indecies;
}

public Client getClient() {
return client;
}

public String[] getIndecies() {
return indecies;
}

public void setRouting(String routing) {
this.routing = routing;
}

public String getRouting() {
return routing;
}

public void setPreference(String preference) {
this.preference = preference;
}

public String getPreference() {
return preference;
}

public void setTypes(String[] types) {
this.types = types;
}

public String[] getTypes() {
return types;
}
}
}
Expand Up @@ -23,6 +23,8 @@
import org.apache.lucene.index.Terms;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.index.analysis.ShingleTokenFilterFactory;
Expand All @@ -46,7 +48,8 @@ public SuggestionSearchContext.SuggestionContext parse(XContentParser parser, Ma
PhraseSuggestionContext suggestion = new PhraseSuggestionContext(suggester);
XContentParser.Token token;
String fieldName = null;
boolean gramSizeSet = false;
boolean gramSizeSet = false;
boolean filterTypeSet = false;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
Expand Down Expand Up @@ -124,6 +127,25 @@ public SuggestionSearchContext.SuggestionContext parse(XContentParser parser, Ma
}
}
}
} else if ("filter".equals(fieldName)) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
fieldName = parser.currentName();
} else if (token.isValue()) {
if ("type".equals(fieldName)) {
suggestion.setFilterType(PhraseSuggestion.FilterType.fromString(parser.text()));
filterTypeSet = true;
} else if ("extra".equals(fieldName)) {
// Copy the filter data
XContentBuilder copier = XContentFactory.contentBuilder(parser.contentType());
copier.copyCurrentStructure(parser);
suggestion.setFilterExtra(copier.bytes());
} else {
throw new ElasticSearchIllegalArgumentException(
"suggester[phrase][highlight] doesn't support field [" + fieldName + "]");
}
}
}
} else {
throw new ElasticSearchIllegalArgumentException("suggester[phrase] doesn't support array field [" + fieldName + "]");
}
Expand Down Expand Up @@ -166,7 +188,10 @@ public SuggestionSearchContext.SuggestionContext parse(XContentParser parser, Ma
}
}


// filterType defaults to PHRASE if filterExtra is set but it isn't
if (!filterTypeSet && suggestion.getFilterExtra().length() > 0) {
suggestion.setFilterType(PhraseSuggestion.FilterType.MATCH_PHRASE);
}

return suggestion;
}
Expand Down
Expand Up @@ -55,6 +55,9 @@ public Suggestion<? extends Entry<? extends Option>> innerExecute(String name, P
IndexReader indexReader, CharsRef spare) throws IOException {
double realWordErrorLikelihood = suggestion.realworldErrorLikelyhood();
final PhraseSuggestion response = new PhraseSuggestion(name, suggestion.getSize());
response.setFilterType(suggestion.getFilterType());
response.setFilterExtra(suggestion.getFilterExtra());
response.setField(new StringText(suggestion.getField()));

List<PhraseSuggestionContext.DirectCandidateGenerator> generators = suggestion.generators();
final int numGenerators = generators.size();
Expand Down

0 comments on commit 53cbce7

Please sign in to comment.