Skip to content
Permalink
Browse files
DRILL-6561: Lateral excluding the columns from output container provi…
…ded by projection push into rules

This closes #1356
  • Loading branch information
Sorabh Hamirwasia authored and parthchandra committed Jul 2, 2018
1 parent 208733b commit 069c3049f1a500e5ae0b47caeebc5856ab182b73
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 54 deletions.
@@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

@@ -67,9 +68,6 @@
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.calcite.rel.core.JoinRelType;

import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;
import static org.apache.drill.exec.record.JoinBatchMemoryManager.RIGHT_INDEX;

/**
* This class implements the runtime execution for the Hash-Join operator
* supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
@@ -892,7 +890,8 @@ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}",
configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);

batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right);
batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<>());
logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
}

/**
@@ -20,24 +20,30 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.LateralContract;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.SchemaBuilder;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;

import java.util.HashSet;
import java.util.List;

import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK;
@@ -82,6 +88,8 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// Flag to keep track of new left batch so that update on memory manager is called only once per left batch
private boolean isNewLeftBatch = false;

private final HashSet<String> excludedFieldNames = new HashSet<>();

/* ****************************************************************************************************************
* Public Methods
* ****************************************************************************************************************/
@@ -91,7 +99,9 @@ public LateralJoinBatch(LateralJoinPOP popConfig, FragmentContext context,
Preconditions.checkNotNull(left);
Preconditions.checkNotNull(right);
final int configOutputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right);
// Prepare Schema Path Mapping
populateExcludedField(popConfig);
batchMemoryManager = new JoinBatchMemoryManager(configOutputBatchSize, left, right, excludedFieldNames);

// Initially it's set to default value of 64K and later for each new output row it will be set to the computed
// row count
@@ -700,6 +710,21 @@ private boolean verifyInputSchema(BatchSchema schema) {
return isValid;
}

private BatchSchema batchSchemaWithNoExcludedCols(BatchSchema originSchema) {
if (excludedFieldNames.size() == 0) {
return originSchema;
}

final SchemaBuilder newSchemaBuilder =
BatchSchema.newBuilder().setSelectionVectorMode(originSchema.getSelectionVectorMode());
for (MaterializedField field : originSchema) {
if (!excludedFieldNames.contains(field.getName())) {
newSchemaBuilder.addField(field);
}
}
return newSchemaBuilder.build();
}

