Skip to content

Commit

Permalink
fixup! fixup! implement sort merge join as a YProjector
Browse files Browse the repository at this point in the history
  • Loading branch information
msbt committed Mar 2, 2015
1 parent 60ed644 commit e1cd7ff
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;

import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import javax.annotation.Nullable;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -74,17 +72,93 @@ public class SortMergeProjector implements YProjector, ProjectorUpstream {

private final ESLogger logger = Loggers.getLogger(getClass());

private static enum Side {
LEFT,
RIGHT,
BOTH;
private static interface Side {
public void finish();
public boolean isFinished();

public boolean matches(Side side) {
return side == this || this == BOTH;
@Nullable
public List<Object[]> currentRows();
public void currentRows(List<Object[]> rows);

public boolean matches(Side other);
public Side other();

static Side BOTH = new Side() {
@Override
public void finish() {

}

@Override
public boolean isFinished() {
return false;
}

@Nullable
@Override
public List<Object[]> currentRows() {
return null;
}

@Override
public void currentRows(List<Object[]> rows) {

}

@Override
public boolean matches(Side other) {
return true;
}

@Override
public Side other() {
return null;
}
};
}

private static abstract class AbstractSide implements Side {
private final AtomicBoolean finished = new AtomicBoolean(false);
private final AtomicReference<List<Object[]>> currentRows = new AtomicReference<>();
private final String name;

protected AbstractSide(String name) {
this.name = name;
}

@Override
public void finish() {
finished.set(true);
}

@Override
public boolean isFinished() {
return finished.get();
}

@Nullable
@Override
public List<Object[]> currentRows() {
return currentRows.get();
}

@Override
public void currentRows(List<Object[]> rows) {
this.currentRows.set(rows);
}

@Override
public boolean matches(Side other) {
return this == other || other == Side.BOTH;
}

@Override
public String toString() {
return name;
}
}

private static List<Object[]> SENTINEL = Collections.emptyList();
private static List<Object[]> SENTINEL = new ArrayList<>(0);

private final Projector leftProjector;
private final Projector rightProjector;
Expand All @@ -96,21 +170,19 @@ public boolean matches(Side side) {
private final AtomicInteger rowsSkipped;
private final AtomicInteger rowsProduced;

private final AtomicBoolean leftFinished;
private final AtomicBoolean rightFinished;

private final CollectExpression[] leftCollectExpressions;
private final CollectExpression[] rightCollectExpressions;

private final Ordering[] comparators;

private final AtomicReference<List<Object[]>> currentRightRows;
private final AtomicReference<List<Object[]>> currentLeftRows;
private final ReentrantLock lock;
private final Condition leftCanContinue;
private final Condition rightCanContinue;
private final AtomicBoolean projectionStarted;

private final Side left;
private final Side right;

public SortMergeProjector(int offset,
int limit,
CollectExpression[] leftCollectExpressions,
Expand All @@ -128,34 +200,41 @@ public SortMergeProjector(int offset,
this.wantMore = new AtomicBoolean(true);

// marker that internal projectors are finished
this.leftFinished = new AtomicBoolean(false);
this.rightFinished = new AtomicBoolean(false);

this.leftCollectExpressions = leftCollectExpressions;
this.rightCollectExpressions = rightCollectExpressions;
this.currentRightRows = new AtomicReference<>();
this.currentLeftRows = new AtomicReference<>();
this.lock = new ReentrantLock();
this.leftCanContinue = lock.newCondition();
this.rightCanContinue = lock.newCondition();
this.left = new AbstractSide("left") {
@Override
public Side other() {
return right;
}
};
this.right = new AbstractSide("right") {
@Override
public Side other() {
return left;
}
};

this.leftProjector = new InternalProjector(leftCollectExpressions) {
@Override
boolean doSetNextRows(List<Object[]> rows) {
return setNextLeftRows(rows);
return setNextEqualRows(left, rows);
}
};
this.rightProjector = new InternalProjector(rightCollectExpressions) {
@Override
boolean doSetNextRows(List<Object[]> rows) {
return setNextRightRows(rows);
return setNextEqualRows(right, rows);
}
};
this.projectionStarted = new AtomicBoolean(false);
}

private void onProjectorFinished() {
if (rightFinished.get() && leftFinished.get()) {
if (left.isFinished() && right.isFinished()) {
downstream.upstreamFinished();
}
}
Expand Down Expand Up @@ -205,22 +284,15 @@ public Projector rightProjector() {
return rightProjector;
}

private void removeFromLeft() throws InterruptedException {
currentLeftRows.set(null);
if (leftFinished.get()) {
onProjectorFinished();
}
}

private void removeFromRight() throws InterruptedException {
currentRightRows.set(null);
if (rightFinished.get()) {
private void removeFromSide(Side side) throws InterruptedException {
side.currentRows(null);
if (side.isFinished()) {
onProjectorFinished();
}
}

private boolean internalProjectorsFinished() {
return rightFinished.get() || leftFinished.get();
return right.isFinished() || left.isFinished();
}

/**
Expand All @@ -229,7 +301,7 @@ private boolean internalProjectorsFinished() {
*/
private boolean waitForSide(Side side) throws InterruptedException{
assert side != Side.BOTH;
Condition condition = side.matches(Side.LEFT) ? rightCanContinue : leftCanContinue;
Condition condition = side.matches(left) ? rightCanContinue : leftCanContinue;
do {
logger.trace("waiting for {} to proceed", side);
// releases the lock
Expand All @@ -241,68 +313,32 @@ private boolean waitForSide(Side side) throws InterruptedException{
return false;
}

private boolean setNextLeftRows(List<Object[]> leftRows) {
private boolean setNextEqualRows(Side source, List<Object[]> rows) {
try {
lock.lockInterruptibly();
try {
if (leftRows == SENTINEL) {
leftFinished.set(true);
if (rows == SENTINEL) {
source.finish();
this.wantMore.set(false);
onProjectorFinished();
} else if (!internalProjectorsFinished()) {
if (logger.isTraceEnabled()) {
logger.trace("get left rows {}", Arrays.deepToString(leftRows.toArray()));
logger.trace("get {} rows {}", source, Arrays.deepToString(rows.toArray()));
}
currentLeftRows.set(leftRows);
List<Object[]> rightRows = currentRightRows.get();
if (rightRows == null) {
if (!waitForSide(Side.RIGHT)) {
source.currentRows(rows);
List<Object[]> otherRows = source.other().currentRows();
if (otherRows == null) {
if (!waitForSide(source.other())) {
// remove and optionally finish this projector if we're done
removeFromLeft();
removeFromSide(source);
}
} else {
// join and wait for right if we need more from it
Side toProceed = consumeRows(leftRows, rightRows);
if (!toProceed.matches(Side.LEFT)) {
waitForSide(Side.RIGHT);
}
}
}
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
// TODO: really propagate?
logger.trace("left interrupted", e);
downstream.upstreamFailed(e);
Thread.currentThread().interrupt();
}
return wantMore.get();
}

private boolean setNextRightRows(List<Object[]> rightRows) {
try {
// ignore null or empty groups
lock.lockInterruptibly();
try {
if (rightRows == SENTINEL) {
rightFinished.set(true);
this.wantMore.set(false);
onProjectorFinished();
} else if (!internalProjectorsFinished()) {
if (logger.isTraceEnabled()) {
logger.trace("get right rows {}", Arrays.deepToString(rightRows.toArray()));
}
currentRightRows.set(rightRows);
List<Object[]> leftRows = currentLeftRows.get();
if (leftRows == null) {
if (!waitForSide(Side.LEFT)) {
removeFromRight();
}
} else {
Side toProceed = consumeRows(leftRows, rightRows);
if (!toProceed.matches(Side.RIGHT)) {
waitForSide(Side.LEFT);
Side toProceed = (source == left
? consumeRows(rows, otherRows)
: consumeRows(otherRows, rows));
if (!toProceed.matches(source)) {
waitForSide(source.other());
}
}
}
Expand All @@ -311,7 +347,7 @@ private boolean setNextRightRows(List<Object[]> rightRows) {
}
} catch (InterruptedException e) {
// TODO: really propagate?
logger.trace("right interrupted", e);
logger.trace("{} interrupted", source, e);
downstream.upstreamFailed(e);
Thread.currentThread().interrupt();
}
Expand All @@ -333,9 +369,9 @@ private Side consumeRows(List<Object[]> leftRows, List<Object[]> rightRows) thro
}
if (compared < 0) {
// left rows are smaller than right, skip to next left set
removeFromLeft();
removeFromSide(left);
leftCanContinue.signal();
return Side.LEFT;
return left;
} else if (compared == 0) {
// both groups have same join conditions
// NESTEDLOOP FTW
Expand All @@ -352,15 +388,15 @@ private Side consumeRows(List<Object[]> leftRows, List<Object[]> rightRows) thro
}
}
}
removeFromLeft();
removeFromRight();
removeFromSide(left);
removeFromSide(right);
leftCanContinue.signal();
rightCanContinue.signal();
} else {
// right rows are smaller than left, skip to next right set
removeFromRight();
removeFromSide(right);
rightCanContinue.signal();
return Side.RIGHT;
return right;
}
}
return Side.BOTH;
Expand All @@ -384,21 +420,20 @@ private InternalProjector(CollectExpression[] collectExpressions) {
this.collectExpressions = collectExpressions;
}

private int compareFromSameRelation(Object[] row, Object[] otherRow) {
Object[] buf = new Object[row.length];
private Object[] fillBufferFromCollectExpressions(Object[] source) {
Object[] buf = new Object[source.length];
int i = 0;
for (CollectExpression collectExpression : collectExpressions) {
collectExpression.setNextRow(row);
collectExpression.setNextRow(source);
buf[i] = collectExpression.value();
i++;
}
i = 0;
Object[] otherBuf = new Object[otherRow.length];
for (CollectExpression collectExpression : collectExpressions) {
collectExpression.setNextRow(otherRow);
otherBuf[i] = collectExpression.value();
i++;
}
return buf;
}

private int compareFromSameRelation(Object[] row, Object[] otherRow) {
Object[] buf = fillBufferFromCollectExpressions(row);
Object[] otherBuf = fillBufferFromCollectExpressions(otherRow);

int compared = 0;
for (int j = 0, size=comparators.length; j < size; j++) {
Expand Down
Loading

0 comments on commit e1cd7ff

Please sign in to comment.