Skip to content

Commit 72b5348

Browse files
seutmfussenegger
authored andcommitted
implement nested-loop operation
1 parent bbc2c47 commit 72b5348

34 files changed

+1335
-378
lines changed

sql/src/main/java/io/crate/action/job/ContextPreparer.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.crate.executor.transport.distributed.SingleBucketBuilder;
3131
import io.crate.jobs.CountContext;
3232
import io.crate.jobs.JobExecutionContext;
33+
import io.crate.jobs.NestedLoopContext;
3334
import io.crate.jobs.PageDownstreamContext;
3435
import io.crate.operation.PageDownstream;
3536
import io.crate.operation.PageDownstreamFactory;
@@ -46,6 +47,7 @@
4647
import io.crate.planner.node.dql.CollectNode;
4748
import io.crate.planner.node.dql.CountNode;
4849
import io.crate.planner.node.dql.MergeNode;
50+
import io.crate.planner.node.dql.join.NestedLoopNode;
4951
import io.crate.types.DataTypes;
5052
import org.elasticsearch.cluster.ClusterService;
5153
import org.elasticsearch.common.breaker.CircuitBreaker;
@@ -121,8 +123,8 @@ private PreparerContext(UUID jobId,
121123
private class InnerPreparer extends ExecutionNodeVisitor<PreparerContext, Void> {
122124

123125
@Override
124-
public Void visitCountNode(CountNode countNode, PreparerContext context) {
125-
Map<String, Map<String, List<Integer>>> locations = countNode.routing().locations();
126+
public Void visitCountNode(CountNode node, PreparerContext context) {
127+
Map<String, Map<String, List<Integer>>> locations = node.routing().locations();
126128
if (locations == null) {
127129
throw new IllegalArgumentException("locations are empty. Can't start count operation");
128130
}
@@ -137,15 +139,15 @@ public Void visitCountNode(CountNode countNode, PreparerContext context) {
137139
countOperation,
138140
singleBucketBuilder,
139141
indexShardMap,
140-
countNode.whereClause()
142+
node.whereClause()
141143
);
142144
context.directResultFuture = singleBucketBuilder.result();
143-
context.contextBuilder.addSubContext(countNode.executionNodeId(), countContext);
145+
context.contextBuilder.addSubContext(node.executionNodeId(), countContext);
144146
return null;
145147
}
146148

147149
@Override
148-
public Void visitMergeNode(final MergeNode node, final PreparerContext context) {
150+
public Void visitMergeNode(MergeNode node, PreparerContext context) {
149151
RamAccountingContext ramAccountingContext = RamAccountingContext.forExecutionNode(circuitBreaker, node);
150152
ResultProvider downstream = resultProviderFactory.createDownstream(node, node.jobId());
151153
Tuple<PageDownstream, FlatProjectorChain> pageDownstreamProjectorChain =
@@ -172,9 +174,9 @@ public Void visitMergeNode(final MergeNode node, final PreparerContext context)
172174
}
173175

174176
@Override
175-
public Void visitCollectNode(final CollectNode node, final PreparerContext context) {
177+
public Void visitCollectNode(CollectNode node, PreparerContext context) {
176178
RamAccountingContext ramAccountingContext = RamAccountingContext.forExecutionNode(circuitBreaker, node);
177-
ResultProvider downstream = collectOperation.createDownstream(node);
179+
ResultProvider downstream = resultProviderFactory.createDownstream(node, node.jobId());
178180

179181
if (ExecutionNodes.hasDirectResponseDownstream(node.downstreamNodes())) {
180182
context.directResultFuture = downstream.result();
@@ -189,5 +191,23 @@ public Void visitCollectNode(final CollectNode node, final PreparerContext conte
189191
context.contextBuilder.addSubContext(node.executionNodeId(), jobCollectContext);
190192
return null;
191193
}
194+
195+
@Override
196+
public Void visitNestedLoopNode(NestedLoopNode node, PreparerContext context) {
197+
RamAccountingContext ramAccountingContext = RamAccountingContext.forExecutionNode(circuitBreaker, node);
198+
199+
ResultProvider downstream = resultProviderFactory.createDownstream(node, node.jobId());
200+
201+
NestedLoopContext nestedLoopContext = new NestedLoopContext(
202+
node,
203+
downstream,
204+
ramAccountingContext,
205+
pageDownstreamFactory,
206+
threadPool,
207+
streamerVisitor);
208+
209+
context.contextBuilder.addSubContext(node.executionNodeId(), nestedLoopContext);
210+
return null;
211+
}
192212
}
193213
}

sql/src/main/java/io/crate/executor/Job.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public UUID id() {
4343
return id;
4444
}
4545

46-
public void addTasks(Collection<Task> tasks) {
46+
public void addTasks(Collection<? extends Task> tasks) {
4747
this.tasks.addAll(tasks);
4848
}
4949

sql/src/main/java/io/crate/executor/transport/ExecutionNodesTask.java

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@ public class ExecutionNodesTask extends JobTask {
6262
private static final ESLogger LOGGER = Loggers.getLogger(ExecutionNodesTask.class);
6363

6464
private final TransportJobAction transportJobAction;
65-
private final List<List<ExecutionNode>> groupedExecutionNodes;
66-
private final List<SettableFuture<TaskResult>> results;
67-
private final boolean hasDirectResponse;
6865
private final ClusterService clusterService;
6966
private ContextPreparer contextPreparer;
7067
private final JobContextService jobContextService;
@@ -73,15 +70,13 @@ public class ExecutionNodesTask extends JobTask {
7370
private TransportCloseContextNodeAction transportCloseContextNodeAction;
7471
private final StreamerVisitor streamerVisitor;
7572
private final CircuitBreaker circuitBreaker;
76-
private List<MergeNode> mergeNodes;
73+
74+
private final List<List<ExecutionNode>> groupedExecutionNodes = new ArrayList<>();
75+
private final List<MergeNode> finalMergeNodes = new ArrayList<>();
76+
private final List<SettableFuture<TaskResult>> results = new ArrayList<>();
77+
private boolean hasDirectResponse;
7778
private boolean rowCountResult = false;
7879

79-
/**
80-
* @param mergeNodes list of mergeNodes for the final merge operation on the handler.
81-
* This may be null in the constructor but then it must be set using the
82-
* {@link #mergeNodes(List)} setter before {@link #start()} is called.
83-
* Multiple merge nodes are only occurring on bulk operations.
84-
*/
8580
protected ExecutionNodesTask(UUID jobId,
8681
ClusterService clusterService,
8782
ContextPreparer contextPreparer,
@@ -91,9 +86,7 @@ protected ExecutionNodesTask(UUID jobId,
9186
TransportJobAction transportJobAction,
9287
TransportCloseContextNodeAction transportCloseContextNodeAction,
9388
StreamerVisitor streamerVisitor,
94-
CircuitBreaker circuitBreaker,
95-
@Nullable List<MergeNode> mergeNodes,
96-
List<List<ExecutionNode>> groupedExecutionNodes) {
89+
CircuitBreaker circuitBreaker) {
9790
super(jobId);
9891
this.clusterService = clusterService;
9992
this.contextPreparer = contextPreparer;
@@ -103,21 +96,31 @@ protected ExecutionNodesTask(UUID jobId,
10396
this.transportCloseContextNodeAction = transportCloseContextNodeAction;
10497
this.streamerVisitor = streamerVisitor;
10598
this.circuitBreaker = circuitBreaker;
106-
this.mergeNodes = mergeNodes;
10799
this.transportJobAction = transportJobAction;
108-
this.groupedExecutionNodes = groupedExecutionNodes;
109-
hasDirectResponse = hasDirectResponse(groupedExecutionNodes);
100+
}
110101

111-
List<SettableFuture<TaskResult>> results = new ArrayList<>(groupedExecutionNodes.size());
112-
for (int i = 0; i < groupedExecutionNodes.size(); i++) {
102+
103+
/**
104+
* @param finalMergeNode a mergeNode for the final merge operation on the handler.
105+
* Multiple merge nodes are only occurring on bulk operations.
106+
*/
107+
public void addFinalMergeNode(MergeNode finalMergeNode) {
108+
finalMergeNode.jobId(jobId());
109+
finalMergeNodes.add(finalMergeNode);
110+
}
111+
112+
public void addExecutionNode(int group, ExecutionNode executionNode) {
113+
executionNode.jobId(jobId());
114+
while (group >= groupedExecutionNodes.size()) {
113115
results.add(SettableFuture.<TaskResult>create());
116+
groupedExecutionNodes.add(new ArrayList<ExecutionNode>());
114117
}
115-
this.results = results;
116-
}
118+
List<ExecutionNode> executionNodes = groupedExecutionNodes.get(group);
117119

118-
public void mergeNodes(List<MergeNode> mergeNodes) {
119-
assert this.mergeNodes == null : "can only overwrite mergeNodes if it was null";
120-
this.mergeNodes = mergeNodes;
120+
if (ExecutionNodes.hasDirectResponseDownstream(executionNode.downstreamNodes())) {
121+
hasDirectResponse = true;
122+
}
123+
executionNodes.add(executionNode);
121124
}
122125

123126
public void rowCountResult(boolean rowCountResult) {
@@ -126,7 +129,7 @@ public void rowCountResult(boolean rowCountResult) {
126129

127130
@Override
128131
public void start() {
129-
assert mergeNodes != null : "mergeNodes must not be null";
132+
assert finalMergeNodes.size() == groupedExecutionNodes.size() : "groupedExecutionNodes and finalMergeNodes sizes must match";
130133

131134
Map<String, Collection<ExecutionNode>> nodesByServer = ExecutionNodeGrouper.groupByServer(clusterService.state().nodes().localNodeId(), groupedExecutionNodes);
132135
RowDownstream rowDownstream;
@@ -135,21 +138,21 @@ public void start() {
135138
} else {
136139
rowDownstream = new QueryResultRowDownstream(results);
137140
}
138-
Streamer<?>[] streamers = streamerVisitor.processExecutionNode(mergeNodes.get(0)).inputStreamers();
141+
Streamer<?>[] streamers = streamerVisitor.processExecutionNode(finalMergeNodes.get(0)).inputStreamers();
139142
List<PageDownstreamContext> pageDownstreamContexts = new ArrayList<>(groupedExecutionNodes.size());
140143

141144
for (int i = 0; i < groupedExecutionNodes.size(); i++) {
142145
RamAccountingContext ramAccountingContext = RamAccountingContext.forExecutionNode(
143-
circuitBreaker, mergeNodes.get(i));
146+
circuitBreaker, finalMergeNodes.get(i));
144147

145148
PageDownstreamContext pageDownstreamContext = createPageDownstreamContext(ramAccountingContext, streamers,
146-
mergeNodes.get(i), groupedExecutionNodes.get(i), rowDownstream);
149+
finalMergeNodes.get(i), groupedExecutionNodes.get(i), rowDownstream);
147150
if (nodesByServer.size() == 0) {
148151
pageDownstreamContext.finish();
149152
continue;
150153
}
151154
if (!hasDirectResponse) {
152-
createLocalContextAndStartOperation(pageDownstreamContext, nodesByServer, mergeNodes.get(i).executionNodeId());
155+
createLocalContextAndStartOperation(pageDownstreamContext, nodesByServer, finalMergeNodes.get(i).executionNodeId());
153156
}
154157
pageDownstreamContexts.add(pageDownstreamContext);
155158
}
@@ -262,6 +265,11 @@ public void onFailure(@Nonnull Throwable t) {
262265

263266
@Override
264267
public List<? extends ListenableFuture<TaskResult>> result() {
268+
if (results.size() != groupedExecutionNodes.size()) {
269+
for (int i = 0; i < groupedExecutionNodes.size(); i++) {
270+
results.add(SettableFuture.<TaskResult>create());
271+
}
272+
}
265273
return results;
266274
}
267275

0 commit comments

Comments
 (0)