Skip to content

Commit

Permalink
DRILL-3793: New MergeJoin and add RecordIterator interface
Browse files Browse the repository at this point in the history
This closes #190
  • Loading branch information
amithadke authored and jacques-n committed Nov 5, 2015
1 parent 39582bd commit 2bc16a9
Show file tree
Hide file tree
Showing 14 changed files with 730 additions and 796 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -18,177 +18,82 @@
package org.apache.drill.exec.physical.impl.join; package org.apache.drill.exec.physical.impl.join;


import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome; import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.RecordIterator;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.JoinRelType;


/** /**
* The status of the current join. Maintained outside the individually compiled join templates so that we can carry status across multiple schemas. * Maintain join state.
*/ */
public final class JoinStatus { public final class JoinStatus {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinStatus.class);


private static final int OUTPUT_BATCH_SIZE = 32*1024; private static final int OUTPUT_BATCH_SIZE = 32*1024;


public static enum RightSourceMode { public final RecordIterator left;
INCOMING, SV4; public final RecordIterator right;
} private boolean iteratorInitialized;

private static enum InitState {
INIT, // initial state
CHECK, // need to check if batches are empty
READY // read to do work
}

private static final int LEFT_INPUT = 0;
private static final int RIGHT_INPUT = 1;

public final RecordBatch left;
private int leftPosition;
private IterOutcome lastLeft;

public final RecordBatch right;
private int rightPosition;
private int svRightPosition;
private IterOutcome lastRight;


private int outputPosition; private int outputPosition;
public RightSourceMode rightSourceMode = RightSourceMode.INCOMING;
public MergeJoinBatch outputBatch; public MergeJoinBatch outputBatch;
public SelectionVector4 sv4;

private boolean hasIntermediateData;
private int initialRightPosition = -1;
private boolean crossedBatchBoundaries;


private final JoinRelType joinType; private final JoinRelType joinType;
private boolean allowMarking;


public boolean ok = true; public boolean ok = true;
private InitState initialSet = InitState.INIT;
private boolean leftRepeating = false;


public JoinStatus(RecordBatch left, RecordBatch right, MergeJoinBatch output) { public JoinStatus(RecordIterator left, RecordIterator right, MergeJoinBatch output) {
super();
this.left = left; this.left = left;
this.right = right; this.right = right;
this.outputBatch = output; this.outputBatch = output;
this.joinType = output.getJoinType(); this.joinType = output.getJoinType();
this.iteratorInitialized = false;
this.allowMarking = true;
} }


@Override @Override
public String toString() { public String toString() {
return return
super.toString() super.toString()
+ "[" + "["
+ "leftPosition = " + leftPosition + "leftPosition = " + left.getCurrentPosition()
+ ", rightPosition = " + rightPosition + ", rightPosition = " + right.getCurrentPosition()
+ ", svRightPosition = " + svRightPosition
+ ", outputPosition = " + outputPosition + ", outputPosition = " + outputPosition
+ ", lastLeft = " + lastLeft
+ ", lastRight = " + lastRight
+ ", rightSourceMode = " + rightSourceMode
+ ", sv4 = " + sv4
+ ", joinType = " + joinType + ", joinType = " + joinType
+ ", ok = " + ok + ", ok = " + ok
+ ", initialSet = " + initialSet + ", initialSet = " + iteratorInitialized
+ ", leftRepeating = " + leftRepeating
+ ", left = " + left + ", left = " + left
+ ", right = " + right + ", right = " + right
+ ", outputBatch = " + outputBatch + ", outputBatch = " + outputBatch
+ "]"; + "]";
} }


public boolean hasIntermediateData() { // Initialize left and right record iterator. We avoid doing this in constructor.
return hasIntermediateData; // Callers must check state of each iterator after calling ensureInitial.
} public void initialize() {

if (!iteratorInitialized) {
public void resetIntermediateData() { left.next();
hasIntermediateData = false; right.next();
} iteratorInitialized = true;

public void setIntermediateData(int initialRightPosition, boolean crossedBatchBoundaries) {
this.initialRightPosition = initialRightPosition;
this.crossedBatchBoundaries = crossedBatchBoundaries;
this.hasIntermediateData = true;
}

public int getInitialRightPosition() {
return initialRightPosition;
}

public boolean getCrossedBatchBoundaries() {
return crossedBatchBoundaries;
}

private final IterOutcome nextLeft() {
return outputBatch.next(LEFT_INPUT, left);
}

private final IterOutcome nextRight() {
return outputBatch.next(RIGHT_INPUT, right);
}

public final void ensureInitial() {
switch(initialSet) {
case INIT:
this.lastLeft = nextLeft();
this.lastRight = nextRight();
initialSet = InitState.CHECK;
break;
case CHECK:
if (lastLeft != IterOutcome.NONE && left.getRecordCount() == 0) {
this.lastLeft = nextLeft();
}
if (lastRight != IterOutcome.NONE && right.getRecordCount() == 0) {
this.lastRight = nextRight();
}
initialSet = InitState.READY;
// fall through
default:
break;
} }
} }


public final void advanceLeft() { public void prepare() {
leftPosition++; if (!iteratorInitialized) {
} initialize();

public final void advanceRight() {
if (rightSourceMode == RightSourceMode.INCOMING) {
rightPosition++;
} else {
svRightPosition++;
} }
left.prepare();
right.prepare();
} }


public final int getLeftPosition() { public IterOutcome getLeftStatus() { return left.getLastOutcome(); }
return leftPosition;
}

public final int getRightPosition() {
return (rightSourceMode == RightSourceMode.INCOMING) ? rightPosition : svRightPosition;
}

public final int getRightCount() {
return right.getRecordCount();
}

public final void setRightPosition(int pos) {
rightPosition = pos;
}


public IterOutcome getRightStatus() { return right.getLastOutcome(); }


public final int getOutPosition() { public final int getOutPosition() {
return outputPosition; return outputPosition;
} }


public final int fetchAndIncOutputPos() {
return outputPosition++;
}

public final void resetOutputPos() { public final void resetOutputPos() {
outputPosition = 0; outputPosition = 0;
} }
Expand All @@ -198,145 +103,61 @@ public final boolean isOutgoingBatchFull() {
} }


