Skip to content

Commit

Permalink
fixup! fixup! implement nested-loop operation
Browse files Browse the repository at this point in the history
  • Loading branch information
seut committed Jun 16, 2015
1 parent f3c0431 commit 23e6203
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,8 @@ public Void visitNestedLoop(NestedLoop plan, Context context) {
if (isRootPlan) {
context.isRootPlan = false;
}
process(plan.left(), context);
process(plan.right(), context);
process(plan.left().plan(), context);
process(plan.right().plan(), context);

if (plan.nestedLoopNode().leftMergeNode() != null) {
plan.nestedLoopNode().leftMergeNode().jobId(context.executionNodesTask.jobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class NestedLoopOperation implements RowUpstream, RowDownstream {

Expand All @@ -44,6 +45,7 @@ public class NestedLoopOperation implements RowUpstream, RowDownstream {
private final AtomicBoolean rightFinished = new AtomicBoolean(false);
private final ArrayBlockingQueue<Row> innerRowsQ = new ArrayBlockingQueue<>(1);
private final Object finishedLock = new Object();
private final AtomicInteger numUpstreams = new AtomicInteger(0);

private final static ESLogger LOGGER = Loggers.getLogger(NestedLoopOperation.class);
private final static Row SENTINEL = new Row() {
Expand All @@ -64,7 +66,6 @@ public Object[] materialize() {
};

private RowDownstreamHandle downstream;
private volatile int numUpstreams = 0;


public NestedLoopOperation() {
Expand Down Expand Up @@ -179,11 +180,10 @@ public void fail(Throwable throwable) {

@Override
public RowDownstreamHandle registerUpstream(RowUpstream upstream) {
numUpstreams++;
assert numUpstreams <= 2: "Only 2 upstreams supported";
if (numUpstreams == 1) {
if (numUpstreams.incrementAndGet() == 1) {
return leftDownstreamHandle;
} else {
assert numUpstreams.get() <= 2: "Only 2 upstreams supported";
return rightDownstreamHandle;
}
}
Expand Down
12 changes: 6 additions & 6 deletions sql/src/main/java/io/crate/planner/node/dql/join/NestedLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
public class NestedLoop extends PlanAndPlannedAnalyzedRelation {


private final PlanAndPlannedAnalyzedRelation left;
private final PlanAndPlannedAnalyzedRelation right;
private final PlannedAnalyzedRelation left;
private final PlannedAnalyzedRelation right;
private final NestedLoopNode nestedLoopNode;
@Nullable
private MergeNode localMergeNode;
Expand Down Expand Up @@ -90,8 +90,8 @@ public class NestedLoop extends PlanAndPlannedAnalyzedRelation {
* a | 3
* b | 3
*/
public NestedLoop(PlanAndPlannedAnalyzedRelation left,
PlanAndPlannedAnalyzedRelation right,
public NestedLoop(PlannedAnalyzedRelation left,
PlannedAnalyzedRelation right,
NestedLoopNode nestedLoopNode,
boolean leftOuterLoop) {
this.leftOuterLoop = leftOuterLoop;
Expand All @@ -100,11 +100,11 @@ public NestedLoop(PlanAndPlannedAnalyzedRelation left,
this.nestedLoopNode = nestedLoopNode;
}

public PlanAndPlannedAnalyzedRelation left() {
public PlannedAnalyzedRelation left() {
return left;
}

public PlanAndPlannedAnalyzedRelation right() {
public PlannedAnalyzedRelation right() {
return right;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.crate.Constants;
import io.crate.analyze.OrderBy;
import io.crate.analyze.WhereClause;
import io.crate.analyze.relations.PlannedAnalyzedRelation;
import io.crate.core.collections.Bucket;
import io.crate.executor.Job;
import io.crate.executor.RowCountResult;
Expand Down Expand Up @@ -847,7 +848,7 @@ public void testNestedLoopWithOrderedQAF() throws Exception {
innerRightOrderBy,
ctx);

PlanAndPlannedAnalyzedRelation innerPlan = new NestedLoop(
PlannedAnalyzedRelation innerPlan = new NestedLoop(
new QueryThenFetch(innerLeftCollectNode, null),
new QueryThenFetch(innerRightCollectNode, null),
innerNestedLoopNode,
Expand Down

0 comments on commit 23e6203

Please sign in to comment.