/**
* Helps to create the outgoing container vectors based on known left and right batch schemas
* @throws SchemaChangeException
@@ -711,8 +736,8 @@ private void setupNewSchema() throws SchemaChangeException {

// Clear up the container
container.clear();
leftSchema = left.getSchema();
rightSchema = right.getSchema();
leftSchema = batchSchemaWithNoExcludedCols(left.getSchema());
rightSchema = batchSchemaWithNoExcludedCols(right.getSchema());

if (!verifyInputSchema(leftSchema)) {
throw new SchemaChangeException("Invalid Schema found for left incoming batch");
@@ -724,12 +749,20 @@ private void setupNewSchema() throws SchemaChangeException {

// Setup LeftSchema in outgoing container
for (final VectorWrapper<?> vectorWrapper : left) {
container.addOrGet(vectorWrapper.getField());
final MaterializedField leftField = vectorWrapper.getField();
if (excludedFieldNames.contains(leftField.getName())) {
continue;
}
container.addOrGet(leftField);
}

// Setup RightSchema in the outgoing container
for (final VectorWrapper<?> vectorWrapper : right) {
MaterializedField rightField = vectorWrapper.getField();
if (excludedFieldNames.contains(rightField.getName())) {
continue;
}

TypeProtos.MajorType rightFieldType = vectorWrapper.getField().getType();

// make right input schema optional if we have LEFT join
@@ -846,15 +879,28 @@ private void copyDataToOutputVectors(int fromRowIndex, int toRowIndex, RecordBat
// Get the vectors using field index rather than Materialized field since input batch field can be different from
// output container field in case of Left Join. As we rebuild the right Schema field to be optional for output
// container.
int inputIndex = 0;
for (int i = startVectorIndex; i < endVectorIndex; ++i) {
// Get input vector
final Class<?> inputValueClass = batch.getSchema().getColumn(i).getValueClass();
final ValueVector inputVector = batch.getValueAccessorById(inputValueClass, i).getValueVector();

// Get output vector
final int outputVectorIndex = i + baseVectorIndex;
final Class<?> outputValueClass = this.getSchema().getColumn(outputVectorIndex).getValueClass();
final ValueVector outputVector = this.getValueAccessorById(outputValueClass, outputVectorIndex).getValueVector();
final String outputFieldName = outputVector.getField().getName();

ValueVector inputVector;
Class<?> inputValueClass;
String inputFieldName;
do {
// Get input vector
inputValueClass = batch.getSchema().getColumn(inputIndex).getValueClass();
inputVector = batch.getValueAccessorById(inputValueClass, inputIndex).getValueVector();
inputFieldName = inputVector.getField().getName();
++inputIndex;
} while (excludedFieldNames.contains(inputFieldName));

Preconditions.checkArgument(outputFieldName.equals(inputFieldName),
new IllegalStateException(String.format("Non-excluded Input and output container fields are not in same order" +
". Output Schema:%s and Input Schema:%s", this.getSchema(), batch.getSchema())));

logger.trace("Copying data from incoming batch vector to outgoing batch vector. [IncomingBatch: " +
"(RowIndex: {}, VectorType: {}), OutputBatch: (RowIndex: {}, VectorType: {}) and Other: (TimeEachValue: {}," +
@@ -938,4 +984,13 @@ private void updateMemoryManager(int inputIndex) {
maxOutputRowCount = newOutputRowCount;
}
}

private void populateExcludedField(PhysicalOperator lateralPop) {
final List<SchemaPath> excludedCols = ((LateralJoinPOP)lateralPop).getExcludedColumns();
if (excludedCols != null) {
for (SchemaPath currentPath : excludedCols) {
excludedFieldNames.add(currentPath.rootName());
}
}
}
}
@@ -62,6 +62,7 @@
import org.apache.drill.exec.vector.complex.AbstractContainerVector;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;

import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
@@ -108,7 +109,7 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
private class MergeJoinMemoryManager extends JoinBatchMemoryManager {

MergeJoinMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) {
super(outputBatchSize, leftBatch, rightBatch);
super(outputBatchSize, leftBatch, rightBatch, new HashSet<>());
}

/**
@@ -17,29 +17,44 @@
*/
package org.apache.drill.exec.record;

import java.util.Set;

public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class);

private int rowWidth[];
private RecordBatch recordBatch[];
private Set<String> columnsToExclude;

private static final int numInputs = 2;
public static final int LEFT_INDEX = 0;
public static final int RIGHT_INDEX = 1;

public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) {
public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch,
RecordBatch rightBatch, Set<String> excludedColumns) {
super(numInputs, outputBatchSize);
recordBatch = new RecordBatch[numInputs];
recordBatch[LEFT_INDEX] = leftBatch;
recordBatch[RIGHT_INDEX] = rightBatch;
rowWidth = new int[numInputs];
this.columnsToExclude = excludedColumns;
}

private int updateInternal(int inputIndex, int outputPosition, boolean useAggregate) {
updateIncomingStats(inputIndex);
rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth();

final int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];
// Reduce the width of excluded columns from actual rowWidth
for (String columnName : columnsToExclude) {
final RecordBatchSizer.ColumnSize currentColSizer = getColumnSize(inputIndex, columnName);
if (currentColSizer == null) {
continue;
}
rowWidth[inputIndex] -= currentColSizer.getAllocSizePerEntry();
}

// Get final net outgoing row width after reducing the excluded columns width
int newOutgoingRowWidth = rowWidth[LEFT_INDEX] + rowWidth[RIGHT_INDEX];

// If outgoing row width is 0 or there is no change in outgoing row width, just return.
// This is possible for empty batches or

0 comments on commit 069c304

Please sign in to comment.