public final void incOutputPos() { public final void incOutputPos() {
outputPosition++; ++outputPosition;
}

public final void notifyLeftRepeating() {
leftRepeating = true;
outputBatch.resetBatchBuilder();
}

public final void notifyLeftStoppedRepeating() {
leftRepeating = false;
svRightPosition = 0;
} }


public final boolean isLeftRepeating() { public void disableMarking() {
return leftRepeating; allowMarking = false;
} }


public void setDefaultAdvanceMode() { public void enableMarking() {
rightSourceMode = RightSourceMode.INCOMING; allowMarking = true;
} }


public void setSV4AdvanceMode() { public boolean shouldMark() {
rightSourceMode = RightSourceMode.SV4; return allowMarking;
svRightPosition = 0;
} }


/** /**
* Check if the left record position can advance by one. * Return state of join based on status of left and right iterator.
* Side effect: advances to next left batch if current left batch size is exceeded. * @return
* 1. JoinOutcome.NO_MORE_DATA : Join is finished
* 2. JoinOutcome.FAILURE : There is an error during join.
* 3. JoinOutcome.BATCH_RETURNED : one of the side has data
* 4. JoinOutcome.SCHEMA_CHANGED : one of the side has change in schema.
*/ */
public final boolean isLeftPositionAllowed() {
if (lastLeft == IterOutcome.NONE) {
return false;
}
if (!isLeftPositionInCurrentBatch()) {
leftPosition = 0;
releaseData(left);
lastLeft = nextLeft();
return lastLeft == IterOutcome.OK;
}
lastLeft = IterOutcome.OK;
return true;
}

