From ea8e17d7bb8b82479138d17245f4c7333de430a3 Mon Sep 17 00:00:00 2001 From: Steven Phillips Date: Fri, 13 Nov 2015 11:27:16 -0800 Subject: [PATCH] DRILL-4081: Handle schema changes in ExternalSort closes #257 --- .../expr/fn/FunctionGenerationHelper.java | 74 +++++--- .../exec/expr/fn/impl/UnionFunctions.java | 92 ++++++++++ .../impl/sort/SortRecordBatchBuilder.java | 9 + .../exec/physical/impl/xsort/BatchGroup.java | 24 ++- .../impl/xsort/ExternalSortBatch.java | 49 ++++-- .../physical/impl/xsort/MSortTemplate.java | 2 +- .../impl/xsort/SingleBatchSorter.java | 3 +- .../impl/xsort/SingleBatchSorterTemplate.java | 5 +- .../apache/drill/exec/record/BatchSchema.java | 37 ++++ .../drill/exec/record/HyperVectorWrapper.java | 23 +-- .../apache/drill/exec/record/SchemaUtil.java | 163 ++++++++++++++++++ .../exec/record/SimpleVectorWrapper.java | 58 +------ .../apache/drill/exec/util/BatchPrinter.java | 37 ++++ .../physical/impl/xsort/TestExternalSort.java | 139 +++++++++++++++ .../main/codegen/templates/ComplexCopier.java | 4 +- .../main/codegen/templates/UnionVector.java | 16 +- .../vector/complex/AbstractMapVector.java | 2 +- .../exec/vector/complex/FieldIdUtil.java | 63 +++++++ 18 files changed, 666 insertions(+), 134 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java index 86734c8a362..90b0816556c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionGenerationHelper.java @@ -18,16 +18,25 @@ package org.apache.drill.exec.expr.fn; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import com.google.common.collect.Lists; +import org.apache.drill.common.expression.ErrorCollector; +import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.FunctionHolderExpression; +import org.apache.drill.common.expression.IfExpression; +import org.apache.drill.common.expression.IfExpression.IfCondition; import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.ValueExpressions.IntExpression; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; +import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.HoldingContainerExpression; import org.apache.calcite.rel.RelFieldCollation.NullDirection; @@ -46,7 +55,7 @@ public class FunctionGenerationHelper { * @return * FunctionHolderExpression containing the found function implementation */ - public static FunctionHolderExpression getOrderingComparator( + public static LogicalExpression getOrderingComparator( boolean null_high, HoldingContainer left, HoldingContainer right, @@ -56,13 +65,20 @@ public static FunctionHolderExpression getOrderingComparator( if ( ! isComparableType(left.getMajorType() ) || ! isComparableType(right.getMajorType() ) - || isUnionType(left.getMajorType()) - || isUnionType(right.getMajorType()) ) { + ){ throw new UnsupportedOperationException( formatCanNotCompareMsg(left.getMajorType(), right.getMajorType())); } - return getFunctionExpression(comparator_name, Types.required(MinorType.INT), + LogicalExpression comparisonFunctionExpression = getFunctionExpression(comparator_name, Types.required(MinorType.INT), registry, left, right); + + ErrorCollector collector = new ErrorCollectorImpl(); + if (!isUnionType(left.getMajorType()) && !isUnionType(right.getMajorType())) { + return ExpressionTreeMaterializer.materialize(comparisonFunctionExpression, null, collector, registry); + } else { + LogicalExpression typeComparisonFunctionExpression = getTypeComparisonFunction(comparisonFunctionExpression, left, right); + return ExpressionTreeMaterializer.materialize(typeComparisonFunctionExpression, null, collector, registry); + } } private static boolean isUnionType(MajorType majorType) { @@ -76,16 +92,15 @@ private static boolean isUnionType(MajorType majorType) { * @param right ... * @param registry ... * @return FunctionHolderExpression containing the function implementation - * @see #getComparator */ - public static FunctionHolderExpression getOrderingComparatorNullsHigh( + public static LogicalExpression getOrderingComparatorNullsHigh( HoldingContainer left, HoldingContainer right, FunctionImplementationRegistry registry) { return getOrderingComparator(true, left, right, registry); } - public static FunctionHolderExpression getFunctionExpression( + private static LogicalExpression getFunctionExpression( String name, MajorType returnType, FunctionImplementationRegistry registry, HoldingContainer... args) { List argTypes = new ArrayList(args.length); List argExpressions = new ArrayList(args.length); @@ -94,25 +109,34 @@ public static FunctionHolderExpression getFunctionExpression( argExpressions.add(new HoldingContainerExpression(c)); } - DrillFuncHolder holder = registry.findExactMatchingDrillFunction(name, argTypes, returnType); - if (holder != null) { - return holder.getExpr(name, argExpressions, ExpressionPosition.UNKNOWN); - } + return new FunctionCall(name, argExpressions, ExpressionPosition.UNKNOWN); + } - StringBuilder sb = new StringBuilder(); - sb.append("Failure finding function that runtime code generation expected. Signature: "); - sb.append(name); - sb.append("( "); - for(int i =0; i < args.length; i++) { - MajorType mt = args[i].getMajorType(); - if (i != 0) { - sb.append(", "); - } - appendType(mt, sb); + /** + * Wraps the comparison function in an If-statement which compares the types first, evaluating the comaprison function only + * if the types are equivialent + * + * @param comparisonFunction + * @param args + * @return + */ + private static LogicalExpression getTypeComparisonFunction(LogicalExpression comparisonFunction, HoldingContainer... args) { + List argExpressions = Lists.newArrayList(); + List argTypes = Lists.newArrayList(); + for(HoldingContainer c : args) { + argTypes.add(c.getMajorType()); + argExpressions.add(new HoldingContainerExpression(c)); } - sb.append(" ) returns "); - appendType(returnType, sb); - throw new UnsupportedOperationException(sb.toString()); + FunctionCall call = new FunctionCall("compareType", argExpressions, ExpressionPosition.UNKNOWN); + + List newArgs = Lists.newArrayList(); + newArgs.add(call); + newArgs.add(new IntExpression(0, ExpressionPosition.UNKNOWN)); + FunctionCall notEqual = new FunctionCall("not_equal", newArgs, ExpressionPosition.UNKNOWN); + + IfExpression.IfCondition ifCondition = new IfCondition(notEqual, call); + IfExpression ifExpression = IfExpression.newBuilder().setIfCondition(ifCondition).setElse(comparisonFunction).build(); + return ifExpression; } private static final void appendType(MajorType mt, StringBuilder sb) { @@ -121,7 +145,7 @@ private static final void appendType(MajorType mt, StringBuilder sb) { sb.append(mt.getMode().name()); } - protected static boolean isComparableType(MajorType type) { + private static boolean isComparableType(MajorType type) { if (type.getMinorType() == MinorType.MAP || type.getMinorType() == MinorType.LIST || type.getMode() == TypeProtos.DataMode.REPEATED ) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java index 26cc43bf115..1ff7a9edb7c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java @@ -17,8 +17,10 @@ */ package org.apache.drill.exec.expr.fn.impl; +import com.google.common.collect.Sets; import io.netty.buffer.DrillBuf; import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; @@ -32,16 +34,106 @@ import org.apache.drill.exec.expr.holders.UnionHolder; import org.apache.drill.exec.expr.holders.IntHolder; import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.resolver.TypeCastRules; import org.apache.drill.exec.vector.complex.impl.UnionReader; import org.apache.drill.exec.vector.complex.reader.FieldReader; import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Set; /** * The class contains additional functions for union types in addition to those in GUnionFunctions */ public class UnionFunctions { + /** + * Returns zero if the inputs have equivalent types. Two numeric types are considered equivalent, as are a combination + * of date/timestamp. If not equivalent, returns a value determined by the numeric value of the MinorType enum + */ + @FunctionTemplate(names = {"compareType"}, + scope = FunctionTemplate.FunctionScope.SIMPLE, + nulls = NullHandling.INTERNAL) + public static class CompareType implements DrillSimpleFunc { + + @Param + FieldReader input1; + @Param + FieldReader input2; + @Output + IntHolder out; + + public void setup() {} + + public void eval() { + org.apache.drill.common.types.TypeProtos.MinorType type1; + if (input1.isSet()) { + type1 = input1.getType().getMinorType(); + } else { + type1 = org.apache.drill.common.types.TypeProtos.MinorType.NULL; + } + org.apache.drill.common.types.TypeProtos.MinorType type2; + if (input2.isSet()) { + type2 = input2.getType().getMinorType(); + } else { + type2 = org.apache.drill.common.types.TypeProtos.MinorType.NULL; + } + + out.value = org.apache.drill.exec.expr.fn.impl.UnionFunctions.compareTypes(type1, type2); + } + } + + public static int compareTypes(MinorType type1, MinorType type2) { + int typeValue1 = getTypeValue(type1); + int typeValue2 = getTypeValue(type2); + return typeValue1 - typeValue2; + } + + /** + * Gives a type ordering modeled after the behavior of MongoDB + * Numeric types are first, folowed by string types, followed by binary, then boolean, then date, then timestamp + * Any other times will be sorted after that + * @param type + * @return + */ + private static int getTypeValue(MinorType type) { + if (TypeCastRules.isNumericType(type)) { + return 0; + } + switch (type) { + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: + case UINT1: + case UINT2: + case UINT4: + case UINT8: + case DECIMAL9: + case DECIMAL18: + case DECIMAL28SPARSE: + case DECIMAL38SPARSE: + case FLOAT4: + case FLOAT8: + return 0; + case VARCHAR: + case VAR16CHAR: + return 1; + case VARBINARY: + return 2; + case BIT: + return 3; + case DATE: + return 4; + case TIMESTAMP: + return 5; + default: + return 6 + type.getNumber(); + } + } + @FunctionTemplate(names = {"typeOf"}, scope = FunctionTemplate.FunctionScope.SIMPLE, nulls = NullHandling.INTERNAL) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java index 97a15c4ddcf..e5d1e4e5056 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java @@ -19,10 +19,16 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; +import com.google.common.collect.Sets; import io.netty.buffer.DrillBuf; import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.BufferAllocator.PreAllocator; import org.apache.drill.exec.ops.FragmentContext; @@ -38,6 +44,7 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; +import org.apache.drill.exec.vector.complex.UnionVector; public class SortRecordBatchBuilder implements AutoCloseable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortRecordBatchBuilder.class); @@ -47,10 +54,12 @@ public class SortRecordBatchBuilder implements AutoCloseable { private int recordCount; private long runningBatches; private SelectionVector4 sv4; + private BufferAllocator allocator; final PreAllocator svAllocator; private boolean svAllocatorUsed = false; public SortRecordBatchBuilder(BufferAllocator a) { + this.allocator = a; this.svAllocator = a.getNewPreAllocator(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java index aa3acc8616b..e228305570d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java @@ -24,7 +24,9 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.cache.VectorAccessibleSerializable; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.SchemaUtil; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorAccessible; @@ -51,23 +53,36 @@ public class BatchGroup implements VectorAccessible, AutoCloseable { private FileSystem fs; private BufferAllocator allocator; private int spilledBatches = 0; + private OperatorContext context; + private BatchSchema schema; - public BatchGroup(VectorContainer container, SelectionVector2 sv2) { + public BatchGroup(VectorContainer container, SelectionVector2 sv2, OperatorContext context) { this.sv2 = sv2; this.currentContainer = container; + this.context = context; } - public BatchGroup(VectorContainer container, FileSystem fs, String path, BufferAllocator allocator) { + public BatchGroup(VectorContainer container, FileSystem fs, String path, OperatorContext context) { currentContainer = container; this.fs = fs; this.path = new Path(path); - this.allocator = allocator; + this.allocator = context.getAllocator(); + this.context = context; } public SelectionVector2 getSv2() { return sv2; } + /** + * Updates the schema for this batch group. The current as well as any deserialized batches will be coerced to this schema + * @param schema + */ + public void setSchema(BatchSchema schema) { + currentContainer = SchemaUtil.coerceContainer(currentContainer, schema, context); + this.schema = schema; + } + public void addBatch(VectorContainer newContainer) throws IOException { assert fs != null; assert path != null; @@ -96,6 +111,9 @@ private VectorContainer getBatch() throws IOException { watch.start(); vas.readFromStream(inputStream); VectorContainer c = vas.get(); + if (schema != null) { + c = SchemaUtil.coerceContainer(c, schema, context); + } // logger.debug("Took {} us to read {} records", watch.elapsed(TimeUnit.MICROSECONDS), c.getRecordCount()); spilledBatches--; currentContainer.zeroVectors(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 51aab67433f..1f3c2d0aac1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -58,6 +58,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.SchemaUtil; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -66,6 +67,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; +import org.apache.drill.exec.util.BatchPrinter; import org.apache.drill.exec.vector.CopyUtil; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; @@ -289,29 +291,42 @@ public IterOutcome innerNext() { case STOP: return upstream; case OK_NEW_SCHEMA: + case OK: + VectorContainer convertedBatch; // only change in the case that the schema truly changes. Artificial schema changes are ignored. - if (!incoming.getSchema().equals(schema)) { + if (upstream == IterOutcome.OK_NEW_SCHEMA && !incoming.getSchema().equals(schema)) { if (schema != null) { - throw new SchemaChangeException(); + if (unionTypeEnabled) { + this.schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema()); + } else { + throw new SchemaChangeException("Schema changes not supported in External Sort. Please enable Union type"); + } + } else { + schema = incoming.getSchema(); + } + convertedBatch = SchemaUtil.coerceContainer(incoming, schema, oContext); + for (BatchGroup b : batchGroups) { + b.setSchema(schema); + } + for (BatchGroup b : spilledBatchGroups) { + b.setSchema(schema); } - this.schema = incoming.getSchema(); - this.sorter = createNewSorter(context, incoming); + this.sorter = createNewSorter(context, convertedBatch); + } else { + convertedBatch = SchemaUtil.coerceContainer(incoming, schema, oContext); } - // fall through. - case OK: if (first) { first = false; } - if (incoming.getRecordCount() == 0) { - for (VectorWrapper w : incoming) { + if (convertedBatch.getRecordCount() == 0) { + for (VectorWrapper w : convertedBatch) { w.clear(); } break; } - totalSizeInMemory += getBufferSize(incoming); SelectionVector2 sv2; if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) { - sv2 = incoming.getSelectionVector2(); + sv2 = incoming.getSelectionVector2().clone(); if (sv2.getBuffer(false).isRootBuffer()) { oContext.getAllocator().takeOwnership(sv2.getBuffer(false)); } @@ -324,18 +339,16 @@ public IterOutcome innerNext() { throw new OutOfMemoryException(e); } } + totalSizeInMemory += getBufferSize(convertedBatch); int count = sv2.getCount(); totalCount += count; - totalBatches++; - sorter.setup(context, sv2, incoming); + sorter.setup(context, sv2, convertedBatch); sorter.sort(sv2); - RecordBatchData rbd = new RecordBatchData(incoming); + RecordBatchData rbd = new RecordBatchData(convertedBatch); boolean success = false; try { - if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE) { - rbd.setSv2(sv2); - } - batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2())); + rbd.setSv2(sv2); + batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2(), oContext)); batchesSinceLastSpill++; if (// We have spilled at least once and the current memory used is more than the 75% of peak memory used. (spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) || @@ -531,7 +544,7 @@ public BatchGroup mergeAndSpill(LinkedList batchGroups) throws Schem c1.setRecordCount(count); String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++); - BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator()); + BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext); boolean threw = true; // true if an exception is thrown in the try block below logger.info("Merging and spilling to {}", outputFile); try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java index d42e8d483d7..6e37a6828b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java @@ -70,7 +70,7 @@ public void setup(final FragmentContext context, final BufferAllocator allocator runStarts.add(i); batch = newBatch; } else { - throw new UnsupportedOperationException("Missing batch"); + throw new UnsupportedOperationException(String.format("Missing batch. batch: %d newBatch: %d", batch, newBatch)); } } final DrillBuf drillBuf = allocator.buffer(4 * totalCount); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java index b37229d2e24..b4986ba567f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java @@ -21,10 +21,11 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.selection.SelectionVector2; public interface SingleBatchSorter { - public void setup(FragmentContext context, SelectionVector2 vector2, RecordBatch incoming) throws SchemaChangeException; + public void setup(FragmentContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException; public void sort(SelectionVector2 vector2); public static TemplateClassDefinition TEMPLATE_DEFINITION = new TemplateClassDefinition(SingleBatchSorter.class, SingleBatchSorterTemplate.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java index 6c32f48ed8e..fb123af2522 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java @@ -24,6 +24,7 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.QuickSort; @@ -36,7 +37,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In private SelectionVector2 vector2; - public void setup(FragmentContext context, SelectionVector2 vector2, RecordBatch incoming) throws SchemaChangeException{ + public void setup(FragmentContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{ Preconditions.checkNotNull(vector2); this.vector2 = vector2; try { @@ -71,7 +72,7 @@ public int compare(int leftIndex, int rightIndex) { return doEval(sv1, sv2); } - public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") RecordBatch outgoing); public abstract int doEval(@Named("leftIndex") char leftIndex, @Named("rightIndex") char rightIndex); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java index da2e9ebb76f..359114897f4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java @@ -22,6 +22,8 @@ import java.util.List; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.drill.common.types.TypeProtos.MajorType; public class BatchSchema implements Iterable { @@ -113,10 +115,45 @@ public boolean equals(Object obj) { } else if (!fields.equals(other.fields)) { return false; } + for (int i = 0; i < fields.size(); i++) { + MajorType t1 = fields.get(i).getType(); + MajorType t2 = other.fields.get(i).getType(); + if (t1 == null) { + if (t2 != null) { + return false; + } + } else { + if (!majorTypeEqual(t1, t2)) { + return false; + } + } + } if (selectionVectorMode != other.selectionVectorMode) { return false; } return true; } + /** + * We treat fields with same set of Subtypes as equal, even if they are in a different order + * @param t1 + * @param t2 + * @return + */ + private boolean majorTypeEqual(MajorType t1, MajorType t2) { + if (t1.equals(t2)) { + return true; + } + if (!t1.getMinorType().equals(t2.getMinorType())) { + return false; + } + if (!t1.getMode().equals(t2.getMode())) { + return false; + } + if (!Sets.newHashSet(t1.getSubTypeList()).equals(Sets.newHashSet(t2.getSubTypeList()))) { + return false; + } + return true; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java index 6bfb483e3a7..a1557e6d754 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java @@ -24,6 +24,7 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.drill.exec.vector.complex.AbstractMapVector; +import org.apache.drill.exec.vector.complex.FieldIdUtil; import org.apache.drill.exec.vector.complex.MapVector; import com.google.common.base.Preconditions; @@ -110,27 +111,7 @@ public VectorWrapper getChildWrapper(int[] ids) { @Override public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) { ValueVector v = vectors[0]; - if (!expectedPath.getRootSegment().segmentEquals(v.getField().getPath().getRootSegment())) { - return null; - } - - if (v instanceof AbstractContainerVector) { - // we're looking for a multi path. - AbstractContainerVector c = (AbstractContainerVector) v; - TypedFieldId.Builder builder = TypedFieldId.newBuilder(); - builder.intermediateType(v.getField().getType()); - builder.hyper(); - builder.addId(id); - return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild()); - - } else { - return TypedFieldId.newBuilder() // - .intermediateType(v.getField().getType()) // - .finalType(v.getField().getType()) // - .addId(id) // - .hyper() // - .build(); - } + return FieldIdUtil.getFieldId(v, id, expectedPath, true); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java new file mode 100644 index 00000000000..8a0954eba3e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.sort.RecordBatchData; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.UnionVector; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Utility class for dealing with changing schemas + */ +public class SchemaUtil { + + /** + * Returns the merger of schemas. The merged schema will include the union all columns. If there is a type conflict + * between columns with the same schemapath but different types, the merged schema will contain a Union type. + * @param schemas + * @return + */ + public static BatchSchema mergeSchemas(BatchSchema... schemas) { + Map> typeSetMap = Maps.newLinkedHashMap(); + + for (BatchSchema s : schemas) { + for (MaterializedField field : s) { + SchemaPath path = field.getPath(); + Set currentTypes = typeSetMap.get(path); + if (currentTypes == null) { + currentTypes = Sets.newHashSet(); + typeSetMap.put(path, currentTypes); + } + MinorType newType = field.getType().getMinorType(); + if (newType == MinorType.MAP || newType == MinorType.LIST) { + throw new RuntimeException("Schema change not currently supported for schemas with complex types"); + } + if (newType == MinorType.UNION) { + for (MinorType subType : field.getType().getSubTypeList()) { + currentTypes.add(subType); + } + } else { + currentTypes.add(newType); + } + } + } + + List fields = Lists.newArrayList(); + + for (SchemaPath path : typeSetMap.keySet()) { + Set types = typeSetMap.get(path); + if (types.size() > 1) { + MajorType.Builder builder = MajorType.newBuilder().setMinorType(MinorType.UNION).setMode(DataMode.OPTIONAL); + for (MinorType t : types) { + builder.addSubType(t); + } + MaterializedField field = MaterializedField.create(path, builder.build()); + fields.add(field); + } else { + MaterializedField field = MaterializedField.create(path, Types.optional(types.iterator().next())); + fields.add(field); + } + } + + SchemaBuilder schemaBuilder = new SchemaBuilder(); + BatchSchema s = schemaBuilder.addFields(fields).setSelectionVectorMode(schemas[0].getSelectionVectorMode()).build(); + return s; + } + + /** + * Creates a copy a record batch, converting any fields as necessary to coerce it into the provided schema + * @param in + * @param toSchema + * @param context + * @return + */ + public static VectorContainer coerceContainer(VectorAccessible in, BatchSchema toSchema, OperatorContext context) { + int recordCount = in.getRecordCount(); + Map vectorMap = Maps.newHashMap(); + for (VectorWrapper w : in) { + ValueVector v = w.getValueVector(); + vectorMap.put(v.getField().getPath(), v); + } + + VectorContainer c = new VectorContainer(context); + + for (MaterializedField field : toSchema) { + ValueVector v = vectorMap.remove(field.getPath()); + if (v != null) { + int valueCount = v.getAccessor().getValueCount(); + TransferPair tp = v.getTransferPair(); + tp.transfer(); + if (v.getField().getType().getMinorType().equals(field.getType().getMinorType())) { + if (field.getType().getMinorType() == MinorType.UNION) { + UnionVector u = (UnionVector) tp.getTo(); + for (MinorType t : field.getType().getSubTypeList()) { + if (u.getField().getType().getSubTypeList().contains(t)) { + continue; + } + u.addSubType(t); + } + } + c.add(tp.getTo()); + } else { + ValueVector newVector = TypeHelper.getNewVector(field, context.getAllocator()); + Preconditions.checkState(field.getType().getMinorType() == MinorType.UNION, "Can only convert vector to Union vector"); + UnionVector u = (UnionVector) newVector; + u.addVector(tp.getTo()); + MinorType type = v.getField().getType().getMinorType(); + for (int i = 0; i < valueCount; i++) { + u.getMutator().setType(i, type); + } + for (MinorType t : field.getType().getSubTypeList()) { + if (u.getField().getType().getSubTypeList().contains(t)) { + continue; + } + u.addSubType(t); + } + u.getMutator().setValueCount(valueCount); + c.add(u); + } + } else { + v = TypeHelper.getNewVector(field, context.getAllocator()); + v.allocateNew(); + v.getMutator().setValueCount(recordCount); + c.add(v); + } + } + c.buildSchema(in.getSchema().getSelectionVectorMode()); + c.setRecordCount(recordCount); + Preconditions.checkState(vectorMap.size() == 0, "Leftover vector from incoming batch"); + return c; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java index f767e74ee3d..f1b60d49876 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java @@ -29,6 +29,7 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.drill.exec.vector.complex.AbstractMapVector; +import org.apache.drill.exec.vector.complex.FieldIdUtil; import org.apache.drill.exec.vector.complex.ListVector; import org.apache.drill.exec.vector.complex.MapVector; import org.apache.drill.exec.vector.complex.UnionVector; @@ -109,62 +110,7 @@ public VectorWrapper getChildWrapper(int[] ids) { @Override public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) { - if (!expectedPath.getRootSegment().segmentEquals(vector.getField().getPath().getRootSegment())) { - return null; - } - PathSegment seg = expectedPath.getRootSegment(); - - if (vector instanceof UnionVector) { - TypedFieldId.Builder builder = TypedFieldId.newBuilder(); - builder.addId(id).remainder(expectedPath.getRootSegment().getChild()); - List minorTypes = ((UnionVector) vector).getSubTypes(); - MajorType.Builder majorTypeBuilder = MajorType.newBuilder().setMinorType(MinorType.UNION); - for (MinorType type : minorTypes) { - majorTypeBuilder.addSubType(type); - } - MajorType majorType = majorTypeBuilder.build(); - builder.intermediateType(majorType); - if (seg.isLastPath()) { - builder.finalType(majorType); - return builder.build(); - } else { - return ((UnionVector) vector).getFieldIdIfMatches(builder, false, seg.getChild()); - } - } else if (vector instanceof ListVector) { - ListVector list = (ListVector) vector; - TypedFieldId.Builder builder = TypedFieldId.newBuilder(); - builder.intermediateType(vector.getField().getType()); - builder.addId(id); - return list.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild()); - } else - if (vector instanceof AbstractContainerVector) { - // we're looking for a multi path. - AbstractContainerVector c = (AbstractContainerVector) vector; - TypedFieldId.Builder builder = TypedFieldId.newBuilder(); - builder.intermediateType(vector.getField().getType()); - builder.addId(id); - return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild()); - - } else { - TypedFieldId.Builder builder = TypedFieldId.newBuilder(); - builder.intermediateType(vector.getField().getType()); - builder.addId(id); - builder.finalType(vector.getField().getType()); - if (seg.isLastPath()) { - return builder.build(); - } else { - PathSegment child = seg.getChild(); - if (child.isArray() && child.isLastPath()) { - builder.remainder(child); - builder.withIndex(); - builder.finalType(vector.getField().getType().toBuilder().setMode(DataMode.OPTIONAL).build()); - return builder.build(); - } else { - return null; - } - - } - } + return FieldIdUtil.getFieldId(getValueVector(), id, expectedPath, false); } public void transfer(VectorWrapper destination) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java index 40ee63d4261..198c0b5c8ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; @@ -90,4 +91,40 @@ public static void printBatch(VectorAccessible batch) { System.out.printf("|\n"); } } + + public static void printBatch(VectorAccessible batch, SelectionVector2 sv2) { + List columns = Lists.newArrayList(); + List vectors = Lists.newArrayList(); + for (VectorWrapper vw : batch) { + columns.add(vw.getValueVector().getField().getAsSchemaPath().toExpr()); + vectors.add(vw.getValueVector()); + } + int width = columns.size(); + int rows = vectors.get(0).getMetadata().getValueCount(); + for (int i = 0; i < rows; i++) { + if (i%50 == 0) { + System.out.println(StringUtils.repeat("-", width * 17 + 1)); + for (String column : columns) { + System.out.printf("| %-15s", width <= 15 ? column : column.substring(0, 14)); + } + System.out.printf("|\n"); + System.out.println(StringUtils.repeat("-", width*17 + 1)); + } + int row = sv2.getIndex(i); + for (ValueVector vv : vectors) { + Object o = vv.getAccessor().getObject(row); + String value; + if (o == null) { + value = "null"; + } else + if (o instanceof byte[]) { + value = new String((byte[]) o); + } else { + value = o.toString(); + } + System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14)); + } + System.out.printf("|\n"); + } + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java new file mode 100644 index 00000000000..3e55d9ded74 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestExternalSort.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.xsort; + +import org.apache.drill.BaseTestQuery; +import org.apache.drill.TestBuilder; +import org.junit.Test; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; + +public class TestExternalSort extends BaseTestQuery { + + @Test + public void testNumericTypes() throws Exception { + final int record_count = 10000; + String dfs_temp = getDfsTestTmpSchemaLocation(); + System.out.println(dfs_temp); + File table_dir = new File(dfs_temp, "numericTypes"); + table_dir.mkdir(); + BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(new File(table_dir, "a.json"))); + String format = "{ a : %d }%n"; + for (int i = 0; i <= record_count; i += 2) { + os.write(String.format(format, i).getBytes()); + } + os.close(); + os = new BufferedOutputStream(new FileOutputStream(new File(table_dir, "b.json"))); + format = "{ a : %.2f }%n"; + for (int i = 1; i <= record_count; i+=2) { + os.write(String.format(format, (float) i).getBytes()); + } + os.close(); + String query = "select * from dfs_test.tmp.numericTypes order by a desc"; + TestBuilder builder = testBuilder() + .sqlQuery(query) + .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true") + .ordered() + .baselineColumns("a"); + for (int i = record_count; i >= 0;) { + builder.baselineValues((long) i--); + if (i >= 0) { + builder.baselineValues((double) i--); + } + } + builder.go(); + } + + @Test + public void testNumericAndStringTypes() throws Exception { + final int record_count = 10000; + String dfs_temp = getDfsTestTmpSchemaLocation(); + System.out.println(dfs_temp); + File table_dir = new File(dfs_temp, "numericAndStringTypes"); + table_dir.mkdir(); + BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(new File(table_dir, "a.json"))); + String format = "{ a : %d }%n"; + for (int i = 0; i <= record_count; i += 2) { + os.write(String.format(format, i).getBytes()); + } + os.close(); + os = new BufferedOutputStream(new FileOutputStream(new File(table_dir, "b.json"))); + format = "{ a : \"%05d\" }%n"; + for (int i = 1; i <= record_count; i+=2) { + os.write(String.format(format, i).getBytes()); + } + os.close(); + String query = "select * from dfs_test.tmp.numericAndStringTypes order by a desc"; + TestBuilder builder = testBuilder() + .sqlQuery(query) + .ordered() + .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true") + .baselineColumns("a"); + // Strings come first because order by is desc + for (int i = record_count; i >= 0;) { + i--; + if (i >= 0) { + builder.baselineValues(String.format("%05d", i--)); + } + } + for (int i = record_count; i >= 0;) { + builder.baselineValues((long) i--); + i--; + } + builder.go(); + } + + @Test + public void testNewColumns() throws Exception { + final int record_count = 10000; + String dfs_temp = getDfsTestTmpSchemaLocation(); + System.out.println(dfs_temp); + File table_dir = new File(dfs_temp, "newColumns"); + table_dir.mkdir(); + BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(new File(table_dir, "a.json"))); + String format = "{ a : %d, b : %d }%n"; + for (int i = 0; i <= record_count; i += 2) { + os.write(String.format(format, i, i).getBytes()); + } + os.close(); + os = new BufferedOutputStream(new FileOutputStream(new File(table_dir, "b.json"))); + format = "{ a : %d, c : %d }%n"; + for (int i = 1; i <= record_count; i+=2) { + os.write(String.format(format, i, i).getBytes()); + } + os.close(); + String query = "select a, b, c from dfs_test.tmp.newColumns order by a desc"; +// Test framework currently doesn't handle changing schema (i.e. new columns) on the client side + TestBuilder builder = testBuilder() + .sqlQuery(query) + .ordered() + .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true") + .baselineColumns("a", "b", "c"); + for (int i = record_count; i >= 0;) { + builder.baselineValues((long) i, (long) i--, null); + if (i >= 0) { + builder.baselineValues((long) i, null, (long) i--); + } + } + builder.go(); + String newQuery = "select * from dfs_test.tmp.newColumns order by a desc"; + test(newQuery); + } +} diff --git a/exec/vector/src/main/codegen/templates/ComplexCopier.java b/exec/vector/src/main/codegen/templates/ComplexCopier.java index b8b76161fb8..8255489d56a 100644 --- a/exec/vector/src/main/codegen/templates/ComplexCopier.java +++ b/exec/vector/src/main/codegen/templates/ComplexCopier.java @@ -80,7 +80,9 @@ private static void writeValue(FieldReader reader, FieldWriter writer) { if (reader.isSet()) { Nullable${name}Holder ${uncappedName}Holder = new Nullable${name}Holder(); reader.read(${uncappedName}Holder); - writer.write${name}(<#list fields as field>${uncappedName}Holder.${field.name}<#if field_has_next>, ); + if (${uncappedName}Holder.isSet == 1) { + writer.write${name}(<#list fields as field>${uncappedName}Holder.${field.name}<#if field_has_next>, ); + } } break; diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java b/exec/vector/src/main/codegen/templates/UnionVector.java index 2ac4e967c12..cc541e599cc 100644 --- a/exec/vector/src/main/codegen/templates/UnionVector.java +++ b/exec/vector/src/main/codegen/templates/UnionVector.java @@ -31,6 +31,7 @@ import org.apache.drill.exec.vector.complex.impl.ComplexCopier; import org.apache.drill.exec.util.CallBack; import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.exec.expr.BasicTypeHelper; /* * This class is generated using freemarker and the ${.template_name} template. @@ -83,8 +84,12 @@ public List getSubTypes() { return majorType.getSubTypeList(); } - private void addSubType(MinorType type) { + public void addSubType(MinorType type) { + if (majorType.getSubTypeList().contains(type)) { + return; + } majorType = MajorType.newBuilder(this.majorType).addSubType(type).build(); + field = MaterializedField.create(field.getPath(), majorType); if (callBack != null) { callBack.doWork(); } @@ -225,8 +230,11 @@ public void copyFromSafe(int inIndex, int outIndex, UnionVector from) { public void addVector(ValueVector v) { String name = v.getField().getType().getMinorType().name().toLowerCase(); + MajorType type = v.getField().getType(); Preconditions.checkState(internalMap.getChild(name) == null, String.format("%s vector already exists", name)); - internalMap.putChild(name, v); + ValueVector newVector = internalMap.addOrGet(name, type, (Class) BasicTypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); + v.makeTransferPair(newVector).transfer(); + internalMap.putChild(name, newVector); addSubType(v.getField().getType().getMinorType()); } @@ -390,9 +398,7 @@ public void get(int index, ComplexHolder holder) { } public void get(int index, UnionHolder holder) { - if (reader == null) { - reader = new UnionReader(UnionVector.this); - } + FieldReader reader = new UnionReader(UnionVector.this); reader.setPosition(index); holder.reader = reader; } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java index 581a54dd370..65973113811 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractMapVector.java @@ -137,7 +137,7 @@ public T addOrGet(String name, TypeProtos.MajorType type } return vector; } - final String message = "Drill does not support schema change yet. Existing[{}] and desired[{}] vector types mismatch"; + final String message = "Drill does not support schema change yet. Existing[%s] and desired[%s] vector types mismatch"; throw new IllegalStateException(String.format(message, existing.getClass().getSimpleName(), clazz.getSimpleName())); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java index 468a7f30c27..935e93cdebd 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/FieldIdUtil.java @@ -18,11 +18,15 @@ package org.apache.drill.exec.vector.complex; import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.vector.ValueVector; +import java.util.List; + public class FieldIdUtil { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FieldIdUtil.class); public static TypedFieldId getFieldIdIfMatches(ValueVector vector, TypedFieldId.Builder builder, boolean addToBreadCrumb, PathSegment seg) { @@ -121,4 +125,63 @@ public static TypedFieldId getFieldIdIfMatches(ValueVector vector, TypedFieldId. } } } + + public static TypedFieldId getFieldId(ValueVector vector, int id, SchemaPath expectedPath, boolean hyper) { + if (!expectedPath.getRootSegment().segmentEquals(vector.getField().getPath().getRootSegment())) { + return null; + } + PathSegment seg = expectedPath.getRootSegment(); + + TypedFieldId.Builder builder = TypedFieldId.newBuilder(); + if (hyper) { + builder.hyper(); + } + if (vector instanceof UnionVector) { + builder.addId(id).remainder(expectedPath.getRootSegment().getChild()); + List minorTypes = ((UnionVector) vector).getSubTypes(); + MajorType.Builder majorTypeBuilder = MajorType.newBuilder().setMinorType(MinorType.UNION); + for (MinorType type : minorTypes) { + majorTypeBuilder.addSubType(type); + } + MajorType majorType = majorTypeBuilder.build(); + builder.intermediateType(majorType); + if (seg.isLastPath()) { + builder.finalType(majorType); + return builder.build(); + } else { + return ((UnionVector) vector).getFieldIdIfMatches(builder, false, seg.getChild()); + } + } else if (vector instanceof ListVector) { + ListVector list = (ListVector) vector; + builder.intermediateType(vector.getField().getType()); + builder.addId(id); + return list.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild()); + } else + if (vector instanceof AbstractContainerVector) { + // we're looking for a multi path. + AbstractContainerVector c = (AbstractContainerVector) vector; + builder.intermediateType(vector.getField().getType()); + builder.addId(id); + return c.getFieldIdIfMatches(builder, true, expectedPath.getRootSegment().getChild()); + + } else { + builder.intermediateType(vector.getField().getType()); + builder.addId(id); + builder.finalType(vector.getField().getType()); + if (seg.isLastPath()) { + return builder.build(); + } else { + PathSegment child = seg.getChild(); + if (child.isArray() && child.isLastPath()) { + builder.remainder(child); + builder.withIndex(); + builder.finalType(vector.getField().getType().toBuilder().setMode(DataMode.OPTIONAL).build()); + return builder.build(); + } else { + return null; + } + + } + } + } }