Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split internal fetch request used within scroll and search #7870

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.search.ReduceSearchPhaseException;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
Expand All @@ -33,7 +32,6 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
Expand Down
Expand Up @@ -21,8 +21,6 @@

import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.*;
import org.elasticsearch.cluster.ClusterService;
Expand All @@ -33,11 +31,10 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.action.SearchServiceListener;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchRequest;
import org.elasticsearch.search.fetch.FetchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
Expand Down Expand Up @@ -220,9 +217,9 @@ private void executeFetchPhase() throws Exception {
IntArrayList docIds = entry.value;
final QuerySearchResult querySearchResult = queryResults.get(entry.index);
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
FetchSearchRequest fetchSearchRequest = new FetchSearchRequest(request, querySearchResult.id(), docIds, lastEmittedDoc);
FetchRequest fetchRequest = new FetchRequest(request, querySearchResult.id(), docIds, lastEmittedDoc);
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
searchService.sendExecuteFetch(node, fetchSearchRequest, new SearchServiceListener<FetchSearchResult>() {
searchService.sendExecuteFetchScroll(node, fetchRequest, new SearchServiceListener<FetchSearchResult>() {
@Override
public void onResult(FetchSearchResult result) {
result.shardTarget(querySearchResult.shardTarget());
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/elasticsearch/search/SearchService.java
Expand Up @@ -468,7 +468,7 @@ public ScrollQueryFetchSearchResult executeFetchPhase(InternalScrollSearchReques
}
}

public FetchSearchResult executeFetchPhase(FetchSearchRequest request) throws ElasticsearchException {
public FetchSearchResult executeFetchPhase(FetchRequest request) throws ElasticsearchException {
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
Expand Down
Expand Up @@ -35,10 +35,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.FetchSearchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
import org.elasticsearch.search.fetch.*;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
Expand Down Expand Up @@ -67,6 +64,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
public static final String QUERY_FETCH_ACTION_NAME = "indices:data/read/search[phase/query+fetch]";
public static final String QUERY_QUERY_FETCH_ACTION_NAME = "indices:data/read/search[phase/query/query+fetch]";
public static final String QUERY_FETCH_SCROLL_ACTION_NAME = "indices:data/read/search[phase/query+fetch/scroll]";
public static final String FETCH_ID_SCROLL_ACTION_NAME = "indices:data/read/search[phase/fetch/id/scroll]";
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";
public static final String SCAN_ACTION_NAME = "indices:data/read/search[phase/scan]";
public static final String SCAN_SCROLL_ACTION_NAME = "indices:data/read/search[phase/scan/scroll]";
Expand Down Expand Up @@ -132,6 +130,7 @@ public SearchServiceTransportAction(Settings settings, ThreadPool threadPool, Tr
transportService.registerHandler(QUERY_FETCH_ACTION_NAME, new SearchQueryFetchTransportHandler());
transportService.registerHandler(QUERY_QUERY_FETCH_ACTION_NAME, new SearchQueryQueryFetchTransportHandler());
transportService.registerHandler(QUERY_FETCH_SCROLL_ACTION_NAME, new SearchQueryFetchScrollTransportHandler());
transportService.registerHandler(FETCH_ID_SCROLL_ACTION_NAME, new ScrollFetchByIdTransportHandler());
transportService.registerHandler(FETCH_ID_ACTION_NAME, new SearchFetchByIdTransportHandler());
transportService.registerHandler(SCAN_ACTION_NAME, new SearchScanTransportHandler());
transportService.registerHandler(SCAN_SCROLL_ACTION_NAME, new SearchScanScrollTransportHandler());
Expand Down Expand Up @@ -429,6 +428,23 @@ public String executor() {
}

public void sendExecuteFetch(DiscoveryNode node, final FetchSearchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
sendExecuteFetch(node, FETCH_ID_ACTION_NAME, request, listener);
}

public void sendExecuteFetchScroll(DiscoveryNode node, final FetchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
String action;
if (node.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
//use the separate action for scroll when possible
action = FETCH_ID_SCROLL_ACTION_NAME;
} else {
//fallback to the previous action name if the new one is not supported by the node we are talking to.
//Do use the same request since it has the same binary format as the previous FetchSearchRequest (without the OriginalIndices addition).
action = FETCH_ID_ACTION_NAME;
}
sendExecuteFetch(node, action, request, listener);
}

private void sendExecuteFetch(DiscoveryNode node, String action, final FetchRequest request, final SearchServiceListener<FetchSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
execute(new Callable<FetchSearchResult>() {
@Override
Expand All @@ -437,7 +453,7 @@ public FetchSearchResult call() throws Exception {
}
}, listener);
} else {
transportService.sendRequest(node, FETCH_ID_ACTION_NAME, request, new BaseTransportResponseHandler<FetchSearchResult>() {
transportService.sendRequest(node, action, request, new BaseTransportResponseHandler<FetchSearchResult>() {

@Override
public FetchSearchResult newInstance() {
Expand Down Expand Up @@ -843,15 +859,12 @@ public String executor() {
}
}

private class SearchFetchByIdTransportHandler extends BaseTransportRequestHandler<FetchSearchRequest> {
private abstract class FetchByIdTransportHandler<Request extends FetchRequest> extends BaseTransportRequestHandler<Request> {

@Override
public FetchSearchRequest newInstance() {
return new FetchSearchRequest();
}
public abstract Request newInstance();

@Override
public void messageReceived(FetchSearchRequest request, TransportChannel channel) throws Exception {
public void messageReceived(Request request, TransportChannel channel) throws Exception {
FetchSearchResult result = searchService.executeFetchPhase(request);
channel.sendResponse(result);
}
Expand All @@ -862,6 +875,20 @@ public String executor() {
}
}

private class ScrollFetchByIdTransportHandler extends FetchByIdTransportHandler<FetchRequest> {
@Override
public FetchRequest newInstance() {
return new FetchRequest();
}
}

private class SearchFetchByIdTransportHandler extends FetchByIdTransportHandler<FetchSearchRequest> {
@Override
public FetchSearchRequest newInstance() {
return new FetchSearchRequest();
}
}

private class SearchQueryFetchScrollTransportHandler extends BaseTransportRequestHandler<InternalScrollSearchRequest> {

@Override
Expand Down
124 changes: 124 additions & 0 deletions src/main/java/org/elasticsearch/search/fetch/FetchRequest.java
@@ -0,0 +1,124 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.fetch;

import com.carrotsearch.hppc.IntArrayList;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.Version;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.type.ParsedScrollId;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;

/**
*
*/
public class FetchRequest extends TransportRequest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename it to ShardFetchRequest?


private long id;

private int[] docIds;

private int size;

private ScoreDoc lastEmittedDoc;

public FetchRequest() {
}

public FetchRequest(SearchScrollRequest request, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
super(request);
this.id = id;
this.docIds = list.buffer;
this.size = list.size();
this.lastEmittedDoc = lastEmittedDoc;
}

protected FetchRequest(TransportRequest originalRequest, long id, IntArrayList list, ScoreDoc lastEmittedDoc) {
super(originalRequest);
this.id = id;
this.docIds = list.buffer;
this.size = list.size();
this.lastEmittedDoc = lastEmittedDoc;
}

public long id() {
return id;
}

public int[] docIds() {
return docIds;
}

public int docIdsSize() {
return size;
}

public ScoreDoc lastEmittedDoc() {
return lastEmittedDoc;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
size = in.readVInt();
docIds = new int[size];
for (int i = 0; i < size; i++) {
docIds[i] = in.readVInt();
}
if (in.getVersion().onOrAfter(ParsedScrollId.SCROLL_SEARCH_AFTER_MINIMUM_VERSION)) {
byte flag = in.readByte();
if (flag == 1) {
lastEmittedDoc = Lucene.readFieldDoc(in);
} else if (flag == 2) {
lastEmittedDoc = Lucene.readScoreDoc(in);
} else if (flag != 0) {
throw new IOException("Unknown flag: " + flag);
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
out.writeVInt(size);
for (int i = 0; i < size; i++) {
out.writeVInt(docIds[i]);
}
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
if (lastEmittedDoc == null) {
out.writeByte((byte) 0);
} else if (lastEmittedDoc instanceof FieldDoc) {
out.writeByte((byte) 1);
Lucene.writeFieldDoc(out, (FieldDoc) lastEmittedDoc);
} else {
out.writeByte((byte) 2);
Lucene.writeScoreDoc(out, lastEmittedDoc);
}
}
}
}