Skip to content

Commit

Permalink
clean up job action handler
Browse files Browse the repository at this point in the history
- simplified MapSideCollectOperation handling
  • Loading branch information
seut committed Apr 15, 2015
1 parent c275749 commit df11233
Show file tree
Hide file tree
Showing 22 changed files with 158 additions and 401 deletions.
21 changes: 1 addition & 20 deletions sql/src/main/java/io/crate/action/job/JobRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,18 @@

public class JobRequest extends TransportRequest {

public static final int NO_DIRECT_RETURN = -1;

private UUID jobId;
private List<ExecutionNode> executionNodes;
private int returnResultFromNode = NO_DIRECT_RETURN;

protected JobRequest() {
}

public JobRequest(UUID jobId, List<ExecutionNode> executionNodes) {
// TODO: assert that only 1 DIRECT_RETURN_DOWNSTREAM_NODE on all execution nodes is set
this.jobId = jobId;
this.executionNodes = executionNodes;
}

public JobRequest(UUID jobId, List<ExecutionNode> executionNodes, int returnResultFromNode) {
assert returnResultFromNode < executionNodes.size() : "invalid returnResultFromNode index";
this.jobId = jobId;
this.executionNodes = executionNodes;
this.returnResultFromNode = returnResultFromNode;
}

public UUID jobId() {
return jobId;
}
Expand All @@ -64,14 +55,6 @@ public Collection<ExecutionNode> executionNodes() {
return this.executionNodes;
}

/**
* @return the index of the executionNode from which the result should be
* directly returned in the {@link JobResponse}.
*/
public int returnResultFromNode() {
return returnResultFromNode;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
Expand All @@ -84,7 +67,6 @@ public void readFrom(StreamInput in) throws IOException {
ExecutionNode node = ExecutionNodes.fromStream(in);
executionNodes.add(node);
}
returnResultFromNode = in.readInt();
}

@Override
Expand All @@ -98,6 +80,5 @@ public void writeTo(StreamOutput out) throws IOException {
for (ExecutionNode executionNode : executionNodes) {
ExecutionNodes.toStream(out, executionNode);
}
out.writeInt(returnResultFromNode);
}
}
6 changes: 1 addition & 5 deletions sql/src/main/java/io/crate/action/job/JobResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,8 @@ public class JobResponse extends TransportResponse {
public JobResponse() {
}

public JobResponse(@Nonnull Bucket bucket, @Nonnull Streamer<?>[] streamers) {
public JobResponse(@Nonnull Bucket bucket) {
this.directResponse = Optional.of(bucket);
if (bucket instanceof StreamBucket) {
((StreamBucket) bucket).streamers(streamers);
}
this.streamers = streamers;
}

public Optional<Bucket> directResponse() {
Expand Down

This file was deleted.

59 changes: 46 additions & 13 deletions sql/src/main/java/io/crate/action/job/TransportJobAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
import io.crate.breaker.CrateCircuitBreakerService;
import io.crate.breaker.RamAccountingContext;
import io.crate.core.collections.Bucket;
import io.crate.exceptions.Exceptions;
import io.crate.executor.transport.DefaultTransportResponseHandler;
import io.crate.executor.transport.ResponseForwarder;
import io.crate.operation.collect.MapSideDataCollectOperation;
import io.crate.operation.collect.StatsTables;
import io.crate.operation.projectors.ResultProvider;
import io.crate.planner.node.ExecutionNode;
import io.crate.planner.node.ExecutionNodeVisitor;
import io.crate.planner.node.StreamerVisitor;
Expand Down Expand Up @@ -71,18 +75,21 @@ public class TransportJobAction {
private final CircuitBreaker circuitBreaker;
private final StreamerVisitor streamerVisitor;
private final ExecutionNodesExecutingVisitor executionNodeVisitor;
private final MapSideCollectOperationDispatcher collectOperationHandler;
private final MapSideDataCollectOperation collectOperationHandler;
private final StatsTables statsTables;

@Inject
public TransportJobAction(TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
CrateCircuitBreakerService breakerService,
StreamerVisitor streamerVisitor,
MapSideCollectOperationDispatcher collectOperationHandler) {
StatsTables statsTables,
MapSideDataCollectOperation collectOperationHandler) {
this.threadPool = threadPool;
this.circuitBreaker = breakerService.getBreaker(CrateCircuitBreakerService.QUERY_BREAKER);
this.clusterService = clusterService;
this.statsTables = statsTables;
this.collectOperationHandler = collectOperationHandler;
this.transportService = transportService;
transportService.registerHandler(ACTION_NAME, new JobInitHandler());
Expand Down Expand Up @@ -130,6 +137,7 @@ private void nodeOperation(final JobRequest request, final ActionListener<JobRes
final RamAccountingContext ramAccountingContext =
new RamAccountingContext(ramAccountingContextId, circuitBreaker);

/*
if (request.returnResultFromNode() == i) {
LOGGER.trace("handling execution Node #{} and return its result", i);
// execution node whose result shall be returned
Expand All @@ -141,11 +149,12 @@ private void nodeOperation(final JobRequest request, final ActionListener<JobRes
request.jobId()
));
} else {
executionFutures.add(executionNodeVisitor.handle(executionNode,
*/
executionFutures.add(executionNodeVisitor.handle(
executionNode,
ramAccountingContext,
request.jobId()
));
}
} catch (Throwable t) {
LOGGER.error("error starting ExecutionNode {}", t, executionNode);
actionListener.onFailure(t);
Expand All @@ -159,14 +168,15 @@ private void nodeOperation(final JobRequest request, final ActionListener<JobRes
@Override
public void onSuccess(@Nullable List<Bucket> buckets) {
assert buckets != null;
if (request.returnResultFromNode() == JobRequest.NO_DIRECT_RETURN) {
if (buckets.isEmpty()) {
actionListener.onResponse(new JobResponse());
} else {
assert finalDirectResultStreamer != null;
Bucket directResultBucket = buckets.get(request.returnResultFromNode());
//assert finalDirectResultStreamer != null;
assert buckets.size() == 1;
Bucket directResultBucket = buckets.get(0);
LOGGER.trace("direct result ready: {}", directResultBucket);
actionListener.onResponse(
new JobResponse(directResultBucket, finalDirectResultStreamer)
new JobResponse(directResultBucket)
);
}
}
Expand Down Expand Up @@ -224,11 +234,34 @@ public Void visitCollectNode(final CollectNode collectNode, final VisitorContext
threadPool.executor(COLLECT_EXECUTOR).execute(new Runnable() {
@Override
public void run() {
collectOperationHandler.executeCollect(
context.jobId,
collectNode,
context.ramAccountingContext,
context.directResultFuture);
final UUID operationId = UUID.randomUUID();
statsTables.operationStarted(operationId, context.jobId, collectNode.name());
ResultProvider downstream = collectOperationHandler.createDownstream(collectNode);
Futures.addCallback(downstream.result(), new FutureCallback<Bucket>() {
@Override
public void onSuccess(@Nullable Bucket result) {
statsTables.operationFinished(operationId, null, context.ramAccountingContext.totalBytes());
context.ramAccountingContext.close();
if (result == null) {
context.directResultFuture.set(Bucket.EMPTY);
} else {
context.directResultFuture.set(result);
}
}

@Override
public void onFailure(Throwable t) {
statsTables.operationFinished(operationId, Exceptions.messageOf(t),
context.ramAccountingContext.totalBytes());
context.ramAccountingContext.close();
context.directResultFuture.setException(t);
}
});
try {
collectOperationHandler.collect(collectNode, downstream, context.ramAccountingContext);
} catch (Throwable t) {
downstream.fail(t);
}
}
});
return null;
Expand Down
Loading

0 comments on commit df11233

Please sign in to comment.