Skip to content

Commit

Permalink
Internal: split internal fetch request used within scroll and search
Browse files Browse the repository at this point in the history
Similar to elastic#7856 but relates to the fetch shard level requests. We currently use the same internal request when we need to fetch within search and scroll. The two original requests though diverged after elastic#6933 as SearchRequest implements IndicesRequest while SearchScrollRequest doesn't. That said, with elastic#7319 we made `FetchSearchRequest` implement IndicesRequest by making it hold the original indices taken from the original request, which are null if the fetch was originated by a search scroll, and that is why original indices are optional there.

This commit introduces a separate fetch request and transport action for scroll, which doesn't hold original indices. The new action is only used against nodes that expose it, the previous action name will be used for nodes older than 1.4.0.Beta1.

As a result, in 1.4 we have a new `indices:data/read/search[phase/fetch/id/scroll]` action that is equivalent to the previous `indices:data/read/search[phase/fetch/id]` whose request implements now IndicesRequest and holds the original indices coming from the original request. The original indices in the latter request can only be null during a rolling upgrade (already existing version checks make sure that serialization is bw compatible), when some nodes are still < 1.4.

Closes elastic#7870
  • Loading branch information
javanna committed Sep 26, 2014
1 parent cac50ce commit d76a455
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 62 deletions.
Expand Up @@ -39,7 +39,7 @@
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.dfs.AggregatedDfs;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchRequest;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
Expand Down Expand Up @@ -158,12 +158,12 @@ void innerExecuteFetchPhase() throws Exception {
for (final AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResult queryResult = queryResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult, entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}

void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override
public void onResult(FetchSearchResult result) {
Expand All @@ -181,7 +181,7 @@ public void onFailure(Throwable t) {
});
}

void onFetchFailure(Throwable t, FetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) {
void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
Expand Down
Expand Up @@ -23,7 +23,6 @@
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -37,7 +36,7 @@
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchRequest;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
Expand Down Expand Up @@ -101,12 +100,12 @@ protected void moveToSecondPhase() throws Exception {
for (AtomicArray.Entry<IntArrayList> entry : docIdsToLoad.asList()) {
QuerySearchResultProvider queryResult = firstResults.get(entry.index);
DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId());
FetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult(), entry, lastEmittedDocPerShard);
executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node);
}
}

void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
void executeFetch(final int shardIndex, final SearchShardTarget shardTarget, final AtomicInteger counter, final ShardFetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override
public void onResult(FetchSearchResult result) {
Expand All @@ -126,7 +125,7 @@ public void onFailure(Throwable t) {
});
}

void onFetchFailure(Throwable t, FetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) {
void onFetchFailure(Throwable t, ShardFetchSearchRequest fetchSearchRequest, int shardIndex, SearchShardTarget shardTarget, AtomicInteger counter) {
if (logger.isDebugEnabled()) {
logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id());
}
Expand Down
Expand Up @@ -21,8 +21,6 @@

import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.cluster.ClusterService;
Expand All @@ -33,11 +31,10 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchRequest;
import org.elasticsearch.search.fetch.ShardFetchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
Expand Down Expand Up @@ -220,9 +217,9 @@ private void executeFetchPhase() throws Exception {
IntArrayList docIds = entry.value;
final QuerySearchResult querySearchResult = queryResults.get(entry.index);
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, querySearchResult.id(), docIds, lastEmittedDoc);
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(request, querySearchResult.id(), docIds, lastEmittedDoc);
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
searchService.sendExecuteFetchScroll(node, shardFetchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override
public void onResult(FetchSearchResult result) {
result.shardTarget(querySearchResult.shardTarget());
Expand Down
Expand Up @@ -44,7 +44,7 @@
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchRequest;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
Expand Down Expand Up @@ -356,12 +356,12 @@ protected void releaseIrrelevantSearchContexts(AtomicArray<? extends QuerySearch
}
}

protected FetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry<IntArrayList> entry, ScoreDoc[] lastEmittedDocPerShard) {
protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, AtomicArray.Entry<IntArrayList> entry, ScoreDoc[] lastEmittedDocPerShard) {
if (lastEmittedDocPerShard != null) {
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
return new FetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value, lastEmittedDoc);
} else {
return new FetchSearchRequest(request, queryResult.id(), entry.value);
return new ShardFetchSearchRequest(request, queryResult.id(), entry.value);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/elasticsearch/search/SearchService.java
Expand Up @@ -468,7 +468,7 @@ public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchReques
}
}

