Skip to content

Commit

Permalink
Cleanup search sub transport actions and collapse o.e.action.search.t…
Browse files Browse the repository at this point in the history
…ype package into o.e.action.search

TransportSearchTypeAction and subclasses are not actually transport actions, but just support classes useful for their inner async actions that can easily be extracted out so that we get rid of one too many level of abstraction.

Same pattern can be applied to TransportSearchScrollQueryAndFetchAction & TransportSearchScrollQueryThenFetchAction which we could remove in favour of keeping only their inner classes named SearchScrollQueryAndFetchAsyncAction and SearchScrollQueryThenFetchAsyncAction.

Remove org.elasticsearch.action.search.type package, collapsed remaining classes into existing org.elasticsearch.action.search package

Make also ParsedScrollId ScrollIdForNode and TransportSearchHelper classes and their methods package private.

Closes #11710
  • Loading branch information
javanna committed Feb 29, 2016
1 parent 65f8fcc commit b4f9906
Show file tree
Hide file tree
Showing 29 changed files with 1,836 additions and 2,054 deletions.
22 changes: 2 additions & 20 deletions core/src/main/java/org/elasticsearch/action/ActionModule.java
Expand Up @@ -169,14 +169,6 @@
import org.elasticsearch.action.search.TransportMultiSearchAction;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.action.search.type.TransportSearchDfsQueryAndFetchAction;
import org.elasticsearch.action.search.type.TransportSearchDfsQueryThenFetchAction;
import org.elasticsearch.action.search.type.TransportSearchQueryAndFetchAction;
import org.elasticsearch.action.search.type.TransportSearchQueryThenFetchAction;
import org.elasticsearch.action.search.type.TransportSearchScanAction;
import org.elasticsearch.action.search.type.TransportSearchScrollQueryAndFetchAction;
import org.elasticsearch.action.search.type.TransportSearchScrollQueryThenFetchAction;
import org.elasticsearch.action.search.type.TransportSearchScrollScanAction;
import org.elasticsearch.action.suggest.SuggestAction;
import org.elasticsearch.action.suggest.TransportSuggestAction;
import org.elasticsearch.action.support.ActionFilter;
Expand Down Expand Up @@ -326,18 +318,8 @@ protected void configure() {
TransportShardMultiGetAction.class);
registerAction(BulkAction.INSTANCE, TransportBulkAction.class,
TransportShardBulkAction.class);
registerAction(SearchAction.INSTANCE, TransportSearchAction.class,
TransportSearchDfsQueryThenFetchAction.class,
TransportSearchQueryThenFetchAction.class,
TransportSearchDfsQueryAndFetchAction.class,
TransportSearchQueryAndFetchAction.class,
TransportSearchScanAction.class
);
registerAction(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class,
TransportSearchScrollScanAction.class,
TransportSearchScrollQueryThenFetchAction.class,
TransportSearchScrollQueryAndFetchAction.class
);
registerAction(SearchAction.INSTANCE, TransportSearchAction.class);
registerAction(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
registerAction(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class);
registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class);
Expand Down
Expand Up @@ -17,12 +17,12 @@
* under the License.
*/

package org.elasticsearch.action.search.type;
package org.elasticsearch.action.search;

