Skip to content

Commit

Permalink
fixup! use ExecutionNodesTask for QAF execution
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Apr 21, 2015
1 parent 21c7175 commit a0b33f2
Showing 1 changed file with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@
import io.crate.executor.JobTask;
import io.crate.executor.TaskResult;
import io.crate.executor.callbacks.OperationFinishedStatsTablesCallback;
import io.crate.executor.transport.distributed.SingleBucketBuilder;
import io.crate.jobs.JobContextService;
import io.crate.jobs.PageDownstreamContext;
import io.crate.metadata.table.TableInfo;
import io.crate.operation.*;
import io.crate.operation.collect.HandlerSideDataCollectOperation;
import io.crate.operation.collect.StatsTables;
import io.crate.operation.projectors.CollectingProjector;
import io.crate.operation.projectors.FlatProjectorChain;
import io.crate.operation.projectors.ProjectionToProjectorVisitor;
import io.crate.planner.node.ExecutionNode;
Expand Down Expand Up @@ -152,7 +152,7 @@ public void start() {
Collection<ExecutionNode> executionNodes = entry.getValue();

if (serverNodeId.equals(TableInfo.NULL_NODE_ID)) {
handlerSideCollect(executionNodes, pageDownstreamContext, streamers);
handlerSideCollect(executionNodes, pageDownstreamContext);
} else {
JobRequest request = new JobRequest(jobId(), executionNodes);
if (hasDirectResponse) {
Expand All @@ -166,17 +166,15 @@ public void start() {
}

private void handlerSideCollect(Collection<ExecutionNode> executionNodes,
final PageDownstreamContext pageDownstreamContext,
Streamer<?>[] streamers) {
final PageDownstreamContext pageDownstreamContext) {
assert executionNodes.size() == 1 : "handlerSideCollect is only possible with 1 collectNode";
ExecutionNode onlyElement = Iterables.getOnlyElement(executionNodes);
assert onlyElement instanceof CollectNode : "handlerSideCollect is only possible with 1 collectNode";

CollectNode collectNode = ((CollectNode) onlyElement);

RamAccountingContext ramAccountingContext = trackOperation(collectNode, "handlerSide collect");
SingleBucketBuilder singleBucketBuilder = new SingleBucketBuilder(streamers);
Futures.addCallback(singleBucketBuilder.result(), new FutureCallback<Bucket>() {
CollectingProjector collectingProjector = new CollectingProjector();
Futures.addCallback(collectingProjector.result(), new FutureCallback<Bucket>() {
@Override
public void onSuccess(Bucket result) {
// TODO: change bucketIdx once the logic for bucketIdxs has been changed
Expand All @@ -199,13 +197,13 @@ public void onFailure(@Nonnull Throwable t) {
}
});

RowDownstream rowDownstream = singleBucketBuilder;
RowDownstream rowDownstream = collectingProjector;
if (!collectNode.projections().isEmpty()) {
FlatProjectorChain flatProjectorChain = FlatProjectorChain.withAttachedDownstream(
projectionToProjectorVisitor,
ramAccountingContext,
collectNode.projections(),
singleBucketBuilder);
collectingProjector);
flatProjectorChain.startProjections();
rowDownstream = flatProjectorChain.firstProjector();
}
Expand Down

0 comments on commit a0b33f2

Please sign in to comment.