public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticsearchException {
public FetchSearchResult executeFetchPhase(ShardFetchRequest request) throws ElasticsearchException {
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
Expand Down
Expand Up @@ -35,10 +35,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
import org.elasticsearch.search.fetch.*;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
Expand Down Expand Up @@ -67,6 +64,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
public static final String QUERY_FETCH_ACTION_NAME = "indices:data/read/search[phase/query+fetch]";
public static final String QUERY_QUERY_FETCH_ACTION_NAME = "indices:data/read/search[phase/query/query+fetch]";
public static final String QUERY_FETCH_SCROLL_ACTION_NAME = "indices:data/read/search[phase/query+fetch/scroll]";
public static final String FETCH_ID_SCROLL_ACTION_NAME = "indices:data/read/search[phase/fetch/id/scroll]";
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";
public static final String SCAN_ACTION_NAME = "indices:data/read/search[phase/scan]";
public static final String SCAN_SCROLL_ACTION_NAME = "indices:data/read/search[phase/scan/scroll]";
Expand Down Expand Up @@ -132,6 +130,7 @@ public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, Tr
transportService.registerHandler(QUERY_FETCH_ACTION_NAME, new SearchQueryFetchTransportHandler());
transportService.registerHandler(QUERY_QUERY_FETCH_ACTION_NAME, new SearchQueryQueryFetchTransportHandler());
transportService.registerHandler(QUERY_FETCH_SCROLL_ACTION_NAME, new SearchQueryFetchScrollTransportHandler());
transportService.registerHandler(FETCH_ID_SCROLL_ACTION_NAME, new ScrollFetchByIdTransportHandler());
transportService.registerHandler(FETCH_ID_ACTION_NAME, new SearchFetchByIdTransportHandler());
transportService.registerHandler(SCAN_ACTION_NAME, new SearchScanTransportHandler());
transportService.registerHandler(SCAN_SCROLL_ACTION_NAME, new SearchScanScrollTransportHandler());
Expand Down Expand Up @@ -428,7 +427,24 @@ public String executor() {
}
}

public void sendExecuteFetch(DiscoveryNode node, final FetchSearchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
public void sendExecuteFetch(DiscoveryNode node, final ShardFetchSearchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener);
}

public void sendExecuteFetchScroll(DiscoveryNode node, final ShardFetchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
String action;
if (node.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
//use the separate action for scroll when possible
action = FETCH_ID_SCROLL_ACTION_NAME;
} else {
//fallback to the previous action name if the new one is not supported by the node we are talking to.
//Do use the same request since it has the same binary format as the previous FetchSearchRequest (without the OriginalIndices addition).
action = FETCH_ID_ACTION_NAME;
}
sendExecuteFetch(node, action, request, listener);
}

private void sendExecuteFetch(DiscoveryNode node, String action, final ShardFetchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
execute(new Callable<FetchSearchResult>() {
@Override
Expand All @@ -437,7 +453,7 @@ public FetchSearchResult call() throws Exception {
}
}, listener);
} else {
transportService.sendRequest(node, FETCH_ID_ACTION_NAME, request, new BaseTransportResponseHandler<FetchSearchResult>() {
transportService.sendRequest(node, action, request, new BaseTransportResponseHandler<FetchSearchResult>() {

@Override
public FetchSearchResult newInstance() {
Expand Down Expand Up @@ -843,15 +859,12 @@ public String executor() {
}
}

private class SearchFetchByIdTransportHandler extends BaseTransportRequestHandler<FetchSearchRequest> {
private abstract class FetchByIdTransportHandler<Request extends ShardFetchRequest> extends BaseTransportRequestHandler<Request> {

@Override
public FetchSearchRequest newInstance() {
return new FetchSearchRequest();
}
public abstract Request newInstance();

@Override
public void messageReceived(FetchSearchRequest request, TransportChannel channel) throws Exception {
public void messageReceived(Request request, TransportChannel channel) throws Exception {
FetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
}
Expand All @@ -862,6 +875,20 @@ public String executor() {
}
}

private class ScrollFetchByIdTransportHandler extends FetchByIdTransportHandler<ShardFetchRequest> {
@Override
public ShardFetchRequest newInstance() {
return new ShardFetchRequest();
}
}

private class SearchFetchByIdTransportHandler extends FetchByIdTransportHandler<ShardFetchSearchRequest> {
@Override
public ShardFetchSearchRequest newInstance() {
return new ShardFetchSearchRequest();
}
}

private class SearchQueryFetchScrollTransportHandler extends BaseTransportRequestHandler<InternalScrollSearchRequest> {

@Override
Expand Down
Expand Up @@ -23,12 +23,8 @@
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.Version;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.type.ParsedScrollId;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
Expand All @@ -37,9 +33,10 @@
import java.io.IOException;

/**
*
* Shard level fetch base request. Holds all the info needed to execute a fetch.
* Used with search scroll as the original request doesn't hold indices.
*/
public class FetchSearchRequest extends TransportRequest implements IndicesRequest {
public class ShardFetchRequest extends TransportRequest {

private long id;

Expand All @@ -49,31 +46,23 @@ public class FetchSearchRequest extends TransportRequest implements IndicesReque

private ScoreDoc lastEmittedDoc;

private OriginalIndices originalIndices;

public FetchSearchRequest() {
}

public FetchSearchRequest(SearchRequest request, long id, IntArrayList list) {
this(request, id, list, null);
public ShardFetchRequest() {
}

public FetchSearchRequest(SearchRequest request, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
public ShardFetchRequest(SearchScrollRequest request, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
super(request);
this.id = id;
this.docIds = list.buffer;
this.size = list.size();
this.lastEmittedDoc = lastEmittedDoc;
this.originalIndices = new OriginalIndices(request);
}

public FetchSearchRequest(SearchScrollRequest request, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
super(request);
protected ShardFetchRequest(TransportRequest originalRequest, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
super(originalRequest);
this.id = id;
this.docIds = list.buffer;
this.size = list.size();
this.lastEmittedDoc = lastEmittedDoc;
this.originalIndices = OriginalIndices.EMPTY;
}

public long id() {
Expand All @@ -92,16 +81,6 @@ public ScoreDoc lastEmittedDoc() {
return lastEmittedDoc;
}

@Override
public String[] indices() {
return originalIndices.indices();
}

@Override
public IndicesOptions indicesOptions() {
return originalIndices.indicesOptions();
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -121,7 +100,6 @@ public void readFrom(StreamInput in) throws IOException {
throw new IOException("Unknown flag: " + flag);
}
}
originalIndices = OriginalIndices.readOptionalOriginalIndices(in);
}

@Override
Expand All @@ -143,6 +121,5 @@ public void writeTo(StreamOutput out) throws IOException {
Lucene.writeScoreDoc(out, lastEmittedDoc);
}
}
OriginalIndices.writeOptionalOriginalIndices(originalIndices, out);
}
}

0 comments on commit d76a455

Please sign in to comment.