Skip to content
Permalink
Browse files
DRILL-6503: Performance improvements in lateral
closes #1328
  • Loading branch information
sohami committed Jun 27, 2018
1 parent 4b028fe commit 779edf880a1e92608b68108f18e79eff6eb4afa5
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 67 deletions.
@@ -43,6 +43,10 @@ public LateralJoinPOP(
@JsonProperty("right") PhysicalOperator right,
@JsonProperty("joinType") JoinRelType joinType) {
super(left, right, joinType, null, null);
Preconditions.checkArgument(joinType != JoinRelType.FULL,
"Full outer join is currently not supported with Lateral Join");
Preconditions.checkArgument(joinType != JoinRelType.RIGHT,
"Right join is currently not supported with Lateral Join");
}

@Override
@@ -52,11 +52,6 @@
public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> implements LateralContract {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinBatch.class);

// Input indexes to correctly update the stats
private static final int LEFT_INPUT = 0;

private static final int RIGHT_INPUT = 1;

// Maximum number records in the outgoing batch
private int maxOutputRowCount;

@@ -81,8 +76,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Keep track if any matching right record was found for current left index record
private boolean matchedRecordFound;

// Used only for testing
private boolean useMemoryManager = true;

// Flag to keep track of new left batch so that update on memory manager is called only once per left batch
private boolean isNewLeftBatch = false;

/* ****************************************************************************************************************
* Public Methods
* ****************************************************************************************************************/
@@ -147,9 +146,16 @@ public IterOutcome innerNext() {
// Setup the references of left, right and outgoing container in generated operator
state = BatchState.NOT_FIRST;

// Update the memory manager
updateMemoryManager(LEFT_INPUT);
updateMemoryManager(RIGHT_INPUT);
// Update the memory manager only if its a brand new incoming i.e. leftJoinIndex and rightJoinIndex is 0
// Otherwise there will be a case where while filling last output batch, some records from previous left or
// right batch are still left to be sent in output for which we will count this batch twice. The actual checks
// are done in updateMemoryManager
updateMemoryManager(LEFT_INDEX);

// We have to call update on memory manager for empty batches (rightJoinIndex = -1) as well since other wise while
// allocating memory for vectors below it can fail. Since in that case colSize will not have any info on right side
// vectors and throws NPE. The actual checks are done in updateMemoryManager
updateMemoryManager(RIGHT_INDEX);

// allocate space for the outgoing batch
allocateVectors();
@@ -161,21 +167,25 @@ public IterOutcome innerNext() {
public void close() {
updateBatchMemoryManagerStats();

logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));

logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, " +
"record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, " +
"record count : {}", batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));

logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, " +
"record count : {}", batchMemoryManager.getNumOutgoingBatches(),
batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(),
batchMemoryManager.getTotalOutputRecords());
}

