Skip to content

Commit

Permalink
Rewrite search requests on the coordinating nodes (#25814)
Browse files Browse the repository at this point in the history
This change rewrites search requests on the coordinating node before
we send requests to the individual shards. This will reduce the rewrite load
and object creation for each rewrite on the executing nodes and will fetch
resources only once instead of N times once per shard for queries like `terms`
query with index lookups. (among percolator and geo-shape)

Relates to #25791
  • Loading branch information
s1monw committed Jul 21, 2017
1 parent 0d0c103 commit 0e3ad52
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 33 deletions.
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -178,28 +179,39 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider =
new SearchTimeProvider(absoluteStartMillis, relativeStartNanos, System::nanoTime);


final ClusterState clusterState = clusterService.state();
final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),
searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
if (remoteClusterIndices.isEmpty()) {
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(),
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, clusterState.getNodes()
.getDataNodes().size());
ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null be we want to catch
// situations when it possible due to a bug changes to null
searchRequest.source(source);
}
final ClusterState clusterState = clusterService.state();
final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),
searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
if (remoteClusterIndices.isEmpty()) {
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(),
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, clusterState.getNodes()
.getDataNodes().size());
} else {
remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(),
searchRequest.routing(), remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> {
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
int numNodesInvovled = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum()
+ clusterState.getNodes().getDataNodes().size();
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices,
remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvovled);
}, listener::onFailure));
}
}, listener::onFailure);
if (searchRequest.source() == null) {
rewriteListener.onResponse(searchRequest.source());
} else {
remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
remoteClusterIndices, ActionListener.wrap((searchShardsResponses) -> {
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
int numNodesInvovled = searchShardsResponses.values().stream().mapToInt(r -> r.getNodes().length).sum()
+ clusterState.getNodes().getDataNodes().size();
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices, remoteShardIterators,
clusterNodeLookup, clusterState, remoteAliasFilters, listener, numNodesInvovled);
}, listener::onFailure));
Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener);
}
}

Expand Down
Expand Up @@ -1239,6 +1239,9 @@ public AliasFilter buildAliasFilter(ClusterState state, String index, String...
return new AliasFilter(ShardSearchRequest.parseAliasFilter(filterParser, indexMetaData, aliases), aliases);
}

/**
* Returns a new {@link QueryRewriteContext} with the given <tt>now</tt> provider
*/
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) {
return new QueryRewriteContext(xContentRegistry, client, nowInMillis);
}
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/org/elasticsearch/search/SearchService.java
Expand Up @@ -105,6 +105,7 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;

import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
Expand Down Expand Up @@ -935,6 +936,9 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) {
* The action listener is guaranteed to be executed on the search thread-pool
*/
private void rewriteShardRequest(ShardSearchRequest request, ActionListener<ShardSearchRequest> listener) {
// we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
// AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not
// adding a lot of overhead
Rewriteable.rewriteAndFetch(request.getRewriteable(), indicesService.getRewriteContext(request::nowInMillis),
ActionListener.wrap(r ->
threadPool.executor(Names.SEARCH).execute(new AbstractRunnable() {
Expand All @@ -949,4 +953,11 @@ protected void doRun() throws Exception {
}
}), listener::onFailure));
}

/**
* Returns a new {@link QueryRewriteContext} with the given <tt>now</tt> provider
*/
public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) {
return indicesService.getRewriteContext(nowInMillis);
}
}
Expand Up @@ -282,7 +282,10 @@ public FailOnRewriteQueryBuilder() {

@Override
protected QueryBuilder doRewrite(QueryRewriteContext queryShardContext) {
throw new IllegalStateException("Fail on rewrite phase");
if (queryShardContext.convertToShardContext() != null) {
throw new IllegalStateException("Fail on rewrite phase");
}
return this;
}

@Override
Expand Down
Expand Up @@ -220,10 +220,9 @@ public void testIndexedShapeReferenceSourceDisabled() throws Exception {
client().prepareIndex("shapes", "shape_type", "Big_Rectangle").setSource(jsonBuilder().startObject()
.field("shape", shape).endObject()).setRefreshPolicy(IMMEDIATE).get();

ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> client().prepareSearch("test").setTypes("type1")
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> client().prepareSearch("test").setTypes("type1")
.setQuery(geoIntersectionQuery("location", "Big_Rectangle", "shape_type")).get());
assertThat(e.getRootCause(), instanceOf(IllegalArgumentException.class));
assertThat(e.getRootCause().getMessage(), containsString("source disabled"));
assertThat(e.getMessage(), containsString("source disabled"));
}

public void testReusableBuilder() throws IOException {
Expand Down
Expand Up @@ -19,9 +19,7 @@
package org.elasticsearch.percolator;

import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -280,12 +278,11 @@ public void testPercolatorQueryExistingDocumentSourceDisabled() throws Exception
client().admin().indices().prepareRefresh().get();

logger.info("percolating empty doc with source disabled");
Throwable e = expectThrows(SearchPhaseExecutionException.class, () -> {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> {
client().prepareSearch()
.setQuery(new PercolateQueryBuilder("query", "test", "type", "1", null, null, null))
.get();
}).getRootCause();
assertThat(e, instanceOf(IllegalArgumentException.class));
});
assertThat(e.getMessage(), containsString("source disabled"));
}

Expand Down Expand Up @@ -650,9 +647,7 @@ public void testPercolatorQueryViaMultiSearch() throws Exception {
item = response.getResponses()[5];
assertThat(item.getResponse(), nullValue());
assertThat(item.getFailureMessage(), notNullValue());
assertThat(item.getFailureMessage(), equalTo("all shards failed"));
assertThat(ExceptionsHelper.unwrapCause(item.getFailure().getCause()).getMessage(),
containsString("[test/type/6] couldn't be found"));
assertThat(item.getFailureMessage(), containsString("[test/type/6] couldn't be found"));
}

}
@@ -0,0 +1,83 @@
"Ensure that we fetch the document only once":
- skip:
version: " - 5.99.99"
reason: this was added in 6.0.0

- do:
indices.create:
index: search_index
body:
settings:
number_of_shards: 5
mappings:
doc:
properties:
user:
type: keyword
- do:
index:
index: search_index
type: doc
id: 1
body: { "user": "1" }

- do:
index:
index: search_index
type: doc
id: 2
body: { "user": "2" }

- do:
index:
index: search_index
type: doc
id: 3
body: { "user": "3" }

- do:
indices.refresh: {}

- do:
catch: /no such index/
search:
index: "search_index"
body: { "size" : 0, "query" : { "terms" : { "user" : { "index": "lookup_index", "type": "doc", "id": "1", "path": "followers"} } } }
- do:
indices.create:
index: lookup_index
body:
settings:
number_of_shards: 1
mappings:
doc:
properties:
followers:
type: keyword
- do:
index:
index: lookup_index
type: doc
id: 1
body: { "followers" : ["1", "3"] }
- do:
indices.refresh: {}

- do:
search:
index: "search_index"
body: { "size" : 0, "query" : { "terms" : { "user" : { "index": "lookup_index", "type": "doc", "id": "1", "path": "followers"} } } }

- match: { _shards.total: 5 }
- match: { _shards.successful: 5 }
- match: { _shards.skipped: 0 }
- match: { _shards.failed: 0 }
- match: { hits.total: 2 }

- do:
indices.stats: { index: 'lookup_index', "metric": "get"}

- match: { indices.lookup_index.total.get.total: 1 }



0 comments on commit 0e3ad52

Please sign in to comment.