Skip to content

Commit

Permalink
- Fixed a threading issue in DistributingDownstreams
Browse files Browse the repository at this point in the history
that could cause some queries to return too few rows
- Prevent DistributingDownstreams to send needless
empty requests
  • Loading branch information
Philipp Bogensberger committed Jul 2, 2015
1 parent ccebc33 commit 9e78fe8
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 44 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Changes for Crate
Unreleased
==========

- Fixed a rare race condition that could cause some queries to return
too few rows

- Fix: IS NULL predicate in query could return records that have already been
marked for deletion

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,65 +86,31 @@ public boolean setNextRow(Row row) {
return true;
}

protected void sendRequestIfNeeded(int downstreamIdx) {
private void sendRequestIfNeeded(int downstreamIdx) {
int size = bucketBuilder.size(downstreamIdx);
if (size >= Constants.PAGE_SIZE || remainingUpstreams.get() <= 0) {
Downstream downstream = downstreams[downstreamIdx];
downstream.bucketQueue.add(bucketBuilder.build(downstreamIdx));
sendRequest(downstream);
downstream.sendRequest(remainingUpstreams.get() <= 0);
}
}

protected void onAllUpstreamsFinished() {
private void onAllUpstreamsFinished() {
for (int i = 0; i < downstreams.length; i++) {
sendRequestIfNeeded(i);
}
}

private void forwardFailures(Throwable throwable) {
for (Downstream downstream : downstreams) {
downstream.request.throwable(throwable);
sendRequest(downstream.request, downstream);
downstream.sendRequest(throwable);
}
}

private boolean allDownstreamsFinished() {
return finishedDownstreams.get() == downstreams.length;
}

private void sendRequest(Downstream downstream) {
if (downstream.requestPending.compareAndSet(false, true)) {
DistributedResultRequest request = downstream.request;
Deque<Bucket> queue = downstream.bucketQueue;
int size = queue.size();
if (size > 0) {
request.rows(queue.poll());
} else {
request.rows(Bucket.EMPTY);
}
request.isLast(!(size > 1 || remainingUpstreams.get() > 0));
sendRequest(request, downstream);
}
}

private void sendRequest(final DistributedResultRequest request, final Downstream downstream) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[{}] sending distributing collect request to {}, isLast? {} ...",
jobId.toString(),
downstream.node, request.isLast());
}
try {
transportDistributedResultAction.pushResult(
downstream.node,
request,
new DistributedResultResponseActionListener(downstream)
);
} catch (IllegalArgumentException e) {
LOGGER.error(e.getMessage(), e);
downstream.wantMore.set(false);
}
}

@Override
public Bucket doFinish() {
onAllUpstreamsFinished();
Expand All @@ -162,22 +128,73 @@ public Throwable doFail(Throwable t) {
return t;
}

static class Downstream {
private class Downstream {

final AtomicBoolean wantMore = new AtomicBoolean(true);
final AtomicBoolean requestPending = new AtomicBoolean(false);
final Deque<Bucket> bucketQueue = new ConcurrentLinkedDeque<>();
final DistributedResultRequest request;
final Deque<DistributedResultRequest> requestQueue = new ConcurrentLinkedDeque<>();
final String node;

final UUID jobId;
final int targetExecutionNodeId;
final int bucketIdx;
final Streamer<?>[] streamers;

public Downstream(String node,
UUID jobId,
int targetExecutionNodeId,
int bucketIdx,
Streamer<?>[] streamers) {
this.node = node;
this.request = new DistributedResultRequest(jobId, targetExecutionNodeId, bucketIdx, streamers);
this.jobId = jobId;
this.targetExecutionNodeId = targetExecutionNodeId;
this.bucketIdx = bucketIdx;
this.streamers = streamers;
}

public void sendRequest(Throwable t) {
DistributedResultRequest request = buildRequest();
request.throwable(t);
sendRequest(request);
}

public void sendRequest(boolean isLast) {
DistributedResultRequest request = buildRequest();
request.isLast(isLast);
Bucket bucket = bucketQueue.poll();
request.rows(bucket != null ? bucket : Bucket.EMPTY);
synchronized(this.requestQueue) {
if (!requestPending.compareAndSet(false, true)) {
requestQueue.add(request);
return;
}
}
sendRequest(request);
}

private void sendRequest(final DistributedResultRequest request) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("[{}] sending distributing collect request to {}, isLast? {} ...",
jobId.toString(),
node, request.isLast());
}
try {
transportDistributedResultAction.pushResult(
node,
request,
new DistributedResultResponseActionListener(this)
);
} catch (IllegalArgumentException e) {
LOGGER.error(e.getMessage(), e);
wantMore.set(false);
}
}

private DistributedResultRequest buildRequest() {
return new DistributedResultRequest(jobId, targetExecutionNodeId, bucketIdx, streamers);
}

}

private class DistributedResultResponseActionListener implements ActionListener<DistributedResultResponse> {
Expand All @@ -199,12 +216,20 @@ public void onResponse(DistributedResultResponse response) {
downstream.wantMore.set(response.needMore());
if (!response.needMore()) {
finishedDownstreams.incrementAndGet();
downstream.requestQueue.clear();
// clean-up queue because no more rows are wanted
downstream.bucketQueue.clear();
} else {
// send next request or final empty closing one
downstream.requestPending.set(false);
sendRequest(downstream);
DistributedResultRequest request;
synchronized (downstream.requestQueue) {
if (downstream.requestQueue.size() > 0) {
request = downstream.requestQueue.poll();
} else {
downstream.requestPending.set(false);
return;
}
}
downstream.sendRequest(request);
}
}

Expand Down

0 comments on commit 9e78fe8

Please sign in to comment.