super.close();
}
@@ -238,6 +248,7 @@ protected boolean prefetchFirstBatchFromBothSides() {
boolean validBatch = setBatchState(leftUpstream);

if (validBatch) {
isNewLeftBatch = true;
rightUpstream = next(1, right);
validBatch = setBatchState(rightUpstream);
}
@@ -266,10 +277,6 @@ protected void buildSchema() throws SchemaChangeException {
}
Preconditions.checkState(right.getRecordCount() == 0, "Unexpected non-empty first right batch received");

// Update the record memory manager
updateMemoryManager(LEFT_INPUT);
updateMemoryManager(RIGHT_INPUT);

// Setup output container schema based on known left and right schema
setupNewSchema();

@@ -337,7 +344,12 @@ private IterOutcome processLeftBatch() {

// If left batch is empty
while (needLeftBatch) {
leftUpstream = !processLeftBatchInFuture ? next(LEFT_INPUT, left) : leftUpstream;

if (!processLeftBatchInFuture) {
leftUpstream = next(LEFT_INDEX, left);
isNewLeftBatch = true;
}

final boolean emptyLeftBatch = left.getRecordCount() <=0;
logger.trace("Received a left batch and isEmpty: {}", emptyLeftBatch);

@@ -418,7 +430,7 @@ private IterOutcome processRightBatch() {
// will be a valid index. When all records are consumed it will be set to -1.
boolean needNewRightBatch = (leftJoinIndex >= 0) && (rightJoinIndex == -1);
while (needNewRightBatch) {
rightUpstream = next(RIGHT_INPUT, right);
rightUpstream = next(RIGHT_INDEX, right);
switch (rightUpstream) {
case OK_NEW_SCHEMA:
// We should not get OK_NEW_SCHEMA multiple times for the same left incoming batch. So there won't be a
@@ -503,7 +515,8 @@ private IterOutcome produceOutputBatch() {
if (rightUpstream == EMIT) {
if (!matchedRecordFound && JoinRelType.LEFT == popConfig.getJoinType()) {
// copy left side in case of LEFT join
emitLeft(leftJoinIndex, outputIndex++);
emitLeft(leftJoinIndex, outputIndex, 1);
++outputIndex;
}
++leftJoinIndex;
// Reset matchedRecord for next left index record
@@ -557,7 +570,7 @@ private IterOutcome produceOutputBatch() {
}

// Update the batch memory manager to use new left incoming batch
updateMemoryManager(LEFT_INPUT);
updateMemoryManager(LEFT_INDEX);
}
}

@@ -577,7 +590,7 @@ private IterOutcome produceOutputBatch() {
}

// Update the batch memory manager to use new right incoming batch
updateMemoryManager(RIGHT_INPUT);
updateMemoryManager(RIGHT_INDEX);
}
} // output batch is full to its max capacity

@@ -615,9 +628,11 @@ private void finalizeOutputContainer() {

batchMemoryManager.updateOutgoingStats(outputIndex);

logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]", outputIndex,
container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation());
if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing:\n {}", new RecordBatchSizer(this));
logger.debug("Number of records emitted: {} and Allocator Stats: [AllocatedMem: {}, PeakMem: {}]",
outputIndex, container.getAllocator().getAllocatedMemory(), container.getAllocator().getPeakMemoryAllocation());
}

// Update the output index for next output batch to zero
outputIndex = 0;
@@ -745,8 +760,6 @@ private boolean setBatchState(IterOutcome outcome) {
* this left index. The right container is copied starting from rightIndex until number of records in the container.
*/
private void crossJoinAndOutputRecords() {
logger.trace("Producing output for leftIndex: {}, rightIndex: {}, rightRecordCount: {} and outputIndex: {}",
leftJoinIndex, rightJoinIndex, right.getRecordCount(), outputIndex);
final int rightRecordCount = right.getRecordCount();

// If there is no record in right batch just return current index in output batch
@@ -756,16 +769,30 @@ private void crossJoinAndOutputRecords() {

// Check if right batch is empty since we have to handle left join case
Preconditions.checkState(rightJoinIndex != -1, "Right batch record count is >0 but index is -1");
// For every record in right side just emit left and right records in output container
for (int i = rightJoinIndex; i < rightRecordCount; ++i) {
emitLeft(leftJoinIndex, outputIndex);
emitRight(i, outputIndex);
++outputIndex;

if (isOutgoingBatchFull()) {
break;
}

int currentOutIndex = outputIndex;
// Number of rows that can be copied in output batch
final int maxAvailableRowSlot = maxOutputRowCount - currentOutIndex;
// Number of rows that can be copied inside output batch is minimum of available slot in
// output batch and available data to copy from right side. It can be half consumed right batch
// which has few more rows to be copied to output but output batch has more to fill.
final int rowsToCopy = Math.min(maxAvailableRowSlot, (rightRecordCount - rightJoinIndex));

if (logger.isDebugEnabled()) {
logger.debug("Producing output for leftIndex: {}, rightIndex: {}, rightRecordCount: {}, outputIndex: {} and " +
"availableSlotInOutput: {}", leftJoinIndex, rightJoinIndex, rightRecordCount, outputIndex, maxAvailableRowSlot);
logger.debug("Output Batch stats before copying new data: {}", new RecordBatchSizer(this));
}

// First copy all the left vectors data. Doing it in this way since it's the same data being copied over may be
// we will have performance gain from JVM
emitLeft(leftJoinIndex, currentOutIndex, rowsToCopy);

// Copy all the right side vectors data
emitRight(rightJoinIndex, currentOutIndex, rowsToCopy);

// Update outputIndex
outputIndex += rowsToCopy;
}

/**
@@ -779,9 +806,14 @@ private void crossJoinAndOutputRecords() {
* @param startVectorIndex - start index of vector inside source record batch
* @param endVectorIndex - end index of vector inside source record batch
* @param baseVectorIndex - base index to be added to startVectorIndex to get corresponding vector in outgoing batch
* @param numRowsToCopy - Number of rows to copy into output batch
* @param moveFromIndex - boolean to indicate if the fromIndex should also be increased or not. Since in case of
* copying data from left vector fromIndex is constant whereas in case of copying data from right
* vector fromIndex will move along with output index.
*/
private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, RecordBatch batch,
int startVectorIndex, int endVectorIndex, int baseVectorIndex) {
int startVectorIndex, int endVectorIndex, int baseVectorIndex,
int numRowsToCopy, boolean moveFromIndex) {
// Get the vectors using field index rather than Materialized field since input batch field can be different from
// output container field in case of Left Join. As we rebuild the right Schema field to be optional for output
// container.
@@ -796,10 +828,14 @@ private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, RecordBat
final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector();

logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " +
"(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and BaseIndex: {}]",
fromRowIndex, inputValueClass, toRowIndex, outputValueClass, baseVectorIndex);
// Copy data from input vector to output vector
outputVector.copyEntry(toRowIndex, inputVector, fromRowIndex);
"(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and Other: (TimeEachValue: {}," +
" NumBaseIndex: {}) ]",
fromRowIndex, inputValueClass, toRowIndex, outputValueClass, numRowsToCopy, baseVectorIndex);

// Copy data from input vector to output vector for numRowsToCopy times.
for (int j = 0; j < numRowsToCopy; ++j) {
outputVector.copyEntry(toRowIndex + j, inputVector, (moveFromIndex) ? fromRowIndex + j : fromRowIndex);
}
}
}

@@ -809,8 +845,9 @@ private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, RecordBat
* @param leftIndex - index to copy data from left incoming batch vectors
* @param outIndex - index to copy data to in outgoing batch vectors
*/
private void emitLeft(int leftIndex, int outIndex) {
copyDataToOutputVectors(leftIndex, outIndex, left, 0, leftSchema.getFieldCount(), 0);
private void emitLeft(int leftIndex, int outIndex, int numRowsToCopy) {
copyDataToOutputVectors(leftIndex, outIndex, left, 0,
leftSchema.getFieldCount(), 0, numRowsToCopy, false);
}

/**
@@ -819,8 +856,9 @@ private void emitLeft(int leftIndex, int outIndex) {
* @param rightIndex - index to copy data from right incoming batch vectors
* @param outIndex - index to copy data to in outgoing batch vectors
*/
private void emitRight(int rightIndex, int outIndex) {
copyDataToOutputVectors(rightIndex, outIndex, right, 0, rightSchema.getFieldCount(), leftSchema.getFieldCount());
private void emitRight(int rightIndex, int outIndex, int numRowsToCopy) {
copyDataToOutputVectors(rightIndex, outIndex, right, 0,
rightSchema.getFieldCount(), leftSchema.getFieldCount(), numRowsToCopy, true);
}

/**
@@ -847,12 +885,24 @@ private boolean isOutgoingBatchFull() {
}

private void updateMemoryManager(int inputIndex) {

if (inputIndex == LEFT_INDEX && isNewLeftBatch) {
// reset state and continue to update
isNewLeftBatch = false;
} else if (inputIndex == RIGHT_INDEX && (rightJoinIndex == 0 || rightJoinIndex == -1)) {
// continue to update
} else {
return;
}

// For cases where all the previous input were consumed and send with previous output batch. But now we are building
// a new output batch with new incoming then it will not cause any problem since outputIndex will be 0
final int newOutputRowCount = batchMemoryManager.update(inputIndex, outputIndex);

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == 0 ? "left" : "right", batchMemoryManager.getRecordBatchSizer(inputIndex));
logger.debug("BATCH_STATS, incoming {}:\n {}", inputIndex == LEFT_INDEX ? "left" : "right",
batchMemoryManager.getRecordBatchSizer(inputIndex));
logger.debug("Previous OutputRowCount: {}, New OutputRowCount: {}", maxOutputRowCount, newOutputRowCount);
}

if (useMemoryManager) {

0 comments on commit 779edf8

Please sign in to comment.