Skip to content

Commit

Permalink
DRILL-4081: Handle schema changes in ExternalSort
Browse files Browse the repository at this point in the history
closes #257
  • Loading branch information
StevenMPhillips committed Nov 16, 2015
1 parent cd01107 commit ea8e17d
Show file tree
Hide file tree
Showing 18 changed files with 666 additions and 134 deletions.
Expand Up @@ -18,16 +18,25 @@
package org.apache.drill.exec.expr.fn; package org.apache.drill.exec.expr.fn;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;


import com.google.common.collect.Lists;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.FunctionHolderExpression; import org.apache.drill.common.expression.FunctionHolderExpression;
import org.apache.drill.common.expression.IfExpression;
import org.apache.drill.common.expression.IfExpression.IfCondition;
import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.ValueExpressions.IntExpression;
import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types; import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.HoldingContainerExpression; import org.apache.drill.exec.expr.HoldingContainerExpression;
import org.apache.calcite.rel.RelFieldCollation.NullDirection; import org.apache.calcite.rel.RelFieldCollation.NullDirection;


Expand All @@ -46,7 +55,7 @@ public class FunctionGenerationHelper {
* @return * @return
* FunctionHolderExpression containing the found function implementation * FunctionHolderExpression containing the found function implementation
*/ */
public static FunctionHolderExpression getOrderingComparator( public static LogicalExpression getOrderingComparator(
boolean null_high, boolean null_high,
HoldingContainer left, HoldingContainer left,
HoldingContainer right, HoldingContainer right,
Expand All @@ -56,13 +65,20 @@ public static FunctionHolderExpression getOrderingComparator(


if ( ! isComparableType(left.getMajorType() ) if ( ! isComparableType(left.getMajorType() )
|| ! isComparableType(right.getMajorType() ) || ! isComparableType(right.getMajorType() )
|| isUnionType(left.getMajorType()) ){
|| isUnionType(right.getMajorType()) ) {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
formatCanNotCompareMsg(left.getMajorType(), right.getMajorType())); formatCanNotCompareMsg(left.getMajorType(), right.getMajorType()));
} }
return getFunctionExpression(comparator_name, Types.required(MinorType.INT), LogicalExpression comparisonFunctionExpression = getFunctionExpression(comparator_name, Types.required(MinorType.INT),
registry, left, right); registry, left, right);

ErrorCollector collector = new ErrorCollectorImpl();
if (!isUnionType(left.getMajorType()) && !isUnionType(right.getMajorType())) {
return ExpressionTreeMaterializer.materialize(comparisonFunctionExpression, null, collector, registry);
} else {
LogicalExpression typeComparisonFunctionExpression = getTypeComparisonFunction(comparisonFunctionExpression, left, right);
return ExpressionTreeMaterializer.materialize(typeComparisonFunctionExpression, null, collector, registry);
}
} }


private static boolean isUnionType(MajorType majorType) { private static boolean isUnionType(MajorType majorType) {
Expand All @@ -76,16 +92,15 @@ private static boolean isUnionType(MajorType majorType) {
* @param right ... * @param right ...
* @param registry ... * @param registry ...
* @return FunctionHolderExpression containing the function implementation * @return FunctionHolderExpression containing the function implementation
* @see #getComparator
*/ */
public static FunctionHolderExpression getOrderingComparatorNullsHigh( public static LogicalExpression getOrderingComparatorNullsHigh(
HoldingContainer left, HoldingContainer left,
HoldingContainer right, HoldingContainer right,
FunctionImplementationRegistry registry) { FunctionImplementationRegistry registry) {
return getOrderingComparator(true, left, right, registry); return getOrderingComparator(true, left, right, registry);
} }


public static FunctionHolderExpression getFunctionExpression( private static LogicalExpression getFunctionExpression(
String name, MajorType returnType, FunctionImplementationRegistry registry, HoldingContainer... args) { String name, MajorType returnType, FunctionImplementationRegistry registry, HoldingContainer... args) {
List<MajorType> argTypes = new ArrayList<MajorType>(args.length); List<MajorType> argTypes = new ArrayList<MajorType>(args.length);
List<LogicalExpression> argExpressions = new ArrayList<LogicalExpression>(args.length); List<LogicalExpression> argExpressions = new ArrayList<LogicalExpression>(args.length);
Expand All @@ -94,25 +109,34 @@ public static FunctionHolderExpression getFunctionExpression(
argExpressions.add(new HoldingContainerExpression(c)); argExpressions.add(new HoldingContainerExpression(c));
} }


DrillFuncHolder holder = registry.findExactMatchingDrillFunction(name, argTypes, returnType); return new FunctionCall(name, argExpressions, ExpressionPosition.UNKNOWN);
if (holder != null) { }
return holder.getExpr(name, argExpressions, ExpressionPosition.UNKNOWN);
}


StringBuilder sb = new StringBuilder(); /**
sb.append("Failure finding function that runtime code generation expected. Signature: "); * Wraps the comparison function in an If-statement which compares the types first, evaluating the comaprison function only
sb.append(name); * if the types are equivialent
sb.append("( "); *
for(int i =0; i < args.length; i++) { * @param comparisonFunction
MajorType mt = args[i].getMajorType(); * @param args
if (i != 0) { * @return
sb.append(", "); */
} private static LogicalExpression getTypeComparisonFunction(LogicalExpression comparisonFunction, HoldingContainer... args) {
appendType(mt, sb); List<LogicalExpression> argExpressions = Lists.newArrayList();
List<MajorType> argTypes = Lists.newArrayList();
for(HoldingContainer c : args) {
argTypes.add(c.getMajorType());
argExpressions.add(new HoldingContainerExpression(c));
} }
sb.append(" ) returns "); FunctionCall call = new FunctionCall("compareType", argExpressions, ExpressionPosition.UNKNOWN);
appendType(returnType, sb);
throw new UnsupportedOperationException(sb.toString()); List<LogicalExpression> newArgs = Lists.newArrayList();
newArgs.add(call);
newArgs.add(new IntExpression(0, ExpressionPosition.UNKNOWN));
FunctionCall notEqual = new FunctionCall("not_equal", newArgs, ExpressionPosition.UNKNOWN);

IfExpression.IfCondition ifCondition = new IfCondition(notEqual, call);
IfExpression ifExpression = IfExpression.newBuilder().setIfCondition(ifCondition).setElse(comparisonFunction).build();
return ifExpression;
} }


