Skip to content

Commit

Permalink
make distributed downstream pageable
Browse files Browse the repository at this point in the history
  • Loading branch information
seut committed Apr 15, 2015
1 parent 038313c commit 7b9dd1b
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public void add(Row row) throws IOException {
}
}

public int size() {
return size;
}

public void writeToStream(StreamOutput output) throws IOException {
output.writeVInt(size);
if (size > 0) {
Expand All @@ -92,6 +96,11 @@ public StreamBucket build() throws IOException {
sb.bytes = out.bytes();
return sb;
}

public void reset() {
out.reset();
size = 0;
}
}

public StreamBucket(@Nullable Streamer<?>[] streamers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

package io.crate.executor.transport.distributed;

import io.crate.Constants;
import io.crate.Streamer;
import io.crate.core.collections.Bucket;
import io.crate.core.collections.Row;
Expand All @@ -47,6 +48,8 @@ public class DistributingDownstream extends ResultProviderBase {
private final DistributedResultRequest[] requests;
private List<DiscoveryNode> downstreams;
private final TransportService transportService;
private final Boolean[] downstreamWantMore;
private int finishedDownstreams = 0;


public DistributingDownstream(UUID jobId,
Expand All @@ -58,6 +61,7 @@ public DistributingDownstream(UUID jobId,
this.downstreams = downstreams;
this.transportService = transportService;
this.bucketBuilder = new MultiBucketBuilder(streamers, downstreams.size());
this.downstreamWantMore = new Boolean[downstreams.size()];
this.jobId = jobId;
this.requests = new DistributedResultRequest[downstreams.size()];
for (int i = 0, length = downstreams.size(); i < length; i++) {
Expand All @@ -67,27 +71,38 @@ public DistributingDownstream(UUID jobId,

@Override
public synchronized boolean setNextRow(Row row) {
if (allDownstreamsFinished()) {
return false;
}
try {
bucketBuilder.setNextRow(row);
int downstreamIdx = bucketBuilder.getBucket(row);
// only collect if downstream want more rows, otherwise just ignore the row
if (downstreamWantMore[downstreamIdx] == null
|| downstreamWantMore[downstreamIdx]) {
bucketBuilder.setNextRow(downstreamIdx, row);
sendRequestIfNeeded(downstreamIdx);
}
} catch (IOException e) {
fail(e);
return false;
}
return true;
}

protected void sendRequestIfNeeded(int downstreamIdx) {
if (bucketBuilder.size(downstreamIdx) >= Constants.PAGE_SIZE) {
DistributedResultRequest request = this.requests[downstreamIdx];
request.rows(bucketBuilder.build(downstreamIdx));
sendRequest(request, downstreamIdx);
}
}

protected void onAllUpstreamsFinished() {
int i = 0;
for (Bucket rows : bucketBuilder.build()) {
DistributedResultRequest request = this.requests[i];
request.rows(rows);
final DiscoveryNode node = downstreams.get(i);
if (logger.isTraceEnabled()) {
logger.trace("[{}] sending distributing collect request to {} ...",
jobId.toString(),
node.id());
}
sendRequest(request, node);
sendRequest(request, i);
i++;
}
}
Expand All @@ -96,12 +111,22 @@ private void forwardFailures(Throwable throwable) {
int idx = 0;
for (DistributedResultRequest request : requests) {
request.throwable(throwable);
sendRequest(request, downstreams.get(idx));
sendRequest(request, idx);
idx++;
}
}

private void sendRequest(final DistributedResultRequest request, final DiscoveryNode node) {
private boolean allDownstreamsFinished() {
return finishedDownstreams == downstreams.size();
}

private void sendRequest(final DistributedResultRequest request, final int downstreamIdx) {
final DiscoveryNode node = downstreams.get(downstreamIdx);
if (logger.isTraceEnabled()) {
logger.trace("[{}] sending distributing collect request to {} ...",
jobId.toString(),
node.id());
}
transportService.submitRequest(
node,
TransportDistributedResultAction.DISTRIBUTED_RESULT_ACTION,
Expand All @@ -120,6 +145,10 @@ public void handleResponse(DistributedResultResponse response) {
jobId.toString(),
node.id());
}
downstreamWantMore[downstreamIdx] = response.needMore();
if (!downstreamWantMore[downstreamIdx]) {
finishedDownstreams++;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

package io.crate.executor.transport.distributed;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.crate.Streamer;
import io.crate.core.collections.Bucket;
Expand Down Expand Up @@ -51,10 +52,27 @@ public List<Bucket> build() {
return Lists.transform(bucketBuilders, StreamBucket.Builder.BUILD_FUNCTION);
}

public Bucket build(int bucketIdx) {
Bucket bucket = null;
try {
StreamBucket.Builder builder = bucketBuilders.get(bucketIdx);
bucket = builder.build();
builder.reset();
} catch (IOException e) {
Throwables.propagate(e);
}

return bucket;
}

public int size(int bucketIdx) {
return bucketBuilders.get(bucketIdx).size();
}

/**
* get bucket number by doing modulo hashcode of first row-element
*/
protected int getBucket(Row row) {
public int getBucket(Row row) {
int hash = hashCode(row.get(0));
if (hash == Integer.MIN_VALUE) {
hash = 0; // Math.abs(Integer.MIN_VALUE) == Integer.MIN_VALUE
Expand All @@ -77,8 +95,8 @@ private static int hashCode(@Nullable Object value) {
return value.hashCode();
}

public synchronized void setNextRow(Row row) throws IOException {
bucketBuilders.get(getBucket(row)).add(row);
public synchronized void setNextRow(int index, Row row) throws IOException {
bucketBuilders.get(index).add(row);
}

}

0 comments on commit 7b9dd1b

Please sign in to comment.