Skip to content

Commit

Permalink
Remove search operation threading option
Browse files Browse the repository at this point in the history
Search operation threading is an option that is not really used, and current non default implementations are flawed. Handling it also creates quite the complexity in the search handling codebase...
This is a breaking change, but one that is actually a good one, since I haven't seen/heard anybody use it, and if its used, its problematic...
closes #6042
  • Loading branch information
kimchy committed May 5, 2014
1 parent cea2d21 commit 7ce8306
Show file tree
Hide file tree
Showing 16 changed files with 74 additions and 647 deletions.

This file was deleted.

39 changes: 6 additions & 33 deletions src/main/java/org/elasticsearch/action/search/SearchRequest.java
Expand Up @@ -83,8 +83,6 @@ public class SearchRequest extends ActionRequest<SearchRequest> {

private String[] types = Strings.EMPTY_ARRAY;

private SearchOperationThreading operationThreading = SearchOperationThreading.THREAD_PER_SHARD;

private IndicesOptions indicesOptions = IndicesOptions.strict();

public SearchRequest() {
Expand Down Expand Up @@ -133,12 +131,6 @@ public void beforeStart() {
}
}

/**
* Internal.
*/
public void beforeLocalFork() {
}

/**
* Sets the indices the search will be executed on.
*/
Expand All @@ -156,29 +148,6 @@ public SearchRequest indices(String... indices) {
return this;
}

/**
* Controls the the search operation threading model.
*/
public SearchOperationThreading operationThreading() {
return this.operationThreading;
}

/**
* Controls the the search operation threading model.
*/
public SearchRequest operationThreading(SearchOperationThreading operationThreading) {
this.operationThreading = operationThreading;
return this;
}

/**
* Sets the string representation of the operation threading model. Can be one of
* "no_threads", "single_thread" and "thread_per_shard".
*/
public SearchRequest operationThreading(String operationThreading) {
return operationThreading(SearchOperationThreading.fromString(operationThreading, this.operationThreading));
}

public IndicesOptions indicesOptions() {
return indicesOptions;
}
Expand Down Expand Up @@ -509,7 +478,9 @@ public SearchRequest scroll(String keepAlive) {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
operationThreading = SearchOperationThreading.fromId(in.readByte());
if (in.getVersion().before(Version.V_1_2_0)) {
in.readByte(); // backward comp. for operation threading
}
searchType = SearchType.fromId(in.readByte());

indices = new String[in.readVInt()];
Expand Down Expand Up @@ -546,7 +517,9 @@ public void readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(operationThreading.id());
if (out.getVersion().before(Version.V_1_2_0)) {
out.writeByte((byte) 2); // operation threading
}
out.writeByte(searchType.id());

out.writeVInt(indices.length);
Expand Down
Expand Up @@ -155,23 +155,6 @@ public SearchRequestBuilder setPreference(String preference) {
return this;
}

/**
* Controls the the search operation threading model.
*/
public SearchRequestBuilder setOperationThreading(SearchOperationThreading operationThreading) {
request.operationThreading(operationThreading);
return this;
}

/**
* Sets the string representation of the operation threading model. Can be one of
* "no_threads", "single_thread" and "thread_per_shard".
*/
public SearchRequestBuilder setOperationThreading(String operationThreading) {
request.operationThreading(operationThreading);
return this;
}

/**
* Specifies what type of requested indices to ignore and wildcard indices expressions.
*
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.search;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -37,11 +38,8 @@
public class SearchScrollRequest extends ActionRequest<SearchScrollRequest> {

private String scrollId;

private Scroll scroll;

private SearchOperationThreading operationThreading = SearchOperationThreading.THREAD_PER_SHARD;

public SearchScrollRequest() {
}

Expand All @@ -58,21 +56,6 @@ public ActionRequestValidationException validate() {
return validationException;
}

/**
* Controls the the search operation threading model.
*/
public SearchOperationThreading operationThreading() {
return this.operationThreading;
}

/**
* Controls the the search operation threading model.
*/
public SearchScrollRequest operationThreading(SearchOperationThreading operationThreading) {
this.operationThreading = operationThreading;
return this;
}

/**
* The scroll id used to scroll the search.
*/
Expand Down Expand Up @@ -117,7 +100,9 @@ public SearchScrollRequest scroll(String keepAlive) {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
operationThreading = SearchOperationThreading.fromId(in.readByte());
if (in.getVersion().before(Version.V_1_2_0)) {
in.readByte(); // backward comp. for operation threading
}
scrollId = in.readString();
if (in.readBoolean()) {
scroll = readScroll(in);
Expand All @@ -127,7 +112,9 @@ public void readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(operationThreading.id());
if (out.getVersion().before(Version.V_1_2_0)) {
out.writeByte((byte) 2); // operation threading
}
out.writeString(scrollId);
if (scroll == null) {
out.writeBoolean(false);
Expand Down
Expand Up @@ -39,14 +39,6 @@ public SearchScrollRequestBuilder(Client client, String scrollId) {
super((InternalClient) client, new SearchScrollRequest(scrollId));
}

/**
* Controls the the search operation threading model.
*/
public SearchScrollRequestBuilder setOperationThreading(SearchOperationThreading operationThreading) {
request.operationThreading(operationThreading);
return this;
}

/**
* Should the listener be called on a separate thread if needed.
*/
Expand Down
Expand Up @@ -43,19 +43,12 @@
public class TransportSearchAction extends TransportAction<SearchRequest, SearchResponse> {

private final ClusterService clusterService;

private final TransportSearchDfsQueryThenFetchAction dfsQueryThenFetchAction;

private final TransportSearchQueryThenFetchAction queryThenFetchAction;

private final TransportSearchDfsQueryAndFetchAction dfsQueryAndFetchAction;

private final TransportSearchQueryAndFetchAction queryAndFetchAction;

private final TransportSearchScanAction scanAction;

private final TransportSearchCountAction countAction;

private final boolean optimizeSingleShard;

@Inject
Expand Down Expand Up @@ -128,10 +121,6 @@ public SearchRequest newInstance() {
public void messageReceived(SearchRequest request, final TransportChannel channel) throws Exception {
// no need for a threaded listener
request.listenerThreaded(false);
// we don't spawn, so if we get a request with no threading, change it to single threaded
if (request.operationThreading() == SearchOperationThreading.NO_THREADS) {
request.operationThreading(SearchOperationThreading.SINGLE_THREAD);
}
execute(request, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse result) {
Expand Down
Expand Up @@ -21,7 +21,6 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchOperationThreading;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterService;
Expand Down Expand Up @@ -82,56 +81,11 @@ protected void moveToSecondPhase() {
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());

int localOperations = 0;
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
localOperations++;
} else {
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
}
if (localOperations > 0) {
if (request.operationThreading() == SearchOperationThreading.SINGLE_THREAD) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
}
}
});
} else {
boolean localAsync = request.operationThreading() == SearchOperationThreading.THREAD_PER_SHARD;
for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
final DfsSearchResult dfsResult = entry.value;
final DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
if (node.id().equals(nodes.localNodeId())) {
final QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
try {
if (localAsync) {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override
public void run() {
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
});
} else {
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
} catch (Throwable t) {
onSecondPhaseFailure(t, querySearchRequest, entry.index, dfsResult, counter);
}
}
}
}
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
}

Expand Down

0 comments on commit 7ce8306

Please sign in to comment.