private static final void appendType(MajorType mt, StringBuilder sb) { private static final void appendType(MajorType mt, StringBuilder sb) {
Expand All @@ -121,7 +145,7 @@ private static final void appendType(MajorType mt, StringBuilder sb) {
sb.append(mt.getMode().name()); sb.append(mt.getMode().name());
} }


protected static boolean isComparableType(MajorType type) { private static boolean isComparableType(MajorType type) {
if (type.getMinorType() == MinorType.MAP || if (type.getMinorType() == MinorType.MAP ||
type.getMinorType() == MinorType.LIST || type.getMinorType() == MinorType.LIST ||
type.getMode() == TypeProtos.DataMode.REPEATED ) { type.getMode() == TypeProtos.DataMode.REPEATED ) {
Expand Down
Expand Up @@ -17,8 +17,10 @@
*/ */
package org.apache.drill.exec.expr.fn.impl; package org.apache.drill.exec.expr.fn.impl;


import com.google.common.collect.Sets;
import io.netty.buffer.DrillBuf; import io.netty.buffer.DrillBuf;
import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.DrillSimpleFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate; import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
Expand All @@ -32,16 +34,106 @@
import org.apache.drill.exec.expr.holders.UnionHolder; import org.apache.drill.exec.expr.holders.UnionHolder;
import org.apache.drill.exec.expr.holders.IntHolder; import org.apache.drill.exec.expr.holders.IntHolder;
import org.apache.drill.exec.expr.holders.VarCharHolder; import org.apache.drill.exec.expr.holders.VarCharHolder;
import org.apache.drill.exec.resolver.TypeCastRules;
import org.apache.drill.exec.vector.complex.impl.UnionReader; import org.apache.drill.exec.vector.complex.impl.UnionReader;
import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.drill.exec.vector.complex.reader.FieldReader;


import javax.inject.Inject; import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;


/** /**
* The class contains additional functions for union types in addition to those in GUnionFunctions * The class contains additional functions for union types in addition to those in GUnionFunctions
*/ */
public class UnionFunctions { public class UnionFunctions {


/**
* Returns zero if the inputs have equivalent types. Two numeric types are considered equivalent, as are a combination
* of date/timestamp. If not equivalent, returns a value determined by the numeric value of the MinorType enum
*/
@FunctionTemplate(names = {"compareType"},
scope = FunctionTemplate.FunctionScope.SIMPLE,
nulls = NullHandling.INTERNAL)
public static class CompareType implements DrillSimpleFunc {

@Param
FieldReader input1;
@Param
FieldReader input2;
@Output
IntHolder out;

public void setup() {}

public void eval() {
org.apache.drill.common.types.TypeProtos.MinorType type1;
if (input1.isSet()) {
type1 = input1.getType().getMinorType();
} else {
type1 = org.apache.drill.common.types.TypeProtos.MinorType.NULL;
}
org.apache.drill.common.types.TypeProtos.MinorType type2;
if (input2.isSet()) {
type2 = input2.getType().getMinorType();
} else {
type2 = org.apache.drill.common.types.TypeProtos.MinorType.NULL;
}

out.value = org.apache.drill.exec.expr.fn.impl.UnionFunctions.compareTypes(type1, type2);
}
}

public static int compareTypes(MinorType type1, MinorType type2) {
int typeValue1 = getTypeValue(type1);
int typeValue2 = getTypeValue(type2);
return typeValue1 - typeValue2;
}

/**
* Gives a type ordering modeled after the behavior of MongoDB
* Numeric types are first, folowed by string types, followed by binary, then boolean, then date, then timestamp
* Any other times will be sorted after that
* @param type
* @return
*/
private static int getTypeValue(MinorType type) {
if (TypeCastRules.isNumericType(type)) {
return 0;
}
switch (type) {
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
case UINT1:
case UINT2:
case UINT4:
case UINT8:
case DECIMAL9:
case DECIMAL18:
case DECIMAL28SPARSE:
case DECIMAL38SPARSE:
case FLOAT4:
case FLOAT8:
return 0;
case VARCHAR:
case VAR16CHAR:
return 1;
case VARBINARY:
return 2;
case BIT:
return 3;
case DATE:
return 4;
case TIMESTAMP:
return 5;
default:
return 6 + type.getNumber();
}
}

@FunctionTemplate(names = {"typeOf"}, @FunctionTemplate(names = {"typeOf"},
scope = FunctionTemplate.FunctionScope.SIMPLE, scope = FunctionTemplate.FunctionScope.SIMPLE,
nulls = NullHandling.INTERNAL) nulls = NullHandling.INTERNAL)
Expand Down
Expand Up @@ -19,10 +19,16 @@


import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set;


import com.google.common.collect.Sets;
import io.netty.buffer.DrillBuf; import io.netty.buffer.DrillBuf;
import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.BufferAllocator.PreAllocator; import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.FragmentContext;
Expand All @@ -38,6 +44,7 @@


import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.drill.exec.vector.complex.UnionVector;


public class SortRecordBatchBuilder implements AutoCloseable { public class SortRecordBatchBuilder implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortRecordBatchBuilder.class); static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortRecordBatchBuilder.class);
Expand All @@ -47,10 +54,12 @@ public class SortRecordBatchBuilder implements AutoCloseable {
private int recordCount; private int recordCount;
private long runningBatches; private long runningBatches;
private SelectionVector4 sv4; private SelectionVector4 sv4;
private BufferAllocator allocator;
final PreAllocator svAllocator; final PreAllocator svAllocator;
private boolean svAllocatorUsed = false; private boolean svAllocatorUsed = false;


public SortRecordBatchBuilder(BufferAllocator a) { public SortRecordBatchBuilder(BufferAllocator a) {
this.allocator = a;
this.svAllocator = a.getNewPreAllocator(); this.svAllocator = a.getNewPreAllocator();
} }


Expand Down
Expand Up @@ -24,7 +24,9 @@
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.cache.VectorAccessibleSerializable; import org.apache.drill.exec.cache.VectorAccessibleSerializable;
import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.SchemaUtil;
import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorAccessible;
Expand All @@ -51,23 +53,36 @@ public class BatchGroup implements VectorAccessible, AutoCloseable {
private FileSystem fs; private FileSystem fs;
private BufferAllocator allocator; private BufferAllocator allocator;
private int spilledBatches = 0; private int spilledBatches = 0;
private OperatorContext context;
private BatchSchema schema;


public BatchGroup(VectorContainer container, SelectionVector2 sv2) { public BatchGroup(VectorContainer container, SelectionVector2 sv2, OperatorContext context) {
this.sv2 = sv2; this.sv2 = sv2;
this.currentContainer = container; this.currentContainer = container;
this.context = context;
} }


public BatchGroup(VectorContainer container, FileSystem fs, String path, BufferAllocator allocator) { public BatchGroup(VectorContainer container, FileSystem fs, String path, OperatorContext context) {
currentContainer = container; currentContainer = container;
this.fs = fs; this.fs = fs;
this.path = new Path(path); this.path = new Path(path);
this.allocator = allocator; this.allocator = context.getAllocator();
this.context = context;
} }


public SelectionVector2 getSv2() { public SelectionVector2 getSv2() {
return sv2; return sv2;
} }


/**
* Updates the schema for this batch group. The current as well as any deserialized batches will be coerced to this schema
* @param schema
*/
public void setSchema(BatchSchema schema) {
currentContainer = SchemaUtil.coerceContainer(currentContainer, schema, context);
this.schema = schema;
}

public void addBatch(VectorContainer newContainer) throws IOException { public void addBatch(VectorContainer newContainer) throws IOException {
assert fs != null; assert fs != null;
assert path != null; assert path != null;
Expand Down Expand Up @@ -96,6 +111,9 @@ private VectorContainer getBatch() throws IOException {
watch.start(); watch.start();
vas.readFromStream(inputStream); vas.readFromStream(inputStream);
VectorContainer c = vas.get(); VectorContainer c = vas.get();
if (schema != null) {
c = SchemaUtil.coerceContainer(c, schema, context);
}
// logger.debug("Took {} us to read {} records", watch.elapsed(TimeUnit.MICROSECONDS), c.getRecordCount()); // logger.debug("Took {} us to read {} records", watch.elapsed(TimeUnit.MICROSECONDS), c.getRecordCount());
spilledBatches--; spilledBatches--;
currentContainer.zeroVectors(); currentContainer.zeroVectors();
Expand Down

0 comments on commit ea8e17d

Please sign in to comment.