From 7ce8306bc5bace5fc741b5e16932de6d39520ca1 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 5 May 2014 11:04:37 +0200 Subject: [PATCH] Remove search operation threading option Search operation threading is an option that is not really used, and current non default implementations are flawed. Handling it also creates quite the complexity in the search handling codebase... This is a breaking change, but one that is actually a good one, since I haven't seen/heard anybody use it, and if its used, its problematic... closes #6042 --- .../search/SearchOperationThreading.java | 82 -------------- .../action/search/SearchRequest.java | 39 +------ .../action/search/SearchRequestBuilder.java | 17 --- .../action/search/SearchScrollRequest.java | 27 ++--- .../search/SearchScrollRequestBuilder.java | 8 -- .../action/search/TransportSearchAction.java | 11 -- ...TransportSearchDfsQueryAndFetchAction.java | 50 +-------- ...ransportSearchDfsQueryThenFetchAction.java | 106 ++---------------- .../TransportSearchQueryThenFetchAction.java | 52 +-------- ...nsportSearchScrollQueryAndFetchAction.java | 58 +--------- ...sportSearchScrollQueryThenFetchAction.java | 57 +--------- .../type/TransportSearchScrollScanAction.java | 62 +--------- .../type/TransportSearchTypeAction.java | 69 +----------- .../rest/action/search/RestSearchAction.java | 9 -- .../action/search/RestSearchScrollAction.java | 18 +-- .../action/SearchServiceTransportAction.java | 56 +++++---- 16 files changed, 74 insertions(+), 647 deletions(-) delete mode 100644 src/main/java/org/elasticsearch/action/search/SearchOperationThreading.java diff --git a/src/main/java/org/elasticsearch/action/search/SearchOperationThreading.java b/src/main/java/org/elasticsearch/action/search/SearchOperationThreading.java deleted file mode 100644 index 23380384040aa..0000000000000 --- a/src/main/java/org/elasticsearch/action/search/SearchOperationThreading.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.action.search; - -import org.elasticsearch.ElasticsearchIllegalArgumentException; -import org.elasticsearch.common.Nullable; - -/** - * Controls the operation threading model for search operation that are performed - * locally on the executing node. - * - * - */ -public enum SearchOperationThreading { - /** - * No threads are used, all the local shards operations will be performed on the calling - * thread. - */ - NO_THREADS((byte) 0), - /** - * The local shards operations will be performed in serial manner on a single forked thread. - */ - SINGLE_THREAD((byte) 1), - /** - * Each local shard operation will execute on its own thread. - */ - THREAD_PER_SHARD((byte) 2); - - private final byte id; - - SearchOperationThreading(byte id) { - this.id = id; - } - - public byte id() { - return this.id; - } - - public static SearchOperationThreading fromId(byte id) { - if (id == 0) { - return NO_THREADS; - } - if (id == 1) { - return SINGLE_THREAD; - } - if (id == 2) { - return THREAD_PER_SHARD; - } - throw new ElasticsearchIllegalArgumentException("No type matching id [" + id + "]"); - } - - public static SearchOperationThreading fromString(String value, @Nullable SearchOperationThreading defaultValue) { - if (value == null) { - return defaultValue; - } - if ("no_threads".equals(value) || "noThreads".equals(value)) { - return NO_THREADS; - } else if ("single_thread".equals(value) || "singleThread".equals(value)) { - return SINGLE_THREAD; - } else if ("thread_per_shard".equals(value) || "threadPerShard".equals(value)) { - return THREAD_PER_SHARD; - } - throw new ElasticsearchIllegalArgumentException("No value for search operation threading matching [" + value + "]"); - } -} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 20efe76c79bab..7929cdac5cda8 100644 --- a/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -83,8 +83,6 @@ public class SearchRequest extends ActionRequest { private String[] types = Strings.EMPTY_ARRAY; - private SearchOperationThreading operationThreading = SearchOperationThreading.THREAD_PER_SHARD; - private IndicesOptions indicesOptions = IndicesOptions.strict(); public SearchRequest() { @@ -133,12 +131,6 @@ public void beforeStart() { } } - /** - * Internal. - */ - public void beforeLocalFork() { - } - /** * Sets the indices the search will be executed on. */ @@ -156,29 +148,6 @@ public SearchRequest indices(String... indices) { return this; } - /** - * Controls the the search operation threading model. - */ - public SearchOperationThreading operationThreading() { - return this.operationThreading; - } - - /** - * Controls the the search operation threading model. - */ - public SearchRequest operationThreading(SearchOperationThreading operationThreading) { - this.operationThreading = operationThreading; - return this; - } - - /** - * Sets the string representation of the operation threading model. Can be one of - * "no_threads", "single_thread" and "thread_per_shard". - */ - public SearchRequest operationThreading(String operationThreading) { - return operationThreading(SearchOperationThreading.fromString(operationThreading, this.operationThreading)); - } - public IndicesOptions indicesOptions() { return indicesOptions; } @@ -509,7 +478,9 @@ public SearchRequest scroll(String keepAlive) { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - operationThreading = SearchOperationThreading.fromId(in.readByte()); + if (in.getVersion().before(Version.V_1_2_0)) { + in.readByte(); // backward comp. for operation threading + } searchType = SearchType.fromId(in.readByte()); indices = new String[in.readVInt()]; @@ -546,7 +517,9 @@ public void readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeByte(operationThreading.id()); + if (out.getVersion().before(Version.V_1_2_0)) { + out.writeByte((byte) 2); // operation threading + } out.writeByte(searchType.id()); out.writeVInt(indices.length); diff --git a/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java b/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java index 4a316cefb7b04..25a25439ed5c5 100644 --- a/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/search/SearchRequestBuilder.java @@ -155,23 +155,6 @@ public SearchRequestBuilder setPreference(String preference) { return this; } - /** - * Controls the the search operation threading model. - */ - public SearchRequestBuilder setOperationThreading(SearchOperationThreading operationThreading) { - request.operationThreading(operationThreading); - return this; - } - - /** - * Sets the string representation of the operation threading model. Can be one of - * "no_threads", "single_thread" and "thread_per_shard". - */ - public SearchRequestBuilder setOperationThreading(String operationThreading) { - request.operationThreading(operationThreading); - return this; - } - /** * Specifies what type of requested indices to ignore and wildcard indices expressions. * diff --git a/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java b/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java index deea52990dc7e..fbee4624a1fdd 100644 --- a/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java +++ b/src/main/java/org/elasticsearch/action/search/SearchScrollRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.search; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.common.io.stream.StreamInput; @@ -37,11 +38,8 @@ public class SearchScrollRequest extends ActionRequest { private String scrollId; - private Scroll scroll; - private SearchOperationThreading operationThreading = SearchOperationThreading.THREAD_PER_SHARD; - public SearchScrollRequest() { } @@ -58,21 +56,6 @@ public ActionRequestValidationException validate() { return validationException; } - /** - * Controls the the search operation threading model. - */ - public SearchOperationThreading operationThreading() { - return this.operationThreading; - } - - /** - * Controls the the search operation threading model. - */ - public SearchScrollRequest operationThreading(SearchOperationThreading operationThreading) { - this.operationThreading = operationThreading; - return this; - } - /** * The scroll id used to scroll the search. */ @@ -117,7 +100,9 @@ public SearchScrollRequest scroll(String keepAlive) { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - operationThreading = SearchOperationThreading.fromId(in.readByte()); + if (in.getVersion().before(Version.V_1_2_0)) { + in.readByte(); // backward comp. for operation threading + } scrollId = in.readString(); if (in.readBoolean()) { scroll = readScroll(in); @@ -127,7 +112,9 @@ public void readFrom(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeByte(operationThreading.id()); + if (out.getVersion().before(Version.V_1_2_0)) { + out.writeByte((byte) 2); // operation threading + } out.writeString(scrollId); if (scroll == null) { out.writeBoolean(false); diff --git a/src/main/java/org/elasticsearch/action/search/SearchScrollRequestBuilder.java b/src/main/java/org/elasticsearch/action/search/SearchScrollRequestBuilder.java index fa75149a0a9cc..ab05c95d3bb7a 100644 --- a/src/main/java/org/elasticsearch/action/search/SearchScrollRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/search/SearchScrollRequestBuilder.java @@ -39,14 +39,6 @@ public SearchScrollRequestBuilder(Client client, String scrollId) { super((InternalClient) client, new SearchScrollRequest(scrollId)); } - /** - * Controls the the search operation threading model. - */ - public SearchScrollRequestBuilder setOperationThreading(SearchOperationThreading operationThreading) { - request.operationThreading(operationThreading); - return this; - } - /** * Should the listener be called on a separate thread if needed. */ diff --git a/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index ed6ec8c2a6ba7..8ad2c0f86f5ab 100644 --- a/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -43,19 +43,12 @@ public class TransportSearchAction extends TransportAction { private final ClusterService clusterService; - private final TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction; - private final TransportSearchQueryThenFetchAction queryThenFetchAction; - private final TransportSearchDfsQueryAndFetchAction dfsQueryAndFetchAction; - private final TransportSearchQueryAndFetchAction queryAndFetchAction; - private final TransportSearchScanAction scanAction; - private final TransportSearchCountAction countAction; - private final boolean optimizeSingleShard; @Inject @@ -128,10 +121,6 @@ public SearchRequest newInstance() { public void messageReceived(SearchRequest request, final TransportChannel channel) throws Exception { // no need for a threaded listener request.listenerThreaded(false); - // we don't spawn, so if we get a request with no threading, change it to single threaded - if (request.operationThreading() == SearchOperationThreading.NO_THREADS) { - request.operationThreading(SearchOperationThreading.SINGLE_THREAD); - } execute(request, new ActionListener() { @Override public void onResponse(SearchResponse result) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java index d29355d851fef..5ea504c8d1935 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java @@ -21,7 +21,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchOperationThreading; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; @@ -82,56 +81,11 @@ protected void moveToSecondPhase() { final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); - int localOperations = 0; for (final AtomicArray.Entry entry : firstResults.asList()) { DfsSearchResult dfsResult = entry.value; DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - localOperations++; - } else { - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - } - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - for (final AtomicArray.Entry entry : firstResults.asList()) { - DfsSearchResult dfsResult = entry.value; - DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final AtomicArray.Entry entry : firstResults.asList()) { - final DfsSearchResult dfsResult = entry.value; - final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - }); - } else { - executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); - } - } catch (Throwable t) { - onSecondPhaseFailure(t, querySearchRequest, entry.index, dfsResult, counter); - } - } - } - } + QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); + executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest); } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index 802333338da54..2ceb7e363bbae 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -22,7 +22,10 @@ import com.carrotsearch.hppc.IntArrayList; import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.*; +import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; @@ -87,58 +90,11 @@ protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest requ protected void moveToSecondPhase() { final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults); final AtomicInteger counter = new AtomicInteger(firstResults.asList().size()); - - int localOperations = 0; for (final AtomicArray.Entry entry : firstResults.asList()) { DfsSearchResult dfsResult = entry.value; DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - localOperations++; - } else { - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - } - - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - for (final AtomicArray.Entry entry : firstResults.asList()) { - DfsSearchResult dfsResult = entry.value; - DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final AtomicArray.Entry entry : firstResults.asList()) { - final DfsSearchResult dfsResult = entry.value; - final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - }); - } else { - executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); - } - } catch (Throwable t) { - onQueryFailure(t, querySearchRequest, entry.index, dfsResult, counter); - } - } - } - } + QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs); + executeQuery(entry.index, dfsResult, counter, querySearchRequest, node); } } @@ -196,57 +152,11 @@ void innerExecuteFetchPhase() throws Exception { request, sortedShardList, firstResults.length() ); final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - int localOperations = 0; for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { QuerySearchResult queryResult = queryResults.get(entry.index); DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - localOperations++; - } else { - FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResult queryResult = queryResults.get(entry.index); - DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - final QuerySearchResult queryResult = queryResults.get(entry.index); - final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - final FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - }); - } else { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } catch (Throwable t) { - onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter); - } - } - } - } + FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index a122cad2af1af..b4ddd623f8ae2 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -23,7 +23,6 @@ import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.ReduceSearchPhaseException; -import org.elasticsearch.action.search.SearchOperationThreading; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; @@ -95,58 +94,11 @@ protected void moveToSecondPhase() throws Exception { request, sortedShardList, firstResults.length() ); final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - int localOperations = 0; for (AtomicArray.Entry entry : docIdsToLoad.asList()) { QuerySearchResult queryResult = firstResults.get(entry.index); DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - localOperations++; - } else { - FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - for (AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResult queryResult = firstResults.get(entry.index); - DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { - final QuerySearchResult queryResult = firstResults.get(entry.index); - final DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - if (node.id().equals(nodes.localNodeId())) { - final FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - }); - } else { - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); - } - } catch (Throwable t) { - docIdsToLoad.set(entry.index, null); // clear it, we didn't manage to do anything with it - onFetchFailure(t, fetchSearchRequest, entry.index, queryResult.shardTarget(), counter); - } - } - } - } + FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard); + executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node); } } diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java index d22a2ac39082f..7cdc3eccd95ab 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java @@ -20,7 +20,6 @@ package org.elasticsearch.action.search.type; import org.apache.lucene.search.ScoreDoc; -import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.*; import org.elasticsearch.cluster.ClusterService; @@ -37,7 +36,6 @@ import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.threadpool.ThreadPool; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -49,19 +47,14 @@ */ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent { - private final ThreadPool threadPool; - private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; - private final SearchPhaseController searchPhaseController; @Inject - public TransportSearchScrollQueryAndFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + public TransportSearchScrollQueryAndFetchAction(Settings settings, ClusterService clusterService, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { super(settings); - this.threadPool = threadPool; this.clusterService = clusterService; this.searchService = searchService; this.searchPhaseController = searchPhaseController; @@ -128,7 +121,6 @@ public void start() { return; } - int localOperations = 0; Tuple[] context = scrollId.getContext(); for (int i = 0; i < context.length; i++) { Tuple target = context[i]; @@ -137,11 +129,7 @@ public void start() { if (node.getVersion().before(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) { useSlowScroll = true; } - if (nodes.localNodeId().equals(node.id())) { - localOperations++; - } else { - executePhase(i, node, target.v2()); - } + executePhase(i, node, target.v2()); } else { if (logger.isDebugEnabled()) { logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.getSource() + "]"); @@ -153,48 +141,6 @@ public void start() { } } - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - Tuple target = context1[i]; - DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - executePhase(i, node, target.v2()); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - final Tuple target = context1[i]; - final int shardIndex = i; - final DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executePhase(shardIndex, node, target.v2()); - } - }); - } else { - executePhase(shardIndex, node, target.v2()); - } - } catch (Throwable t) { - onPhaseFailure(t, target.v2(), shardIndex); - } - } - } - } - } - for (Tuple target : scrollId.getContext()) { DiscoveryNode node = nodes.get(target.v1()); if (node == null) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java index 473725f8c2194..91a76d4901bb5 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java @@ -39,7 +39,6 @@ import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.threadpool.ThreadPool; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -51,19 +50,14 @@ */ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent { - private final ThreadPool threadPool; - private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; - private final SearchPhaseController searchPhaseController; @Inject - public TransportSearchScrollQueryThenFetchAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + public TransportSearchScrollQueryThenFetchAction(Settings settings, ClusterService clusterService, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { super(settings); - this.threadPool = threadPool; this.clusterService = clusterService; this.searchService = searchService; this.searchPhaseController = searchPhaseController; @@ -134,7 +128,6 @@ public void start() { } final AtomicInteger counter = new AtomicInteger(scrollId.getContext().length); - int localOperations = 0; Tuple[] context = scrollId.getContext(); for (int i = 0; i < context.length; i++) { Tuple target = context[i]; @@ -143,11 +136,7 @@ public void start() { if (node.getVersion().before(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) { useSlowScroll = true; } - if (nodes.localNodeId().equals(node.id())) { - localOperations++; - } else { - executeQueryPhase(i, counter, node, target.v2()); - } + executeQueryPhase(i, counter, node, target.v2()); } else { if (logger.isDebugEnabled()) { logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.getSource() + "]"); @@ -163,48 +152,6 @@ public void start() { } } } - - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - Tuple target = context1[i]; - DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - executeQueryPhase(i, counter, node, target.v2()); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - final Tuple target = context1[i]; - final int shardIndex = i; - final DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executeQueryPhase(shardIndex, counter, node, target.v2()); - } - }); - } else { - executeQueryPhase(shardIndex, counter, node, target.v2()); - } - } catch (Throwable t) { - onQueryPhaseFailure(shardIndex, counter, target.v2(), t); - } - } - } - } - } } private void executeQueryPhase(final int shardIndex, final AtomicInteger counter, DiscoveryNode node, final long searchId) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java index eeb456a14ca54..915bc61dbe2a6 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java @@ -21,7 +21,10 @@ import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.*; +import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -36,7 +39,6 @@ import org.elasticsearch.search.fetch.QueryFetchSearchResult; import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.List; @@ -49,19 +51,14 @@ */ public class TransportSearchScrollScanAction extends AbstractComponent { - private final ThreadPool threadPool; - private final ClusterService clusterService; - private final SearchServiceTransportAction searchService; - private final SearchPhaseController searchPhaseController; @Inject - public TransportSearchScrollScanAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, + public TransportSearchScrollScanAction(Settings settings, ClusterService clusterService, SearchServiceTransportAction searchService, SearchPhaseController searchPhaseController) { super(settings); - this.threadPool = threadPool; this.clusterService = clusterService; this.searchService = searchService; this.searchPhaseController = searchPhaseController; @@ -127,17 +124,12 @@ public void start() { return; } - int localOperations = 0; Tuple[] context = scrollId.getContext(); for (int i = 0; i < context.length; i++) { Tuple target = context[i]; DiscoveryNode node = nodes.get(target.v1()); if (node != null) { - if (nodes.localNodeId().equals(node.id())) { - localOperations++; - } else { - executePhase(i, node, target.v2()); - } + executePhase(i, node, target.v2()); } else { if (logger.isDebugEnabled()) { logger.debug("Node [" + target.v1() + "] not available for scroll request [" + scrollId.getSource() + "]"); @@ -149,48 +141,6 @@ public void start() { } } - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - Tuple target = context1[i]; - DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - executePhase(i, node, target.v2()); - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - Tuple[] context1 = scrollId.getContext(); - for (int i = 0; i < context1.length; i++) { - final Tuple target = context1[i]; - final int shardIndex = i; - final DiscoveryNode node = nodes.get(target.v1()); - if (node != null && nodes.localNodeId().equals(node.id())) { - try { - if (localAsync) { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - executePhase(shardIndex, node, target.v2()); - } - }); - } else { - executePhase(shardIndex, node, target.v2()); - } - } catch (Throwable t) { - onPhaseFailure(t, target.v2(), shardIndex); - } - } - } - } - } - for (Tuple target : scrollId.getContext()) { DiscoveryNode node = nodes.get(target.v1()); if (node == null) { diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index 3c9aa834ffff7..3101305d5ab62 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -145,75 +145,17 @@ public void start() { return; } request.beforeStart(); - // count the local operations, and perform the non local ones - int localOperations = 0; int shardIndex = -1; for (final ShardIterator shardIt : shardsIts) { shardIndex++; - final ShardRouting shard = shardIt.firstOrNull(); + final ShardRouting shard = shardIt.nextOrNull(); if (shard != null) { - if (shard.currentNodeId().equals(nodes.localNodeId())) { - localOperations++; - } else { - // do the remote operation here, the localAsync flag is not relevant - performFirstPhase(shardIndex, shardIt, shardIt.nextOrNull()); - } + performFirstPhase(shardIndex, shardIt, shard); } else { // really, no shards active in this group onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())); } } - // we have local operations, perform them now - if (localOperations > 0) { - if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) { - request.beforeLocalFork(); - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - int shardIndex = -1; - for (final ShardIterator shardIt : shardsIts) { - shardIndex++; - final ShardRouting shard = shardIt.firstOrNull(); - if (shard != null) { - if (shard.currentNodeId().equals(nodes.localNodeId())) { - performFirstPhase(shardIndex, shardIt, shardIt.nextOrNull()); - } - } - } - } - }); - } else { - boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD; - if (localAsync) { - request.beforeLocalFork(); - } - shardIndex = -1; - for (final ShardIterator shardIt : shardsIts) { - shardIndex++; - final int fShardIndex = shardIndex; - ShardRouting first = shardIt.firstOrNull(); - if (first != null) { - if (first.currentNodeId().equals(nodes.localNodeId())) { - final ShardRouting shard = shardIt.nextOrNull(); - if (localAsync) { - try { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - performFirstPhase(fShardIndex, shardIt, shard); - } - }); - } catch (Throwable t) { - onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t); - } - } else { - performFirstPhase(fShardIndex, shardIt, shard); - } - } - } - } - } - } } void performFirstPhase(final int shardIndex, final ShardIterator shardIt, final ShardRouting shard) { @@ -305,12 +247,7 @@ void onFirstPhaseResult(final int shardIndex, @Nullable ShardRouting shard, @Nul } if (!lastShard) { try { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - performFirstPhase(shardIndex, shardIt, nextShard); - } - }); + performFirstPhase(shardIndex, shardIt, nextShard); } catch (Throwable t1) { onFirstPhaseResult(shardIndex, shard, shard.currentNodeId(), shardIt, t1); } diff --git a/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index b18fc1bef80a5..7df3726a91508 100644 --- a/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -20,7 +20,6 @@ package org.elasticsearch.rest.action.search; import org.elasticsearch.ElasticsearchIllegalArgumentException; -import org.elasticsearch.action.search.SearchOperationThreading; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; @@ -72,14 +71,6 @@ public void handleRequest(final RestRequest request, final RestChannel channel) SearchRequest searchRequest; searchRequest = RestSearchAction.parseSearchRequest(request); searchRequest.listenerThreaded(false); - SearchOperationThreading operationThreading = SearchOperationThreading.fromString(request.param("operation_threading"), null); - if (operationThreading != null) { - if (operationThreading == SearchOperationThreading.NO_THREADS) { - // since we don't spawn, don't allow no_threads, but change it to a single thread - operationThreading = SearchOperationThreading.SINGLE_THREAD; - } - searchRequest.operationThreading(operationThreading); - } client.search(searchRequest, new RestToXContentListener(channel)); } diff --git a/src/main/java/org/elasticsearch/rest/action/search/RestSearchScrollAction.java b/src/main/java/org/elasticsearch/rest/action/search/RestSearchScrollAction.java index 0c41da18357b1..8d2e001e6b98f 100644 --- a/src/main/java/org/elasticsearch/rest/action/search/RestSearchScrollAction.java +++ b/src/main/java/org/elasticsearch/rest/action/search/RestSearchScrollAction.java @@ -19,21 +19,19 @@ package org.elasticsearch.rest.action.search; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchOperationThreading; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.rest.*; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestToXContentListener; import org.elasticsearch.search.Scroll; -import java.io.IOException; - import static org.elasticsearch.common.unit.TimeValue.parseTimeValue; import static org.elasticsearch.rest.RestRequest.Method.GET; import static org.elasticsearch.rest.RestRequest.Method.POST; @@ -65,14 +63,6 @@ public void handleRequest(final RestRequest request, final RestChannel channel) if (scroll != null) { searchScrollRequest.scroll(new Scroll(parseTimeValue(scroll, null))); } - SearchOperationThreading operationThreading = SearchOperationThreading.fromString(request.param("operation_threading"), null); - if (operationThreading != null) { - if (operationThreading == SearchOperationThreading.NO_THREADS) { - // since we don't spawn, don't allow no_threads, but change it to a single thread - operationThreading = SearchOperationThreading.SINGLE_THREAD; - } - searchScrollRequest.operationThreading(operationThreading); - } client.searchScroll(searchScrollRequest, new RestToXContentListener(channel)); } } diff --git a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java index 890c9f406e496..19fdd8ffd11da 100644 --- a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java +++ b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java @@ -68,37 +68,16 @@ public void handleException(TransportException exp) { } } - private static void execute(Callable callable, SearchServiceListener listener) { - // Listeners typically do counting on errors and successes, and the decision to move to second phase, etc. is based on - // these counts so we need to be careful here to never propagate exceptions thrown by onResult to onFailure - T result = null; - Throwable error = null; - try { - result = callable.call(); - } catch (Throwable t) { - error = t; - } finally { - if (result == null) { - assert error != null; - listener.onFailure(error); - } else { - assert error == null : error; - listener.onResult(result); - } - } - } - + private final ThreadPool threadPool; private final TransportService transportService; - private final ClusterService clusterService; - private final SearchService searchService; - private final FreeContextResponseHandler freeContextResponseHandler = new FreeContextResponseHandler(logger); @Inject - public SearchServiceTransportAction(Settings settings, TransportService transportService, ClusterService clusterService, SearchService searchService) { + public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterService clusterService, SearchService searchService) { super(settings); + this.threadPool = threadPool; this.transportService = transportService; this.clusterService = clusterService; this.searchService = searchService; @@ -523,6 +502,35 @@ public String executor() { } } + private void execute(final Callable callable, final SearchServiceListener listener) { + try { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + // Listeners typically do counting on errors and successes, and the decision to move to second phase, etc. is based on + // these counts so we need to be careful here to never propagate exceptions thrown by onResult to onFailure + T result = null; + Throwable error = null; + try { + result = callable.call(); + } catch (Throwable t) { + error = t; + } finally { + if (result == null) { + assert error != null; + listener.onFailure(error); + } else { + assert error == null : error; + listener.onResult(result); + } + } + } + }); + } catch (Throwable t) { + listener.onFailure(t); + } + } + class SearchFreeContextRequest extends TransportRequest { private long id;