Skip to content

Commit

Permalink
DRILL-3874: flattening large JSON objects uses too much direct memory…
Browse files Browse the repository at this point in the history
… - add getBufferSizeFor() to ValueVector interface - add implememtations of getBufferSizeFor() for all ValueVector derivatives - add adaptive algorithm for adjusting batch size to flatten operator
  • Loading branch information
cwestin authored and parthchandra committed Oct 2, 2015
1 parent af0aff8 commit a3b27c8
Show file tree
Hide file tree
Showing 16 changed files with 311 additions and 66 deletions.
Expand Up @@ -59,6 +59,14 @@ public FieldReader getReader(){
return reader; return reader;
} }


@Override
public int getBufferSizeFor(final int valueCount) {
if (valueCount == 0) {
return 0;
}
return valueCount * ${type.width};
}

@Override @Override
public int getValueCapacity(){ public int getValueCapacity(){
return (int) (data.capacity() *1.0 / ${type.width}); return (int) (data.capacity() *1.0 / ${type.width});
Expand Down
Expand Up @@ -102,6 +102,16 @@ public int getBufferSize(){
return values.getBufferSize() + bits.getBufferSize(); return values.getBufferSize() + bits.getBufferSize();
} }


@Override
public int getBufferSizeFor(final int valueCount) {
if (valueCount == 0) {
return 0;
}

return values.getBufferSizeFor(valueCount)
+ bits.getBufferSizeFor(valueCount);
}

@Override @Override
public DrillBuf getBuffer() { public DrillBuf getBuffer() {
return values.getBuffer(); return values.getBuffer();
Expand Down
Expand Up @@ -88,6 +88,16 @@ public int getBufferSize(){
return offsetVector.getBufferSize() + data.writerIndex(); return offsetVector.getBufferSize() + data.writerIndex();
} }


@Override
public int getBufferSizeFor(final int valueCount) {
if (valueCount == 0) {
return 0;
}

final int idx = offsetVector.getAccessor().get(valueCount);
return offsetVector.getBufferSizeFor(valueCount + 1) + idx;
}

@Override @Override
public int getValueCapacity(){ public int getValueCapacity(){
return Math.max(offsetVector.getValueCapacity() - 1, 0); return Math.max(offsetVector.getValueCapacity() - 1, 0);
Expand Down Expand Up @@ -302,6 +312,7 @@ public boolean allocateNewSafe() {
try { try {
final int requestedSize = (int)curAllocationSize; final int requestedSize = (int)curAllocationSize;
data = allocator.buffer(requestedSize); data = allocator.buffer(requestedSize);
allocationSizeInBytes = requestedSize;
offsetVector.allocateNew(); offsetVector.allocateNew();
} catch (OutOfMemoryRuntimeException e) { } catch (OutOfMemoryRuntimeException e) {
clear(); clear();
Expand Down
Expand Up @@ -51,18 +51,6 @@ public class TopLevelAllocator implements BufferAllocator {
private final DrillBuf empty; private final DrillBuf empty;
private final DrillConfig config; private final DrillConfig config;


/* TODO(cwestin) remove
@Deprecated
TopLevelAllocator() {
this(DrillConfig.getMaxDirectMemory());
}
@Deprecated
TopLevelAllocator(long maximumAllocation) {
this(null, maximumAllocation, true);
}
*/

private TopLevelAllocator(DrillConfig config, long maximumAllocation, boolean errorOnLeak){ private TopLevelAllocator(DrillConfig config, long maximumAllocation, boolean errorOnLeak){
MAXIMUM_DIRECT_MEMORY = maximumAllocation; MAXIMUM_DIRECT_MEMORY = maximumAllocation;
this.config=(config!=null) ? config : DrillConfig.create(); this.config=(config!=null) ? config : DrillConfig.create();
Expand Down
Expand Up @@ -21,17 +21,16 @@
import java.util.List; import java.util.List;


import com.carrotsearch.hppc.IntOpenHashSet; import com.carrotsearch.hppc.IntOpenHashSet;

import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.DrillFuncHolderExpr; import org.apache.drill.exec.expr.DrillFuncHolderExpr;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
Expand Down Expand Up @@ -61,29 +60,32 @@
// TODO - handle the case where a user tries to flatten a scalar, should just act as a project all of the columns exactly // TODO - handle the case where a user tries to flatten a scalar, should just act as a project all of the columns exactly
// as they come in // as they come in
public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenRecordBatch.class); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenRecordBatch.class);


private Flattener flattener; private Flattener flattener;
private List<ValueVector> allocationVectors; private List<ValueVector> allocationVectors;
private List<ComplexWriter> complexWriters; private List<ComplexWriter> complexWriters;
private boolean hasRemainder = false; private boolean hasRemainder = false;
private int remainderIndex = 0; private int remainderIndex = 0;
private int recordCount; private int recordCount;
// the buildSchema method is always called first by a short circuit path to return schema information to the client
// this information is not entirely accurate as Drill determines schema on the fly, so here it needs to have modified private final Flattener.Monitor monitor = new Flattener.Monitor() {
// behavior for that call to setup the schema for the flatten operation @Override
private boolean fastSchemaCalled; public int getBufferSizeFor(int recordCount) {
int bufferSize = 0;
for(final ValueVector vv : allocationVectors) {
bufferSize += vv.getBufferSizeFor(recordCount);
}
return bufferSize;
}
};


private static final String EMPTY_STRING = ""; private static final String EMPTY_STRING = "";


private class ClassifierResult { private class ClassifierResult {
public boolean isStar = false;
public List<String> outputNames; public List<String> outputNames;
public String prefix = "";


private void clear() { private void clear() {
isStar = false;
prefix = "";
if (outputNames != null) { if (outputNames != null) {
outputNames.clear(); outputNames.clear();
} }
Expand All @@ -94,7 +96,6 @@ private void clear() {


public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming); super(pop, context, incoming);
fastSchemaCalled = false;
} }


@Override @Override
Expand Down Expand Up @@ -150,7 +151,7 @@ protected IterOutcome doWork() {
setFlattenVector(); setFlattenVector();


int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getInnerValueCount(); int childCount = incomingRecordCount == 0 ? 0 : flattener.getFlattenField().getAccessor().getInnerValueCount();
int outputRecords = flattener.flattenRecords(incomingRecordCount, 0); int outputRecords = flattener.flattenRecords(incomingRecordCount, 0, monitor);
// TODO - change this to be based on the repeated vector length // TODO - change this to be based on the repeated vector length
if (outputRecords < childCount) { if (outputRecords < childCount) {
setValueCount(outputRecords); setValueCount(outputRecords);
Expand Down Expand Up @@ -181,7 +182,7 @@ private void handleRemainder() {
return; return;
} }


int projRecords = flattener.flattenRecords(remainingRecordCount, 0); int projRecords = flattener.flattenRecords(remainingRecordCount, 0, monitor);
if (projRecords < remainingRecordCount) { if (projRecords < remainingRecordCount) {
setValueCount(projRecords); setValueCount(projRecords);
this.recordCount = projRecords; this.recordCount = projRecords;
Expand Down Expand Up @@ -243,9 +244,7 @@ private void setValueCount(int count) {
} }


private FieldReference getRef(NamedExpression e) { private FieldReference getRef(NamedExpression e) {
FieldReference ref = e.getRef(); final FieldReference ref = e.getRef();
PathSegment seg = ref.getRootSegment();

return ref; return ref;
} }


Expand All @@ -261,7 +260,7 @@ private FieldReference getRef(NamedExpression e) {
*/ */
private TransferPair getFlattenFieldTransferPair(FieldReference reference) { private TransferPair getFlattenFieldTransferPair(FieldReference reference) {
final TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn()); final TypedFieldId fieldId = incoming.getValueVectorId(popConfig.getColumn());
final Class vectorClass = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]).getValueClass(); final Class<?> vectorClass = incoming.getSchema().getColumn(fieldId.getFieldIds()[0]).getValueClass();
final ValueVector flattenField = incoming.getValueAccessorById(vectorClass, fieldId.getFieldIds()).getValueVector(); final ValueVector flattenField = incoming.getValueAccessorById(vectorClass, fieldId.getFieldIds()).getValueVector();


TransferPair tp = null; TransferPair tp = null;
Expand Down Expand Up @@ -338,7 +337,7 @@ protected boolean setupNewSchema() throws SchemaChangeException {
allocationVectors.add(vector); allocationVectors.add(vector);
TypedFieldId fid = container.add(vector); TypedFieldId fid = container.add(vector);
ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
HoldingContainer hc = cg.addExpr(write); cg.addExpr(write);


logger.debug("Added eval for project expression."); logger.debug("Added eval for project expression.");
} }
Expand Down Expand Up @@ -369,5 +368,4 @@ private List<NamedExpression> getExpressionList() {
} }
return exprs; return exprs;
} }

} }

0 comments on commit a3b27c8

Please sign in to comment.