Skip to content

Commit

Permalink
DRILL-6688 Data batches for Project operator exceed the maximum speci…
Browse files Browse the repository at this point in the history
…fied (#1442)


This change separates the metadata-width and data-width of a variable-width column such that the data-width is used in all intermediate calculations and the meta-data width is added finally when the column's width is accumulated into the total width.
  • Loading branch information
Karthikeyan Manivannan authored and Boaz Ben-Zvi committed Aug 25, 2018
1 parent b895b28 commit ddb35ce
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 24 deletions.
Expand Up @@ -59,7 +59,7 @@ public int getOutputWidth(List<FixedLenExpr> args) {
throw new IllegalArgumentException();
}
for (FixedLenExpr expr : args) {
outputSize += expr.getWidth();
outputSize += expr.getDataWidth();
}
outputSize = adjustOutputWidth(outputSize, "ConcatOutputWidthCalculator:");
return outputSize;
Expand All @@ -85,7 +85,7 @@ public int getOutputWidth(List<FixedLenExpr> args) {
if (args == null || args.size() < 1) {
throw new IllegalArgumentException();
}
outputSize = args.get(0).getWidth();
outputSize = args.get(0).getDataWidth();
outputSize = adjustOutputWidth(outputSize, "CloneOutputWidthCalculator:");
return outputSize;
}
Expand Down
Expand Up @@ -133,11 +133,18 @@ public <T, V, E extends Exception> T accept(AbstractExecExprVisitor<T, V, E> vis
*/

public static class FixedLenExpr extends OutputWidthExpression {
int fixedWidth;
/**
* Only the width of the payload is saved in fixedDataWidth.
* Metadata width is added when the final output row size is calculated.
* This is to avoid function {@link OutputWidthCalculator} from using
* metadata width in the calculations.
*/
private int fixedDataWidth;

public FixedLenExpr(int fixedWidth) {
this.fixedWidth = fixedWidth;
this.fixedDataWidth = fixedWidth;
}
public int getWidth() { return fixedWidth;}
public int getDataWidth() { return fixedDataWidth;}

@Override
public <T, V, E extends Exception> T accept(AbstractExecExprVisitor<T, V, E> visitor, V value) throws E {
Expand Down
Expand Up @@ -212,7 +212,7 @@ public OutputWidthExpression visitVarLenReadExpr(VarLenReadExpr varLenReadExpr,
}
final RecordBatchSizer.ColumnSize columnSize = state.manager.getColumnSize(columnName);

int columnWidth = columnSize.getNetSizePerEntry();
int columnWidth = columnSize.getDataSizePerEntry();
return new FixedLenExpr(columnWidth);
}

Expand Down Expand Up @@ -256,12 +256,12 @@ public OutputWidthExpression visitIfElseWidthExpr(IfElseWidthExpr ifElseWidthExp
throws RuntimeException {
OutputWidthExpression ifReducedExpr = ifElseWidthExpr.expressions[0].accept(this, state);
assert ifReducedExpr instanceof FixedLenExpr;
int ifWidth = ((FixedLenExpr)ifReducedExpr).getWidth();
int ifWidth = ((FixedLenExpr)ifReducedExpr).getDataWidth();
int elseWidth = -1;
if (ifElseWidthExpr.expressions[1] != null) {
OutputWidthExpression elseReducedExpr = ifElseWidthExpr.expressions[1].accept(this, state);
assert elseReducedExpr instanceof FixedLenExpr;
elseWidth = ((FixedLenExpr)elseReducedExpr).getWidth();
elseWidth = ((FixedLenExpr)elseReducedExpr).getDataWidth();
}
int outputWidth = Math.max(ifWidth, elseWidth);
return new FixedLenExpr(outputWidth);
Expand All @@ -270,8 +270,10 @@ public OutputWidthExpression visitIfElseWidthExpr(IfElseWidthExpr ifElseWidthExp
private OutputWidthExpression getFixedLenExpr(MajorType majorType) {
MajorType type = majorType;
if (Types.isFixedWidthType(type)) {
int fixedWidth = ProjectMemoryManager.getWidthOfFixedWidthType(type);
return new OutputWidthExpression.FixedLenExpr(fixedWidth);
// Use only the width of the data. Metadata width will be accounted for at the end
// This is to avoid using metadata size in intermediate calculations
int fixedDataWidth = ProjectMemoryManager.getDataWidthOfFixedWidthType(type);
return new OutputWidthExpression.FixedLenExpr(fixedDataWidth);
}
return null;
}
Expand Down
Expand Up @@ -20,17 +20,21 @@
import com.google.common.base.Preconditions;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.physical.impl.project.OutputWidthExpression.VarLenReadExpr;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchMemoryManager;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.NullableVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.physical.impl.project.OutputWidthExpression.FixedLenExpr;
import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -93,15 +97,18 @@ class ColumnWidthInfo {
int width;
WidthType widthType;
OutputColumnType outputColumnType;
ValueVector outputVV; // for transfers, this is the transfer src


ColumnWidthInfo(OutputWidthExpression outputWidthExpression,
OutputColumnType outputColumnType,
WidthType widthType,
int fieldWidth) {
int fieldWidth, ValueVector outputVV) {
this.outputExpression = outputWidthExpression;
this.width = fieldWidth;
this.outputColumnType = outputColumnType;
this.widthType = widthType;
this.outputVV = outputVV;
}

public OutputWidthExpression getOutputExpression() { return outputExpression; }
Expand Down Expand Up @@ -151,13 +158,12 @@ public ValueVector getOutgoingValueVector(TypedFieldId fieldId) {
static boolean isFixedWidth(ValueVector vv) { return (vv instanceof FixedWidthVector); }


static int getWidthOfFixedWidthType(ValueVector vv) {
static int getNetWidthOfFixedWidthType(ValueVector vv) {
assert isFixedWidth(vv);
return ((FixedWidthVector)vv).getValueWidth();
}

public static int getWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
DataMode mode = majorType.getMode();
public static int getDataWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
MinorType minorType = majorType.getMinorType();
final boolean isVariableWidth = (minorType == MinorType.VARCHAR || minorType == MinorType.VAR16CHAR
|| minorType == MinorType.VARBINARY);
Expand All @@ -166,12 +172,11 @@ public static int getWidthOfFixedWidthType(TypeProtos.MajorType majorType) {
throw new IllegalArgumentException("getWidthOfFixedWidthType() cannot handle variable width types");
}

final boolean isOptional = (mode == DataMode.OPTIONAL);
final boolean isRepeated = (mode == DataMode.REPEATED);
final boolean isRepeatedList = false; // repeated
final Map<String, RecordBatchSizer.ColumnSize> children = null;
if (minorType == MinorType.NULL) {
return 0;
}

return RecordBatchSizer.getStdNetSizePerEntryCommon(majorType, isOptional, isRepeated, isRepeatedList, children);
return TypeHelper.getSize(majorType);
}


Expand All @@ -196,11 +201,13 @@ private void addVariableWidthField(ValueVector vv, LogicalExpression logicalExpr
OutputColumnType outputColumnType, String inputColumnName, String outputColumnName) {
variableWidthColumnCount++;
ColumnWidthInfo columnWidthInfo;
logger.trace("addVariableWidthField(): vv {} totalCount: {} outputColumnType: {}",
printVV(vv), variableWidthColumnCount, outputColumnType);
//Variable width transfers
if(outputColumnType == OutputColumnType.TRANSFER) {
VarLenReadExpr readExpr = new VarLenReadExpr(inputColumnName);
columnWidthInfo = new ColumnWidthInfo(readExpr, outputColumnType,
WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the RecordBatchSizer
WidthType.VARIABLE, -1, vv); //fieldWidth has to be obtained from the RecordBatchSizer
} else if (isComplex(vv.getField().getType())) {
addComplexField(vv);
return;
Expand All @@ -209,25 +216,37 @@ private void addVariableWidthField(ValueVector vv, LogicalExpression logicalExpr
OutputWidthVisitorState state = new OutputWidthVisitorState(this);
OutputWidthExpression outputWidthExpression = logicalExpression.accept(new OutputWidthVisitor(), state);
columnWidthInfo = new ColumnWidthInfo(outputWidthExpression, outputColumnType,
WidthType.VARIABLE, -1); //fieldWidth has to be obtained from the OutputWidthExpression
WidthType.VARIABLE, -1, vv); //fieldWidth has to be obtained from the OutputWidthExpression
}
ColumnWidthInfo existingInfo = outputColumnSizes.put(outputColumnName, columnWidthInfo);
Preconditions.checkState(existingInfo == null);
}

public static String printVV(ValueVector vv) {
String str = "null";
if (vv != null) {
str = vv.getField().getName() + " " + vv.getField().getType();
}
return str;
}

void addComplexField(ValueVector vv) {
//Complex types are not yet supported. Just use a guess for the size
assert vv == null || isComplex(vv.getField().getType());
complexColumnsCount++;
// just a guess
totalComplexColumnWidth += OutputSizeEstimateConstants.COMPLEX_FIELD_ESTIMATE;
logger.trace("addComplexField(): vv {} totalCount: {} totalComplexColumnWidth: {}",
printVV(vv), complexColumnsCount, totalComplexColumnWidth);
}

void addFixedWidthField(ValueVector vv) {
assert isFixedWidth(vv);
fixedWidthColumnCount++;
int fixedFieldWidth = getWidthOfFixedWidthType(vv);
int fixedFieldWidth = getNetWidthOfFixedWidthType(vv);
totalFixedWidthColumnWidth += fixedFieldWidth;
logger.trace("addFixedWidthField(): vv {} totalCount: {} totalComplexColumnWidth: {}",
printVV(vv), fixedWidthColumnCount, totalFixedWidthColumnWidth);
}

public void init(RecordBatch incomingBatch, ProjectRecordBatch outgoingBatch) {
Expand Down Expand Up @@ -267,8 +286,12 @@ public void update() {
OutputWidthExpression savedWidthExpr = columnWidthInfo.getOutputExpression();
OutputWidthVisitorState state = new OutputWidthVisitorState(this);
OutputWidthExpression reducedExpr = savedWidthExpr.accept(new OutputWidthVisitor(), state);
width = ((FixedLenExpr)reducedExpr).getWidth();
width = ((FixedLenExpr)reducedExpr).getDataWidth();
Preconditions.checkState(width >= 0);
int metadataWidth = getMetadataWidth(columnWidthInfo.outputVV);
logger.trace("update(): fieldName {} width: {} metadataWidth: {}",
columnWidthInfo.outputVV.getField().getName(), width, metadataWidth);
width += metadataWidth;
}
totalVariableColumnWidth += width;
}
Expand Down Expand Up @@ -301,4 +324,21 @@ public void update() {
logger.debug("BATCH_STATS, incoming: {}", getRecordBatchSizer());
updateIncomingStats();
}

public static int getMetadataWidth(ValueVector vv) {
int width = 0;
if (vv instanceof NullableVector) {
width += ((NullableVector)vv).getBitsVector().getPayloadByteCount(1);
}

if (vv instanceof VariableWidthVector) {
width += ((VariableWidthVector)vv).getOffsetVector().getPayloadByteCount(1);
}

if (vv instanceof BaseRepeatedValueVector) {
width += ((BaseRepeatedValueVector)vv).getOffsetVector().getPayloadByteCount(1);
width += (getMetadataWidth(((BaseRepeatedValueVector)vv).getDataVector()) * RepeatedValueVector.DEFAULT_REPEAT_PER_RECORD);
}
return width;
}
}

0 comments on commit ddb35ce

Please sign in to comment.