Skip to content

Commit

Permalink
remove RowDownstream from Projector and merge Projector/RowPipe
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Sep 9, 2015
1 parent 0aefc37 commit ac12925
Show file tree
Hide file tree
Showing 96 changed files with 1,270 additions and 1,767 deletions.
29 changes: 16 additions & 13 deletions sql/src/main/java/io/crate/action/job/ContextPreparer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Optional;
import io.crate.breaker.CrateCircuitBreakerService;
import io.crate.breaker.RamAccountingContext;
import io.crate.executor.RowCountResult;
import io.crate.jobs.CountContext;
import io.crate.jobs.ExecutionSubContext;
import io.crate.jobs.JobExecutionContext;
Expand All @@ -36,6 +37,7 @@
import io.crate.operation.count.CountOperation;
import io.crate.operation.projectors.FlatProjectorChain;
import io.crate.operation.projectors.RowDownstreamFactory;
import io.crate.operation.projectors.RowReceiver;
import io.crate.planner.distribution.UpstreamPhase;
import io.crate.planner.node.ExecutionPhase;
import io.crate.planner.node.ExecutionPhaseVisitor;
Expand Down Expand Up @@ -89,35 +91,36 @@ public ContextPreparer(MapSideDataCollectOperation collectOperation,
public void prepare(UUID jobId,
NodeOperation nodeOperation,
JobExecutionContext.Builder contextBuilder,
@Nullable RowDownstream rowDownstream) {
PreparerContext preparerContext = new PreparerContext(jobId, nodeOperation, rowDownstream);
@Nullable RowReceiver rowReceiver) {
PreparerContext preparerContext = new PreparerContext(jobId, nodeOperation, rowReceiver);
ExecutionSubContext subContext = innerPreparer.process(nodeOperation.executionPhase(), preparerContext);
contextBuilder.addSubContext(nodeOperation.executionPhase().executionPhaseId(), subContext);
}

@SuppressWarnings("unchecked")
public <T extends ExecutionSubContext> T prepare(UUID jobId, ExecutionPhase executionPhase, RowDownstream rowDownstream) {
PreparerContext preparerContext = new PreparerContext(jobId, null, rowDownstream);
public <T extends ExecutionSubContext> T prepare(UUID jobId, ExecutionPhase executionPhase, RowReceiver rowReceiver) {
PreparerContext preparerContext = new PreparerContext(jobId, null, rowReceiver);
return (T) innerPreparer.process(executionPhase, preparerContext);
}

private static class PreparerContext {

private final UUID jobId;
@Nullable
private final RowReceiver rowReceiver;
private final NodeOperation nodeOperation;
private final RowDownstream rowDownstream;

private PreparerContext(UUID jobId, NodeOperation nodeOperation, @Nullable RowDownstream rowDownstream) {
private PreparerContext(UUID jobId, NodeOperation nodeOperation, @Nullable RowReceiver rowReceiver) {
this.nodeOperation = nodeOperation;
this.jobId = jobId;
this.rowDownstream = rowDownstream;
this.rowReceiver = rowReceiver;
}
}

private class InnerPreparer extends ExecutionPhaseVisitor<PreparerContext, ExecutionSubContext> {

RowDownstream getDownstream(PreparerContext context, UpstreamPhase upstreamPhase, int pageSize) {
if (context.rowDownstream == null) {
RowReceiver getDownstream(PreparerContext context, UpstreamPhase upstreamPhase, int pageSize) {
if (context.rowReceiver == null) {
assert context.nodeOperation != null : "nodeOperation shouldn't be null if context.rowDownstream hasn't been set";
return rowDownstreamFactory.createDownstream(
context.nodeOperation,
Expand All @@ -126,7 +129,7 @@ RowDownstream getDownstream(PreparerContext context, UpstreamPhase upstreamPhase
pageSize);
}

return context.rowDownstream;
return context.rowReceiver;
}

@Override
Expand All @@ -143,7 +146,7 @@ public ExecutionSubContext visitCountPhase(CountPhase phase, PreparerContext con

return new CountContext(
countOperation,
context.rowDownstream,
context.rowReceiver,
indexShardMap,
phase.whereClause()
);
Expand All @@ -153,7 +156,7 @@ public ExecutionSubContext visitCountPhase(CountPhase phase, PreparerContext con
public ExecutionSubContext visitMergePhase(final MergePhase phase, final PreparerContext context) {
RamAccountingContext ramAccountingContext = RamAccountingContext.forExecutionPhase(circuitBreaker, phase);

RowDownstream downstream = getDownstream(context, phase,
RowReceiver downstream = getDownstream(context, phase,
Paging.getWeightedPageSize(Paging.PAGE_SIZE, 1.0d / phase.executionNodes().size()));
Tuple<PageDownstream, FlatProjectorChain> pageDownstreamProjectorChain =
pageDownstreamFactory.createMergeNodePageDownstream(
Expand Down Expand Up @@ -187,7 +190,7 @@ public ExecutionSubContext visitCollectPhase(final CollectPhase phase, final Pre
LOGGER.trace("{} setting node page size to: {}, numShards in total: {} shards on node: {}",
localNodeId, pageSize, numTotalShards, numShardsOnNode);

RowDownstream downstream = getDownstream(context, phase, pageSize);
RowReceiver downstream = getDownstream(context, phase, pageSize);
return new JobCollectContext(
context.jobId,
phase,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.crate.action.job.ContextPreparer;
Expand All @@ -35,6 +36,7 @@
import io.crate.jobs.*;
import io.crate.metadata.table.TableInfo;
import io.crate.operation.*;
import io.crate.operation.projectors.RowReceiver;
import io.crate.planner.node.ExecutionPhases;
import io.crate.planner.node.NodeOperationGrouper;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -97,38 +99,44 @@ public Iterable<? extends NodeOperation> apply(NodeOperationTree input) {
Map<String, Collection<NodeOperation>> operationByServer = NodeOperationGrouper.groupByServer(
clusterService.state().nodes().localNodeId(), nodeOperations);

RowDownstream rowDownstream;

List<PageDownstreamContext> pageDownstreamContexts = new ArrayList<>(nodeOperationTrees.size());

if (nodeOperationTrees.size() > 1) {
// bulk Operation with rowCountResult
rowDownstream = new RowCountResultRowDownstream(results);
for (int i = 0; i < nodeOperationTrees.size(); i++) {
RowCountResultRowDownstream rowDownstream = new RowCountResultRowDownstream(results.get(i));
setupContext(operationByServer, pageDownstreamContexts, rowDownstream, nodeOperationTrees.get(i));
}
} else {
rowDownstream = new QueryResultRowDownstream(results);
QueryResultRowDownstream downstream = new QueryResultRowDownstream(Iterables.getOnlyElement(results));
setupContext(operationByServer, pageDownstreamContexts, downstream, Iterables.getOnlyElement(nodeOperationTrees));
}

List<PageDownstreamContext> pageDownstreamContexts = new ArrayList<>(nodeOperationTrees.size());
for (NodeOperationTree nodeOperationTree : nodeOperationTrees) {
DownstreamExecutionSubContext executionSubContext =
contextPreparer.prepare(jobId(), nodeOperationTree.leaf(), rowDownstream);
if (operationByServer.isEmpty()) {
executionSubContext.close();
continue;
}
if (hasDirectResponse) {
executionSubContext.start();
pageDownstreamContexts.add(executionSubContext.pageDownstreamContext((byte) 0));
} else {
createLocalContextAndStartOperation(
executionSubContext,
operationByServer,
nodeOperationTree.leaf().executionPhaseId());
}
}
if (operationByServer.isEmpty()) {
return;
}
sendJobRequests(pageDownstreamContexts, operationByServer);
}

private void setupContext(Map<String, Collection<NodeOperation>> operationByServer, List<PageDownstreamContext> pageDownstreamContexts, RowReceiver rowDownstream, NodeOperationTree nodeOperationTree) {
DownstreamExecutionSubContext executionSubContext =
contextPreparer.prepare(jobId(), nodeOperationTree.leaf(), rowDownstream);
if (operationByServer.isEmpty()) {
executionSubContext.close();
return;
}
if (hasDirectResponse) {
executionSubContext.start();
pageDownstreamContexts.add(executionSubContext.pageDownstreamContext((byte) 0));
} else {
createLocalContextAndStartOperation(
executionSubContext,
operationByServer,
nodeOperationTree.leaf().executionPhaseId());
}
}

private void sendJobRequests(List<PageDownstreamContext> pageDownstreamContexts,
Map<String, Collection<NodeOperation>> operationsByServer) {
int idx = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,20 @@
import io.crate.Streamer;
import io.crate.core.collections.Bucket;
import io.crate.core.collections.Row;
import io.crate.operation.RowDownstream;
import io.crate.operation.RowDownstreamHandle;
import io.crate.jobs.ExecutionState;
import io.crate.operation.RowUpstream;
import io.crate.operation.projectors.RowReceiver;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class DistributingDownstream implements RowDownstream, RowDownstreamHandle {
public class DistributingDownstream implements RowReceiver {

private final static ESLogger LOGGER = Loggers.getLogger(DistributingDownstream.class);

Expand All @@ -65,8 +63,7 @@ public void onFailure(Throwable e) {
private final TransportDistributedResultAction transportDistributedResultAction;
private final Streamer<?>[] streamers;
private final int pageSize;
private final List<RowUpstream> upstreams = new ArrayList<>(1);
private final AtomicInteger numActiveUpstreams = new AtomicInteger();
private RowUpstream upstream;
private final AtomicReference<Throwable> failure = new AtomicReference<>();
private final AtomicInteger requestsPending = new AtomicInteger(0);
private final Downstream[] downstreams;
Expand All @@ -75,6 +72,7 @@ public void onFailure(Throwable e) {
private final Bucket[] buckets;

private volatile boolean gatherMoreRows = true;
private boolean hasUpstreamFinished = false;

public DistributingDownstream(UUID jobId,
MultiBucketBuilder multiBucketBuilder,
Expand Down Expand Up @@ -129,15 +127,11 @@ private void sendRequests(boolean isLast) {
}

private void pause() {
for (RowUpstream upstream : upstreams) {
upstream.pause();
}
upstream.pause();
}

private void resume() {
for (RowUpstream upstream : upstreams) {
upstream.resume(true);
}
upstream.resume(true);
}

@Override
Expand All @@ -156,36 +150,35 @@ public void fail(Throwable throwable) {
upstreamFinished();
}

@Override
public RowDownstreamHandle registerUpstream(RowUpstream upstream) {
upstreams.add(upstream);
numActiveUpstreams.incrementAndGet();
return this;
}

private void upstreamFinished() {
final int numUpstreams = numActiveUpstreams.decrementAndGet();
if (numUpstreams == 0) {
final Throwable throwable = failure.get();
if (throwable == null) {
if (requestsPending.get() == 0) {
LOGGER.trace("all upstreams finished. Sending last requests");
multiBucketBuilder.build(buckets);
sendRequests(true);
} else if (LOGGER.isTraceEnabled()) {
LOGGER.trace("all upstreams finished. Doing nothing since there are pending requests");
}
} else if (!(throwable instanceof CancellationException)) { // no need to forward kill - downstream will receive it too
LOGGER.trace("all upstreams finished; forwarding failure");
for (Downstream downstream : downstreams) {
downstream.forwardFailure(throwable);
}
hasUpstreamFinished = true;
final Throwable throwable = failure.get();
if (throwable == null) {
if (requestsPending.get() == 0) {
LOGGER.trace("all upstreams finished. Sending last requests");
multiBucketBuilder.build(buckets);
sendRequests(true);
} else if (LOGGER.isTraceEnabled()) {
LOGGER.trace("all upstreams finished. Doing nothing since there are pending requests");
}
} else if (!(throwable instanceof CancellationException)) { // no need to forward kill - downstream will receive it too
LOGGER.trace("all upstreams finished; forwarding failure");
for (Downstream downstream : downstreams) {
downstream.forwardFailure(throwable);
}
} else {
LOGGER.trace("Upstream finished, {} remaining", numUpstreams);
}
}

@Override
public void prepare(ExecutionState executionState) {

}

@Override
public void setUpstream(RowUpstream rowUpstream) {
upstream = rowUpstream;
}

private class Downstream implements ActionListener<DistributedResultResponse> {

private final String node;
Expand Down Expand Up @@ -230,14 +223,13 @@ public void onFailure(Throwable e) {
private void onResponse(boolean needMore) {
finished = !needMore;
final int numPending = requestsPending.decrementAndGet();
final int activeUpstreams = numActiveUpstreams.get();

LOGGER.trace("Received response from downstream: {}; requires more: {}, other pending requests: {}, activeUpstreams: {}",
node, needMore, numPending, activeUpstreams);
LOGGER.trace("Received response from downstream: {}; requires more: {}, other pending requests: {}, finished: {}",
node, needMore, numPending, hasUpstreamFinished);
if (numPending > 0) {
return;
}
if (activeUpstreams == 0) {
if (hasUpstreamFinished) {
// upstreams (e.g. collector(s)) finished (after the requests have been sent)
// send request with isLast=true with remaining buckets to downstream nodes
multiBucketBuilder.build(buckets);
Expand Down

This file was deleted.

Loading

0 comments on commit ac12925

Please sign in to comment.