From 7b9dd1b3e80cb54fee711d1247c1ecf7dabc5a56 Mon Sep 17 00:00:00 2001 From: Sebastian Utz Date: Wed, 15 Apr 2015 12:13:52 +0200 Subject: [PATCH] make distributed downstream pageable --- .../executor/transport/StreamBucket.java | 9 ++++ .../distributed/DistributingDownstream.java | 49 +++++++++++++++---- .../distributed/MultiBucketBuilder.java | 24 +++++++-- 3 files changed, 69 insertions(+), 13 deletions(-) diff --git a/sql/src/main/java/io/crate/executor/transport/StreamBucket.java b/sql/src/main/java/io/crate/executor/transport/StreamBucket.java index 0cfc1154f7a4..01cfe0328402 100644 --- a/sql/src/main/java/io/crate/executor/transport/StreamBucket.java +++ b/sql/src/main/java/io/crate/executor/transport/StreamBucket.java @@ -79,6 +79,10 @@ public void add(Row row) throws IOException { } } + public int size() { + return size; + } + public void writeToStream(StreamOutput output) throws IOException { output.writeVInt(size); if (size > 0) { @@ -92,6 +96,11 @@ public StreamBucket build() throws IOException { sb.bytes = out.bytes(); return sb; } + + public void reset() { + out.reset(); + size = 0; + } } public StreamBucket(@Nullable Streamer[] streamers) { diff --git a/sql/src/main/java/io/crate/executor/transport/distributed/DistributingDownstream.java b/sql/src/main/java/io/crate/executor/transport/distributed/DistributingDownstream.java index a362fba232cd..1b80e4d7ba0c 100644 --- a/sql/src/main/java/io/crate/executor/transport/distributed/DistributingDownstream.java +++ b/sql/src/main/java/io/crate/executor/transport/distributed/DistributingDownstream.java @@ -21,6 +21,7 @@ package io.crate.executor.transport.distributed; +import io.crate.Constants; import io.crate.Streamer; import io.crate.core.collections.Bucket; import io.crate.core.collections.Row; @@ -47,6 +48,8 @@ public class DistributingDownstream extends ResultProviderBase { private final DistributedResultRequest[] requests; private List downstreams; private final TransportService transportService; + private final Boolean[] downstreamWantMore; + private int finishedDownstreams = 0; public DistributingDownstream(UUID jobId, @@ -58,6 +61,7 @@ public DistributingDownstream(UUID jobId, this.downstreams = downstreams; this.transportService = transportService; this.bucketBuilder = new MultiBucketBuilder(streamers, downstreams.size()); + this.downstreamWantMore = new Boolean[downstreams.size()]; this.jobId = jobId; this.requests = new DistributedResultRequest[downstreams.size()]; for (int i = 0, length = downstreams.size(); i < length; i++) { @@ -67,8 +71,17 @@ public DistributingDownstream(UUID jobId, @Override public synchronized boolean setNextRow(Row row) { + if (allDownstreamsFinished()) { + return false; + } try { - bucketBuilder.setNextRow(row); + int downstreamIdx = bucketBuilder.getBucket(row); + // only collect if downstream want more rows, otherwise just ignore the row + if (downstreamWantMore[downstreamIdx] == null + || downstreamWantMore[downstreamIdx]) { + bucketBuilder.setNextRow(downstreamIdx, row); + sendRequestIfNeeded(downstreamIdx); + } } catch (IOException e) { fail(e); return false; @@ -76,18 +89,20 @@ public synchronized boolean setNextRow(Row row) { return true; } + protected void sendRequestIfNeeded(int downstreamIdx) { + if (bucketBuilder.size(downstreamIdx) >= Constants.PAGE_SIZE) { + DistributedResultRequest request = this.requests[downstreamIdx]; + request.rows(bucketBuilder.build(downstreamIdx)); + sendRequest(request, downstreamIdx); + } + } + protected void onAllUpstreamsFinished() { int i = 0; for (Bucket rows : bucketBuilder.build()) { DistributedResultRequest request = this.requests[i]; request.rows(rows); - final DiscoveryNode node = downstreams.get(i); - if (logger.isTraceEnabled()) { - logger.trace("[{}] sending distributing collect request to {} ...", - jobId.toString(), - node.id()); - } - sendRequest(request, node); + sendRequest(request, i); i++; } } @@ -96,12 +111,22 @@ private void forwardFailures(Throwable throwable) { int idx = 0; for (DistributedResultRequest request : requests) { request.throwable(throwable); - sendRequest(request, downstreams.get(idx)); + sendRequest(request, idx); idx++; } } - private void sendRequest(final DistributedResultRequest request, final DiscoveryNode node) { + private boolean allDownstreamsFinished() { + return finishedDownstreams == downstreams.size(); + } + + private void sendRequest(final DistributedResultRequest request, final int downstreamIdx) { + final DiscoveryNode node = downstreams.get(downstreamIdx); + if (logger.isTraceEnabled()) { + logger.trace("[{}] sending distributing collect request to {} ...", + jobId.toString(), + node.id()); + } transportService.submitRequest( node, TransportDistributedResultAction.DISTRIBUTED_RESULT_ACTION, @@ -120,6 +145,10 @@ public void handleResponse(DistributedResultResponse response) { jobId.toString(), node.id()); } + downstreamWantMore[downstreamIdx] = response.needMore(); + if (!downstreamWantMore[downstreamIdx]) { + finishedDownstreams++; + } } @Override diff --git a/sql/src/main/java/io/crate/executor/transport/distributed/MultiBucketBuilder.java b/sql/src/main/java/io/crate/executor/transport/distributed/MultiBucketBuilder.java index 0f03e87e8557..9a11573cceb9 100644 --- a/sql/src/main/java/io/crate/executor/transport/distributed/MultiBucketBuilder.java +++ b/sql/src/main/java/io/crate/executor/transport/distributed/MultiBucketBuilder.java @@ -21,6 +21,7 @@ package io.crate.executor.transport.distributed; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import io.crate.Streamer; import io.crate.core.collections.Bucket; @@ -51,10 +52,27 @@ public List build() { return Lists.transform(bucketBuilders, StreamBucket.Builder.BUILD_FUNCTION); } + public Bucket build(int bucketIdx) { + Bucket bucket = null; + try { + StreamBucket.Builder builder = bucketBuilders.get(bucketIdx); + bucket = builder.build(); + builder.reset(); + } catch (IOException e) { + Throwables.propagate(e); + } + + return bucket; + } + + public int size(int bucketIdx) { + return bucketBuilders.get(bucketIdx).size(); + } + /** * get bucket number by doing modulo hashcode of first row-element */ - protected int getBucket(Row row) { + public int getBucket(Row row) { int hash = hashCode(row.get(0)); if (hash == Integer.MIN_VALUE) { hash = 0; // Math.abs(Integer.MIN_VALUE) == Integer.MIN_VALUE @@ -77,8 +95,8 @@ private static int hashCode(@Nullable Object value) { return value.hashCode(); } - public synchronized void setNextRow(Row row) throws IOException { - bucketBuilders.get(getBucket(row)).add(row); + public synchronized void setNextRow(int index, Row row) throws IOException { + bucketBuilders.get(index).add(row); } }