/**
* Base implementation for an async action.
*/
public class AbstractAsyncAction {
abstract class AbstractAsyncAction {

private final long startTime;

Expand All @@ -46,4 +46,5 @@ protected final long buildTookInMillis() {
return Math.max(1, System.currentTimeMillis() - startTime);
}

abstract void start();
}

Large diffs are not rendered by default.

Expand Up @@ -17,14 +17,14 @@
* under the License.
*/

package org.elasticsearch.action.search.type;
package org.elasticsearch.action.search;

import java.util.Map;

/**
*
*/
public class ParsedScrollId {
class ParsedScrollId {

public static final String QUERY_THEN_FETCH_TYPE = "queryThenFetch";

Expand Down
Expand Up @@ -17,9 +17,9 @@
* under the License.
*/

package org.elasticsearch.action.search.type;
package org.elasticsearch.action.search;

public class ScrollIdForNode {
class ScrollIdForNode {
private final String node;
private final long scrollId;

Expand Down
@@ -0,0 +1,67 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.search;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchResultProvider;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;

class SearchCountAsyncAction extends AbstractSearchAsyncAction<QuerySearchResultProvider> {

SearchCountAsyncAction(ESLogger logger, SearchServiceTransportAction searchService, ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool, SearchRequest request,
ActionListener<SearchResponse> listener) {
super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool,
request, listener);
}

@Override
protected String firstPhaseName() {
return "query";
}

@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, ActionListener<QuerySearchResultProvider> listener) {
searchService.sendExecuteQuery(node, request, listener);
}

@Override
protected void moveToSecondPhase() throws Exception {
// no need to sort, since we know we have no hits back
final InternalSearchResponse internalResponse = searchPhaseController.merge(SearchPhaseController.EMPTY_DOCS, firstResults,
(AtomicArray<? extends FetchSearchResultProvider>) AtomicArray.empty(), request);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(), buildTookInMillis(), buildShardFailures()));
}
}
@@ -0,0 +1,142 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.search;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

class SearchDfsQueryAndFetchAsyncAction extends AbstractSearchAsyncAction<DfsSearchResult> {

private final AtomicArray<QueryFetchSearchResult> queryFetchResults;

SearchDfsQueryAndFetchAsyncAction(ESLogger logger, SearchServiceTransportAction searchService,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
SearchPhaseController searchPhaseController, ThreadPool threadPool,
SearchRequest request, ActionListener<SearchResponse> listener) {
super(logger, searchService, clusterService, indexNameExpressionResolver, searchPhaseController, threadPool, request, listener);
queryFetchResults = new AtomicArray<>(firstResults.length());
}

@Override
protected String firstPhaseName() {
return "dfs";
}

@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request,
ActionListener<DfsSearchResult> listener) {
searchService.sendExecuteDfs(node, request, listener);
}

@Override
protected void moveToSecondPhase() {
final AggregatedDfs dfs = searchPhaseController.aggregateDfs(firstResults);
final AtomicInteger counter = new AtomicInteger(firstResults.asList().size());

for (final AtomicArray.Entry<DfsSearchResult> entry : firstResults.asList()) {
DfsSearchResult dfsResult = entry.value;
DiscoveryNode node = nodes.get(dfsResult.shardTarget().nodeId());
QuerySearchRequest querySearchRequest = new QuerySearchRequest(request, dfsResult.id(), dfs);
executeSecondPhase(entry.index, dfsResult, counter, node, querySearchRequest);
}
}

void executeSecondPhase(final int shardIndex, final DfsSearchResult dfsResult, final AtomicInteger counter,
final DiscoveryNode node, final QuerySearchRequest querySearchRequest) {
searchService.sendExecuteFetch(node, querySearchRequest, new ActionListener<QueryFetchSearchResult>() {
@Override
public void onResponse(QueryFetchSearchResult result) {
result.shardTarget(dfsResult.shardTarget());
queryFetchResults.set(shardIndex, result);
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

@Override
public void onFailure(Throwable t) {
try {
onSecondPhaseFailure(t, querySearchRequest, shardIndex, dfsResult, counter);
} finally {
// the query might not have been executed at all (for example because thread pool rejected execution)
// and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
sendReleaseSearchContext(querySearchRequest.id(), node);
}
}
});
}

void onSecondPhaseFailure(Throwable t, QuerySearchRequest querySearchRequest, int shardIndex, DfsSearchResult dfsResult,
AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id());
}
this.addShardFailure(shardIndex, dfsResult.shardTarget(), t);
successfulOps.decrementAndGet();
if (counter.decrementAndGet() == 0) {
finishHim();
}
}

private void finishHim() {
threadPool.executor(ThreadPool.Names.SEARCH).execute(new ActionRunnable<SearchResponse>(listener) {
@Override
public void doRun() throws IOException {
sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
queryFetchResults, request);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);
}
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successfulOps.get(),
buildTookInMillis(), buildShardFailures()));
}

@Override
public void onFailure(Throwable t) {
ReduceSearchPhaseException failure = new ReduceSearchPhaseException("query_fetch", "", t, buildShardFailures());
if (logger.isDebugEnabled()) {
logger.debug("failed to reduce search", failure);
}
super.onFailure(t);
}
});

}
}

0 comments on commit b4f9906

Please sign in to comment.