Skip to content

Commit

Permalink
implemented random access paging for query then fetch
Browse files Browse the repository at this point in the history
without caching. every page request will hit the shards
  • Loading branch information
msbt committed Feb 5, 2015
1 parent a9106f7 commit 917afdf
Show file tree
Hide file tree
Showing 15 changed files with 1,403 additions and 678 deletions.
48 changes: 11 additions & 37 deletions sql/src/main/java/io/crate/action/sql/query/CrateSearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,11 @@
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.dfs.DfsPhase;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
import org.elasticsearch.search.fetch.source.FetchSourceContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.search.sort.SortParseElement;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -118,45 +117,20 @@ public CrateSearchService(Settings settings,
}


public ScrollQueryFetchSearchResult executeScrollPhase(QueryShardScrollRequest request) {
public ScrollQuerySearchResult executeScrollQueryPhase(QueryShardScrollRequest request) {
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
processScroll(request, context);
context.indexShard().searchService().onPreQueryPhase(context);
long time = System.nanoTime();
try {
queryPhase.execute(context);
} catch (Throwable e) {
context.indexShard().searchService().onFailedQueryPhase(context);
throw Throwables.propagate(e);
}

// set lastEmittedDoc
int size = context.queryResult().topDocs().scoreDocs.length;
if (size > 0) {
context.lastEmittedDoc(context.queryResult().topDocs().scoreDocs[size - 1]);
}

long time2 = System.nanoTime();
context.indexShard().searchService().onQueryPhase(context, time2 - time);
context.indexShard().searchService().onPreFetchPhase(context);
try {
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
if (context.scroll() == null) {
freeContext(request.id());
} else {
contextProcessedSuccessfully(context);
}
} catch (Throwable e) {
context.indexShard().searchService().onFailedFetchPhase(context);
throw Throwables.propagate(e);
}
context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time2);
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
contextProcessing(context);
processScroll(request, context);
queryPhase.execute(context);
contextProcessedSuccessfully(context);
context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);
return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget());
} catch (Throwable e) {
logger.trace("Fetch phase failed", e);
context.indexShard().searchService().onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
freeContext(context.id());
throw Throwables.propagate(e);
} finally {
Expand All @@ -167,7 +141,7 @@ public ScrollQueryFetchSearchResult executeScrollPhase(QueryShardScrollRequest r
private void processScroll(QueryShardScrollRequest request, SearchContext context) {
// process scroll
context.size(request.limit());
context.from(context.from() + context.size());
context.from(request.from());

context.scroll(request.scroll());
// update the context keep alive based on the new scroll value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ public class QueryShardScrollRequest extends ActionRequest<QueryShardScrollReque

private long id;
private Scroll scroll;
private int from;
private int limit;

public QueryShardScrollRequest(long id, Scroll scroll, int limit) {

public QueryShardScrollRequest(long id, Scroll scroll, int from, int limit) {
this.id = id;
this.scroll = scroll;
this.from = from;
this.limit = limit;
}

Expand All @@ -57,6 +60,10 @@ public int limit() {
return limit;
}

public int from() {
return from;
}

@Override
public ActionRequestValidationException validate() {
return null;
Expand All @@ -67,6 +74,7 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
scroll = Scroll.readScroll(in);
from = in.readInt();
limit = in.readInt();
}

Expand All @@ -75,6 +83,7 @@ public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
scroll.writeTo(out);
out.writeInt(from);
out.writeInt(limit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
Expand Down Expand Up @@ -124,27 +124,27 @@ public String executor() {
}
}

public void executeScroll(String node, final QueryShardScrollRequest request, final ActionListener<ScrollQueryFetchSearchResult> listener) {
public void executeScrollQuery(String node, final QueryShardScrollRequest request, final ActionListener<ScrollQuerySearchResult> listener) {
Runnable localRunnable = new Runnable() {
@Override
public void run() {
executeScrollOnShard(request, listener);
}
};
TransportResponseHandler<?> responseHandler = new DefaultTransportResponseHandler<ScrollQueryFetchSearchResult>(listener, executorName) {
TransportResponseHandler<?> responseHandler = new DefaultTransportResponseHandler<ScrollQuerySearchResult>(listener, executorName) {
@Override
public ScrollQueryFetchSearchResult newInstance() {
return new ScrollQueryFetchSearchResult();
public ScrollQuerySearchResult newInstance() {
return new ScrollQuerySearchResult();
}
};
executeLocalOrViaTransport(node, localRunnable, request, queryScrollTransportAction, responseHandler);


}

public void executeScrollOnShard(QueryShardScrollRequest request, ActionListener<ScrollQueryFetchSearchResult> listener) {
public void executeScrollOnShard(QueryShardScrollRequest request, ActionListener<ScrollQuerySearchResult> listener) {
try {
ScrollQueryFetchSearchResult result = searchService.executeScrollPhase(request);
ScrollQuerySearchResult result = searchService.executeScrollQueryPhase(request);
listener.onResponse(result);
} catch (Throwable e) {
listener.onFailure(e);
Expand All @@ -160,7 +160,7 @@ public QueryShardScrollRequest newInstance() {

@Override
public void messageReceived(QueryShardScrollRequest request, TransportChannel channel) throws Exception {
ActionListener<ScrollQueryFetchSearchResult> listener = ResponseForwarder.forwardTo(channel);
ActionListener<ScrollQuerySearchResult> listener = ResponseForwarder.forwardTo(channel);
executeScrollOnShard(request, listener);
}

Expand Down
2 changes: 2 additions & 0 deletions sql/src/main/java/io/crate/executor/PageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package io.crate.executor;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand All @@ -33,6 +34,7 @@ public class PageInfo {
private final int position;

public PageInfo(int position, int size) {
Preconditions.checkArgument(position >= 0 && size >= 0, "invalid page paramaters");
this.position = position;
this.size = size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.crate.operation.collect.HandlerSideDataCollectOperation;
import io.crate.operation.collect.StatsTables;
import io.crate.operation.projectors.ProjectionToProjectorVisitor;
import io.crate.operation.qtf.QueryThenFetchOperation;
import io.crate.planner.*;
import io.crate.planner.node.PlanNode;
import io.crate.planner.node.PlanNodeVisitor;
Expand Down Expand Up @@ -86,6 +87,8 @@ public class TransportExecutor implements Executor, TaskExecutor {

private final BigArrays bigArrays;

private final QueryThenFetchOperation queryThenFetchOperation;

@Inject
public TransportExecutor(Settings settings,
TransportActionProvider transportActionProvider,
Expand All @@ -99,7 +102,8 @@ public TransportExecutor(Settings settings,
ClusterService clusterService,
CrateCircuitBreakerService breakerService,
CrateResultSorter crateResultSorter,
BigArrays bigArrays) {
BigArrays bigArrays,
QueryThenFetchOperation queryThenFetchOperation) {
this.settings = settings;
this.transportActionProvider = transportActionProvider;
this.handlerSideDataCollectOperation = handlerSideDataCollectOperation;
Expand All @@ -111,6 +115,7 @@ public TransportExecutor(Settings settings,
this.clusterService = clusterService;
this.crateResultSorter = crateResultSorter;
this.bigArrays = bigArrays;
this.queryThenFetchOperation = queryThenFetchOperation;
this.nodeVisitor = new NodeVisitor();
this.planVisitor = new TaskCollectingVisitor();
this.circuitBreaker = breakerService.getBreaker(CrateCircuitBreakerService.QUERY_BREAKER);
Expand Down Expand Up @@ -273,15 +278,9 @@ public ImmutableList<Task> visitMergeNode(MergeNode node, UUID jobId) {
public ImmutableList<Task> visitQueryThenFetchNode(QueryThenFetchNode node, UUID jobId) {
return singleTask(new QueryThenFetchTask(
jobId,
queryThenFetchOperation,
functions,
node,
clusterService,
transportActionProvider.transportQueryShardAction(),
transportActionProvider.searchServiceTransportAction(),
searchPhaseControllerProvider.get(),
threadPool,
bigArrays,
crateResultSorter));
node));
}

@Override
Expand Down

0 comments on commit 917afdf

Please sign in to comment.