From 917afdfe44b9bd5cb36ba69c9e5762c1be5c3251 Mon Sep 17 00:00:00 2001 From: Matthias Wahl Date: Mon, 2 Feb 2015 16:55:07 +0100 Subject: [PATCH] implemented random access paging for query then fetch without caching. every page request will hit the shards --- .../action/sql/query/CrateSearchService.java | 48 +- .../sql/query/QueryShardScrollRequest.java | 11 +- .../sql/query/TransportQueryShardAction.java | 16 +- .../main/java/io/crate/executor/PageInfo.java | 2 + .../executor/transport/TransportExecutor.java | 17 +- .../QueryThenFetchPageableTaskResult.java | 282 ++++++++ .../elasticsearch/QueryThenFetchTask.java | 647 ++---------------- .../join/CollectingPageableTaskIterable.java | 3 +- .../join/SinglePagePageableTaskIterable.java | 1 + .../qtf/QueryThenFetchOperation.java | 637 +++++++++++++++++ .../planner/node/dql/QueryThenFetchNode.java | 16 + .../where/WhereClauseAnalyzerTest.java | 3 +- ...orPagingTest.java => PagingTasksTest.java} | 355 +++++++++- .../transport/TransportExecutorTest.java | 2 + .../elasticsearch/QueryThenFetchTaskTest.java | 41 +- 15 files changed, 1403 insertions(+), 678 deletions(-) create mode 100644 sql/src/main/java/io/crate/executor/transport/task/elasticsearch/QueryThenFetchPageableTaskResult.java create mode 100644 sql/src/main/java/io/crate/operation/qtf/QueryThenFetchOperation.java rename sql/src/test/java/io/crate/executor/transport/{TransportExecutorPagingTest.java => PagingTasksTest.java} (66%) diff --git a/sql/src/main/java/io/crate/action/sql/query/CrateSearchService.java b/sql/src/main/java/io/crate/action/sql/query/CrateSearchService.java index 0cae41eee27c..f6dc8126e66d 100644 --- a/sql/src/main/java/io/crate/action/sql/query/CrateSearchService.java +++ b/sql/src/main/java/io/crate/action/sql/query/CrateSearchService.java @@ -67,12 +67,11 @@ import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.dfs.DfsPhase; import org.elasticsearch.search.fetch.FetchPhase; -import org.elasticsearch.search.fetch.QueryFetchSearchResult; -import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; import org.elasticsearch.search.fetch.source.FetchSourceContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.query.QueryPhase; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.query.ScrollQuerySearchResult; import org.elasticsearch.search.sort.SortParseElement; import org.elasticsearch.threadpool.ThreadPool; @@ -118,45 +117,20 @@ public CrateSearchService(Settings settings, } - public ScrollQueryFetchSearchResult executeScrollPhase(QueryShardScrollRequest request) { + public ScrollQuerySearchResult executeScrollQueryPhase(QueryShardScrollRequest request) { final SearchContext context = findContext(request.id()); - contextProcessing(context); try { - processScroll(request, context); context.indexShard().searchService().onPreQueryPhase(context); long time = System.nanoTime(); - try { - queryPhase.execute(context); - } catch (Throwable e) { - context.indexShard().searchService().onFailedQueryPhase(context); - throw Throwables.propagate(e); - } - - // set lastEmittedDoc - int size = context.queryResult().topDocs().scoreDocs.length; - if (size > 0) { - context.lastEmittedDoc(context.queryResult().topDocs().scoreDocs[size - 1]); - } - - long time2 = System.nanoTime(); - context.indexShard().searchService().onQueryPhase(context, time2 - time); - context.indexShard().searchService().onPreFetchPhase(context); - try { - shortcutDocIdsToLoad(context); - fetchPhase.execute(context); - if (context.scroll() == null) { - freeContext(request.id()); - } else { - contextProcessedSuccessfully(context); - } - } catch (Throwable e) { - context.indexShard().searchService().onFailedFetchPhase(context); - throw Throwables.propagate(e); - } - context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time2); - return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget()); + contextProcessing(context); + processScroll(request, context); + queryPhase.execute(context); + contextProcessedSuccessfully(context); + context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time); + return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); } catch (Throwable e) { - logger.trace("Fetch phase failed", e); + context.indexShard().searchService().onFailedQueryPhase(context); + logger.trace("Query phase failed", e); freeContext(context.id()); throw Throwables.propagate(e); } finally { @@ -167,7 +141,7 @@ public ScrollQueryFetchSearchResult executeScrollPhase(QueryShardScrollRequest r private void processScroll(QueryShardScrollRequest request, SearchContext context) { // process scroll context.size(request.limit()); - context.from(context.from() + context.size()); + context.from(request.from()); context.scroll(request.scroll()); // update the context keep alive based on the new scroll value diff --git a/sql/src/main/java/io/crate/action/sql/query/QueryShardScrollRequest.java b/sql/src/main/java/io/crate/action/sql/query/QueryShardScrollRequest.java index 46b339fa2e14..65a2d22c725e 100644 --- a/sql/src/main/java/io/crate/action/sql/query/QueryShardScrollRequest.java +++ b/sql/src/main/java/io/crate/action/sql/query/QueryShardScrollRequest.java @@ -33,11 +33,14 @@ public class QueryShardScrollRequest extends ActionRequest listener) { + public void executeScrollQuery(String node, final QueryShardScrollRequest request, final ActionListener listener) { Runnable localRunnable = new Runnable() { @Override public void run() { executeScrollOnShard(request, listener); } }; - TransportResponseHandler responseHandler = new DefaultTransportResponseHandler(listener, executorName) { + TransportResponseHandler responseHandler = new DefaultTransportResponseHandler(listener, executorName) { @Override - public ScrollQueryFetchSearchResult newInstance() { - return new ScrollQueryFetchSearchResult(); + public ScrollQuerySearchResult newInstance() { + return new ScrollQuerySearchResult(); } }; executeLocalOrViaTransport(node, localRunnable, request, queryScrollTransportAction, responseHandler); @@ -142,9 +142,9 @@ public ScrollQueryFetchSearchResult newInstance() { } - public void executeScrollOnShard(QueryShardScrollRequest request, ActionListener listener) { + public void executeScrollOnShard(QueryShardScrollRequest request, ActionListener listener) { try { - ScrollQueryFetchSearchResult result = searchService.executeScrollPhase(request); + ScrollQuerySearchResult result = searchService.executeScrollQueryPhase(request); listener.onResponse(result); } catch (Throwable e) { listener.onFailure(e); @@ -160,7 +160,7 @@ public QueryShardScrollRequest newInstance() { @Override public void messageReceived(QueryShardScrollRequest request, TransportChannel channel) throws Exception { - ActionListener listener = ResponseForwarder.forwardTo(channel); + ActionListener listener = ResponseForwarder.forwardTo(channel); executeScrollOnShard(request, listener); } diff --git a/sql/src/main/java/io/crate/executor/PageInfo.java b/sql/src/main/java/io/crate/executor/PageInfo.java index 73de4f071e8e..365ce1e9b0ad 100644 --- a/sql/src/main/java/io/crate/executor/PageInfo.java +++ b/sql/src/main/java/io/crate/executor/PageInfo.java @@ -22,6 +22,7 @@ package io.crate.executor; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -33,6 +34,7 @@ public class PageInfo { private final int position; public PageInfo(int position, int size) { + Preconditions.checkArgument(position >= 0 && size >= 0, "invalid page paramaters"); this.position = position; this.size = size; } diff --git a/sql/src/main/java/io/crate/executor/transport/TransportExecutor.java b/sql/src/main/java/io/crate/executor/transport/TransportExecutor.java index 4429a42aebc3..cbf10d70c4f3 100644 --- a/sql/src/main/java/io/crate/executor/transport/TransportExecutor.java +++ b/sql/src/main/java/io/crate/executor/transport/TransportExecutor.java @@ -40,6 +40,7 @@ import io.crate.operation.collect.HandlerSideDataCollectOperation; import io.crate.operation.collect.StatsTables; import io.crate.operation.projectors.ProjectionToProjectorVisitor; +import io.crate.operation.qtf.QueryThenFetchOperation; import io.crate.planner.*; import io.crate.planner.node.PlanNode; import io.crate.planner.node.PlanNodeVisitor; @@ -86,6 +87,8 @@ public class TransportExecutor implements Executor, TaskExecutor { private final BigArrays bigArrays; + private final QueryThenFetchOperation queryThenFetchOperation; + @Inject public TransportExecutor(Settings settings, TransportActionProvider transportActionProvider, @@ -99,7 +102,8 @@ public TransportExecutor(Settings settings, ClusterService clusterService, CrateCircuitBreakerService breakerService, CrateResultSorter crateResultSorter, - BigArrays bigArrays) { + BigArrays bigArrays, + QueryThenFetchOperation queryThenFetchOperation) { this.settings = settings; this.transportActionProvider = transportActionProvider; this.handlerSideDataCollectOperation = handlerSideDataCollectOperation; @@ -111,6 +115,7 @@ public TransportExecutor(Settings settings, this.clusterService = clusterService; this.crateResultSorter = crateResultSorter; this.bigArrays = bigArrays; + this.queryThenFetchOperation = queryThenFetchOperation; this.nodeVisitor = new NodeVisitor(); this.planVisitor = new TaskCollectingVisitor(); this.circuitBreaker = breakerService.getBreaker(CrateCircuitBreakerService.QUERY_BREAKER); @@ -273,15 +278,9 @@ public ImmutableList visitMergeNode(MergeNode node, UUID jobId) { public ImmutableList visitQueryThenFetchNode(QueryThenFetchNode node, UUID jobId) { return singleTask(new QueryThenFetchTask( jobId, + queryThenFetchOperation, functions, - node, - clusterService, - transportActionProvider.transportQueryShardAction(), - transportActionProvider.searchServiceTransportAction(), - searchPhaseControllerProvider.get(), - threadPool, - bigArrays, - crateResultSorter)); + node)); } @Override diff --git a/sql/src/main/java/io/crate/executor/transport/task/elasticsearch/QueryThenFetchPageableTaskResult.java b/sql/src/main/java/io/crate/executor/transport/task/elasticsearch/QueryThenFetchPageableTaskResult.java new file mode 100644 index 000000000000..245b1e9c76a3 --- /dev/null +++ b/sql/src/main/java/io/crate/executor/transport/task/elasticsearch/QueryThenFetchPageableTaskResult.java @@ -0,0 +1,282 @@ +/* + * Licensed to CRATE Technology GmbH ("Crate") under one or more contributor + * license agreements. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. Crate licenses + * this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * However, if you have executed another commercial license agreement + * with Crate these terms will supersede the license and you may use the + * software solely pursuant to the terms of the relevant commercial agreement. + */ + +package io.crate.executor.transport.task.elasticsearch; + +import com.google.common.base.Function; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.crate.core.bigarray.MultiObjectArrayBigArray; +import io.crate.executor.BigArrayPage; +import io.crate.executor.Page; +import io.crate.executor.PageInfo; +import io.crate.executor.PageableTaskResult; +import io.crate.operation.qtf.QueryThenFetchOperation; +import io.crate.planner.node.dql.QueryThenFetchNode; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.internal.InternalSearchResponse; + +import java.io.IOException; +import java.util.List; + +/** + * pageable taskresult used for paging through the results of a query then fetch + * query + * + * keeps a reference on the query context + * + */ +class QueryThenFetchPageableTaskResult implements PageableTaskResult { + + public static final int MAX_GAP_PAGESIZE = 1024; + + private final ESLogger logger = Loggers.getLogger(this.getClass()); + + private final ObjectArray pageSource; + private final Page page; + private final long startIndexAtPageSource; + private final PageInfo currentPageInfo; + private final List> extractors; + + private final QueryThenFetchOperation operation; + private QueryThenFetchOperation.QueryThenFetchContext ctx; + + public QueryThenFetchPageableTaskResult(QueryThenFetchOperation operation, + QueryThenFetchOperation.QueryThenFetchContext ctx, + List> extractors, + PageInfo pageInfo, + ObjectArray pageSource, + long startIndexAtPageSource) { + this.operation = operation; + this.ctx = ctx; + this.currentPageInfo = pageInfo; + this.pageSource = pageSource; + this.startIndexAtPageSource = startIndexAtPageSource; + this.page = new BigArrayPage(pageSource, startIndexAtPageSource, pageInfo.size()); + this.extractors = extractors; + } + + private void fetchFromSource(int from, int size, final FutureCallback> callback) { + Futures.addCallback( + Futures.transform(operation.executePageQuery(from, size, ctx), + new Function>() { + @javax.annotation.Nullable + @Override + public ObjectArray apply(@Nullable InternalSearchResponse input) { + return ctx.toPage(input.hits().hits(), extractors); + } + }), + callback + ); + } + + private ListenableFuture fetchWithNewQTF(final PageInfo pageInfo) { + final SettableFuture future = SettableFuture.create(); + QueryThenFetchNode oldNode = ctx.searchNode(); + Futures.addCallback( + operation.execute( + oldNode, + ctx.outputs(), + Optional.of(pageInfo) + ), + new FutureCallback() { + @Override + public void onSuccess(@Nullable final QueryThenFetchOperation.QueryThenFetchContext newCtx) { + Futures.addCallback( + newCtx.createSearchResponse(), + new FutureCallback() { + @Override + public void onSuccess(@Nullable InternalSearchResponse searchResponse) { + ObjectArray pageSource = newCtx.toPage(searchResponse.hits().hits(), extractors); + newCtx.cleanAfterFirstPage(); + future.set( + new QueryThenFetchPageableTaskResult( + operation, + newCtx, + extractors, + pageInfo, + pageSource, + 0L + ) + ); + closeSafe(); // close old searchcontexts and stuff + } + + @Override + public void onFailure(Throwable t) { + closeSafe(); + future.setException(t); + } + } + ); + } + + @Override + public void onFailure(Throwable t) { + closeSafe(); + future.setException(t); + } + } + ); + return future; + } + + /** + * + * @param pageInfo identifying the page to fetch + * @return a future for a new task result containing the requested page. + */ + @Override + public ListenableFuture fetch(final PageInfo pageInfo) { + final long pageSourceBeginning = currentPageInfo.position() - startIndexAtPageSource; + if (pageInfo.position() < pageSourceBeginning) { + logger.trace("paging backwards for page {}. issue new QTF query", pageInfo); + // backward paging - not optimized + return fetchWithNewQTF(pageInfo); + } + final long pageSourceEnd = currentPageInfo.position() + (pageSource.size()-startIndexAtPageSource); + final long gap = Math.max(0, pageInfo.position() - pageSourceEnd); + + final long restSize; + if (gap == 0) { + restSize = pageSourceEnd - pageInfo.position(); + } else { + restSize = 0; + } + + final SettableFuture future = SettableFuture.create(); + if (restSize >= pageInfo.size()) { + // don't need to fetch nuttin' + logger.trace("can satisfy page {} with current page source", pageInfo); + future.set( + new QueryThenFetchPageableTaskResult( + operation, + ctx, + extractors, + pageInfo, + pageSource, + startIndexAtPageSource + (pageInfo.position() - currentPageInfo.position()) + ) + ); + } else if (restSize <= 0) { + if (gap + pageInfo.size() > MAX_GAP_PAGESIZE) { + // if we have to fetch more than default pagesize, issue another query + logger.trace("issue a new QTF query for page {}. gap is too big.", pageInfo); + return fetchWithNewQTF(pageInfo); + } else { + logger.trace("fetch another page: {}, we only got a small gap.", pageInfo); + fetchFromSource( + (int)(pageInfo.position() - gap), + (int)(pageInfo.size() + gap), + new FutureCallback>() { + @Override + public void onSuccess(@Nullable ObjectArray result) { + future.set( + new QueryThenFetchPageableTaskResult( + operation, + ctx, + extractors, + pageInfo, + result, + Math.min(result.size(), gap) + ) + ); + } + + @Override + public void onFailure(Throwable t) { + closeSafe(); + future.setException(t); + } + } + ); + } + } else if (restSize > 0) { + logger.trace("we got {} docs left for page {}. need to fetch {}", restSize, pageInfo, pageInfo.size() - restSize); + // we got a rest, need to combine stuff + fetchFromSource(pageInfo.position(), (int)(pageInfo.size() - restSize), new FutureCallback>() { + @Override + public void onSuccess(@Nullable ObjectArray result) { + + MultiObjectArrayBigArray merged = new MultiObjectArrayBigArray<>( + 0, + pageSource.size() + result.size(), + pageSource, + result + ); + future.set( + new QueryThenFetchPageableTaskResult( + operation, + ctx, + extractors, + pageInfo, + merged, + startIndexAtPageSource + (pageInfo.position() - currentPageInfo.position()) + ) + ); + } + + @Override + public void onFailure(Throwable t) { + closeSafe(); + future.setException(t); + } + }); + } + return future; + } + + @Override + public Page page() { + return page; + } + + @Override + public Object[][] rows() { + throw new UnsupportedOperationException("QTFScrollTaskResult does not support rows()"); + } + + @javax.annotation.Nullable + @Override + public String errorMessage() { + return null; + } + + @Override + public void close() throws IOException { + ctx.close(); + } + + private void closeSafe() { + try { + close(); + } catch (IOException e) { + logger.error("error closing {}",e, getClass().getSimpleName()); + } + } +} diff --git a/sql/src/main/java/io/crate/executor/transport/task/elasticsearch/QueryThenFetchTask.java b/sql/src/main/java/io/crate/executor/transport/task/elasticsearch/QueryThenFetchTask.java index 3a9862b5c19c..9851a0604814 100644 --- a/sql/src/main/java/io/crate/executor/transport/task/elasticsearch/QueryThenFetchTask.java +++ b/sql/src/main/java/io/crate/executor/transport/task/elasticsearch/QueryThenFetchTask.java @@ -21,137 +21,55 @@ package io.crate.executor.transport.task.elasticsearch; -import com.carrotsearch.hppc.IntArrayList; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import io.crate.action.sql.query.CrateResultSorter; -import io.crate.action.sql.query.QueryShardRequest; -import io.crate.action.sql.query.QueryShardScrollRequest; -import io.crate.action.sql.query.TransportQueryShardAction; -import io.crate.core.bigarray.MultiObjectArrayBigArray; -import io.crate.exceptions.Exceptions; -import io.crate.exceptions.FailedShardsException; import io.crate.executor.*; -import io.crate.metadata.*; +import io.crate.metadata.ColumnIdent; +import io.crate.metadata.Functions; +import io.crate.metadata.ReferenceInfo; import io.crate.metadata.doc.DocSysColumns; +import io.crate.operation.qtf.QueryThenFetchOperation; import io.crate.planner.node.dql.QueryThenFetchNode; -import io.crate.planner.symbol.*; -import org.apache.lucene.search.ScoreDoc; +import io.crate.planner.symbol.Reference; +import io.crate.planner.symbol.Symbol; import org.apache.lucene.util.BytesRef; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.ShardSearchFailure; -import org.elasticsearch.action.support.TransportActions; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.block.ClusterBlockLevel; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.ObjectArray; -import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchShardTarget; -import org.elasticsearch.search.action.SearchServiceListener; -import org.elasticsearch.search.action.SearchServiceTransportAction; -import org.elasticsearch.search.controller.SearchPhaseController; -import org.elasticsearch.search.fetch.FetchSearchResult; -import org.elasticsearch.search.fetch.QueryFetchSearchResult; -import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult; -import org.elasticsearch.search.fetch.ShardFetchSearchRequest; import org.elasticsearch.search.internal.InternalSearchResponse; -import org.elasticsearch.search.query.QueryPhaseExecutionException; -import org.elasticsearch.search.query.QuerySearchResult; -import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; public class QueryThenFetchTask extends JobTask implements PageableTask { - private static final TimeValue DEFAULT_KEEP_ALIVE = TimeValue.timeValueMinutes(5L); - private static final SymbolToFieldExtractor SYMBOL_TO_FIELD_EXTRACTOR = + private static final SymbolToFieldExtractor SYMBOL_TO_FIELD_EXTRACTOR = new SymbolToFieldExtractor<>(new SearchHitFieldExtractorFactory()); private final ESLogger logger = Loggers.getLogger(this.getClass()); - private Optional keepAlive = Optional.absent(); - private int limit; - private int offset; - private final QueryThenFetchNode searchNode; - private final TransportQueryShardAction transportQueryShardAction; - private final SearchServiceTransportAction searchServiceTransportAction; - private final SearchPhaseController searchPhaseController; - private final ThreadPool threadPool; - private final BigArrays bigArrays; - private final CrateResultSorter crateResultSorter; + private final QueryThenFetchOperation operation; - private final SettableFuture result; - private final List> results; + private SettableFuture result; + private List> results; - private final Routing routing; - private final AtomicArray docIdsToLoad; - private final AtomicArray firstResults; - private final AtomicArray fetchResults; - private final DiscoveryNodes nodes; - private final int numColumns; - private final int numShards; - private final ClusterState state; private final List> extractors; - volatile ScoreDoc[] sortedShardList; - private volatile AtomicArray shardFailures; - private final Object shardFailuresMutex = new Object(); - private final Map searchContextIds; - private List> requests; - private List references; - - /** - * dummy request required to re-use the searchService transport - */ - private final static SearchRequest EMPTY_SEARCH_REQUEST = new SearchRequest(); + private List references; public QueryThenFetchTask(UUID jobId, + QueryThenFetchOperation operation, Functions functions, - QueryThenFetchNode searchNode, - ClusterService clusterService, - TransportQueryShardAction transportQueryShardAction, - SearchServiceTransportAction searchServiceTransportAction, - SearchPhaseController searchPhaseController, - ThreadPool threadPool, - BigArrays bigArrays, - CrateResultSorter crateResultSorter) { + QueryThenFetchNode searchNode) { super(jobId); + this.operation = operation; this.searchNode = searchNode; - this.transportQueryShardAction = transportQueryShardAction; - this.searchServiceTransportAction = searchServiceTransportAction; - this.searchPhaseController = searchPhaseController; - this.threadPool = threadPool; - this.bigArrays = bigArrays; - this.crateResultSorter = crateResultSorter; - - state = clusterService.state(); - nodes = state.nodes(); - - result = SettableFuture.create(); - results = Arrays.>asList(result); - - routing = searchNode.routing(); - this.limit = searchNode.limit(); - this.offset = searchNode.offset(); SearchHitExtractorContext context = new SearchHitExtractorContext(functions, searchNode.outputs().size(), searchNode.partitionBy()); extractors = new ArrayList<>(searchNode.outputs().size()); @@ -159,13 +77,9 @@ public QueryThenFetchTask(UUID jobId, extractors.add(SYMBOL_TO_FIELD_EXTRACTOR.convert(symbol, context)); } references = context.references(); - numShards = searchNode.routing().numShards(); - searchContextIds = new ConcurrentHashMap<>(numShards); - docIdsToLoad = new AtomicArray<>(numShards); - firstResults = new AtomicArray<>(numShards); - fetchResults = new AtomicArray<>(numShards); - numColumns = searchNode.outputs().size(); + result = SettableFuture.create(); + results = Arrays.>asList(result); } @Override @@ -178,438 +92,53 @@ public void start(PageInfo pageInfo) { doStart(Optional.of(pageInfo)); } - private void doStart(Optional pageInfo) { - // create initial requests - requests = prepareRequests(references, pageInfo); - - if (!routing.hasLocations() || requests.size() == 0) { - result.set( - pageInfo.isPresent() - ? PageableTaskResult.EMPTY_PAGABLE_RESULT - : TaskResult.EMPTY_RESULT - ); - } - - state.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); - AtomicInteger totalOps = new AtomicInteger(0); - - int requestIdx = -1; - for (Tuple requestTuple : requests) { - requestIdx++; - state.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, requestTuple.v2().index()); - transportQueryShardAction.executeQuery( - requestTuple.v1(), - requestTuple.v2(), - new QueryShardResponseListener(requestIdx, firstResults, totalOps, pageInfo) - ); - } - } - - private List> prepareRequests(List outputs, Optional pageInfo) { - List> requests = new ArrayList<>(); - Map>> locations = searchNode.routing().locations(); - if (locations == null) { - return requests; - } - - int queryLimit; - int queryOffset; - - if (pageInfo.isPresent()) { - // fetch all, including all offset stuff - queryLimit = this.offset + pageInfo.get().position() + pageInfo.get().size(); - queryOffset = 0; - } else { - queryLimit = this.limit; - queryOffset = this.offset; - } - - // only set keepAlive on pages Requests - Optional keepAliveValue = Optional.absent(); - if (pageInfo.isPresent()) { - keepAliveValue = keepAlive.or(Optional.of(DEFAULT_KEEP_ALIVE)); - } - - for (Map.Entry>> entry : locations.entrySet()) { - String node = entry.getKey(); - for (Map.Entry> indexEntry : entry.getValue().entrySet()) { - String index = indexEntry.getKey(); - Set shards = indexEntry.getValue(); - - for (Integer shard : shards) { - requests.add(new Tuple<>( - node, - new QueryShardRequest( - index, - shard, - outputs, - searchNode.orderBy(), - searchNode.reverseFlags(), - searchNode.nullsFirst(), - queryLimit, - queryOffset, // handle offset manually on handler for paged/scrolled calls - searchNode.whereClause(), - searchNode.partitionBy(), - keepAliveValue - ) - )); - } - } - } - return requests; - } - - private void moveToSecondPhase(Optional pageInfo) throws IOException { - ScoreDoc[] lastEmittedDocs = null; - if (pageInfo.isPresent()) { - // first sort to determine the lastEmittedDocs - // no offset and limit should be limit + offset - int sortLimit = this.offset + pageInfo.get().size() + pageInfo.get().position(); - sortedShardList = crateResultSorter.sortDocs(firstResults, 0, sortLimit); - lastEmittedDocs = searchPhaseController.getLastEmittedDocPerShard( - sortedShardList, - numShards); - - int fillOffset = pageInfo.get().position() + this.offset; - - // create a fetchrequest for all documents even those hit by the offset - // to set the lastemitteddoc on the shard - crateResultSorter.fillDocIdsToLoad(docIdsToLoad, sortedShardList, fillOffset); - } else { - sortedShardList = searchPhaseController.sortDocs(false, firstResults); - searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); - } - - //searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); - if (docIdsToLoad.asList().isEmpty()) { - finish(pageInfo); - return; - } - - final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); - - for (AtomicArray.Entry entry : docIdsToLoad.asList()) { - QuerySearchResult queryResult = firstResults.get(entry.index); - DiscoveryNode node = nodes.get(queryResult.shardTarget().nodeId()); - ShardFetchSearchRequest fetchRequest = createFetchRequest(queryResult, entry, lastEmittedDocs); - executeFetch(entry.index, queryResult.shardTarget(), counter, fetchRequest, node, pageInfo); - } - } - - private void executeFetch(final int shardIndex, - final SearchShardTarget shardTarget, - final AtomicInteger counter, - ShardFetchSearchRequest shardFetchSearchRequest, - DiscoveryNode node, - final Optional pageInfo) { - - searchServiceTransportAction.sendExecuteFetch( - node, - shardFetchSearchRequest, - new SearchServiceListener() { - @Override - public void onResult(FetchSearchResult result) { - result.shardTarget(shardTarget); - fetchResults.set(shardIndex, result); - if (counter.decrementAndGet() == 0) { - finish(pageInfo); - } - } - - @Override - public void onFailure(Throwable t) { - docIdsToLoad.set(shardIndex, null); - addShardFailure(shardIndex, shardTarget, t); - if (counter.decrementAndGet() == 0) { - finish(pageInfo); - } - } - } - ); - } - - private void addShardFailure(int shardIndex, SearchShardTarget shardTarget, Throwable t) { - if (TransportActions.isShardNotAvailableException(t)) { - return; - } - if (shardFailures == null) { - synchronized (shardFailuresMutex) { - if (shardFailures == null) { - shardFailures = new AtomicArray<>(requests.size()); - } - } - } - ShardSearchFailure failure = shardFailures.get(shardIndex); - if (failure == null) { - shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget)); - } else { - // the failure is already present, try and not override it with an exception that is less meaningless - // for example, getting illegal shard state - if (TransportActions.isReadOverrideException(t)) { - shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget)); - } - } - } - - private ObjectArray toPage(SearchHit[] hits, List> extractors) { - ObjectArray rows = bigArrays.newObjectArray(hits.length); - for (int r = 0; r < hits.length; r++) { - Object[] row = new Object[numColumns]; - for (int c = 0; c < numColumns; c++) { - row[c] = extractors.get(c).extract(hits[r]); - } - rows.set(r, row); - } - return rows; - } - - private Object[][] toRows(SearchHit[] hits, List> extractors) { - Object[][] rows = new Object[hits.length][numColumns]; - - for (int r = 0; r < hits.length; r++) { - rows[r] = new Object[numColumns]; - for (int c = 0; c < numColumns; c++) { - rows[r][c] = extractors.get(c).extract(hits[r]); - } - } - return rows; - } - - private void finish(final Optional pageInfo) { - try { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + private void doStart(final Optional pageInfo) { + Futures.addCallback( + operation.execute(searchNode, references, pageInfo), + new FutureCallback() { @Override - public void run() { - try { - if(shardFailures != null && shardFailures.length() > 0){ - FailedShardsException ex = new FailedShardsException(shardFailures.toArray( - new ShardSearchFailure[shardFailures.length()])); - result.setException(ex); - return; - } - InternalSearchResponse response = searchPhaseController.merge(sortedShardList, firstResults, fetchResults); - - - if (pageInfo.isPresent()) { - ObjectArray page = toPage(response.hits().hits(), extractors); - result.set(new QTFScrollTaskResult(page, 0, pageInfo.get())); - } else { - final Object[][] rows = toRows(response.hits().hits(), extractors); - result.set(new QueryResult(rows)); - } - } catch (Throwable t) { - result.setException(t); - } finally { - releaseIrrelevantSearchContexts(firstResults, docIdsToLoad, pageInfo); - } - } - }); - } catch (EsRejectedExecutionException e) { - try { - releaseIrrelevantSearchContexts(firstResults, docIdsToLoad, pageInfo); - } finally { - result.setException(e); - } - } - } - - class QTFScrollTaskResult implements PageableTaskResult { - - private final ObjectArray pageSource; - private final Page page; - private final long startIndexAtPageSource; - private final PageInfo currentPageInfo; - private final AtomicArray queryFetchResults; - - public QTFScrollTaskResult(ObjectArray pageSource, long startIndexAtPageSource, PageInfo pageInfo) { - this.pageSource = pageSource; - this.startIndexAtPageSource = startIndexAtPageSource; - this.currentPageInfo = pageInfo; - this.queryFetchResults = new AtomicArray<>(numShards); - this.page = new BigArrayPage(pageSource, startIndexAtPageSource, currentPageInfo.size()); - } - - private void fetchFromSource(long needToFetch, final FutureCallback> callback) { - final AtomicInteger numOps = new AtomicInteger(numShards); - - final Scroll scroll = new Scroll(keepAlive.or(DEFAULT_KEEP_ALIVE)); - - for (final Map.Entry entry : searchContextIds.entrySet()) { - DiscoveryNode node = nodes.get(entry.getKey().nodeId()); - - QueryShardScrollRequest request = new QueryShardScrollRequest(entry.getValue(), scroll, (int)needToFetch); - - transportQueryShardAction.executeScroll(node.id(), request, new ActionListener() { - @Override - public void onResponse(ScrollQueryFetchSearchResult scrollQueryFetchSearchResult) { - QueryFetchSearchResult qfsResult = scrollQueryFetchSearchResult.result(); - qfsResult.shardTarget(entry.getKey()); - - int opNum = numOps.decrementAndGet(); - queryFetchResults.set(opNum, qfsResult); - if (opNum == 0) { - try { - threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { - @Override - public void run() { - try { - ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults); - InternalSearchResponse response = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults); - final SearchHit[] hits = response.hits().hits(); - final ObjectArray page = toPage(hits, extractors); - callback.onSuccess(page); - } catch (Throwable e) { - onFailure(e); - } - } - }); - } catch (EsRejectedExecutionException e) { - logger.error("error merging searchResults of QTFScrollTaskResult", e); - onFailure(e); + public void onSuccess(@Nullable final QueryThenFetchOperation.QueryThenFetchContext context) { + Futures.addCallback(context.createSearchResponse(), new FutureCallback() { + @Override + public void onSuccess(@Nullable InternalSearchResponse searchResponse) { + if (pageInfo.isPresent()) { + ObjectArray pageSource = context.toPage(searchResponse.hits().hits(), extractors); + context.cleanAfterFirstPage(); + result.set(new QueryThenFetchPageableTaskResult(operation, context, extractors, pageInfo.get(), pageSource, 0L)); + } else { + Object[][] rows = context.toRows(searchResponse.hits().hits(), extractors); + try { + context.close(); + result.set(new QueryResult(rows)); + } catch (IOException e) { + onFailure(e); + } } } - } - @Override - public void onFailure(Throwable t) { - callback.onFailure(t); - try { - close(); - } catch (IOException e) { - logger.error("error closing QTFScrollTaskResult", e); - } - } - }); - } - } - - @Override - public ListenableFuture fetch(final PageInfo pageInfo) { - Preconditions.checkArgument( - pageInfo.position() == (this.currentPageInfo.size() + this.currentPageInfo.position()), - "QueryThenFetchTask can only page forward without gaps"); - - final long restSize = (pageSource.size()-startIndexAtPageSource) - currentPageInfo.size(); - final SettableFuture future = SettableFuture.create(); - if (restSize >= pageInfo.size()) { - // don't need to fetch nuttin' - future.set( - new QTFScrollTaskResult( - pageSource, - startIndexAtPageSource + currentPageInfo.size(), - pageInfo) - ); - } else if (restSize < 0) { - // if restSize is less than 0, we got less than requested - // and in that can safely assume that we're exhausted - future.set(PageableTaskResult.EMPTY_PAGABLE_RESULT); - try { - close(); - } catch (IOException e) { - logger.error("error closing QTFScrollTaskResult", e); - } - } else if (restSize == 0) { - fetchFromSource(pageInfo.size(), new FutureCallback>() { - @Override - public void onSuccess(@Nullable ObjectArray result) { - if (result.size() == 0) { - future.set(PageableTaskResult.EMPTY_PAGABLE_RESULT); + @Override + public void onFailure(Throwable t) { try { - close(); + context.close(); + logger.error("error creating a QueryThenFetch response", t); + result.setException(t); } catch (IOException e) { - logger.error("error closing QTFScrollTaskResult", e); + // ignore in this case + logger.error("error closing QueryThenFetch context", t); + result.setException(e); } - } else { - future.set(new QTFScrollTaskResult(result, 0, pageInfo)); - } - } - - @Override - public void onFailure(Throwable t) { - future.setException(t); - } - }); - } else if (restSize > 0) { - // we got a rest, need to combine stuff - fetchFromSource(pageInfo.size() - restSize, new FutureCallback>() { - @Override - public void onSuccess(@Nullable ObjectArray result) { - - MultiObjectArrayBigArray merged = new MultiObjectArrayBigArray<>( - pageSource.size() - restSize, - pageInfo.size(), - pageSource, - result - ); - future.set(new QTFScrollTaskResult(merged, 0, pageInfo)); - } - - @Override - public void onFailure(Throwable t) { - future.setException(t); - } - }); - } - return future; - } - - @Override - public Page page() { - return page; - } - - @Override - public Object[][] rows() { - throw new UnsupportedOperationException("QTFScrollTaskResult does not support rows()"); - } - - @javax.annotation.Nullable - @Override - public String errorMessage() { - return null; - } - - @Override - public void close() throws IOException { - releaseAllContexts(); - } - } - - private void releaseIrrelevantSearchContexts(AtomicArray firstResults, - AtomicArray docIdsToLoad, - Optional pageInfo) { - // don't release searchcontexts yet, if we use scroll - if (docIdsToLoad == null || pageInfo.isPresent()) { - return; - } - - for (AtomicArray.Entry entry : firstResults.asList()) { - if (docIdsToLoad.get(entry.index) == null) { - DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId()); - if (node != null) { - searchServiceTransportAction.sendFreeContext(node, entry.value.queryResult().id(), EMPTY_SEARCH_REQUEST); + } + }); } - } - } - } - private void releaseAllContexts() { - for (Map.Entry entry : searchContextIds.entrySet()) { - DiscoveryNode node = nodes.get(entry.getKey().nodeId()); - if (node != null) { - searchServiceTransportAction.sendFreeContext(node, entry.getValue(), EMPTY_SEARCH_REQUEST); + @Override + public void onFailure(Throwable t) { + logger.error("error executing a QueryThenFetch query", t); + result.setException(t); + } } - } - } - - protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, - AtomicArray.Entry entry, @Nullable ScoreDoc[] lastEmittedDocsPerShard) { - if (lastEmittedDocsPerShard != null) { - ScoreDoc lastEmittedDoc = lastEmittedDocsPerShard[entry.index]; - return new ShardFetchSearchRequest(EMPTY_SEARCH_REQUEST, queryResult.id(), entry.value, lastEmittedDoc); - } - return new ShardFetchSearchRequest(EMPTY_SEARCH_REQUEST, queryResult.id(), entry.value); + ); } @Override @@ -622,76 +151,6 @@ public void upstreamResult(List> result) { throw new UnsupportedOperationException("Can't have upstreamResults"); } - /** - * set the keep alive value for the search context on the shards - */ - public void setKeepAlive(TimeValue keepAlive) { - this.keepAlive = Optional.of(keepAlive); - } - - class QueryShardResponseListener implements ActionListener { - - private final int requestIdx; - private final AtomicArray firstResults; - private final AtomicInteger totalOps; - private final int expectedOps; - private final Optional pageInfo; - - public QueryShardResponseListener(int requestIdx, - AtomicArray firstResults, - AtomicInteger totalOps, Optional pageInfo) { - - this.requestIdx = requestIdx; - this.firstResults = firstResults; - this.totalOps = totalOps; - this.pageInfo = pageInfo; - this.expectedOps = firstResults.length(); - } - - @Override - public void onResponse(QuerySearchResult querySearchResult) { - Tuple requestTuple = requests.get(requestIdx); - QueryShardRequest request = requestTuple.v2(); - - - querySearchResult.shardTarget( - new SearchShardTarget(requestTuple.v1(), request.index(), request.shardId())); - searchContextIds.put(querySearchResult.shardTarget(), querySearchResult.id()); - firstResults.set(requestIdx, querySearchResult); - if (totalOps.incrementAndGet() == expectedOps) { - try { - moveToSecondPhase(pageInfo); - } catch (IOException e) { - raiseEarlyFailure(e); - } - } - } - - @Override - public void onFailure(Throwable e) { - raiseEarlyFailure(e); - } - } - - private void raiseEarlyFailure(Throwable t) { - for (AtomicArray.Entry entry : firstResults.asList()) { - try { - DiscoveryNode node = nodes.get(entry.value.shardTarget().nodeId()); - if (node != null) { - searchServiceTransportAction.sendFreeContext(node, entry.value.id(), EMPTY_SEARCH_REQUEST); - } - } catch (Throwable t1) { - logger.trace("failed to release context", t1); - } - } - t = Exceptions.unwrap(t); - if (t instanceof QueryPhaseExecutionException) { - result.setException(t.getCause()); - return; - } - result.setException(t); - } - static class SearchHitExtractorContext extends SymbolToFieldExtractor.Context { private final List partitionBy; diff --git a/sql/src/main/java/io/crate/operation/join/CollectingPageableTaskIterable.java b/sql/src/main/java/io/crate/operation/join/CollectingPageableTaskIterable.java index 22157a637470..3378c4035e40 100644 --- a/sql/src/main/java/io/crate/operation/join/CollectingPageableTaskIterable.java +++ b/sql/src/main/java/io/crate/operation/join/CollectingPageableTaskIterable.java @@ -45,7 +45,7 @@ */ public class CollectingPageableTaskIterable extends RelationIterable { - private PageableTaskResult currentTaskResult; + private volatile PageableTaskResult currentTaskResult; private List pages; public CollectingPageableTaskIterable(PageableTaskResult taskResult, PageInfo pageInfo) { @@ -83,6 +83,7 @@ public void onSuccess(@Nullable PageableTaskResult result) { if (result == null) { future.setException(new IllegalArgumentException("PageableTaskResult is null")); } else { + currentTaskResult = result; pages.add(result.page()); future.set(null); } diff --git a/sql/src/main/java/io/crate/operation/join/SinglePagePageableTaskIterable.java b/sql/src/main/java/io/crate/operation/join/SinglePagePageableTaskIterable.java index 94c8fcac34c3..69915b434412 100644 --- a/sql/src/main/java/io/crate/operation/join/SinglePagePageableTaskIterable.java +++ b/sql/src/main/java/io/crate/operation/join/SinglePagePageableTaskIterable.java @@ -56,6 +56,7 @@ public ListenableFuture fetchPage(PageInfo pageInfo) throws NoSuchElementE public void onSuccess(@Nullable PageableTaskResult result) { if (result == null) { future.setException(new IllegalArgumentException("PageableTaskResult is null")); + return; } currentTaskResult = result; future.set(null); diff --git a/sql/src/main/java/io/crate/operation/qtf/QueryThenFetchOperation.java b/sql/src/main/java/io/crate/operation/qtf/QueryThenFetchOperation.java new file mode 100644 index 000000000000..5dee6ecc3c35 --- /dev/null +++ b/sql/src/main/java/io/crate/operation/qtf/QueryThenFetchOperation.java @@ -0,0 +1,637 @@ +/* + * Licensed to CRATE Technology GmbH ("Crate") under one or more contributor + * license agreements. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. Crate licenses + * this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may + * obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * However, if you have executed another commercial license agreement + * with Crate these terms will supersede the license and you may use the + * software solely pursuant to the terms of the relevant commercial agreement. + */ + +package io.crate.operation.qtf; + +import com.carrotsearch.hppc.IntArrayList; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.crate.action.sql.query.CrateResultSorter; +import io.crate.action.sql.query.QueryShardRequest; +import io.crate.action.sql.query.QueryShardScrollRequest; +import io.crate.action.sql.query.TransportQueryShardAction; +import io.crate.exceptions.Exceptions; +import io.crate.exceptions.FailedShardsException; +import io.crate.executor.PageInfo; +import io.crate.executor.transport.task.elasticsearch.FieldExtractor; +import io.crate.planner.node.dql.QueryThenFetchNode; +import io.crate.planner.symbol.Reference; +import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.support.TransportActions; +import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ObjectArray; +import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.search.Scroll; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchShardTarget; +import org.elasticsearch.search.action.SearchServiceListener; +import org.elasticsearch.search.action.SearchServiceTransportAction; +import org.elasticsearch.search.controller.SearchPhaseController; +import org.elasticsearch.search.fetch.*; +import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.search.query.QueryPhaseExecutionException; +import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.search.query.ScrollQuerySearchResult; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.Closeable; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class QueryThenFetchOperation { + + private static final TimeValue DEFAULT_KEEP_ALIVE = TimeValue.timeValueMinutes(5L); + + private final ESLogger logger = Loggers.getLogger(this.getClass()); + + private final TransportQueryShardAction transportQueryShardAction; + private final SearchServiceTransportAction searchServiceTransportAction; + private final SearchPhaseController searchPhaseController; + private final ThreadPool threadPool; + private final BigArrays bigArrays; + private final CrateResultSorter crateResultSorter; + private final ClusterService clusterService; + + /** + * dummy request required to re-use the searchService transport + */ + private final static SearchRequest EMPTY_SEARCH_REQUEST = new SearchRequest(); + private final static SearchScrollRequest EMPTY_SCROLL_REQUEST = new SearchScrollRequest(); + + @Inject + public QueryThenFetchOperation(ClusterService clusterService, + TransportQueryShardAction transportQueryShardAction, + SearchServiceTransportAction searchServiceTransportAction, + SearchPhaseController searchPhaseController, + ThreadPool threadPool, + BigArrays bigArrays, + CrateResultSorter crateResultSorter) { + this.transportQueryShardAction = transportQueryShardAction; + this.searchServiceTransportAction = searchServiceTransportAction; + this.searchPhaseController = searchPhaseController; + this.threadPool = threadPool; + this.bigArrays = bigArrays; + this.crateResultSorter = crateResultSorter; + this.clusterService = clusterService; + } + + public ListenableFuture execute(QueryThenFetchNode searchNode, + List outputs, + Optional pageInfo) { + SettableFuture future = SettableFuture.create(); + // do stuff + QueryThenFetchContext ctx = new QueryThenFetchContext(bigArrays, searchNode, outputs, pageInfo); + prepareRequests(ctx); + + if (!searchNode.routing().hasLocations() || ctx.requests.size() == 0) { + future.set(ctx); + } + ClusterState state = clusterService.state(); + state.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); + AtomicInteger totalOps = new AtomicInteger(0); + + int requestIdx = -1; + for (Tuple requestTuple : ctx.requests) { + requestIdx++; + state.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, requestTuple.v2().index()); + transportQueryShardAction.executeQuery( + requestTuple.v1(), + requestTuple.v2(), + new QueryShardResponseListener(requestIdx, totalOps, ctx, future) + ); + } + + return future; + } + + private void prepareRequests(QueryThenFetchContext ctx) { + ctx.requests = new ArrayList<>(); + Map>> locations = ctx.searchNode.routing().locations(); + if (locations == null) { + return; + } + + int queryLimit; + int queryOffset; + // only set keepAlive on paged Requests + Optional keepAliveValue = Optional.absent(); + + if (ctx.pageInfo.isPresent()) { + // fetch all, including all offset stuff + queryLimit = ctx.searchNode.offset() + Math.min( + ctx.searchNode.limit(), + ctx.pageInfo.get().position() + ctx.pageInfo.get().size()); + queryOffset = 0; + keepAliveValue = Optional.of(DEFAULT_KEEP_ALIVE); + } else { + queryLimit = ctx.searchNode.limit(); + queryOffset = ctx.searchNode.offset(); + } + + + for (Map.Entry>> entry : locations.entrySet()) { + String node = entry.getKey(); + for (Map.Entry> indexEntry : entry.getValue().entrySet()) { + String index = indexEntry.getKey(); + Set shards = indexEntry.getValue(); + + for (Integer shard : shards) { + ctx.requests.add(new Tuple<>( + node, + new QueryShardRequest( + index, + shard, + ctx.outputs, + ctx.searchNode.orderBy(), + ctx.searchNode.reverseFlags(), + ctx.searchNode.nullsFirst(), + queryLimit, + queryOffset, // handle offset manually on handler for paged/scrolled calls + ctx.searchNode.whereClause(), + ctx.searchNode.partitionBy(), + keepAliveValue + ) + )); + } + } + } + } + + public ListenableFuture executePageQuery(final int from, final int size, final QueryThenFetchContext ctx) { + + final SettableFuture future = SettableFuture.create(); + final QueryThenFetchPageContext pageContext = new QueryThenFetchPageContext(ctx); + final Scroll scroll = new Scroll(DEFAULT_KEEP_ALIVE); + + int requestId = 0; + for (final AtomicArray.Entry entry : ctx.queryResults.asList()) { + + DiscoveryNode node = ctx.nodes.get(entry.value.shardTarget().nodeId()); + final int currentId = requestId; + QueryShardScrollRequest request = new QueryShardScrollRequest(entry.value.id(), scroll, from, size); + transportQueryShardAction.executeScrollQuery(node.id(), request, new ActionListener() { + @Override + public void onResponse(ScrollQuerySearchResult scrollQuerySearchResult) { + final QuerySearchResult queryResult = scrollQuerySearchResult.queryResult(); + pageContext.queryResults.set(currentId, queryResult); + + if (pageContext.numOps.decrementAndGet() == 0) { + try { + executePageFetch(pageContext, future); + } catch (Exception e) { + logger.error("error fetching page results for page from={} size={}", e, from ,size); + future.setException(e); + } + } else { + logger.trace("{} queries pending", pageContext.numOps.get()); + } + } + + @Override + public void onFailure(Throwable t) { + logger.error("error querying page results for page from={} size={}", t, from ,size); + future.setException(t); + } + }); + requestId++; + } + return future; + } + + public void executePageFetch(final QueryThenFetchPageContext pageContext, + final SettableFuture future) throws Exception { + + final ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, pageContext.queryResults); + AtomicArray docIdsToLoad = new AtomicArray<>(pageContext.queryResults.length()); + searchPhaseController.fillDocIdsToLoad(docIdsToLoad, sortedShardList); + + if (docIdsToLoad.asList().isEmpty()) { + future.set(InternalSearchResponse.empty()); + return; + } + + final ScoreDoc[] lastEmittedDocPerShard = searchPhaseController.getLastEmittedDocPerShard(sortedShardList, pageContext.queryResults.length()); + + final AtomicInteger counter = new AtomicInteger(docIdsToLoad.asList().size()); + + for (final AtomicArray.Entry entry : docIdsToLoad.asList()) { + IntArrayList docIds = entry.value; + final QuerySearchResult querySearchResult = pageContext.queryResults.get(entry.index); + ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index]; + ShardFetchRequest shardFetchRequest = new ShardFetchRequest(EMPTY_SCROLL_REQUEST, querySearchResult.id(), docIds, lastEmittedDoc); + DiscoveryNode node = pageContext.queryThenFetchContext.nodes.get(querySearchResult.shardTarget().nodeId()); + searchServiceTransportAction.sendExecuteFetchScroll(node, shardFetchRequest, new SearchServiceListener() { + @Override + public void onResult(FetchSearchResult result) { + result.shardTarget(querySearchResult.shardTarget()); + pageContext.fetchResults.set(entry.index, result); + if (counter.decrementAndGet() == 0) { + final InternalSearchResponse response = searchPhaseController.merge( + sortedShardList, + pageContext.queryResults, + pageContext.fetchResults); + future.set(response); + } else { + logger.trace("{} fetch results left", counter.get()); + } + } + + @Override + public void onFailure(Throwable t) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to execute paged fetch phase", t); + } + pageContext.successfulFetchOps.decrementAndGet(); + if (counter.decrementAndGet() == 0) { + future.setException(t); + } + } + }); + } + } + + private void moveToSecondPhase(QueryThenFetchContext ctx, SettableFuture future) throws IOException { + ScoreDoc[] lastEmittedDocs = null; + if (ctx.pageInfo.isPresent()) { + PageInfo pageInfo = ctx.pageInfo.get(); + + int sortLimit = ctx.searchNode.offset() + pageInfo.size() + pageInfo.position(); + ctx.sortedShardList = crateResultSorter.sortDocs(ctx.queryResults, 0, sortLimit); + lastEmittedDocs = searchPhaseController.getLastEmittedDocPerShard( + ctx.sortedShardList, + ctx.numShards); + + int fillOffset = pageInfo.position() + ctx.searchNode.offset(); + + // create a fetchrequest for all documents even those hit by the offset + // to set the lastemitteddoc on the shard + crateResultSorter.fillDocIdsToLoad( + ctx.docIdsToLoad, + ctx.sortedShardList, + fillOffset + ); + } else { + ctx.sortedShardList = searchPhaseController.sortDocs(false, ctx.queryResults); + searchPhaseController.fillDocIdsToLoad( + ctx.docIdsToLoad, + ctx.sortedShardList + ); + } + + if (ctx.docIdsToLoad.asList().isEmpty()) { + future.set(ctx); + return; + } + + final AtomicInteger counter = new AtomicInteger(ctx.docIdsToLoad.asList().size()); + + for (final AtomicArray.Entry entry : ctx.docIdsToLoad.asList()) { + QuerySearchResult queryResult = ctx.queryResults.get(entry.index); + DiscoveryNode node = ctx.nodes.get(queryResult.shardTarget().nodeId()); + ShardFetchSearchRequest fetchRequest = createFetchRequest(queryResult, entry, lastEmittedDocs); + executeFetch(ctx, future, entry.index, queryResult.shardTarget(), counter, fetchRequest, node); + } + } + + protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResult, + AtomicArray.Entry entry, @Nullable ScoreDoc[] lastEmittedDocsPerShard) { + if (lastEmittedDocsPerShard != null) { + ScoreDoc lastEmittedDoc = lastEmittedDocsPerShard[entry.index]; + return new ShardFetchSearchRequest(EMPTY_SEARCH_REQUEST, queryResult.id(), entry.value, lastEmittedDoc); + } + return new ShardFetchSearchRequest(EMPTY_SEARCH_REQUEST, queryResult.id(), entry.value); + } + + private void executeFetch(final QueryThenFetchContext ctx, + final SettableFuture future, + final int shardIndex, + final SearchShardTarget shardTarget, + final AtomicInteger counter, + ShardFetchSearchRequest shardFetchSearchRequest, + DiscoveryNode node) { + + searchServiceTransportAction.sendExecuteFetch( + node, + shardFetchSearchRequest, + new SearchServiceListener() { + @Override + public void onResult(FetchSearchResult result) { + result.shardTarget(shardTarget); + ctx.fetchResults.set(shardIndex, result); + if (counter.decrementAndGet() == 0) { + future.set(ctx); + } + } + + @Override + public void onFailure(Throwable t) { + ctx.docIdsToLoad.set(shardIndex, null); + ctx.addShardFailure(shardIndex, shardTarget, t); + if (counter.decrementAndGet() == 0) { + future.set(ctx); + } + } + } + ); + } + + private void raiseEarlyFailure(QueryThenFetchContext ctx, SettableFuture future, Throwable t) { + ctx.releaseAllContexts(); + t = Exceptions.unwrap(t); + if (t instanceof QueryPhaseExecutionException) { + future.setException(t.getCause()); + return; + } + future.setException(t); + } + + public class QueryThenFetchPageContext { + private final QueryThenFetchContext queryThenFetchContext; + private final AtomicArray queryResults; + private final AtomicArray fetchResults; + private final AtomicInteger numOps; + private final AtomicInteger successfulFetchOps; + + public QueryThenFetchPageContext(QueryThenFetchContext queryThenFetchContext) { + this.queryThenFetchContext = queryThenFetchContext; + this.queryResults = new AtomicArray<>(queryThenFetchContext.numShards()); + this.fetchResults = new AtomicArray<>(queryThenFetchContext.numShards()); + this.numOps = new AtomicInteger(queryThenFetchContext.numShards()); + this.successfulFetchOps = new AtomicInteger(queryThenFetchContext.numShards()); + } + } + + public class QueryThenFetchContext implements Closeable { + + private final Optional pageInfo; + private final DiscoveryNodes nodes; + private final QueryThenFetchNode searchNode; + private final List outputs; + private final int numShards; + private final int numColumns; + private final BigArrays bigArrays; + + private final Map searchContextIds; + + private final AtomicArray docIdsToLoad; + private final AtomicArray queryResults; + private final AtomicArray fetchResults; + + private final Object shardFailuresMutex = new Object(); + + private volatile ScoreDoc[] sortedShardList; + private volatile AtomicArray shardFailures; + private List> requests; + + public QueryThenFetchContext(BigArrays bigArrays, + QueryThenFetchNode node, + List outputs, + Optional pageInfo) { + this.searchNode = node; + this.outputs = outputs; + this.pageInfo = pageInfo; + this.numShards = node.routing().numShards(); + this.nodes = clusterService.state().nodes(); + this.bigArrays = bigArrays; + + searchContextIds = new ConcurrentHashMap<>(); + docIdsToLoad = new AtomicArray<>(numShards); + queryResults = new AtomicArray<>(numShards); + fetchResults = new AtomicArray<>(numShards); + numColumns = node.outputs().size(); + } + + public ListenableFuture createSearchResponse() { + final SettableFuture future = SettableFuture.create(); + try { + threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() { + @Override + public void run() { + try { + if(shardFailures != null && shardFailures.length() > 0){ + FailedShardsException ex = new FailedShardsException(shardFailures.toArray( + new ShardSearchFailure[shardFailures.length()])); + future.setException(ex); + return; + } + ScoreDoc[] appliedSortedShardList = sortedShardList; + if (pageInfo.isPresent()) { + int offset = pageInfo.get().position(); + appliedSortedShardList = Arrays.copyOfRange( + sortedShardList, + Math.min(offset, sortedShardList.length), + sortedShardList.length + ); + } + future.set(searchPhaseController.merge(appliedSortedShardList, queryResults, fetchResults)); + } catch (Throwable t) { + future.setException(t); + } finally { + releaseIrrelevantSearchContexts(queryResults, docIdsToLoad, pageInfo); + } + } + }); + } catch (EsRejectedExecutionException e) { + try { + releaseIrrelevantSearchContexts(queryResults, docIdsToLoad, pageInfo); + } finally { + future.setException(e); + } + } + return future; + } + + private void releaseIrrelevantSearchContexts(AtomicArray firstResults, + AtomicArray docIdsToLoad, + Optional pageInfo) { + // don't release searchcontexts yet, if we use scroll + if (docIdsToLoad == null || pageInfo.isPresent()) { + return; + } + + for (AtomicArray.Entry entry : firstResults.asList()) { + if (docIdsToLoad.get(entry.index) == null) { + DiscoveryNode node = nodes.get(entry.value.queryResult().shardTarget().nodeId()); + if (node != null) { + searchServiceTransportAction.sendFreeContext(node, entry.value.queryResult().id(), EMPTY_SEARCH_REQUEST); + } + } + } + } + + private void releaseAllContexts() { + for (Map.Entry entry : searchContextIds.entrySet()) { + DiscoveryNode node = nodes.get(entry.getKey().nodeId()); + if (node != null) { + searchServiceTransportAction.sendFreeContext(node, entry.getValue(), EMPTY_SEARCH_REQUEST); + } + } + } + + private void addShardFailure(int shardIndex, SearchShardTarget shardTarget, Throwable t) { + if (TransportActions.isShardNotAvailableException(t)) { + return; + } + if (shardFailures == null) { + synchronized (shardFailuresMutex) { + if (shardFailures == null) { + shardFailures = new AtomicArray<>(requests.size()); + } + } + } + ShardSearchFailure failure = shardFailures.get(shardIndex); + if (failure == null) { + shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget)); + } else { + // the failure is already present, try and not override it with an exception that is less meaningless + // for example, getting illegal shard state + if (TransportActions.isReadOverrideException(t)) { + shardFailures.set(shardIndex, new ShardSearchFailure(t, shardTarget)); + } + } + } + + public int numShards() { + return numShards; + } + + @Override + public void close() throws IOException { + releaseAllContexts(); + } + + public List outputs() { + return outputs; + } + + public QueryThenFetchNode searchNode() { + return searchNode; + } + + public ObjectArray toPage(SearchHit[] hits, List> extractors) { + ObjectArray rows = bigArrays.newObjectArray(hits.length); + for (int r = 0; r < hits.length; r++) { + Object[] row = new Object[numColumns]; + for (int c = 0; c < numColumns; c++) { + row[c] = extractors.get(c).extract(hits[r]); + } + rows.set(r, row); + } + return rows; + } + + public Object[][] toRows(SearchHit[] hits, List> extractors) { + Object[][] rows = new Object[hits.length][numColumns]; + + for (int r = 0; r < hits.length; r++) { + rows[r] = new Object[numColumns]; + for (int c = 0; c < numColumns; c++) { + rows[r][c] = extractors.get(c).extract(hits[r]); + } + } + return rows; + } + + public void cleanAfterFirstPage() { + for (int i = 0; i < docIdsToLoad.length(); i++) { + docIdsToLoad.set(i, null); + } + // we still need the queryresults and searchContextIds + for (int i = 0; i < fetchResults.length(); i++) { + fetchResults.set(i, null); + } + if (shardFailures != null) { + for (int i = 0; i < shardFailures.length(); i++) { + shardFailures.set(i, null); + } + } + if (sortedShardList != null) { + Arrays.fill(sortedShardList, null); + } + if (requests != null) { + requests.clear(); + } + } + } + + public class QueryShardResponseListener implements ActionListener { + + private final int requestIdx; + private final AtomicInteger totalOps; + private final int expectedOps; + private final QueryThenFetchContext ctx; + private final SettableFuture future; + + public QueryShardResponseListener(int requestIdx, + AtomicInteger totalOps, + QueryThenFetchContext ctx, + SettableFuture future) { + + this.requestIdx = requestIdx; + this.ctx = ctx; + this.totalOps = totalOps; + this.future = future; + this.expectedOps = ctx.queryResults.length(); + } + + @Override + public void onResponse(QuerySearchResult querySearchResult) { + Tuple requestTuple = ctx.requests.get(requestIdx); + QueryShardRequest request = requestTuple.v2(); + + querySearchResult.shardTarget( + new SearchShardTarget(requestTuple.v1(), request.index(), request.shardId())); + ctx.searchContextIds.put(querySearchResult.shardTarget(), querySearchResult.id()); + ctx.queryResults.set(requestIdx, querySearchResult); + if (totalOps.incrementAndGet() == expectedOps) { + try { + moveToSecondPhase(ctx, future); + } catch (IOException e) { + raiseEarlyFailure(ctx, future, e); + } + } + } + + @Override + public void onFailure(Throwable e) { + raiseEarlyFailure(ctx, future, e); + } + } +} diff --git a/sql/src/main/java/io/crate/planner/node/dql/QueryThenFetchNode.java b/sql/src/main/java/io/crate/planner/node/dql/QueryThenFetchNode.java index bb00a9c105b3..bd8f87cf8efd 100644 --- a/sql/src/main/java/io/crate/planner/node/dql/QueryThenFetchNode.java +++ b/sql/src/main/java/io/crate/planner/node/dql/QueryThenFetchNode.java @@ -135,4 +135,20 @@ public String toString() { .add("partitionBy", partitionBy) .toString(); } + + public static QueryThenFetchNode withLimitAndOffset(QueryThenFetchNode oldNode, + int newOffset, + int newLimit) { + return new QueryThenFetchNode( + oldNode.routing(), + oldNode.outputs(), + oldNode.orderBy(), + oldNode.reverseFlags(), + oldNode.nullsFirst(), + newLimit, + newOffset, + oldNode.whereClause(), + oldNode.partitionBy() + ); + } } diff --git a/sql/src/test/java/io/crate/analyze/where/WhereClauseAnalyzerTest.java b/sql/src/test/java/io/crate/analyze/where/WhereClauseAnalyzerTest.java index 4d3e35b728af..46090be73506 100644 --- a/sql/src/test/java/io/crate/analyze/where/WhereClauseAnalyzerTest.java +++ b/sql/src/test/java/io/crate/analyze/where/WhereClauseAnalyzerTest.java @@ -21,11 +21,10 @@ package io.crate.analyze.where; -import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.crate.analyze.*; -import io.crate.analyze.relations.QueriedRelation; import io.crate.analyze.relations.TableRelation; import io.crate.metadata.*; import io.crate.metadata.sys.MetaDataSysModule; diff --git a/sql/src/test/java/io/crate/executor/transport/TransportExecutorPagingTest.java b/sql/src/test/java/io/crate/executor/transport/PagingTasksTest.java similarity index 66% rename from sql/src/test/java/io/crate/executor/transport/TransportExecutorPagingTest.java rename to sql/src/test/java/io/crate/executor/transport/PagingTasksTest.java index ec959abd5159..5237a5126321 100644 --- a/sql/src/test/java/io/crate/executor/transport/TransportExecutorPagingTest.java +++ b/sql/src/test/java/io/crate/executor/transport/PagingTasksTest.java @@ -23,8 +23,10 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListenableFuture; import io.crate.Constants; +import io.crate.action.sql.SQLResponse; import io.crate.analyze.WhereClause; import io.crate.executor.*; import io.crate.executor.task.join.NestedLoopTask; @@ -42,7 +44,7 @@ import io.crate.testing.TestingHelpers; import io.crate.types.DataType; import io.crate.types.DataTypes; -import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.After; import org.junit.Rule; import org.junit.Test; @@ -55,7 +57,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.core.Is.is; -public class TransportExecutorPagingTest extends BaseTransportExecutorTest { +public class PagingTasksTest extends BaseTransportExecutorTest { static { ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(true); @@ -94,7 +96,6 @@ public void testPagedQueryThenFetch() throws Exception { assertThat(tasks.size(), is(1)); QueryThenFetchTask qtfTask = (QueryThenFetchTask)tasks.get(0); PageInfo pageInfo = PageInfo.firstPage(2); - qtfTask.setKeepAlive(TimeValue.timeValueSeconds(10)); qtfTask.start(pageInfo); List> results = qtfTask.result(); @@ -144,7 +145,6 @@ public void testPagedQueryThenFetchWithOffset() throws Exception { assertThat(tasks.size(), is(1)); QueryThenFetchTask qtfTask = (QueryThenFetchTask)tasks.get(0); PageInfo pageInfo = new PageInfo(1, 2); - qtfTask.setKeepAlive(TimeValue.timeValueSeconds(10)); qtfTask.start(pageInfo); List> results = qtfTask.result(); assertThat(results.size(), is(1)); @@ -191,7 +191,6 @@ public void testPagedQueryThenFetchWithoutSorting() throws Exception { assertThat(tasks.size(), is(1)); QueryThenFetchTask qtfTask = (QueryThenFetchTask)tasks.get(0); PageInfo pageInfo = new PageInfo(1, 2); - qtfTask.setKeepAlive(TimeValue.timeValueSeconds(10)); qtfTask.start(pageInfo); List> results = qtfTask.result(); assertThat(results.size(), is(1)); @@ -236,7 +235,6 @@ public void testPagedQueryThenFetch1RowPages() throws Exception { assertThat(tasks.size(), is(1)); QueryThenFetchTask qtfTask = (QueryThenFetchTask)tasks.get(0); PageInfo pageInfo = new PageInfo(1, 1); - qtfTask.setKeepAlive(TimeValue.timeValueSeconds(10)); qtfTask.start(pageInfo); List> results = qtfTask.result(); assertThat(results.size(), is(1)); @@ -286,7 +284,6 @@ public void testPartitionedPagedQueryThenFetch1RowPages() throws Exception { assertThat(tasks.size(), is(1)); QueryThenFetchTask qtfTask = (QueryThenFetchTask)tasks.get(0); PageInfo pageInfo = new PageInfo(0, 1); - qtfTask.setKeepAlive(TimeValue.timeValueSeconds(10)); qtfTask.start(pageInfo); List> results = qtfTask.result(); assertThat(results.size(), is(1)); @@ -352,7 +349,6 @@ public void testPagedQueryThenFetchWithQueryOffset() throws Exception { assertThat(tasks.size(), is(1)); QueryThenFetchTask qtfTask = (QueryThenFetchTask)tasks.get(0); PageInfo pageInfo = new PageInfo(1, 1); - qtfTask.setKeepAlive(TimeValue.timeValueSeconds(10)); qtfTask.start(pageInfo); List> results = qtfTask.result(); assertThat(results.size(), is(1)); @@ -681,4 +677,347 @@ public void testPagedNestedLoopWithProjectionsBothSidesPageable() throws Excepti expectedException.expectCause(TestingHelpers.cause(NoSuchElementException.class, "backingArray exceeded")); pageableResult.fetch(pageInfo.nextPage()).get(); } + + @Test + public void testQueryThenFetchPageWithGap() throws Exception { + setup.setUpCharacters(); + sqlExecutor.exec("insert into characters (id, name, female) values (?, ?, ?)", new Object[][]{ + new Object[]{ + 5, "Matthias Wahl", false, + }, + new Object[]{ + 6, "Philipp Bogensberger", false, + + }, + new Object[]{ + 7, "Sebastian Utz", false + } + }); + sqlExecutor.refresh("characters"); + + DocTableInfo characters = docSchemaInfo.getTableInfo("characters"); + + QueryThenFetchNode qtfNode = new QueryThenFetchNode( + characters.getRouting(WhereClause.MATCH_ALL), + Arrays.asList(idRef, nameRef, femaleRef), + Arrays.asList(idRef), + new boolean[]{false}, + new Boolean[]{null}, + null, + null, + WhereClause.MATCH_ALL, + null + ); + + List tasks = executor.newTasks(qtfNode, UUID.randomUUID()); + assertThat(tasks.size(), is(1)); + QueryThenFetchTask qtfTask = (QueryThenFetchTask)tasks.get(0); + PageInfo pageInfo = PageInfo.firstPage(2); + + qtfTask.start(pageInfo); + List> results = qtfTask.result(); + assertThat(results.size(), is(1)); + ListenableFuture resultFuture = results.get(0); + + TaskResult taskResult = resultFuture.get(); + assertThat(taskResult, instanceOf(PageableTaskResult.class)); + PageableTaskResult pageableTaskResult = (PageableTaskResult)taskResult; + closeMeWhenDone = pageableTaskResult; + assertThat(TestingHelpers.printedPage(pageableTaskResult.page()), + is("1| Arthur| false\n" + + "2| Ford| false\n") + ); + + pageInfo = new PageInfo(4, 2); + PageableTaskResult gappedResult = pageableTaskResult.fetch(pageInfo).get(); + closeMeWhenDone = gappedResult; + assertThat(TestingHelpers.printedPage(gappedResult.page()), + is("5| Matthias Wahl| false\n" + + "6| Philipp Bogensberger| false\n") + ); + + } + + @Test + public void testQueryThenFetchPageWithBigGap() throws Exception { + sqlExecutor.exec("create table ids (id long primary key) with (number_of_replicas=0)"); + sqlExecutor.ensureGreen(); + Object[][] bulkArgs = new Object[2048][]; + for (int l = 0; l<2048; l++) { + bulkArgs[l] = new Object[] { l }; + } + sqlExecutor.exec("insert into ids values (?)", bulkArgs); + sqlExecutor.refresh("ids"); + assertThat( + (Long)sqlExecutor.exec("select count(*) from ids").rows()[0][0], + is(2048L) + ); + + DocTableInfo ids = docSchemaInfo.getTableInfo("ids"); + + QueryThenFetchNode qtfNode = new QueryThenFetchNode( + ids.getRouting(WhereClause.MATCH_ALL), + Arrays.asList(ref(ids, "id")), + Arrays.asList(ref(ids, "id")), + new boolean[]{false}, + new Boolean[]{null}, + null, + null, + WhereClause.MATCH_ALL, + null + ); + + List tasks = executor.newTasks(qtfNode, UUID.randomUUID()); + assertThat(tasks.size(), is(1)); + QueryThenFetchTask qtfTask = (QueryThenFetchTask)tasks.get(0); + PageInfo pageInfo = PageInfo.firstPage(2); + + qtfTask.start(pageInfo); + List> results = qtfTask.result(); + assertThat(results.size(), is(1)); + ListenableFuture resultFuture = results.get(0); + + TaskResult taskResult = resultFuture.get(); + assertThat(taskResult, instanceOf(PageableTaskResult.class)); + PageableTaskResult pageableTaskResult = (PageableTaskResult)taskResult; + closeMeWhenDone = pageableTaskResult; + assertThat(TestingHelpers.printedPage(pageableTaskResult.page()), + is("0\n" + + "1\n") + ); + + pageInfo = new PageInfo(1048, 4); + PageableTaskResult gappedResult = pageableTaskResult.fetch(pageInfo).get(); + closeMeWhenDone = gappedResult; + assertThat(TestingHelpers.printedPage(gappedResult.page()), + is("1048\n" + + "1049\n" + + "1050\n" + + "1051\n") + ); + + pageInfo = pageInfo.nextPage(10); + PageableTaskResult afterGappedResult = gappedResult.fetch(pageInfo).get(); + closeMeWhenDone = afterGappedResult; + assertThat(TestingHelpers.printedPage(afterGappedResult.page()), + is("1052\n" + + "1053\n" + + "1054\n" + + "1055\n" + + "1056\n" + + "1057\n" + + "1058\n" + + "1059\n" + + "1060\n" + + "1061\n") + ); + + } + + @TestLogging("io.crate.executor.transport.task.elasticsearch:TRACE") + @Test + public void testRandomQTFPaging() throws Exception { + SQLResponse response = sqlExecutor.exec("create table ids (id long primary key) with (number_of_replicas=0)"); + assertThat(response.rowCount(), is(1L)); + sqlExecutor.ensureGreen(); + + int numRows = randomIntBetween(1, 4096); + Object[][] bulkArgs = new Object[numRows][]; + for (int l = 0; l < numRows; l++) { + bulkArgs[l] = new Object[]{l}; + } + sqlExecutor.exec("insert into ids values (?)", bulkArgs); + sqlExecutor.refresh("ids"); + assertThat( + (Long) sqlExecutor.exec("select count(*) from ids").rows()[0][0], + is((long)numRows) + ); + + DocTableInfo ids = docSchemaInfo.getTableInfo("ids"); + + QueryThenFetchNode qtfNode = new QueryThenFetchNode( + ids.getRouting(WhereClause.MATCH_ALL), + Arrays.asList(ref(ids, "id")), + Arrays.asList(ref(ids, "id")), + new boolean[]{false}, + new Boolean[]{null}, + null, + null, + WhereClause.MATCH_ALL, + null + ); + + List tasks = executor.newTasks(qtfNode, UUID.randomUUID()); + assertThat(tasks.size(), is(1)); + QueryThenFetchTask qtfTask = (QueryThenFetchTask)tasks.get(0); + PageInfo pageInfo = new PageInfo( + randomIntBetween(0, 100), + randomIntBetween(1, 100)); + + qtfTask.start(pageInfo); + List> results = qtfTask.result(); + assertThat(results.size(), is(1)); + ListenableFuture resultFuture = results.get(0); + + TaskResult taskResult = resultFuture.get(); + assertThat(taskResult, instanceOf(PageableTaskResult.class)); + PageableTaskResult pageableTaskResult = (PageableTaskResult)taskResult; + closeMeWhenDone = pageableTaskResult; + + int run = 0; + while (run < 10) { + Page page = pageableTaskResult.page(); + Object[][] actual = Iterables.toArray(page, Object[].class); + Object[][] expected = Arrays.copyOfRange(bulkArgs, Math.min(numRows, pageInfo.position()), Math.min(numRows, pageInfo.position() + pageInfo.size())); + + for (int i = 0 ; iasList(idRef, nameRef, femaleRef), + Arrays.asList(idRef), + new boolean[]{false}, + new Boolean[]{null}, + null, + null, + WhereClause.MATCH_ALL, + null + ); + + List tasks = executor.newTasks(qtfNode, UUID.randomUUID()); + assertThat(tasks.size(), is(1)); + QueryThenFetchTask qtfTask = (QueryThenFetchTask)tasks.get(0); + PageInfo pageInfo = PageInfo.firstPage(2); + + qtfTask.start(pageInfo); + List> results = qtfTask.result(); + assertThat(results.size(), is(1)); + ListenableFuture resultFuture = results.get(0); + + TaskResult taskResult = resultFuture.get(); + assertThat(taskResult, instanceOf(PageableTaskResult.class)); + PageableTaskResult pageableTaskResult = (PageableTaskResult)taskResult; + closeMeWhenDone = pageableTaskResult; + assertThat(TestingHelpers.printedPage(pageableTaskResult.page()), + is("1| Arthur| false" + System.lineSeparator() + + "2| Ford| false" + System.lineSeparator()) + ); + + PageableTaskResult samePageResult = pageableTaskResult.fetch(pageInfo).get(); + closeMeWhenDone = samePageResult; + + assertThat(TestingHelpers.printedPage(samePageResult.page()), + is("1| Arthur| false" + System.lineSeparator() + + "2| Ford| false" + System.lineSeparator()) + ); + + pageInfo = new PageInfo(pageInfo.position(), pageInfo.size()+2); + PageableTaskResult biggerPageResult = pageableTaskResult.fetch(pageInfo).get(); + assertThat(TestingHelpers.printedPage(biggerPageResult.page()), + is("1| Arthur| false" + System.lineSeparator() + + "2| Ford| false" + System.lineSeparator() + + "3| Trillian| true" + System.lineSeparator() + + "4| Arthur| true" + System.lineSeparator()) + ); + + } + + @Test + public void testQueryThenFetchPageBackwards() throws Exception { + setup.setUpCharacters(); + sqlExecutor.exec("insert into characters (id, name, female) values (?, ?, ?)", new Object[][]{ + new Object[]{ + 5, "Matthias Wahl", false, + }, + new Object[]{ + 6, "Philipp Bogensberger", false, + + }, + new Object[]{ + 7, "Sebastian Utz", false + } + }); + sqlExecutor.refresh("characters"); + + DocTableInfo characters = docSchemaInfo.getTableInfo("characters"); + + QueryThenFetchNode qtfNode = new QueryThenFetchNode( + characters.getRouting(WhereClause.MATCH_ALL), + Arrays.asList(idRef, nameRef, femaleRef), + Arrays.asList(idRef), + new boolean[]{false}, + new Boolean[]{null}, + null, + null, + WhereClause.MATCH_ALL, + null + ); + + List tasks = executor.newTasks(qtfNode, UUID.randomUUID()); + assertThat(tasks.size(), is(1)); + QueryThenFetchTask qtfTask = (QueryThenFetchTask)tasks.get(0); + PageInfo pageInfo = PageInfo.firstPage(2); + + qtfTask.start(pageInfo); + List> results = qtfTask.result(); + assertThat(results.size(), is(1)); + ListenableFuture resultFuture = results.get(0); + + TaskResult taskResult = resultFuture.get(); + assertThat(taskResult, instanceOf(PageableTaskResult.class)); + PageableTaskResult pageableTaskResult = (PageableTaskResult)taskResult; + closeMeWhenDone = pageableTaskResult; + assertThat(TestingHelpers.printedPage(pageableTaskResult.page()), + is("1| Arthur| false" + System.lineSeparator() + + "2| Ford| false" + System.lineSeparator()) + ); + + pageInfo = pageInfo.nextPage(4); + PageableTaskResult nextResult = pageableTaskResult.fetch(pageInfo).get(); + closeMeWhenDone = nextResult; + assertThat(TestingHelpers.printedPage(nextResult.page()), is( + "3| Trillian| true" + System.lineSeparator() + + "4| Arthur| true" + System.lineSeparator() + + "5| Matthias Wahl| false" + System.lineSeparator() + + "6| Philipp Bogensberger| false" + System.lineSeparator())); + + + pageInfo = PageInfo.firstPage(1); + PageableTaskResult backwardResult = nextResult.fetch(pageInfo).get(); + closeMeWhenDone = backwardResult; + assertThat(TestingHelpers.printedPage(backwardResult.page()), is("1| Arthur| false" + System.lineSeparator())); + } } diff --git a/sql/src/test/java/io/crate/executor/transport/TransportExecutorTest.java b/sql/src/test/java/io/crate/executor/transport/TransportExecutorTest.java index 6edf7850ee98..e11216cb9a93 100644 --- a/sql/src/test/java/io/crate/executor/transport/TransportExecutorTest.java +++ b/sql/src/test/java/io/crate/executor/transport/TransportExecutorTest.java @@ -70,6 +70,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -81,6 +82,7 @@ import static org.hamcrest.core.Is.is; import static org.hamcrest.number.OrderingComparison.greaterThan; +@TestLogging("io.crate.executor.transport.task.elasticsearch:TRACE,io.crate.operation.qtf:TRACE") public class TransportExecutorTest extends BaseTransportExecutorTest { static { diff --git a/sql/src/test/java/io/crate/executor/transport/task/elasticsearch/QueryThenFetchTaskTest.java b/sql/src/test/java/io/crate/executor/transport/task/elasticsearch/QueryThenFetchTaskTest.java index 322c6bcc2b1e..ecc991dd3da6 100644 --- a/sql/src/test/java/io/crate/executor/transport/task/elasticsearch/QueryThenFetchTaskTest.java +++ b/sql/src/test/java/io/crate/executor/transport/task/elasticsearch/QueryThenFetchTaskTest.java @@ -33,6 +33,7 @@ import io.crate.metadata.Functions; import io.crate.metadata.Routing; import io.crate.operation.aggregation.impl.CountAggregation; +import io.crate.operation.qtf.QueryThenFetchOperation; import io.crate.planner.node.dql.QueryThenFetchNode; import io.crate.planner.symbol.Aggregation; import io.crate.planner.symbol.Symbol; @@ -77,7 +78,7 @@ public class QueryThenFetchTaskTest { private CrateResultSorter crateResultSorter; private SearchPhaseController searchPhaseController = mock(SearchPhaseController.class); private DiscoveryNodes nodes = mock(DiscoveryNodes.class); - + private QueryThenFetchOperation queryThenFetchOperation; @Rule public ExpectedException expectedException = ExpectedException.none(); @@ -104,17 +105,24 @@ public void prepare() { transportQueryShardAction = mock(TransportQueryShardAction.class); searchServiceTransportAction = mock(SearchServiceTransportAction.class); crateResultSorter = mock(CrateResultSorter.class); - queryThenFetchTask = new QueryThenFetchTask( - UUID.randomUUID(), - mock(Functions.class), - searchNode, + + ThreadPool testPool = new ThreadPool(getClass().getName()); + BigArrays mockedBigarrays = mock(BigArrays.class); + queryThenFetchOperation = new QueryThenFetchOperation( clusterService, transportQueryShardAction, searchServiceTransportAction, searchPhaseController, - new ThreadPool("testpool"), - mock(BigArrays.class), - crateResultSorter); + testPool, + mockedBigarrays, + crateResultSorter + ); + queryThenFetchTask = new QueryThenFetchTask( + UUID.randomUUID(), + queryThenFetchOperation, + mock(Functions.class), + searchNode + ); } @Test @@ -123,6 +131,7 @@ public void testAggregationInOutputs() throws Exception { expectedException.expectMessage("Operation not supported with symbol count()"); new QueryThenFetchTask( UUID.randomUUID(), + queryThenFetchOperation, mock(Functions.class), new QueryThenFetchNode( new Routing(), @@ -135,19 +144,14 @@ public void testAggregationInOutputs() throws Exception { null, WhereClause.MATCH_ALL, null - ), - clusterService, - mock(TransportQueryShardAction.class), - mock(SearchServiceTransportAction.class), - mock(SearchPhaseController.class), - mock(ThreadPool.class), - mock(BigArrays.class), - crateResultSorter); + ) + ); } @Test public void testFinishWithErrors() throws Throwable{ - ArgumentCaptor responseListener = ArgumentCaptor.forClass(QueryThenFetchTask.QueryShardResponseListener.class); + ArgumentCaptor responseListener = ArgumentCaptor.forClass( + QueryThenFetchOperation.QueryShardResponseListener.class); doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable { @@ -192,7 +196,8 @@ public void onFailure(Throwable t) { @Test public void testErrorInQueryPhase() throws Throwable { - ArgumentCaptor responseListener = ArgumentCaptor.forClass(QueryThenFetchTask.QueryShardResponseListener.class); + ArgumentCaptor responseListener = ArgumentCaptor.forClass( + QueryThenFetchOperation.QueryShardResponseListener.class); doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Throwable {