Skip to content

Commit

Permalink
change FetchProjector to implement RowPipe
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Sep 8, 2015
1 parent 55e2f3d commit 0aefc37
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import io.crate.core.collections.Bucket;
import io.crate.core.collections.Row;
import io.crate.operation.RowDownstream;
import io.crate.operation.RowDownstreamHandle;
import io.crate.operation.RowUpstream;

Expand All @@ -38,7 +37,7 @@
*/
public class PositionalBucketMerger implements RowUpstream {

private RowDownstreamHandle downstream;
private final RowDownstreamHandle downstream;
private final AtomicInteger upstreamsRemaining = new AtomicInteger(0);
private final int orderingColumnIndex;
private final UpstreamBucket[] remainingBuckets;
Expand All @@ -47,10 +46,10 @@ public class PositionalBucketMerger implements RowUpstream {
private volatile int leastBucketId = -1;
private final AtomicBoolean consumeBuckets = new AtomicBoolean(true);

public PositionalBucketMerger(RowDownstream downstream,
public PositionalBucketMerger(RowDownstreamHandle rowDownstreamHandle,
int numUpstreams,
int orderingColumnIndex) {
downstream(downstream);
this.downstream = rowDownstreamHandle;
this.orderingColumnIndex = orderingColumnIndex;
remainingBuckets = new UpstreamBucket[numUpstreams];
}
Expand Down Expand Up @@ -147,10 +146,6 @@ private boolean emitRow(Row row) {
}


public void downstream(RowDownstream downstream) {
this.downstream = downstream.registerUpstream(this);
}

public PositionalBucketMerger registerUpstream(RowUpstream upstream) {
upstreamsRemaining.incrementAndGet();
return this;
Expand Down
64 changes: 23 additions & 41 deletions sql/src/main/java/io/crate/operation/projectors/FetchProjector.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class FetchProjector extends RowDownstreamAndHandle implements Projector {

public static final int NO_BULK_REQUESTS = -1;

public class FetchProjector extends AbstractRowPipe {

private PositionalBucketMerger downstream;
private final TransportFetchNodeAction transportFetchNodeAction;
Expand All @@ -75,7 +72,6 @@ public class FetchProjector extends RowDownstreamAndHandle implements Projector
private final Object rowDelegateLock = new Object();
private final Row outputRow;
private final Map<String, NodeBucket> nodeBuckets = new HashMap<>();
private final AtomicInteger remainingUpstreams = new AtomicInteger(0);
private final AtomicBoolean consumingRows = new AtomicBoolean(true);
private final List<String> executionNodes;
private final int numNodes;
Expand Down Expand Up @@ -150,18 +146,14 @@ public FetchProjector(TransportFetchNodeAction transportFetchNodeAction,
}

@Override
public void startProjection(ExecutionState executionState) {
public void prepare(ExecutionState executionState) {
this.executionState = executionState;
if (remainingUpstreams.get() <= 0) {
finish();
} else {
// register once to increment downstream upstreams counter
downstream.registerUpstream(this);
}
// register once to increment downstream upstreams counter
downstream.registerUpstream(this);
}

@Override
public synchronized boolean setNextRow(Row row) {
public boolean setNextRow(Row row) {
if (!consumingRows.get()) {
return false;
}
Expand All @@ -186,36 +178,28 @@ public synchronized boolean setNextRow(Row row) {
}

@Override
public void downstream(RowDownstream downstream) {
public void downstream(RowDownstreamHandle downstream) {
this.downstream = new PositionalBucketMerger(downstream, numNodes, outputRow.size());
}

@Override
public RowDownstreamHandle registerUpstream(RowUpstream upstream) {
remainingUpstreams.incrementAndGet();
return super.registerUpstream(upstream);
}

@Override
public void finish() {
if (remainingUpstreams.decrementAndGet() == 0) {
// flush all remaining buckets
Iterator<NodeBucket> it = nodeBuckets.values().iterator();
remainingRequests.set(nodeBuckets.size());
while (it.hasNext()) {
flushNodeBucket(it.next());
it.remove();
}
// flush all remaining buckets
Iterator<NodeBucket> it = nodeBuckets.values().iterator();
remainingRequests.set(nodeBuckets.size());
while (it.hasNext()) {
flushNodeBucket(it.next());
it.remove();
}

// no rows consumed (so no fetch requests made), but collect contexts are open, close them.
if (!consumedRows) {
closeContextsAndFinish();
} else {
finishDownstream();
// projector registered itself as an upstream to prevent downstream of
// flushing rows before all requests finished.
// release it now as no new rows are consumed anymore (downstream will flush all remaining rows)
}
// no rows consumed (so no fetch requests made), but collect contexts are open, close them.
if (!consumedRows) {
closeContextsAndFinish();
} else {
finishDownstream();
// projector registered itself as an upstream to prevent downstream of
// flushing rows before all requests finished.
// release it now as no new rows are consumed anymore (downstream will flush all remaining rows)
}
}

Expand All @@ -236,9 +220,7 @@ private void finishDownstream() {
@Override
public void fail(Throwable throwable) {
failures.add(throwable);
if (remainingUpstreams.decrementAndGet() == 0) {
closeContextsAndFinish();
}
closeContextsAndFinish();
}

@Nullable
Expand Down Expand Up @@ -301,7 +283,7 @@ public void onResponse(NodeFetchResponse response) {
if (!downstream.setNextBucket(rows, nodeBucket.nodeIdx)) {
consumingRows.set(false);
}
if (remainingRequests.decrementAndGet() <= 0 && remainingUpstreams.get() <= 0) {
if (remainingRequests.decrementAndGet() <= 0) {
closeContextsAndFinish();
} else {
downstream.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public Projector visitFetchProjection(FetchProjection projection, Context contex
symbolVisitor.process(projection.docIdSymbol(), ctxDocId);
assert ctxDocId.collectExpressions().size() == 1;

return new FetchProjector(
return new ForwardingProjector(new FetchProjector(
transportActionProvider.transportFetchNodeAction(),
transportActionProvider.transportCloseContextNodeAction(),
symbolVisitor.functions(),
Expand All @@ -355,7 +355,7 @@ public Projector visitFetchProjection(FetchProjection projection, Context contex
projection.partitionedBy(),
projection.jobSearchContextIdToNode(),
projection.jobSearchContextIdToShard(),
projection.executionNodes());
projection.executionNodes()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import io.crate.core.collections.Bucket;
import io.crate.core.collections.Row;
import io.crate.core.collections.RowN;
import io.crate.operation.projectors.RowReceiver;
import io.crate.test.integration.CrateUnitTest;
import io.crate.testing.CollectingProjector;
import io.crate.testing.CollectingRowReceiver;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand Down Expand Up @@ -57,8 +59,8 @@ public void testConcurrentSetNextBucket() throws Exception {
*/
int numUpstreams = 2;

CollectingProjector resultProvider = new CollectingProjector();
final PositionalBucketMerger bucketMerger = new PositionalBucketMerger(resultProvider, numUpstreams, 1);
CollectingRowReceiver rowReceiver = new CollectingRowReceiver();
final PositionalBucketMerger bucketMerger = new PositionalBucketMerger(rowReceiver, numUpstreams, 1);

final List<List<List<Object[]>>> bucketsPerUpstream = new ArrayList<>(numUpstreams);
List<List<Object[]>> upstream1 = new ArrayList<>(2);
Expand Down Expand Up @@ -113,20 +115,7 @@ public void run() {

assertThat(setNextRowExceptions, empty());

final SettableFuture<Bucket> results = SettableFuture.create();
Futures.addCallback(resultProvider.result(), new FutureCallback<Bucket>() {
@Override
public void onSuccess(Bucket result) {
results.set(result);
}

@Override
public void onFailure(Throwable t) {
results.setException(t);
}
});

Bucket result = results.get();
Bucket result = rowReceiver.result();
assertThat(result.size(), is(10));
Iterator<Row> it = result.iterator();
for (int i = 0; i < 10; i++) {
Expand All @@ -140,8 +129,8 @@ public void onFailure(Throwable t) {
public void testOneUpstreamWillFail() throws Exception {
final int numUpstreams = 2;

CollectingProjector resultProvider = new CollectingProjector();
final PositionalBucketMerger bucketMerger = new PositionalBucketMerger(resultProvider, numUpstreams, 1);
CollectingRowReceiver rowReceiver = new CollectingRowReceiver();
final PositionalBucketMerger bucketMerger = new PositionalBucketMerger(rowReceiver, numUpstreams, 1);

final List<List<List<Object[]>>> bucketsPerUpstream = new ArrayList<>(numUpstreams);
List<List<Object[]>> upstream1 = new ArrayList<>(2);
Expand Down Expand Up @@ -200,23 +189,9 @@ public void run() {

assertThat(setNextRowExceptions, empty());

final SettableFuture<Bucket> results = SettableFuture.create();
Futures.addCallback(resultProvider.result(), new FutureCallback<Bucket>() {
@Override
public void onSuccess(Bucket result) {
results.set(result);
}

@Override
public void onFailure(Throwable t) {
results.setException(t);
}
});

expectedException.expect(Throwable.class);
expectedException.expectMessage(String.format("[%d] I'm failing", numUpstreams-1));
results.get();

expectedException.expectMessage(String.format("[%d] I'm failing", numUpstreams - 1));
rowReceiver.result();
executorService.awaitTermination(1, TimeUnit.SECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.crate.planner.symbol.Symbol;
import io.crate.test.integration.CrateUnitTest;
import io.crate.testing.CollectingProjector;
import io.crate.testing.CollectingRowReceiver;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.index.shard.ShardId;
import org.junit.Test;
Expand All @@ -56,7 +57,7 @@ public void testCloseContextsAndFinishOnFail() throws Exception {
TransportCloseContextNodeAction transportCloseContextNodeAction = mock(TransportCloseContextNodeAction.class);
doThrow(new IllegalArgumentException("Node \"missing-node\" not found in cluster state!"))
.when(transportCloseContextNodeAction).execute(anyString(), any(NodeCloseContextRequest.class), any(ActionListener.class));
CollectingProjector downstream = new CollectingProjector();
RowReceiver rowReceiver = new CollectingRowReceiver();

FetchProjector projector = new FetchProjector(
mock(TransportFetchNodeAction.class),
Expand All @@ -71,8 +72,7 @@ public void testCloseContextsAndFinishOnFail() throws Exception {
IntObjectOpenHashMap.<String>newInstance(),
IntObjectOpenHashMap.<ShardId>newInstance(),
ImmutableSet.of("missing-node"));
projector.registerUpstream(mock(RowUpstream.class));
projector.downstream(downstream);
projector.downstream(rowReceiver);
projector.fail(new Throwable("Something when wrong"));

}
Expand Down

0 comments on commit 0aefc37

Please sign in to comment.