Skip to content

Commit

Permalink
unify bucketmerger implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
msbt committed Mar 13, 2015
1 parent 2f8fed7 commit aa4cef0
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 75 deletions.
12 changes: 2 additions & 10 deletions sql/src/main/java/io/crate/operation/merge/BucketMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,18 @@

package io.crate.operation.merge;

import com.google.common.util.concurrent.ListenableFuture;
import io.crate.core.collections.Bucket;
import io.crate.operation.PageDownstream;
import io.crate.operation.RowDownstream;
import io.crate.operation.RowUpstream;

import java.util.List;

/**
* Bridge between Pages transmitted over the network containing buckets and Projectors
* which are {@linkplain io.crate.core.collections.Row} based.
*
* Merges rows from a list of buckets to a single stream of rows which are fed to
* the downstream projector.
*/
public interface BucketMerger extends RowUpstream {
public void merge(List<ListenableFuture<Bucket>> buckets);

public void finish();

public void fail(Throwable t);
public interface BucketMerger extends PageDownstream, RowUpstream {

/**
* sets the downstream to send rows to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class MergeOperation implements DownstreamOperation {

private final int numUpstreams;
private final FlatProjectorChain projectorChain;
private final NonSortingBucketMerger bucketMerger;
private final BucketMerger bucketMerger;

public MergeOperation(ClusterService clusterService,
Settings settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* and just emits a stream of rows, whose order is undeterministic
* as it is not guaranteed which row from which bucket ends up in the stream at which position.
*/
public class NonSortingBucketMerger implements PageDownstream, RowUpstream {
public class NonSortingBucketMerger implements BucketMerger {

private static final ESLogger LOGGER = Loggers.getLogger(NonSortingBucketMerger.class);

Expand Down Expand Up @@ -122,6 +122,7 @@ public void fail(Throwable t) {
}
}

@Override
public void downstream(RowDownstream downstream) {
assert downstream != null : "downstream must not be null";
this.downstream = downstream.registerUpstream(this);
Expand Down
62 changes: 45 additions & 17 deletions sql/src/main/java/io/crate/operation/merge/SortingBucketMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import io.crate.core.collections.Bucket;
import io.crate.core.collections.BucketPage;
import io.crate.core.collections.Row;
import io.crate.operation.PageConsumeListener;
import io.crate.operation.RowDownstream;
import io.crate.operation.RowDownstreamHandle;
import io.crate.operation.projectors.NoOpProjector;
Expand Down Expand Up @@ -95,28 +98,53 @@ public SortingBucketMerger(int numBuckets,
*
* see private emitBuckets(...) for more details on how this works
*/
public void merge(List<ListenableFuture<Bucket>> buckets) {
assert buckets.size() == numBuckets :
@Override
public void nextPage(BucketPage page, final PageConsumeListener listener) {
assert page.buckets().size() == numBuckets :
"number of buckets received in merge call must match the number given in the constructor";
final AtomicBoolean listenerNotified = new AtomicBoolean(false);
Futures.addCallback(Futures.allAsList(page.buckets()), new FutureCallback<List<Bucket>>() {
@Override
public void onSuccess(List<Bucket> buckets) {
try {
if (numBuckets == 1) {
emitSingleBucket(buckets.get(0));
} else {
ArrayList<Iterator<Row>> bucketIts = new ArrayList<>(numBuckets);
for (int i = 0; i < buckets.size(); i++) {
Iterator<Row> remainingBucketIt = remainingBucketIts[i];
if (remainingBucketIt == null) {
bucketIts.add(buckets.get(i).iterator());
} else {
bucketIts.add(Iterators.concat(remainingBucketIt, buckets.get(i).iterator()));
}
}
emitBuckets(bucketIts);
}
} catch (Throwable t) {
downstream.fail(t);
} finally {
notifyListener();
}
}

try {
if (numBuckets == 1) {
emitSingleBucket(buckets.get(0).get());
} else {
ArrayList<Iterator<Row>> bucketIts = new ArrayList<>(numBuckets);
for (int i = 0; i < buckets.size(); i++) {
Iterator<Row> remainingBucketIt = remainingBucketIts[i];
if (remainingBucketIt == null) {
bucketIts.add(buckets.get(i).get().iterator());
private void notifyListener() {
if (!listenerNotified.getAndSet(true)) {
if (wantMore.get()) {
listener.needMore();
} else {
bucketIts.add(Iterators.concat(remainingBucketIt, new FutureBackedRowIterator(buckets.get(i))));
listener.finish();
}
}
emitBuckets(bucketIts);
}
} catch (Throwable t) {
downstream.fail(t);
}

@Override
public void onFailure(Throwable t) {
wantMore.set(false);
fail(t);
notifyListener();
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public void finish() {
merger.finish();
}
});
} else {
merger.finish();
}
return collectingProjector.result().get();
}
Expand Down
Loading

0 comments on commit aa4cef0

Please sign in to comment.