/**
* Check if the right record position can advance by one.
* Side effect: advances to next right batch if current right batch size is exceeded
*/
public final boolean isRightPositionAllowed() {
if (rightSourceMode == RightSourceMode.SV4) {
return svRightPosition < sv4.getCount();
}
if (lastRight == IterOutcome.NONE) {
return false;
}
if (!isRightPositionInCurrentBatch()) {
rightPosition = 0;
releaseData(right);
lastRight = nextRight();
return lastRight == IterOutcome.OK;
}
lastRight = IterOutcome.OK;
return true;
}

private void releaseData(RecordBatch b) {
for (VectorWrapper<?> v : b) {
v.clear();
}
if (b.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) {
b.getSelectionVector2().clear();
}
}

/**
* Check if the left record position can advance by one in the current batch.
*/
public final boolean isLeftPositionInCurrentBatch() {
return leftPosition < left.getRecordCount();
}

/**
* Check if the right record position can advance by one in the current batch.
*/
public final boolean isRightPositionInCurrentBatch() {
return rightPosition < right.getRecordCount();
}

/**
* Check if the next left record position can advance by one in the current batch.
*/
public final boolean isNextLeftPositionInCurrentBatch() {
return leftPosition + 1 < left.getRecordCount();
}

public IterOutcome getLastRight() {
return lastRight;
}

public IterOutcome getLastLeft() {
return lastLeft;
}

/**
* Check if the next left record position can advance by one in the current batch.
*/
public final boolean isNextRightPositionInCurrentBatch() {
return rightPosition + 1 < right.getRecordCount();
}

public JoinOutcome getOutcome() { public JoinOutcome getOutcome() {
if (!ok) { if (!ok) {
return JoinOutcome.FAILURE; return JoinOutcome.FAILURE;
} }
if (bothMatches(IterOutcome.NONE) || if (bothMatches(IterOutcome.NONE) ||
(joinType == JoinRelType.INNER && eitherMatches(IterOutcome.NONE)) || (joinType == JoinRelType.INNER && eitherMatches(IterOutcome.NONE)) ||
(joinType == JoinRelType.LEFT && lastLeft == IterOutcome.NONE) || (joinType == JoinRelType.LEFT && getLeftStatus() == IterOutcome.NONE) ||
(joinType == JoinRelType.RIGHT && lastRight == IterOutcome.NONE)) { (joinType == JoinRelType.RIGHT && getRightStatus() == IterOutcome.NONE)) {
return JoinOutcome.NO_MORE_DATA; return JoinOutcome.NO_MORE_DATA;
} }
if (bothMatches(IterOutcome.OK) || if (bothMatches(IterOutcome.OK) ||
(eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK))) { (eitherMatches(IterOutcome.NONE) && eitherMatches(IterOutcome.OK))) {
return JoinOutcome.BATCH_RETURNED; return JoinOutcome.BATCH_RETURNED;
} }
if (eitherMatches(IterOutcome.OK_NEW_SCHEMA)) { if (eitherMatches(IterOutcome.OK_NEW_SCHEMA)) {
return JoinOutcome.SCHEMA_CHANGED; return JoinOutcome.SCHEMA_CHANGED;
} }
// should never see NOT_YET
if (eitherMatches(IterOutcome.NOT_YET)) { if (eitherMatches(IterOutcome.NOT_YET)) {
return JoinOutcome.WAITING; return JoinOutcome.WAITING;
} }
ok = false;
// on STOP, OUT_OF_MEMORY return FAILURE.
return JoinOutcome.FAILURE; return JoinOutcome.FAILURE;
} }


private boolean bothMatches(IterOutcome outcome) { private boolean bothMatches(IterOutcome outcome) {
return lastLeft == outcome && lastRight == outcome; return getLeftStatus() == outcome && getRightStatus() == outcome;
} }


private boolean eitherMatches(IterOutcome outcome) { private boolean eitherMatches(IterOutcome outcome) {
return lastLeft == outcome || lastRight == outcome; return getLeftStatus() == outcome || getRightStatus() == outcome;
} }


} }
Loading

0 comments on commit 2bc16a9

Please sign in to comment.