From e5a529d7a276597f2b62cdcb9a1cab2fae8bc52f Mon Sep 17 00:00:00 2001 From: Steven Phillips Date: Thu, 1 Oct 2015 03:26:34 -0700 Subject: [PATCH 1/2] DRILL-3229: Implement Union type vector --- .../common/expression/fn/CastFunctions.java | 1 + .../org/apache/drill/common/types/Types.java | 5 + .../templates/AbstractFieldReader.java | 4 + .../templates/AbstractFieldWriter.java | 10 + .../templates/AbstractRecordWriter.java | 5 + .../main/codegen/templates/BaseReader.java | 3 + .../main/codegen/templates/BaseWriter.java | 4 +- .../main/codegen/templates/ComplexCopier.java | 134 ++++++ .../codegen/templates/ComplexReaders.java | 9 +- .../templates/EventBasedRecordWriter.java | 4 + .../codegen/templates/HolderReaderImpl.java | 22 + .../main/codegen/templates/ListWriters.java | 8 +- .../main/codegen/templates/MapWriters.java | 46 +- .../main/codegen/templates/NullReader.java | 10 +- .../main/codegen/templates/RecordWriter.java | 1 + .../main/codegen/templates/TypeHelper.java | 25 +- .../codegen/templates/UnionFunctions.java | 88 ++++ .../codegen/templates/UnionListWriter.java | 187 ++++++++ .../main/codegen/templates/UnionReader.java | 183 ++++++++ .../main/codegen/templates/UnionVector.java | 433 ++++++++++++++++++ .../main/codegen/templates/UnionWriter.java | 228 +++++++++ .../org/apache/drill/exec/ExecConstants.java | 2 + .../exec/expr/ExpressionTreeMaterializer.java | 83 ++-- .../drill/exec/expr/GetSetVectorHelper.java | 4 + .../drill/exec/expr/fn/DrillFuncHolder.java | 2 +- .../exec/expr/fn/impl/MappifyUtility.java | 4 +- .../exec/expr/fn/impl/UnionFunctions.java | 115 +++++ .../drill/exec/expr/holders/UnionHolder.java | 37 ++ .../base/AbstractPhysicalVisitor.java | 6 + .../impl/filter/FilterRecordBatch.java | 4 +- .../impl/flatten/FlattenRecordBatch.java | 4 +- .../impl/project/ProjectRecordBatch.java | 5 +- .../exec/record/AbstractRecordBatch.java | 9 + .../exec/record/SimpleVectorWrapper.java | 11 + .../exec/resolver/ResolverTypePrecedence.java | 1 + .../drill/exec/resolver/TypeCastRules.java | 10 + .../server/options/SystemOptionManager.java | 1 + .../exec/store/avro/MapOrListWriter.java | 4 +- .../store/easy/json/JSONRecordReader.java | 4 +- .../store/easy/json/JsonRecordWriter.java | 24 + .../vector/accessor/UnionSqlAccessor.java | 129 ++++++ .../drill/exec/vector/complex/ListVector.java | 292 ++++++++++++ .../exec/vector/complex/fn/JsonReader.java | 147 +++--- .../exec/vector/complex/fn/JsonWriter.java | 11 +- .../complex/impl/AbstractBaseReader.java | 17 + .../complex/impl/ComplexWriterImpl.java | 12 +- .../vector/complex/impl/UnionListReader.java | 90 ++++ .../complex/impl/VectorContainerWriter.java | 8 +- .../drill/exec/store/TestOutputMutator.java | 4 + .../vector/complex/writer/TestJsonReader.java | 104 +++++ .../vector/complex/writer/TestRepeated.java | 24 +- .../src/test/resources/jsoninput/union/a.json | 52 +++ .../drill/jdbc/test/TestJdbcDistQuery.java | 6 + .../apache/drill/common/types/MinorType.java | 4 +- .../apache/drill/common/types/TypeProtos.java | 18 +- protocol/src/main/protobuf/Types.proto | 1 + 56 files changed, 2504 insertions(+), 155 deletions(-) create mode 100644 exec/java-exec/src/main/codegen/templates/ComplexCopier.java create mode 100644 exec/java-exec/src/main/codegen/templates/UnionFunctions.java create mode 100644 exec/java-exec/src/main/codegen/templates/UnionListWriter.java create mode 100644 exec/java-exec/src/main/codegen/templates/UnionReader.java create mode 100644 exec/java-exec/src/main/codegen/templates/UnionVector.java create mode 100644 exec/java-exec/src/main/codegen/templates/UnionWriter.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/UnionHolder.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/UnionSqlAccessor.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/UnionListReader.java create mode 100644 exec/java-exec/src/test/resources/jsoninput/union/a.json diff --git a/common/src/main/java/org/apache/drill/common/expression/fn/CastFunctions.java b/common/src/main/java/org/apache/drill/common/expression/fn/CastFunctions.java index c0eaa9042e9..34997ab796d 100644 --- a/common/src/main/java/org/apache/drill/common/expression/fn/CastFunctions.java +++ b/common/src/main/java/org/apache/drill/common/expression/fn/CastFunctions.java @@ -36,6 +36,7 @@ public class CastFunctions { private static Map CAST_FUNC_REPLACEMENT_FROM_NULLABLE = new HashMap<>(); static { + TYPE2FUNC.put(MinorType.UNION, "castUNION"); TYPE2FUNC.put(MinorType.BIGINT, "castBIGINT"); TYPE2FUNC.put(MinorType.INT, "castINT"); TYPE2FUNC.put(MinorType.BIT, "castBIT"); diff --git a/common/src/main/java/org/apache/drill/common/types/Types.java b/common/src/main/java/org/apache/drill/common/types/Types.java index 69b1b4c72d0..adc3eef49e2 100644 --- a/common/src/main/java/org/apache/drill/common/types/Types.java +++ b/common/src/main/java/org/apache/drill/common/types/Types.java @@ -143,6 +143,7 @@ public static String getSqlTypeName(final MajorType type) { case MAP: return "MAP"; case LATE: return "ANY"; case NULL: return "NULL"; + case UNION: return "UNION"; // Internal types not actually used at level of SQL types(?): @@ -228,6 +229,8 @@ public static int getJdbcTypeCode(final MajorType type) { return java.sql.Types.VARBINARY; case VARCHAR: return java.sql.Types.VARCHAR; + case UNION: + return java.sql.Types.OTHER; default: // TODO: This isn't really an unsupported-operation/-type case; this // is an unexpected, code-out-of-sync-with-itself case, so use an @@ -290,6 +293,7 @@ public static boolean isJdbcSignedType( final MajorType type ) { case LATE: case LIST: case MAP: + case UNION: case NULL: case TIMETZ: // SQL TIME WITH TIME ZONE case TIMESTAMPTZ: // SQL TIMESTAMP WITH TIME ZONE @@ -340,6 +344,7 @@ public static boolean isFixedWidthType(final MajorType type) { case VARBINARY: case VAR16CHAR: case VARCHAR: + case UNION: return false; default: return true; diff --git a/exec/java-exec/src/main/codegen/templates/AbstractFieldReader.java b/exec/java-exec/src/main/codegen/templates/AbstractFieldReader.java index 5420f998434..89afd7c5505 100644 --- a/exec/java-exec/src/main/codegen/templates/AbstractFieldReader.java +++ b/exec/java-exec/src/main/codegen/templates/AbstractFieldReader.java @@ -67,6 +67,10 @@ public void copyAsField(String name, ListWriter writer){ <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> <#assign boxedType = (minor.boxedType!type.boxedType) /> + public void read(${name}Holder holder){ + fail("${name}"); + } + public void read(Nullable${name}Holder holder){ fail("${name}"); } diff --git a/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java b/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java index 9b673048e56..2da714111ce 100644 --- a/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java +++ b/exec/java-exec/src/main/codegen/templates/AbstractFieldWriter.java @@ -45,6 +45,16 @@ public void end() { throw new IllegalStateException(String.format("You tried to end when you are using a ValueWriter of type %s.", this.getClass().getSimpleName())); } + @Override + public void startList() { + throw new IllegalStateException(String.format("You tried to start when you are using a ValueWriter of type %s.", this.getClass().getSimpleName())); + } + + @Override + public void endList() { + throw new IllegalStateException(String.format("You tried to end when you are using a ValueWriter of type %s.", this.getClass().getSimpleName())); + } + <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> <#assign fields = minor.fields!type.fields /> @Override diff --git a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java index 5f1f42f239d..13f74827c0b 100644 --- a/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/AbstractRecordWriter.java @@ -54,6 +54,11 @@ public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldRea throw new UnsupportedOperationException("Doesn't support writing Map'"); } + @Override + public FieldConverter getNewUnionConverter(int fieldId, String fieldName, FieldReader reader) { + throw new UnsupportedOperationException("Doesn't support writing Union type'"); + } + @Override public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) { throw new UnsupportedOperationException("Doesn't support writing RepeatedMap"); diff --git a/exec/java-exec/src/main/codegen/templates/BaseReader.java b/exec/java-exec/src/main/codegen/templates/BaseReader.java index 116deda8960..4fce409ecdd 100644 --- a/exec/java-exec/src/main/codegen/templates/BaseReader.java +++ b/exec/java-exec/src/main/codegen/templates/BaseReader.java @@ -33,6 +33,9 @@ public interface BaseReader extends Positionable{ MajorType getType(); MaterializedField getField(); void reset(); + void read(UnionHolder holder); + void read(int index, UnionHolder holder); + void copyAsValue(UnionWriter writer); public interface MapReader extends BaseReader, Iterable{ FieldReader reader(String name); diff --git a/exec/java-exec/src/main/codegen/templates/BaseWriter.java b/exec/java-exec/src/main/codegen/templates/BaseWriter.java index 76978801fb3..da27e660f6f 100644 --- a/exec/java-exec/src/main/codegen/templates/BaseWriter.java +++ b/exec/java-exec/src/main/codegen/templates/BaseWriter.java @@ -57,8 +57,8 @@ public interface MapWriter extends BaseWriter { } public interface ListWriter extends BaseWriter { - void start(); - void end(); + void startList(); + void endList(); MapWriter map(); ListWriter list(); void copyReader(FieldReader reader); diff --git a/exec/java-exec/src/main/codegen/templates/ComplexCopier.java b/exec/java-exec/src/main/codegen/templates/ComplexCopier.java new file mode 100644 index 00000000000..c3b5cb56b0e --- /dev/null +++ b/exec/java-exec/src/main/codegen/templates/ComplexCopier.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +<@pp.dropOutputFile /> +<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/ComplexCopier.java" /> + + +<#include "/@includes/license.ftl" /> + +package org.apache.drill.exec.vector.complex.impl; + +<#include "/@includes/vv_imports.ftl" /> + +/* + * This class is generated using freemarker and the ${.template_name} template. + */ +@SuppressWarnings("unused") +public class ComplexCopier { + + FieldReader in; + FieldWriter out; + + public ComplexCopier(FieldReader in, FieldWriter out) { + this.in = in; + this.out = out; + } + + public void write() { + writeValue(in, out); + } + + private void writeValue(FieldReader reader, FieldWriter writer) { + final DataMode m = reader.getType().getMode(); + final MinorType mt = reader.getType().getMinorType(); + + switch(m){ + case OPTIONAL: + case REQUIRED: + + + switch (mt) { + + case LIST: + writer.startList(); + while (reader.next()) { + writeValue(reader.reader(), getListWriterForReader(reader.reader(), writer)); + } + writer.endList(); + break; + case MAP: + writer.start(); + if (reader.isSet()) { + for(String name : reader){ + FieldReader childReader = reader.reader(name); + if(childReader.isSet()){ + writeValue(childReader, getMapWriterForReader(childReader, writer, name)); + } + } + } + writer.end(); + break; + <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> + <#assign fields = minor.fields!type.fields /> + <#assign uncappedName = name?uncap_first/> + <#if !minor.class?starts_with("Decimal")> + + case ${name?upper_case}: + if (reader.isSet()) { + Nullable${name}Holder ${uncappedName}Holder = new Nullable${name}Holder(); + reader.read(${uncappedName}Holder); + writer.write${name}(<#list fields as field>${uncappedName}Holder.${field.name}<#if field_has_next>, ); + } + break; + + + + } + break; + } + } + + private FieldWriter getMapWriterForReader(FieldReader reader, MapWriter writer, String name) { + switch (reader.getType().getMinorType()) { + <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> + <#assign fields = minor.fields!type.fields /> + <#assign uncappedName = name?uncap_first/> + <#if !minor.class?starts_with("Decimal")> + case ${name?upper_case}: + return (FieldWriter) writer.<#if name == "Int">integer<#else>${uncappedName}(name); + + + case MAP: + return (FieldWriter) writer.map(name); + case LIST: + return (FieldWriter) writer.list(name); + default: + throw new UnsupportedOperationException(reader.getType().toString()); + } + } + + private FieldWriter getListWriterForReader(FieldReader reader, ListWriter writer) { + switch (reader.getType().getMinorType()) { + <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> + <#assign fields = minor.fields!type.fields /> + <#assign uncappedName = name?uncap_first/> + <#if !minor.class?starts_with("Decimal")> + case ${name?upper_case}: + return (FieldWriter) writer.<#if name == "Int">integer<#else>${uncappedName}(); + + + case MAP: + return (FieldWriter) writer.map(); + case LIST: + return (FieldWriter) writer.list(); + default: + throw new UnsupportedOperationException(reader.getType().toString()); + } + } +} diff --git a/exec/java-exec/src/main/codegen/templates/ComplexReaders.java b/exec/java-exec/src/main/codegen/templates/ComplexReaders.java index 068efb4441e..607b71d94c0 100644 --- a/exec/java-exec/src/main/codegen/templates/ComplexReaders.java +++ b/exec/java-exec/src/main/codegen/templates/ComplexReaders.java @@ -119,7 +119,13 @@ public void copyAsField(String name, MapWriter writer){ ${nullMode}${minor.class?cap_first}WriterImpl impl = (${nullMode}${minor.class?cap_first}WriterImpl) writer.${lowerName}(name); impl.vector.copyFromSafe(idx(), impl.idx(), vector); } - + + <#if nullMode != "Nullable"> + public void read(${minor.class?cap_first}Holder h){ + vector.getAccessor().get(idx(), h); + } + + public void read(Nullable${minor.class?cap_first}Holder h){ vector.getAccessor().get(idx(), h); } @@ -157,6 +163,7 @@ public interface ${name}Reader extends BaseReader{ public Object readObject(int arrayIndex); public ${friendlyType} read${safeType}(int arrayIndex); <#else> + public void read(${minor.class?cap_first}Holder h); public void read(Nullable${minor.class?cap_first}Holder h); public Object readObject(); public ${friendlyType} read${safeType}(); diff --git a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java index cf1529d215f..bf447c9c91a 100644 --- a/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/EventBasedRecordWriter.java @@ -31,6 +31,7 @@ import org.apache.drill.exec.planner.physical.WriterPrel; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.complex.impl.UnionReader; import org.apache.drill.exec.vector.complex.reader.FieldReader; import java.io.IOException; @@ -118,6 +119,9 @@ public void endField() throws IOException { } public static FieldConverter getConverter(RecordWriter recordWriter, int fieldId, String fieldName, FieldReader reader) { + if (reader instanceof UnionReader) { + return recordWriter.getNewUnionConverter(fieldId, fieldName, reader); + } switch (reader.getType().getMinorType()) { case MAP: switch (reader.getType().getMode()) { diff --git a/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java b/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java index 3bf94036c4e..9bb4f82dccd 100644 --- a/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java +++ b/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java @@ -30,6 +30,7 @@ <#assign friendlyType = (minor.friendlyType!minor.boxedType!type.boxedType) /> <#assign safeType=friendlyType /> <#if safeType=="byte[]"><#assign safeType="ByteArray" /> +<#assign fields = minor.fields!type.fields /> <@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/${holderMode}${name}HolderReaderImpl.java" /> <#include "/@includes/license.ftl" /> @@ -116,6 +117,22 @@ public boolean isSet() { } +<#if holderMode != "Repeated"> +@Override + public void read(${name}Holder h) { + <#list fields as field> + h.${field.name} = holder.${field.name}; + + } + + @Override + public void read(Nullable${name}Holder h) { + <#list fields as field> + h.${field.name} = holder.${field.name}; + + } + + <#if holderMode == "Repeated"> @Override public ${friendlyType} read${safeType}(int index){ @@ -262,6 +279,11 @@ private Object readSingleObject() { } +<#if holderMode != "Repeated" && nullMode != "Nullable"> + public void copyAsValue(${minor.class?cap_first}Writer writer){ + writer.write(holder); + } + } diff --git a/exec/java-exec/src/main/codegen/templates/ListWriters.java b/exec/java-exec/src/main/codegen/templates/ListWriters.java index f2683a3f024..16d41ecf1b8 100644 --- a/exec/java-exec/src/main/codegen/templates/ListWriters.java +++ b/exec/java-exec/src/main/codegen/templates/ListWriters.java @@ -181,7 +181,7 @@ public MaterializedField getField() { <#if mode == "Repeated"> - public void start() { + public void startList() { final RepeatedListVector list = (RepeatedListVector) container; final RepeatedListVector.RepeatedMutator mutator = list.getMutator(); @@ -202,7 +202,7 @@ public void start() { } } - public void end() { + public void endList() { // noop, we initialize state at start rather than end. } <#else> @@ -214,11 +214,11 @@ public void setPosition(int index) { } } - public void start() { + public void startList() { // noop } - public void end() { + public void endList() { // noop } diff --git a/exec/java-exec/src/main/codegen/templates/MapWriters.java b/exec/java-exec/src/main/codegen/templates/MapWriters.java index 968333897c2..42cacabd3ee 100644 --- a/exec/java-exec/src/main/codegen/templates/MapWriters.java +++ b/exec/java-exec/src/main/codegen/templates/MapWriters.java @@ -52,9 +52,18 @@ public class ${mode}MapWriter extends AbstractFieldWriter { private final Map fields = Maps.newHashMap(); <#if mode == "Repeated">private int currentChildIndex = 0; - public ${mode}MapWriter(${containerClass} container, FieldWriter parent) { + private final boolean unionEnabled; + private final boolean unionInternalMap; + + public ${mode}MapWriter(${containerClass} container, FieldWriter parent, boolean unionEnabled, boolean unionInternalMap) { super(parent); this.container = container; + this.unionEnabled = unionEnabled; + this.unionInternalMap = unionInternalMap; + } + + public ${mode}MapWriter(${containerClass} container, FieldWriter parent) { + this(container, parent, false, false); } @Override @@ -70,10 +79,15 @@ public MaterializedField getField() { @Override public MapWriter map(String name) { FieldWriter writer = fields.get(name.toLowerCase()); - if(writer == null) { - int vectorCount = container.size(); - MapVector vector = container.addOrGet(name, MapVector.TYPE, MapVector.class); - writer = new SingleMapWriter(vector, this); + if(writer == null){ + int vectorCount=container.size(); + if(!unionEnabled || unionInternalMap){ + MapVector vector=container.addOrGet(name,MapVector.TYPE,MapVector.class); + writer=new SingleMapWriter(vector,this); + } else { + UnionVector vector = container.addOrGet(name, Types.optional(MinorType.UNION), UnionVector.class); + writer = new UnionWriter(vector); + } if(vectorCount != container.size()) { writer.allocate(); } @@ -108,8 +122,16 @@ public void clear() { @Override public ListWriter list(String name) { FieldWriter writer = fields.get(name.toLowerCase()); + int vectorCount = container.size(); if(writer == null) { - writer = new SingleListWriter(name, container, this); + if (!unionEnabled){ + writer = new SingleListWriter(name,container,this); + } else{ + writer = new UnionWriter(container.addOrGet(name, Types.optional(MinorType.UNION), UnionVector.class)); + } + if (container.size() > vectorCount) { + writer.allocate(); + } writer.setPosition(${index}); fields.put(name.toLowerCase(), writer); } @@ -191,9 +213,17 @@ public void end() { FieldWriter writer = fields.get(name.toLowerCase()); if(writer == null) { - final ${vectName}Vector vector = container.addOrGet(name, ${upperName}_TYPE, ${vectName}Vector.class); + ValueVector vector; + if (unionEnabled){ + UnionVector v = container.addOrGet(name, Types.optional(MinorType.UNION), UnionVector.class); + writer = new UnionWriter(v); + vector = v; + } else { + ${vectName}Vector v = container.addOrGet(name, ${upperName}_TYPE, ${vectName}Vector.class); + writer = new ${vectName}WriterImpl(v, this); + vector = v; + } vector.allocateNewSafe(); - writer = new ${vectName}WriterImpl(vector, this); writer.setPosition(${index}); fields.put(name.toLowerCase(), writer); } diff --git a/exec/java-exec/src/main/codegen/templates/NullReader.java b/exec/java-exec/src/main/codegen/templates/NullReader.java index a0e5f508f6e..472dbed557c 100644 --- a/exec/java-exec/src/main/codegen/templates/NullReader.java +++ b/exec/java-exec/src/main/codegen/templates/NullReader.java @@ -56,11 +56,17 @@ public void copyAsValue(MapWriter writer) {} public void copyAsValue(ListWriter writer) {} - <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> + public void copyAsValue(UnionWriter writer) {} + + <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> + public void read(${name}Holder holder){ + throw new UnsupportedOperationException("NullReader cannot read into non-nullable holder"); + } + public void read(Nullable${name}Holder holder){ holder.isSet = 0; } - + public void read(int arrayIndex, ${name}Holder holder){ throw new ArrayIndexOutOfBoundsException(); } diff --git a/exec/java-exec/src/main/codegen/templates/RecordWriter.java b/exec/java-exec/src/main/codegen/templates/RecordWriter.java index a37ffa88916..24a94c4c56f 100644 --- a/exec/java-exec/src/main/codegen/templates/RecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/RecordWriter.java @@ -62,6 +62,7 @@ public interface RecordWriter { /** Add the field value given in valueHolder at the given column number fieldId. */ public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader); + public FieldConverter getNewUnionConverter(int fieldId, String fieldName, FieldReader reader); public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader); public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader); diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java b/exec/java-exec/src/main/codegen/templates/TypeHelper.java index 9c66cb718dd..d9810c79ec9 100644 --- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java +++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java @@ -16,6 +16,8 @@ * limitations under the License. */ +import org.apache.drill.exec.vector.complex.UnionVector; + <@pp.dropOutputFile /> <@pp.changeOutputFile name="/org/apache/drill/exec/expr/TypeHelper.java" /> @@ -71,6 +73,8 @@ public static int getSize(MajorType major) { public static SqlAccessor getSqlAccessor(ValueVector vector){ final MajorType type = vector.getField().getType(); switch(type.getMinorType()){ + case UNION: + return new UnionSqlAccessor((UnionVector) vector); <#list vv.types as type> <#list type.minor as minor> case ${minor.class?upper_case}: @@ -100,8 +104,11 @@ public static ValueVector getNewVector(SchemaPath parentPath, String name, Buffe public static Class getValueVectorClass(MinorType type, DataMode mode){ switch (type) { + case UNION: + return UnionVector.class; case MAP: switch (mode) { + case OPTIONAL: case REQUIRED: return MapVector.class; case REPEATED: @@ -112,6 +119,9 @@ public static Class getValueVectorClass(MinorType type, DataMode mode){ switch (mode) { case REPEATED: return RepeatedListVector.class; + case REQUIRED: + case OPTIONAL: + return ListVector.class; } <#list vv.types as type> @@ -175,6 +185,7 @@ public static Class getReaderClassName( MinorType type, DataMode mode, boolea public static Class getWriterInterface( MinorType type, DataMode mode){ switch (type) { + case UNION: return UnionWriter.class; case MAP: return MapWriter.class; case LIST: return ListWriter.class; <#list vv.types as type> @@ -190,6 +201,8 @@ public static Class getWriterInterface( MinorType type, DataMode mode){ public static Class getWriterImpl( MinorType type, DataMode mode){ switch (type) { + case UNION: + return UnionWriter.class; case MAP: switch (mode) { case REQUIRED: @@ -247,6 +260,8 @@ public static Class getHolderReaderImpl( MinorType type, DataMode mode){ public static JType getHolderType(JCodeModel model, MinorType type, DataMode mode){ switch (type) { + case UNION: + return model._ref(UnionHolder.class); case MAP: case LIST: return model._ref(ComplexHolder.class); @@ -280,10 +295,13 @@ public static ValueVector getNewVector(MaterializedField field, BufferAllocator switch (type.getMinorType()) { - + case UNION: + return new UnionVector(field, allocator, callBack); + case MAP: switch (type.getMode()) { case REQUIRED: + case OPTIONAL: return new MapVector(field, allocator, callBack); case REPEATED: return new RepeatedMapVector(field, allocator, callBack); @@ -292,7 +310,10 @@ public static ValueVector getNewVector(MaterializedField field, BufferAllocator switch (type.getMode()) { case REPEATED: return new RepeatedListVector(field, allocator, callBack); - } + case OPTIONAL: + case REQUIRED: + return new ListVector(field, allocator, callBack); + } <#list vv. types as type> <#list type.minor as minor> case ${minor.class?upper_case}: diff --git a/exec/java-exec/src/main/codegen/templates/UnionFunctions.java b/exec/java-exec/src/main/codegen/templates/UnionFunctions.java new file mode 100644 index 00000000000..41b6b002fa8 --- /dev/null +++ b/exec/java-exec/src/main/codegen/templates/UnionFunctions.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +<@pp.dropOutputFile /> +<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/UnionFunctions.java" /> + + +<#include "/@includes/license.ftl" /> + +package org.apache.drill.exec.vector.complex.impl; + +<#include "/@includes/vv_imports.ftl" /> +import org.apache.drill.exec.expr.DrillSimpleFunc; +import org.apache.drill.exec.expr.annotations.FunctionTemplate; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; +import org.apache.drill.exec.expr.annotations.Output; +import org.apache.drill.exec.expr.annotations.Param; +import org.apache.drill.exec.expr.holders.*; +import javax.inject.Inject; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.record.RecordBatch; + +/* + * This class is generated using freemarker and the ${.template_name} template. + */ + +@SuppressWarnings("unused") +public class UnionFunctions { + + <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> + <#assign fields = minor.fields!type.fields /> + <#assign uncappedName = name?uncap_first/> + + <#if !minor.class?starts_with("Decimal")> + + @SuppressWarnings("unused") + @FunctionTemplate(name = "as${name}", scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.INTERNAL) + public static class CastUnion${name} implements DrillSimpleFunc { + + @Param UnionHolder in; + @Output Nullable${name}Holder out; + + public void setup() {} + + public void eval() { + if (in.isSet == 1) { + in.reader.read(out); + } else { + out.isSet = 0; + } + } + } + + @SuppressWarnings("unused") + @FunctionTemplate(names = {"castUNION", "castToUnion"}, scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.INTERNAL) + public static class Cast${name}ToUnion implements DrillSimpleFunc { + + @Param Nullable${name}Holder in; + @Output UnionHolder out; + + public void setup() {} + + public void eval() { + out.reader = new org.apache.drill.exec.vector.complex.impl.Nullable${name}HolderReaderImpl(in); + out.isSet = in.isSet; + } + } + + + + + +} diff --git a/exec/java-exec/src/main/codegen/templates/UnionListWriter.java b/exec/java-exec/src/main/codegen/templates/UnionListWriter.java new file mode 100644 index 00000000000..fd7256b06ac --- /dev/null +++ b/exec/java-exec/src/main/codegen/templates/UnionListWriter.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +<@pp.dropOutputFile /> +<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/UnionListWriter.java" /> + + +<#include "/@includes/license.ftl" /> + +package org.apache.drill.exec.vector.complex.impl; + +<#include "/@includes/vv_imports.ftl" /> + +/* + * This class is generated using freemarker and the ${.template_name} template. + */ + +@SuppressWarnings("unused") +public class UnionListWriter extends AbstractFieldWriter { + + ListVector vector; + UnionVector data; + UInt4Vector offsets; + private UnionWriter writer; + private boolean inMap = false; + private String mapName; + private int lastIndex = 0; + + public UnionListWriter(ListVector vector) { + super(null); + this.vector = vector; + this.data = (UnionVector) vector.getDataVector(); + this.writer = new UnionWriter(data); + this.offsets = vector.getOffsetVector(); + } + + @Override + public void allocate() { + vector.allocateNew(); + } + + @Override + public void clear() { + vector.clear(); + } + + @Override + public MaterializedField getField() { + return null; + } + + @Override + public int getValueCapacity() { + return vector.getValueCapacity(); + } + + @Override + public void close() throws Exception { + + } + + <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> + <#assign fields = minor.fields!type.fields /> + <#assign uncappedName = name?uncap_first/> + + <#if !minor.class?starts_with("Decimal")> + + @Override + public ${name}Writer <#if uncappedName == "int">integer<#else>${uncappedName}() { + return this; + } + + @Override + public ${name}Writer <#if uncappedName == "int">integer<#else>${uncappedName}(String name) { + assert inMap; + mapName = name; + return this; + } + + + + + + @Override + public MapWriter map() { + inMap = true; + return this; + } + + @Override + public ListWriter list() { + final int nextOffset = offsets.getAccessor().get(idx() + 1); + vector.getMutator().setNotNull(idx()); + offsets.getMutator().setSafe(idx() + 1, nextOffset + 1); + writer.setPosition(nextOffset); + return writer; + } + + @Override + public ListWriter list(String name) { + final int nextOffset = offsets.getAccessor().get(idx() + 1); + vector.getMutator().setNotNull(idx()); + data.getMutator().setType(nextOffset, MinorType.MAP); + writer.setPosition(nextOffset); + ListWriter listWriter = writer.list(name); + return listWriter; + } + + @Override + public MapWriter map(String name) { + MapWriter mapWriter = writer.map(name); + return mapWriter; + } + + @Override + public void startList() { + vector.getMutator().startNewValue(idx()); + } + + @Override + public void endList() { + + } + + @Override + public void start() { + assert inMap; + final int nextOffset = offsets.getAccessor().get(idx() + 1); + vector.getMutator().setNotNull(idx()); + data.getMutator().setType(nextOffset, MinorType.MAP); + offsets.getMutator().setSafe(idx() + 1, nextOffset); + writer.setPosition(nextOffset); + } + + @Override + public void end() { + if (inMap) { + inMap = false; + final int nextOffset = offsets.getAccessor().get(idx() + 1); + offsets.getMutator().setSafe(idx() + 1, nextOffset + 1); + } + } + + <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> + <#assign fields = minor.fields!type.fields /> + <#assign uncappedName = name?uncap_first/> + + <#if !minor.class?starts_with("Decimal")> + + @Override + public void write${name}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, ) { + if (inMap) { + final int nextOffset = offsets.getAccessor().get(idx() + 1); + vector.getMutator().setNotNull(idx()); + data.getMutator().setType(nextOffset, MinorType.MAP); + writer.setPosition(nextOffset); + ${name}Writer ${uncappedName}Writer = writer.<#if uncappedName == "int">integer<#else>${uncappedName}(mapName); + ${uncappedName}Writer.write${name}(<#list fields as field>${field.name}<#if field_has_next>, ); + } else { + final int nextOffset = offsets.getAccessor().get(idx() + 1); + vector.getMutator().setNotNull(idx()); + writer.setPosition(nextOffset); + writer.write${name}(<#list fields as field>${field.name}<#if field_has_next>, ); + offsets.getMutator().setSafe(idx() + 1, nextOffset + 1); + } + } + + + + + +} diff --git a/exec/java-exec/src/main/codegen/templates/UnionReader.java b/exec/java-exec/src/main/codegen/templates/UnionReader.java new file mode 100644 index 00000000000..38d424786e9 --- /dev/null +++ b/exec/java-exec/src/main/codegen/templates/UnionReader.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.vector.complex.impl.NullReader; + +<@pp.dropOutputFile /> +<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/UnionReader.java" /> + + +<#include "/@includes/license.ftl" /> + +package org.apache.drill.exec.vector.complex.impl; + +<#include "/@includes/vv_imports.ftl" /> + +@SuppressWarnings("unused") +public class UnionReader extends AbstractFieldReader { + + private BaseReader[] readers = new BaseReader[43]; + public UnionVector data; + + public UnionReader(UnionVector data) { + this.data = data; + } + + public MajorType getType() { + return Types.required(MinorType.valueOf(data.getTypeValue(idx()))); + } + + public boolean isSet(){ + return !data.getAccessor().isNull(idx()); + } + + public void read(UnionHolder holder) { + holder.reader = this; + holder.isSet = this.isSet() ? 1 : 0; + } + + public void read(int index, UnionHolder holder) { + getList().read(index, holder); + } + + private FieldReader getReaderForIndex(int index) { + int typeValue = data.getTypeValue(index); + FieldReader reader = (FieldReader) readers[typeValue]; + if (reader != null) { + return reader; + } + switch (typeValue) { + case 0: + return NullReader.INSTANCE; + case MinorType.MAP_VALUE: + return (FieldReader) getMap(); + case MinorType.LIST_VALUE: + return (FieldReader) getList(); + <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> + <#assign uncappedName = name?uncap_first/> + <#if !minor.class?starts_with("Decimal")> + case MinorType.${name?upper_case}_VALUE: + return (FieldReader) get${name}(); + + + default: + throw new UnsupportedOperationException("Unsupported type: " + MinorType.valueOf(typeValue)); + } + } + + private SingleMapReaderImpl mapReader; + + private MapReader getMap() { + if (mapReader == null) { + mapReader = (SingleMapReaderImpl) data.getMap().getReader(); + mapReader.setPosition(idx()); + readers[MinorType.MAP_VALUE] = mapReader; + } + return mapReader; + } + + private UnionListReader listReader; + + private FieldReader getList() { + if (listReader == null) { + listReader = new UnionListReader(data.getList()); + listReader.setPosition(idx()); + readers[MinorType.LIST_VALUE] = listReader; + } + return listReader; + } + + @Override + public java.util.Iterator iterator() { + return getMap().iterator(); + } + + @Override + public void copyAsValue(UnionWriter writer) { + writer.data.copyFrom(idx(), writer.idx(), data); + } + + <#list ["Object", "BigDecimal", "Integer", "Long", "Boolean", + "Character", "DateTime", "Period", "Double", "Float", + "Text", "String", "Byte", "Short", "byte[]"] as friendlyType> + <#assign safeType=friendlyType /> + <#if safeType=="byte[]"><#assign safeType="ByteArray" /> + + @Override + public ${friendlyType} read${safeType}() { + return getReaderForIndex(idx()).read${safeType}(); + } + + + + <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> + <#assign uncappedName = name?uncap_first/> + <#assign boxedType = (minor.boxedType!type.boxedType) /> + <#assign javaType = (minor.javaType!type.javaType) /> + <#assign friendlyType = (minor.friendlyType!minor.boxedType!type.boxedType) /> + <#assign safeType=friendlyType /> + <#if safeType=="byte[]"><#assign safeType="ByteArray" /> + <#if !minor.class?starts_with("Decimal")> + + private Nullable${name}ReaderImpl ${uncappedName}Reader; + + private Nullable${name}ReaderImpl get${name}() { + if (${uncappedName}Reader == null) { + ${uncappedName}Reader = new Nullable${name}ReaderImpl(data.get${name}Vector()); + ${uncappedName}Reader.setPosition(idx()); + readers[MinorType.${name?upper_case}_VALUE] = ${uncappedName}Reader; + } + return ${uncappedName}Reader; + } + + public void read(Nullable${name}Holder holder){ + getReaderForIndex(idx()).read(holder); + } + + public void copyAsValue(${name}Writer writer){ + getReaderForIndex(idx()).copyAsValue(writer); + } + + + + @Override + public void setPosition(int index) { + super.setPosition(index); + for (BaseReader reader : readers) { + if (reader != null) { + reader.setPosition(index); + } + } + } + + public FieldReader reader(String name){ + return getMap().reader(name); + } + + public FieldReader reader() { + return getList().reader(); + } + + public boolean next() { + return getReaderForIndex(idx()).next(); + } +} + + + diff --git a/exec/java-exec/src/main/codegen/templates/UnionVector.java b/exec/java-exec/src/main/codegen/templates/UnionVector.java new file mode 100644 index 00000000000..6a72757cc85 --- /dev/null +++ b/exec/java-exec/src/main/codegen/templates/UnionVector.java @@ -0,0 +1,433 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.drill.common.types.TypeProtos.MinorType; + +<@pp.dropOutputFile /> +<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/UnionVector.java" /> + + +<#include "/@includes/license.ftl" /> + +package org.apache.drill.exec.vector.complex.impl; + +<#include "/@includes/vv_imports.ftl" /> +import java.util.Iterator; +import org.apache.drill.exec.vector.complex.impl.ComplexCopier; +import org.apache.drill.exec.util.CallBack; + +/* + * This class is generated using freemarker and the ${.template_name} template. + */ +@SuppressWarnings("unused") + + +public class UnionVector implements ValueVector { + + private MaterializedField field; + private BufferAllocator allocator; + private Accessor accessor = new Accessor(); + private Mutator mutator = new Mutator(); + private int valueCount; + + private MapVector internalMap; + private SingleMapWriter internalMapWriter; + private UInt1Vector typeVector; + + private MapVector mapVector; + private ListVector listVector; + private NullableBigIntVector bigInt; + private NullableVarCharVector varChar; + + private FieldReader reader; + private NullableBitVector bit; + + private State state = State.INIT; + private int singleType = 0; + private ValueVector singleVector; + + private enum State { + INIT, SINGLE, MULTI + } + + public UnionVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) { + this.field = field.clone(); + this.allocator = allocator; + internalMap = new MapVector("internal", allocator, callBack); + internalMapWriter = new SingleMapWriter(internalMap, null, true, true); + this.typeVector = internalMap.addOrGet("types", Types.required(MinorType.UINT1), UInt1Vector.class); + this.field.addChild(internalMap.getField().clone()); + } + + private void updateState(ValueVector v) { + if (state == State.INIT) { + state = State.SINGLE; + singleVector = v; + singleType = v.getField().getType().getMinorType().getNumber(); + } else { + state = State.MULTI; + singleVector = null; + } + } + + public boolean isSingleType() { + return state == State.SINGLE && singleType != MinorType.LIST_VALUE; + } + + public ValueVector getSingleVector() { + assert state != State.MULTI : "Cannot get single vector when there are multiple types"; + assert state != State.INIT : "Cannot get single vector when there are no types"; + return singleVector; + } + + public MapVector getMap() { + if (mapVector == null) { + int vectorCount = internalMap.size(); + mapVector = internalMap.addOrGet("map", Types.optional(MinorType.MAP), MapVector.class); + updateState(mapVector); + if (internalMap.size() > vectorCount) { + mapVector.allocateNew(); + } + } + return mapVector; + } + + <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> + <#assign fields = minor.fields!type.fields /> + <#assign uncappedName = name?uncap_first/> + <#if !minor.class?starts_with("Decimal")> + + private Nullable${name}Vector ${uncappedName}Vector; + + public Nullable${name}Vector get${name}Vector() { + if (${uncappedName}Vector == null) { + int vectorCount = internalMap.size(); + ${uncappedName}Vector = internalMap.addOrGet("${uncappedName}", Types.optional(MinorType.${name?upper_case}), Nullable${name}Vector.class); + updateState(${uncappedName}Vector); + if (internalMap.size() > vectorCount) { + ${uncappedName}Vector.allocateNew(); + } + } + return ${uncappedName}Vector; + } + + + + + + public ListVector getList() { + if (listVector == null) { + int vectorCount = internalMap.size(); + listVector = internalMap.addOrGet("list", Types.optional(MinorType.LIST), ListVector.class); + updateState(listVector); + if (internalMap.size() > vectorCount) { + listVector.allocateNew(); + } + } + return listVector; + } + + public int getTypeValue(int index) { + return typeVector.getAccessor().get(index); + } + + public UInt1Vector getTypeVector() { + return typeVector; + } + + @Override + public void allocateNew() throws OutOfMemoryRuntimeException { + internalMap.allocateNew(); + if (typeVector != null) { + typeVector.zeroVector(); + } + } + + @Override + public boolean allocateNewSafe() { + boolean safe = internalMap.allocateNewSafe(); + if (safe) { + if (typeVector != null) { + typeVector.zeroVector(); + } + } + return safe; + } + + @Override + public void setInitialCapacity(int numRecords) { + } + + @Override + public int getValueCapacity() { + return Math.min(typeVector.getValueCapacity(), internalMap.getValueCapacity()); + } + + @Override + public void close() { + } + + @Override + public void clear() { + internalMap.clear(); + } + + @Override + public MaterializedField getField() { + return field; + } + + @Override + public TransferPair getTransferPair() { + return new TransferImpl(field); + } + + @Override + public TransferPair getTransferPair(FieldReference ref) { + return new TransferImpl(field.withPath(ref)); + } + + @Override + public TransferPair makeTransferPair(ValueVector target) { + return new TransferImpl((UnionVector) target); + } + + public void transferTo(UnionVector target) { + internalMap.makeTransferPair(target.internalMap).transfer(); + target.valueCount = valueCount; + } + + public void copyFrom(int inIndex, int outIndex, UnionVector from) { + from.getReader().setPosition(inIndex); + getWriter().setPosition(outIndex); + ComplexCopier copier = new ComplexCopier(from.reader, mutator.writer); + copier.write(); + } + + public void copyFromSafe(int inIndex, int outIndex, UnionVector from) { + copyFrom(inIndex, outIndex, from); + } + + private class TransferImpl implements TransferPair { + + UnionVector to; + + public TransferImpl(MaterializedField field) { + to = new UnionVector(field, allocator, null); + } + + public TransferImpl(UnionVector to) { + this.to = to; + } + + @Override + public void transfer() { + transferTo(to); + } + + @Override + public void splitAndTransfer(int startIndex, int length) { + + } + + @Override + public ValueVector getTo() { + return to; + } + + @Override + public void copyValueSafe(int from, int to) { + this.to.copyFrom(from, to, UnionVector.this); + } + } + + @Override + public Accessor getAccessor() { + return accessor; + } + + @Override + public Mutator getMutator() { + return mutator; + } + + @Override + public FieldReader getReader() { + if (reader == null) { + reader = new UnionReader(this); + } + return reader; + } + + public FieldWriter getWriter() { + if (mutator.writer == null) { + mutator.writer = new UnionWriter(this); + } + return mutator.writer; + } + + @Override + public UserBitShared.SerializedField getMetadata() { + SerializedField.Builder b = getField() // + .getAsBuilder() // + .setBufferLength(getBufferSize()) // + .setValueCount(valueCount); + + b.addChild(internalMap.getMetadata()); + return b.build(); + } + + @Override + public int getBufferSize() { + return internalMap.getBufferSize(); + } + + @Override + public DrillBuf[] getBuffers(boolean clear) { + return internalMap.getBuffers(clear); + } + + @Override + public void load(UserBitShared.SerializedField metadata, DrillBuf buffer) { + valueCount = metadata.getValueCount(); + + internalMap.load(metadata.getChild(0), buffer); + } + + @Override + public Iterator iterator() { + return null; + } + + public class Accessor extends BaseValueVector.BaseAccessor { + + + @Override + public Object getObject(int index) { + int type = typeVector.getAccessor().get(index); + switch (type) { + case 0: + return null; + <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> + <#assign fields = minor.fields!type.fields /> + <#assign uncappedName = name?uncap_first/> + <#if !minor.class?starts_with("Decimal")> + case MinorType.${name?upper_case}_VALUE: + return get${name}Vector().getAccessor().getObject(index); + + + + case MinorType.MAP_VALUE: + return getMap().getAccessor().getObject(index); + case MinorType.LIST_VALUE: + return getList().getAccessor().getObject(index); + default: + throw new UnsupportedOperationException("Cannot support type: " + MinorType.valueOf(type)); + } + } + + public byte[] get(int index) { + return null; + } + + public void get(int index, ComplexHolder holder) { + } + + public void get(int index, UnionHolder holder) { + if (reader == null) { + reader = new UnionReader(UnionVector.this); + } + reader.setPosition(index); + holder.reader = reader; + } + + @Override + public int getValueCount() { + return valueCount; + } + + @Override + public boolean isNull(int index) { + return typeVector.getAccessor().get(index) == 0; + } + + public int isSet(int index) { + return isNull(index) ? 0 : 1; + } + } + + public class Mutator extends BaseValueVector.BaseMutator { + + UnionWriter writer; + + @Override + public void setValueCount(int valueCount) { + UnionVector.this.valueCount = valueCount; + internalMap.getMutator().setValueCount(valueCount); + } + + public void set(int index, byte[] bytes) { + } + + public void setSafe(int index, UnionHolder holder) { + FieldReader reader = holder.reader; + if (writer == null) { + writer = new UnionWriter(UnionVector.this); + } + writer.setPosition(index); + MinorType type = reader.getType().getMinorType(); + switch (type) { + <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> + <#assign fields = minor.fields!type.fields /> + <#assign uncappedName = name?uncap_first/> + <#if !minor.class?starts_with("Decimal")> + case ${name?upper_case}: + Nullable${name}Holder ${uncappedName}Holder = new Nullable${name}Holder(); + reader.read(${uncappedName}Holder); + if (holder.isSet == 1) { + writer.write${name}(<#list fields as field>${uncappedName}Holder.${field.name}<#if field_has_next>, ); + } + break; + + + case MAP: { + ComplexCopier copier = new ComplexCopier(reader, writer); + copier.write(); + break; + } + case LIST: { + ComplexCopier copier = new ComplexCopier(reader, writer); + copier.write(); + break; + } + default: + throw new UnsupportedOperationException(); + } + } + + public void setType(int index, MinorType type) { + typeVector.getMutator().setSafe(index, type.getNumber()); + } + + @Override + public void reset() { } + + @Override + public void generateTestData(int values) { } + } +} diff --git a/exec/java-exec/src/main/codegen/templates/UnionWriter.java b/exec/java-exec/src/main/codegen/templates/UnionWriter.java new file mode 100644 index 00000000000..9d779999c17 --- /dev/null +++ b/exec/java-exec/src/main/codegen/templates/UnionWriter.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +<@pp.dropOutputFile /> +<@pp.changeOutputFile name="/org/apache/drill/exec/vector/complex/impl/UnionWriter.java" /> + + +<#include "/@includes/license.ftl" /> + +package org.apache.drill.exec.vector.complex.impl; + +<#include "/@includes/vv_imports.ftl" /> + +/* + * This class is generated using freemarker and the ${.template_name} template. + */ +@SuppressWarnings("unused") +public class UnionWriter extends AbstractFieldWriter implements FieldWriter { + + UnionVector data; + private MapWriter mapWriter; + private UnionListWriter listWriter; + private List writers = Lists.newArrayList(); + + public UnionWriter(BufferAllocator allocator) { + super(null); + } + + public UnionWriter(UnionVector vector) { + super(null); + data = vector; + } + + public UnionWriter(UnionVector vector, FieldWriter parent) { + super(null); + data = vector; + } + + @Override + public void setPosition(int index) { + super.setPosition(index); + for (BaseWriter writer : writers) { + writer.setPosition(index); + } + } + + + @Override + public void start() { + data.getMutator().setType(idx(), MinorType.MAP); + getMapWriter(true).start(); + } + + @Override + public void end() { + getMapWriter(false).end(); + } + + @Override + public void startList() { + getListWriter(true).startList(); + data.getMutator().setType(idx(), MinorType.LIST); + } + + @Override + public void endList() { + getListWriter(true).endList(); + } + + private MapWriter getMapWriter(boolean create) { + if (create && mapWriter == null) { + mapWriter = new SingleMapWriter(data.getMap(), null, true, false); + mapWriter.setPosition(idx()); + writers.add(mapWriter); + } + return mapWriter; + } + + public MapWriter asMap() { + data.getMutator().setType(idx(), MinorType.MAP); + return getMapWriter(true); + } + + private ListWriter getListWriter(boolean create) { + if (create && listWriter == null) { + listWriter = new UnionListWriter(data.getList()); + listWriter.setPosition(idx()); + writers.add(listWriter); + } + return listWriter; + } + + public ListWriter asList() { + data.getMutator().setType(idx(), MinorType.LIST); + return getListWriter(true); + } + + <#list vv.types as type><#list type.minor as minor><#assign name = minor.class?cap_first /> + <#assign fields = minor.fields!type.fields /> + <#assign uncappedName = name?uncap_first/> + + <#if !minor.class?starts_with("Decimal")> + + private ${name}Writer ${name?uncap_first}Writer; + + private ${name}Writer get${name}Writer(boolean create) { + if (create && ${uncappedName}Writer == null) { + ${uncappedName}Writer = new Nullable${name}WriterImpl(data.get${name}Vector(), null); + ${uncappedName}Writer.setPosition(idx()); + writers.add(${uncappedName}Writer); + } + return ${uncappedName}Writer; + } + + public ${name}Writer as${name}() { + data.getMutator().setType(idx(), MinorType.${name?upper_case}); + return get${name}Writer(true); + } + + @Override + public void write(${name}Holder holder) { + data.getMutator().setType(idx(), MinorType.${name?upper_case}); + get${name}Writer(true).setPosition(idx()); + get${name}Writer(true).write${name}(<#list fields as field>holder.${field.name}<#if field_has_next>, ); + } + + public void write${minor.class}(<#list fields as field>${field.type} ${field.name}<#if field_has_next>, ) { + data.getMutator().setType(idx(), MinorType.${name?upper_case}); + get${name}Writer(true).setPosition(idx()); + get${name}Writer(true).write${name}(<#list fields as field>${field.name}<#if field_has_next>, ); + } + + + + + public void writeNull() { + } + + @Override + public MapWriter map() { + data.getMutator().setType(idx(), MinorType.LIST); + getListWriter(true).setPosition(idx()); + return getListWriter(true).map(); + } + + @Override + public ListWriter list() { + data.getMutator().setType(idx(), MinorType.LIST); + getListWriter(true).setPosition(idx()); + return getListWriter(true).list(); + } + + @Override + public ListWriter list(String name) { + data.getMutator().setType(idx(), MinorType.MAP); + getMapWriter(true).setPosition(idx()); + return getMapWriter(true).list(name); + } + + @Override + public MapWriter map(String name) { + data.getMutator().setType(idx(), MinorType.MAP); + getMapWriter(true).setPosition(idx()); + return getMapWriter(true).map(name); + } + + <#list vv.types as type><#list type.minor as minor> + <#assign lowerName = minor.class?uncap_first /> + <#if lowerName == "int" ><#assign lowerName = "integer" /> + <#assign upperName = minor.class?upper_case /> + <#assign capName = minor.class?cap_first /> + <#if !minor.class?starts_with("Decimal")> + @Override + public ${capName}Writer ${lowerName}(String name) { + data.getMutator().setType(idx(), MinorType.MAP); + getMapWriter(true).setPosition(idx()); + return getMapWriter(true).${lowerName}(name); + } + + @Override + public ${capName}Writer ${lowerName}() { + data.getMutator().setType(idx(), MinorType.LIST); + getListWriter(true).setPosition(idx()); + return getListWriter(true).${lowerName}(); + } + + + + @Override + public void allocate() { + data.allocateNew(); + } + + @Override + public void clear() { + data.clear(); + } + + @Override + public void close() throws Exception { + data.close(); + } + + @Override + public MaterializedField getField() { + return data.getField(); + } + + @Override + public int getValueCapacity() { + return data.getValueCapacity(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index c9554af6a3f..e3063d0d177 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -163,6 +163,8 @@ public interface ExecConstants { public static String MONGO_READER_READ_NUMBERS_AS_DOUBLE = "store.mongo.read_numbers_as_double"; public static OptionValidator MONGO_READER_READ_NUMBERS_AS_DOUBLE_VALIDATOR = new BooleanValidator(MONGO_READER_READ_NUMBERS_AS_DOUBLE, false); + public static BooleanValidator ENABLE_UNION_TYPE = new BooleanValidator("exec.enable_union_type", false); + // TODO: We need to add a feature that enables storage plugins to add their own options. Currently we have to declare // in core which is not right. Move this option and above two mongo plugin related options once we have the feature. public static String HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS = "store.hive.optimize_scan_with_native_readers"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java index df315b2e07c..bc6d807a611 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java @@ -87,12 +87,12 @@ private ExpressionTreeMaterializer() { }; public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionLookupContext functionLookupContext) { - return ExpressionTreeMaterializer.materialize(expr, batch, errorCollector, functionLookupContext, false); + return ExpressionTreeMaterializer.materialize(expr, batch, errorCollector, functionLookupContext, false, false); } public static LogicalExpression materializeAndCheckErrors(LogicalExpression expr, VectorAccessible batch, FunctionLookupContext functionLookupContext) throws SchemaChangeException { ErrorCollector collector = new ErrorCollectorImpl(); - LogicalExpression e = ExpressionTreeMaterializer.materialize(expr, batch, collector, functionLookupContext, false); + LogicalExpression e = ExpressionTreeMaterializer.materialize(expr, batch, collector, functionLookupContext, false, false); if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } @@ -100,8 +100,8 @@ public static LogicalExpression materializeAndCheckErrors(LogicalExpression expr } public static LogicalExpression materialize(LogicalExpression expr, VectorAccessible batch, ErrorCollector errorCollector, FunctionLookupContext functionLookupContext, - boolean allowComplexWriterExpr) { - LogicalExpression out = expr.accept(new MaterializeVisitor(batch, errorCollector, allowComplexWriterExpr), functionLookupContext); + boolean allowComplexWriterExpr, boolean unionTypeEnabled) { + LogicalExpression out = expr.accept(new MaterializeVisitor(batch, errorCollector, allowComplexWriterExpr, unionTypeEnabled), functionLookupContext); if (!errorCollector.hasErrors()) { out = out.accept(ConditionalExprOptimizer.INSTANCE, null); @@ -190,11 +190,13 @@ private static class MaterializeVisitor extends AbstractExprVisitor g, JBlock sub, String body, H ValueReference parameter = parameters[i]; HoldingContainer inputVariable = inputVariables[i]; - if (parameter.isFieldReader && ! inputVariable.isReader() && ! Types.isComplex(inputVariable.getMajorType())) { + if (parameter.isFieldReader && ! inputVariable.isReader() && ! Types.isComplex(inputVariable.getMajorType()) && inputVariable.getMinorType() != MinorType.UNION) { JType singularReaderClass = g.getModel()._ref(TypeHelper.getHolderReaderImpl(inputVariable.getMajorType().getMinorType(), inputVariable.getMajorType().getMode())); JType fieldReadClass = g.getModel()._ref(FieldReader.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java index e27234fdb22..b7877df1693 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java @@ -46,7 +46,7 @@ public static DrillBuf mappify(FieldReader reader, BaseWriter.ComplexWriter writ throw new DrillRuntimeException("kvgen function only supports Simple maps as input"); } BaseWriter.ListWriter listWriter = writer.rootAsList(); - listWriter.start(); + listWriter.startList(); BaseWriter.MapWriter mapWriter = listWriter.map(); // Iterate over the fields in the map @@ -79,7 +79,7 @@ public static DrillBuf mappify(FieldReader reader, BaseWriter.ComplexWriter writ mapWriter.end(); } - listWriter.end(); + listWriter.endList(); return buffer; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java new file mode 100644 index 00000000000..23a5004e9a3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/UnionFunctions.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr.fn.impl; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.expr.DrillSimpleFunc; +import org.apache.drill.exec.expr.annotations.FunctionTemplate; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; +import org.apache.drill.exec.expr.annotations.Output; +import org.apache.drill.exec.expr.annotations.Param; +import org.apache.drill.exec.expr.holders.UnionHolder; +import org.apache.drill.exec.expr.holders.IntHolder; +import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.vector.complex.impl.UnionReader; +import org.apache.drill.exec.vector.complex.reader.FieldReader; + +import javax.inject.Inject; + +public class UnionFunctions { + + @FunctionTemplate(names = {"typeString"}, + scope = FunctionTemplate.FunctionScope.SIMPLE, + nulls = NullHandling.NULL_IF_NULL) + public static class FromType implements DrillSimpleFunc { + + @Param + IntHolder in; + @Output + VarCharHolder out; + @Inject + DrillBuf buffer; + + public void setup() {} + + public void eval() { + + VarCharHolder h = org.apache.drill.exec.vector.ValueHolderHelper.getVarCharHolder(buffer, org.apache.drill.common.types.MinorType.valueOf(in.value).toString()); + out.buffer = h.buffer; + out.start = h.start; + out.end = h.end; + } + } + + @FunctionTemplate(names = {"type"}, + scope = FunctionTemplate.FunctionScope.SIMPLE, + nulls = NullHandling.NULL_IF_NULL) + public static class ToType implements DrillSimpleFunc { + + @Param + VarCharHolder input; + @Output + IntHolder out; + + public void setup() {} + + public void eval() { + + out.value = input.getType().getMinorType().getNumber(); + byte[] b = new byte[input.end - input.start]; + input.buffer.getBytes(input.start, b, 0, b.length); + String type = new String(b); + out.value = org.apache.drill.common.types.MinorType.valueOf(type.toUpperCase()).getNumber(); + } + } + + @FunctionTemplate(names = {"typeOf"}, + scope = FunctionTemplate.FunctionScope.SIMPLE, + nulls = NullHandling.INTERNAL) + public static class GetType implements DrillSimpleFunc { + + @Param + FieldReader input; + @Output + IntHolder out; + + public void setup() {} + + public void eval() { + + out.value = input.isSet() ? input.getType().getMinorType().getNumber() : 0; + + } + } + + @SuppressWarnings("unused") + @FunctionTemplate(names = {"castUNION", "castToUnion"}, scope = FunctionTemplate.FunctionScope.SIMPLE, nulls=NullHandling.NULL_IF_NULL) + public static class CastUnionToUnion implements DrillSimpleFunc{ + + @Param FieldReader in; + @Output + UnionHolder out; + + public void setup() {} + + public void eval() { + out.reader = in; + out.isSet = in.isSet() ? 1 : 0; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/UnionHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/UnionHolder.java new file mode 100644 index 00000000000..84cdefb7bb0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/holders/UnionHolder.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.expr.holders; + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.vector.complex.reader.FieldReader; + +public class UnionHolder implements ValueHolder { + public static final MajorType TYPE = Types.optional(MinorType.UNION); + public FieldReader reader; + public int isSet; + + public MajorType getType() { + return reader.getType(); + } + + public boolean isSet() { + return isSet == 1; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index a3b5f27ec80..d065a6d40aa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -40,6 +40,7 @@ import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.config.UnionExchange; +import org.apache.drill.exec.physical.config.UnionTypeReducer; import org.apache.drill.exec.physical.config.UnorderedReceiver; import org.apache.drill.exec.physical.config.Values; import org.apache.drill.exec.physical.config.WindowPOP; @@ -215,6 +216,11 @@ public T visitIteratorValidator(IteratorValidator op, X value) throws E { return visitOp(op, value); } + @Override + public T visitUnionTypeReducer(UnionTypeReducer op, X value) throws E { + return visitOp(op, value); + } + @Override public T visitValues(Values op, X value) throws E { return visitOp(op, value); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index 432e06b8e8d..fd16bc84153 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -23,6 +23,7 @@ import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; @@ -176,7 +177,8 @@ protected Filterer generateSV2Filterer() throws SchemaChangeException { final List transfers = Lists.newArrayList(); final ClassGenerator cg = CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION2, context.getFunctionRegistry()); - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, context.getFunctionRegistry()); + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, collector, + context.getFunctionRegistry(), false, unionTypeEnabled); if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 491ced34330..53ef426230d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -289,7 +289,7 @@ protected boolean setupNewSchema() throws SchemaChangeException { final IntOpenHashSet transferFieldIds = new IntOpenHashSet(); final NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn())); - final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression)ExpressionTreeMaterializer.materialize(flattenExpr.getExpr(), incoming, collector, context.getFunctionRegistry(), true); + final ValueVectorReadExpression vectorRead = (ValueVectorReadExpression)ExpressionTreeMaterializer.materialize(flattenExpr.getExpr(), incoming, collector, context.getFunctionRegistry(), true, false); final TransferPair tp = getFlattenFieldTransferPair(flattenExpr.getRef()); if (tp != null) { @@ -316,7 +316,7 @@ protected boolean setupNewSchema() throws SchemaChangeException { } } - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true); + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true, false); final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType()); if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 5b5c90d47f2..dfca892ff06 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -37,6 +37,7 @@ import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; @@ -63,7 +64,6 @@ import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import com.carrotsearch.hppc.IntOpenHashSet; @@ -381,7 +381,8 @@ protected boolean setupNewSchema() throws SchemaChangeException { } } - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true); + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, + collector, context.getFunctionRegistry(), true, unionTypeEnabled); final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType()); if (collector.hasErrors()) { throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index d8f703ed485..aaa6f9e5b77 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -22,6 +22,7 @@ import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; @@ -30,6 +31,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.server.options.OptionValue; public abstract class AbstractRecordBatch implements CloseableRecordBatch { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass()); @@ -39,6 +41,7 @@ public abstract class AbstractRecordBatch implements protected final FragmentContext context; protected final OperatorContext oContext; protected final OperatorStats stats; + protected final boolean unionTypeEnabled; protected BatchState state; @@ -62,6 +65,12 @@ protected AbstractRecordBatch(final T popConfig, final FragmentContext context, } else { state = BatchState.FIRST; } + OptionValue option = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE.getOptionName()); + if (option != null) { + unionTypeEnabled = option.bool_val; + } else { + unionTypeEnabled = false; + } } protected static enum BatchState { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java index 6b1d178796e..21b3adc9a91 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java @@ -19,11 +19,15 @@ import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.MinorType; +import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.DataMode; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.drill.exec.vector.complex.AbstractMapVector; import org.apache.drill.exec.vector.complex.MapVector; +import org.apache.drill.exec.vector.complex.impl.UnionVector; public class SimpleVectorWrapper implements VectorWrapper{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleVectorWrapper.class); @@ -103,6 +107,13 @@ public TypedFieldId getFieldIdIfMatches(int id, SchemaPath expectedPath) { } PathSegment seg = expectedPath.getRootSegment(); + if (v instanceof UnionVector) { + TypedFieldId.Builder builder = TypedFieldId.newBuilder(); + builder.addId(id).remainder(expectedPath.getRootSegment().getChild()); + builder.finalType(Types.optional(TypeProtos.MinorType.UNION)); + builder.intermediateType(Types.optional(TypeProtos.MinorType.UNION)); + return builder.build(); + } else if (v instanceof AbstractContainerVector) { // we're looking for a multi path. AbstractContainerVector c = (AbstractContainerVector) v; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/ResolverTypePrecedence.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/ResolverTypePrecedence.java index 29dd6a53056..8c602b33627 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/ResolverTypePrecedence.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/ResolverTypePrecedence.java @@ -77,6 +77,7 @@ public class ResolverTypePrecedence { precedenceMap.put(MinorType.INTERVALDAY, i+= 2); precedenceMap.put(MinorType.INTERVALYEAR, i+= 2); precedenceMap.put(MinorType.INTERVAL, i+= 2); + precedenceMap.put(MinorType.UNION, i += 2); MAX_IMPLICIT_CAST_COST = i; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java index 3278d3e6419..7ee8ebed33a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.Set; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; @@ -750,6 +752,8 @@ private static void initTypeRules() { rule.add(MinorType.VARBINARY); rule.add(MinorType.FIXEDBINARY); rules.put(MinorType.VARBINARY, rule); + + rules.put(MinorType.UNION, Sets.newHashSet(MinorType.UNION)); } public static boolean isCastableWithNullHandling(MajorType from, MajorType to, NullHandling nullHandling) { @@ -792,10 +796,16 @@ public static DataMode getLeastRestrictiveDataMode(List dataModes) { public static MinorType getLeastRestrictiveType(List types) { assert types.size() >= 2; MinorType result = types.get(0); + if (result == MinorType.UNION) { + return result; + } int resultPrec = ResolverTypePrecedence.precedenceMap.get(result); for (int i = 1; i < types.size(); i++) { MinorType next = types.get(i); + if (next == MinorType.UNION) { + return next; + } if (next == result) { // both args are of the same type; continue continue; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index c58bc08aefb..ad8d54e7711 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -87,6 +87,7 @@ public class SystemOptionManager extends BaseOptionManager { ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR, ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR, ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR, + ExecConstants.ENABLE_UNION_TYPE, ExecConstants.TEXT_ESTIMATED_ROW_SIZE, ExecConstants.JSON_EXTENDED_TYPES, ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE_VALIDATOR, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java index 1a94452a1d2..e74021b94ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/MapOrListWriter.java @@ -49,7 +49,7 @@ void start() { if (map != null) { map.start(); } else { - list.start(); + list.startList(); } } @@ -57,7 +57,7 @@ void end() { if (map != null) { map.end(); } else { - list.end(); + list.endList(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java index 4d51199eb9d..0e3c9080021 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java @@ -59,6 +59,7 @@ public class JSONRecordReader extends AbstractRecordReader { private final FragmentContext fragmentContext; private final boolean enableAllTextMode; private final boolean readNumbersAsDouble; + private final boolean unionEnabled; /** * Create a JSON Record Reader that uses a file based input stream. @@ -108,6 +109,7 @@ private JSONRecordReader(final FragmentContext fragmentContext, final String inp // only enable all text mode if we aren't using embedded content mode. this.enableAllTextMode = embeddedContent == null && fragmentContext.getOptions().getOption(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR); this.readNumbersAsDouble = fragmentContext.getOptions().getOption(ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val; + this.unionEnabled = fragmentContext.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE); setColumns(columns); } @@ -118,7 +120,7 @@ public void setup(final OperatorContext context, final OutputMutator output) thr this.stream = fileSystem.openPossiblyCompressedStream(hadoopPath); } - this.writer = new VectorContainerWriter(output); + this.writer = new VectorContainerWriter(output, unionEnabled); if (isSkipQuery()) { this.jsonReader = new CountingJsonReader(fragmentContext.getManagedBuffer()); } else { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java index ea45653d222..9f7d2b30683 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java @@ -29,6 +29,7 @@ import org.apache.drill.exec.store.RecordWriter; import org.apache.drill.exec.vector.complex.fn.BasicJsonOutput; import org.apache.drill.exec.vector.complex.fn.ExtendedJsonOutput; +import org.apache.drill.exec.vector.complex.fn.JsonWriter; import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -128,6 +129,29 @@ public void writeField() throws IOException { } } + @Override + public FieldConverter getNewUnionConverter(int fieldId, String fieldName, FieldReader reader) { + return new UnionJsonConverter(fieldId, fieldName, reader); + } + + public class UnionJsonConverter extends FieldConverter { + + public UnionJsonConverter(int fieldId, String fieldName, FieldReader reader) { + super(fieldId, fieldName, reader); + } + + @Override + public void startField() throws IOException { + gen.writeFieldName(fieldName); + } + + @Override + public void writeField() throws IOException { + JsonWriter writer = new JsonWriter(gen); + writer.write(reader); + } + } + @Override public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) { return new RepeatedMapJsonConverter(fieldId, fieldName, reader); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/UnionSqlAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/UnionSqlAccessor.java new file mode 100644 index 00000000000..ecf2596edba --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/UnionSqlAccessor.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.vector.accessor; + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.vector.complex.impl.UnionVector; +import org.apache.drill.exec.vector.complex.impl.UnionWriter; +import org.apache.drill.exec.vector.complex.reader.FieldReader; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; + +public class UnionSqlAccessor extends AbstractSqlAccessor { + + FieldReader reader; + + public UnionSqlAccessor(UnionVector vector) { + reader = vector.getReader(); + } + + @Override + public boolean isNull(int rowOffset) { + reader.setPosition(rowOffset); + return reader.isSet(); + } + + @Override + public BigDecimal getBigDecimal(int rowOffset) throws InvalidAccessException{ + reader.setPosition(rowOffset); + return reader.readBigDecimal(); + } + + @Override + public boolean getBoolean(int rowOffset) throws InvalidAccessException{ + reader.setPosition(rowOffset); + return reader.readBoolean(); + } + + @Override + public byte getByte(int rowOffset) throws InvalidAccessException{ + reader.setPosition(rowOffset); + return reader.readByte(); + } + + @Override + public byte[] getBytes(int rowOffset) throws InvalidAccessException{ + reader.setPosition(rowOffset); + return reader.readByteArray(); + } + + @Override + public double getDouble(int rowOffset) throws InvalidAccessException{ + reader.setPosition(rowOffset); + return reader.readDouble(); + } + + @Override + public float getFloat(int rowOffset) throws InvalidAccessException{ + reader.setPosition(rowOffset); + return reader.readFloat(); + } + + @Override + public int getInt(int rowOffset) throws InvalidAccessException{ + reader.setPosition(rowOffset); + return reader.readInteger(); + } + + @Override + public long getLong(int rowOffset) throws InvalidAccessException{ + reader.setPosition(rowOffset); + return reader.readLong(); + } + + @Override + public short getShort(int rowOffset) throws InvalidAccessException{ + reader.setPosition(rowOffset); + return reader.readShort(); + } + + @Override + public char getChar(int rowOffset) throws InvalidAccessException{ + reader.setPosition(rowOffset); + return reader.readCharacter(); + } + + @Override + public String getString(int rowOffset) throws InvalidAccessException{ + reader.setPosition(rowOffset); + return getObject(rowOffset).toString(); + } + + @Override + public Object getObject(int rowOffset) throws InvalidAccessException { + reader.setPosition(rowOffset); + return reader.readObject(); + } + + @Override + public MajorType getType() { + return Types.optional(MinorType.UNION); + } + + @Override + public Class getObjectClass() { + return Object.class; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java new file mode 100644 index 00000000000..ccd6239ec7b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java @@ -0,0 +1,292 @@ +/******************************************************************************* + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.vector.complex; + +import com.google.common.collect.ObjectArrays; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.util.CallBack; +import org.apache.drill.exec.util.JsonStringArrayList; +import org.apache.drill.exec.vector.BaseValueVector; +import org.apache.drill.exec.vector.UInt1Vector; +import org.apache.drill.exec.vector.UInt4Vector; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.VectorDescriptor; +import org.apache.drill.exec.vector.complex.impl.ComplexCopier; +import org.apache.drill.exec.vector.complex.impl.UnionListReader; +import org.apache.drill.exec.vector.complex.impl.UnionListWriter; +import org.apache.drill.exec.vector.complex.impl.UnionVector; +import org.apache.drill.exec.vector.complex.reader.FieldReader; +import org.apache.drill.exec.vector.complex.writer.FieldWriter; + +import java.util.List; + +public class ListVector extends BaseRepeatedValueVector { + + UInt4Vector offsets; + protected final UInt1Vector bits; + Mutator mutator = new Mutator(); + Accessor accessor = new Accessor(); + UnionListWriter writer; + UnionListReader reader; + + public ListVector(MaterializedField field, BufferAllocator allocator, CallBack callBack) { + super(field, allocator, new UnionVector(field, allocator, callBack)); + this.bits = new UInt1Vector(MaterializedField.create("$bits$", Types.required(MinorType.UINT1)), allocator); + offsets = getOffsetVector(); + this.field.addChild(getDataVector().getField()); + this.writer = new UnionListWriter(this); + this.reader = new UnionListReader(this); + } + + public UnionListWriter getWriter() { + return writer; + } + + @Override + public void allocateNew() throws OutOfMemoryRuntimeException { + super.allocateNewSafe(); + } + + public void transferTo(ListVector target) { + offsets.makeTransferPair(target.offsets).transfer(); + bits.makeTransferPair(target.bits).transfer(); + getDataVector().makeTransferPair(target.getDataVector()).transfer(); + } + + public void copyFrom(int inIndex, int outIndex, ListVector from) { + FieldReader in = from.getReader(); + in.setPosition(inIndex); + FieldWriter out = getWriter(); + out.setPosition(outIndex); + ComplexCopier copier = new ComplexCopier(in, out); + copier.write(); + } + + @Override + public UnionVector getDataVector() { + return (UnionVector) vector; + } + + @Override + public TransferPair getTransferPair(FieldReference ref) { + return new TransferImpl(field.withPath(ref)); + } + + @Override + public TransferPair makeTransferPair(ValueVector target) { + return new TransferImpl((ListVector) target); + } + + private class TransferImpl implements TransferPair { + + ListVector to; + + public TransferImpl(MaterializedField field) { + to = new ListVector(field, allocator, null); + } + + public TransferImpl(ListVector to) { + this.to = to; + } + + @Override + public void transfer() { + transferTo(to); + } + + @Override + public void splitAndTransfer(int startIndex, int length) { + + } + + @Override + public ValueVector getTo() { + return to; + } + + @Override + public void copyValueSafe(int from, int to) { + this.to.copyFrom(from, to, ListVector.this); + } + } + + @Override + public Accessor getAccessor() { + return accessor; + } + + @Override + public Mutator getMutator() { + return mutator; + } + + @Override + public FieldReader getReader() { + return reader; + } + + @Override + public boolean allocateNewSafe() { + /* boolean to keep track if all the memory allocation were successful + * Used in the case of composite vectors when we need to allocate multiple + * buffers for multiple vectors. If one of the allocations failed we need to + * clear all the memory that we allocated + */ + boolean success = false; + try { + if (!offsets.allocateNewSafe()) { + return false; + } + success = vector.allocateNewSafe(); + success = success && bits.allocateNewSafe(); + } finally { + if (!success) { + clear(); + } + } + offsets.zeroVector(); + bits.zeroVector(); + return success; + } + + @Override + protected UserBitShared.SerializedField.Builder getMetadataBuilder() { + return getField().getAsBuilder() + .setValueCount(getAccessor().getValueCount()) + .setBufferLength(getBufferSize()) + .addChild(offsets.getMetadata()) + .addChild(bits.getMetadata()) + .addChild(vector.getMetadata()); + } + + @Override + public int getBufferSize() { + if (getAccessor().getValueCount() == 0) { + return 0; + } + return offsets.getBufferSize() + bits.getBufferSize() + vector.getBufferSize(); + } + + @Override + public void clear() { + offsets.clear(); + vector.clear(); + bits.clear(); + lastSet = 0; + super.clear(); + } + + @Override + public DrillBuf[] getBuffers(boolean clear) { + final DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(false), ObjectArrays.concat(bits.getBuffers(false), + vector.getBuffers(false), DrillBuf.class), DrillBuf.class); + if (clear) { + for (DrillBuf buffer:buffers) { + buffer.retain(); + } + clear(); + } + return buffers; + } + + @Override + public void load(UserBitShared.SerializedField metadata, DrillBuf buffer) { + final UserBitShared.SerializedField offsetMetadata = metadata.getChild(0); + offsets.load(offsetMetadata, buffer); + + final int offsetLength = offsetMetadata.getBufferLength(); + final UserBitShared.SerializedField bitMetadata = metadata.getChild(1); + final int bitLength = bitMetadata.getBufferLength(); + bits.load(bitMetadata, buffer.slice(offsetLength, bitLength)); + + final UserBitShared.SerializedField vectorMetadata = metadata.getChild(2); + if (getDataVector() == DEFAULT_DATA_VECTOR) { + addOrGetVector(VectorDescriptor.create(vectorMetadata.getMajorType())); + } + + final int vectorLength = vectorMetadata.getBufferLength(); + vector.load(vectorMetadata, buffer.slice(offsetLength + bitLength, vectorLength)); + } + + private int lastSet; + + public class Accessor extends BaseRepeatedAccessor { + + @Override + public Object getObject(int index) { + if (bits.getAccessor().isNull(index)) { + return null; + } + final List vals = new JsonStringArrayList<>(); + final UInt4Vector.Accessor offsetsAccessor = offsets.getAccessor(); + final int start = offsetsAccessor.get(index); + final int end = offsetsAccessor.get(index + 1); + final UnionVector.Accessor valuesAccessor = getDataVector().getAccessor(); + for(int i = start; i < end; i++) { + vals.add(valuesAccessor.getObject(i)); + } + return vals; + } + + @Override + public boolean isNull(int index) { + return bits.getAccessor().get(index) == 0; + } + } + + public class Mutator extends BaseRepeatedMutator { + public void setNotNull(int index) { + bits.getMutator().setSafe(index, 1); + lastSet = index + 1; + } + + @Override + public void startNewValue(int index) { + for (int i = lastSet; i <= index; i++) { + offsets.getMutator().setSafe(i + 1, offsets.getAccessor().get(i)); + } + setNotNull(index); + lastSet = index + 1; + } + + @Override + public void setValueCount(int valueCount) { + // TODO: populate offset end points + if (valueCount == 0) { + offsets.getMutator().setValueCount(0); + } else { + for (int i = lastSet; i < valueCount; i++) { + offsets.getMutator().setSafe(i + 1, offsets.getAccessor().get(i)); + } + offsets.getMutator().setValueCount(valueCount + 1); + } + final int childValueCount = valueCount == 0 ? 0 : offsets.getAccessor().get(valueCount); + vector.getMutator().setValueCount(childValueCount); + bits.getMutator().setValueCount(valueCount); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java index 603776daba9..d55b1d39baa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java @@ -275,83 +275,86 @@ private void consumeEntireNextValue() throws IOException { private void writeData(MapWriter map, FieldSelection selection, boolean moveForward) throws IOException { // map.start(); - outside: while (true) { - - JsonToken t; - if(moveForward){ - t = parser.nextToken(); - }else{ - t = parser.getCurrentToken(); - moveForward = true; - } + try { + outside: + while (true) { + + JsonToken t; + if (moveForward) { + t = parser.nextToken(); + } else { + t = parser.getCurrentToken(); + moveForward = true; + } - if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) { - return; - } + if (t == JsonToken.NOT_AVAILABLE || t == JsonToken.END_OBJECT) { + return; + } - assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME but got %s.", t.name()); + assert t == JsonToken.FIELD_NAME : String.format("Expected FIELD_NAME but got %s.", t.name()); - final String fieldName = parser.getText(); - this.currentFieldName = fieldName; - FieldSelection childSelection = selection.getChild(fieldName); - if (childSelection.isNeverValid()) { - consumeEntireNextValue(); - continue outside; - } - - switch (parser.nextToken()) { - case START_ARRAY: - writeData(map.list(fieldName)); - break; - case START_OBJECT: - if (!writeMapDataIfTyped(map, fieldName)) { - writeData(map.map(fieldName), childSelection, false); + final String fieldName = parser.getText(); + this.currentFieldName = fieldName; + FieldSelection childSelection = selection.getChild(fieldName); + if (childSelection.isNeverValid()) { + consumeEntireNextValue(); + continue outside; } - break; - case END_OBJECT: - break outside; - case VALUE_FALSE: { - map.bit(fieldName).writeBit(0); - atLeastOneWrite = true; - break; - } - case VALUE_TRUE: { - map.bit(fieldName).writeBit(1); - atLeastOneWrite = true; - break; - } - case VALUE_NULL: - // do nothing as we don't have a type. - break; - case VALUE_NUMBER_FLOAT: - map.float8(fieldName).writeFloat8(parser.getDoubleValue()); - atLeastOneWrite = true; - break; - case VALUE_NUMBER_INT: - if (this.readNumbersAsDouble) { - map.float8(fieldName).writeFloat8(parser.getDoubleValue()); + switch (parser.nextToken()) { + case START_ARRAY: + writeData(map.list(fieldName)); + break; + case START_OBJECT: + if (!writeMapDataIfTyped(map, fieldName)) { + writeData(map.map(fieldName), childSelection, false); + } + break; + case END_OBJECT: + break outside; + + case VALUE_FALSE: { + map.bit(fieldName).writeBit(0); + atLeastOneWrite = true; + break; } - else { - map.bigInt(fieldName).writeBigInt(parser.getLongValue()); + case VALUE_TRUE: { + map.bit(fieldName).writeBit(1); + atLeastOneWrite = true; + break; + } + case VALUE_NULL: + // do nothing as we don't have a type. + break; + case VALUE_NUMBER_FLOAT: + map.float8(fieldName).writeFloat8(parser.getDoubleValue()); + atLeastOneWrite = true; + break; + case VALUE_NUMBER_INT: + if (this.readNumbersAsDouble) { + map.float8(fieldName).writeFloat8(parser.getDoubleValue()); + } else { + map.bigInt(fieldName).writeBigInt(parser.getLongValue()); + } + atLeastOneWrite = true; + break; + case VALUE_STRING: + handleString(parser, map, fieldName); + atLeastOneWrite = true; + break; + + default: + throw + getExceptionWithContext( + UserException.dataReadError(), currentFieldName, null) + .message("Unexpected token %s", parser.getCurrentToken()) + .build(logger); } - atLeastOneWrite = true; - break; - case VALUE_STRING: - handleString(parser, map, fieldName); - atLeastOneWrite = true; - break; - default: - throw - getExceptionWithContext( - UserException.dataReadError(), currentFieldName, null) - .message("Unexpected token %s", parser.getCurrentToken()) - .build(logger); } - + } finally { + map.end(); } - map.end(); } @@ -463,8 +466,8 @@ private void handleString(JsonParser parser, ListWriter writer) throws IOExcepti writer.varChar().writeVarChar(0, workingBuffer.prepareVarCharHolder(parser.getText()), workingBuffer.getBuf()); } - private void writeData(ListWriter list) { - list.start(); + private void writeData(ListWriter list) throws IOException { + list.startList(); outside: while (true) { try { switch (parser.nextToken()) { @@ -523,12 +526,12 @@ private void writeData(ListWriter list) { throw getExceptionWithContext(e, this.currentFieldName, null).build(logger); } } - list.end(); + list.endList(); } private void writeDataAllText(ListWriter list) throws IOException { - list.start(); + list.startList(); outside: while (true) { switch (parser.nextToken()) { @@ -562,7 +565,7 @@ private void writeDataAllText(ListWriter list) throws IOException { .build(logger); } } - list.end(); + list.endList(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java index 8309bf34702..6ff7d905f90 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonWriter.java @@ -47,6 +47,10 @@ public JsonWriter(OutputStream out, boolean pretty, boolean useExtendedOutput) t } + public JsonWriter(JsonOutput gen) { + this.gen = gen; + } + public void write(FieldReader reader) throws JsonGenerationException, IOException{ writeValue(reader); gen.flush(); @@ -109,7 +113,11 @@ private void writeValue(FieldReader reader) throws JsonGenerationException, IOEx case LIST: // this is a pseudo class, doesn't actually contain the real reader so we have to drop down. - writeValue(reader.reader()); + gen.writeStartArray(); + while (reader.next()) { + writeValue(reader.reader()); + } + gen.writeEndArray(); break; case MAP: gen.writeStartObject(); @@ -125,6 +133,7 @@ private void writeValue(FieldReader reader) throws JsonGenerationException, IOEx gen.writeEndObject(); break; case NULL: + case LATE: gen.writeUntypedNull(); break; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java index 22addc97e2d..fea326c4c96 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/AbstractBaseReader.java @@ -21,6 +21,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.Types; +import org.apache.drill.exec.expr.holders.UnionHolder; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.vector.complex.reader.FieldReader; @@ -71,4 +72,20 @@ public boolean next() { public int size() { throw new IllegalStateException("The current reader doesn't support getting size information."); } + + @Override + public void read(UnionHolder holder) { + holder.reader = this; + holder.isSet = this.isSet() ? 1 : 0; + } + + @Override + public void read(int index, UnionHolder holder) { + throw new IllegalStateException("The current reader doesn't support reading union type"); + } + + @Override + public void copyAsValue(UnionWriter writer) { + throw new IllegalStateException("The current reader doesn't support reading union type"); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java index 88a56f8707e..23bcfcb635d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java @@ -38,13 +38,19 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWri Mode mode = Mode.INIT; private final String name; + private final boolean unionEnabled; private enum Mode { INIT, MAP, LIST }; - public ComplexWriterImpl(String name, MapVector container){ + public ComplexWriterImpl(String name, MapVector container, boolean unionEnabled){ super(null); this.name = name; this.container = container; + this.unionEnabled = unionEnabled; + } + + public ComplexWriterImpl(String name, MapVector container){ + this(name, container, false); } @Override @@ -120,7 +126,7 @@ public MapWriter directMap(){ case INIT: MapVector map = (MapVector) container; - mapRoot = new SingleMapWriter(map, this); + mapRoot = new SingleMapWriter(map, this, unionEnabled, false); mapRoot.setPosition(idx()); mode = Mode.MAP; break; @@ -141,7 +147,7 @@ public MapWriter rootAsMap() { case INIT: MapVector map = container.addOrGet(name, Types.required(MinorType.MAP), MapVector.class); - mapRoot = new SingleMapWriter(map, this); + mapRoot = new SingleMapWriter(map, this, unionEnabled, false); mapRoot.setPosition(idx()); mode = Mode.MAP; break; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/UnionListReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/UnionListReader.java new file mode 100644 index 00000000000..fef33259bd3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/UnionListReader.java @@ -0,0 +1,90 @@ +/******************************************************************************* + + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.vector.complex.impl; + +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.expr.holders.UnionHolder; +import org.apache.drill.exec.vector.UInt4Vector; +import org.apache.drill.exec.vector.complex.ListVector; +import org.apache.drill.exec.vector.complex.reader.FieldReader; + +public class UnionListReader extends AbstractFieldReader { + + private ListVector vector; + private UnionVector data; + private UInt4Vector offsets; + private UnionReader reader; + + public UnionListReader(ListVector vector) { + this.vector = vector; + this.data = vector.getDataVector(); + this.offsets = vector.getOffsetVector(); + this.reader = (UnionReader) data.getReader(); + } + + @Override + public boolean isSet() { + return true; + } + + @Override + public MajorType getType() { + return reader.getType(); + } + + private int currentOffset; + private int maxOffset; + + @Override + public void setPosition(int index) { + super.setPosition(index); + currentOffset = offsets.getAccessor().get(index) - 1; + maxOffset = offsets.getAccessor().get(index + 1); + } + + @Override + public FieldReader reader() { + return reader; + } + + @Override + public Object readObject() { + return vector.getAccessor().getObject(idx()); + } + + @Override + public void read(int index, UnionHolder holder) { + setPosition(idx()); + for (int i = -1; i < index; i++) { + next(); + } + holder.reader = reader; + holder.isSet = reader.isSet() ? 1 : 0; + } + + @Override + public boolean next() { + if (currentOffset + 1 < maxOffset) { + reader.setPosition(++currentOffset); + return true; + } else { + return false; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java index 95c651f5186..6bc2e05a571 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java @@ -33,11 +33,15 @@ public class VectorContainerWriter extends AbstractFieldWriter implements Comple private final SpecialMapVector mapVector; private final OutputMutator mutator; - public VectorContainerWriter(OutputMutator mutator) { + public VectorContainerWriter(OutputMutator mutator, boolean unionEnabled) { super(null); this.mutator = mutator; mapVector = new SpecialMapVector(mutator.getCallBack()); - mapRoot = new SingleMapWriter(mapVector, this); + mapRoot = new SingleMapWriter(mapVector, this, unionEnabled, false); + } + + public VectorContainerWriter(OutputMutator mutator) { + this(mutator, false); } @Override diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java index e3591b660bf..51b909b366e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestOutputMutator.java @@ -98,4 +98,8 @@ public CallBack getCallBack() { return null; } + public VectorContainer getContainer() { + return container; + } + } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java index bd9cea16ad7..0ae8945c8aa 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java @@ -104,6 +104,7 @@ public void testSplitAndTransferFailure() throws Exception { .baselineValues(listOf(testVal)) .go(); + test("select flatten(config) as flat from cp.`/store/json/null_list_v2.json`"); testBuilder() .sqlQuery("select flatten(config) as flat from cp.`/store/json/null_list_v2.json`") .ordered() @@ -377,5 +378,108 @@ private void testExistentColumns(RecordBatchLoader batchLoader) throws SchemaCha assertEquals("[4,5,6]", vw.getValueVector().getAccessor().getObject(2).toString()); } + @Test + public void testSelectStarWithUnionType() throws Exception { + try { + String query = "select * from cp.`jsoninput/union/a.json`"; + testBuilder() + .sqlQuery(query) + .ordered() + .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true") + .baselineColumns("field1", "field2") + .baselineValues( + 1L, 1.2 + ) + .baselineValues( + listOf(2L), 1.2 + ) + .baselineValues( + mapOf("inner1", 3L, "inner2", 4L), listOf(3L, 4.0, "5") + ) + .baselineValues( + mapOf("inner1", 3L, + "inner2", listOf( + mapOf( + "innerInner1", 1L, + "innerInner2", + listOf( + 3L, + "a" + ) + ) + ) + ), + listOf( + mapOf("inner3", 7L), + 4.0, + "5", + mapOf("inner4", 9L), + listOf( + mapOf( + "inner5", 10L, + "inner6", 11L + ), + mapOf( + "inner5", 12L, + "inner7", 13L + ) + ) + ) + ).go(); + } finally { + testNoResult("alter session set `exec.enable_union_type` = false"); + } + } + + @Test + public void testSelectFromListWithCase() throws Exception { + String query = "select a from (select case when typeOf(field2) = type('list') then asBigInt(field2[4][1].inner7) end a from cp.`jsoninput/union/a.json`) where a is not null"; + try { + testBuilder() + .sqlQuery(query) + .ordered() + .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true") + .baselineColumns("a") + .baselineValues(13L) + .go(); + } finally { + testNoResult("alter session set `exec.enable_union_type` = false"); + } + } + + @Test + public void testTypeCase() throws Exception { + String query = "select case typeOf(field1) when type('bigint') then asBigInt(field1) when type('list') then asBigInt(field1[0]) when type('map') then asBigInt(t.field1.inner1) end f1 from cp.`jsoninput/union/a.json` t"; + try { + testBuilder() + .sqlQuery(query) + .ordered() + .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true") + .baselineColumns("f1") + .baselineValues(1L) + .baselineValues(2L) + .baselineValues(3L) + .baselineValues(3L) + .go(); + } finally { + testNoResult("alter session set `exec.enable_union_type` = false"); + } + } + + @Test + public void testSumWithTypeCase() throws Exception { + String query = "select sum(f1) sum_f1 from (select case typeOf(field1) when type('bigint') then asBigInt(field1) when type('list') then asBigInt(field1[0]) when type('map') then asBigInt(t.field1.inner1) end f1 from cp.`jsoninput/union/a.json` t)"; + try { + testBuilder() + .sqlQuery(query) + .ordered() + .optionSettingQueriesForTestQuery("alter session set `exec.enable_union_type` = true") + .baselineColumns("sum_f1") + .baselineValues(9L) + .go(); + } finally { + testNoResult("alter session set `exec.enable_union_type` = false"); + } + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java index 023dc088a02..51abab305bc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestRepeated.java @@ -139,12 +139,12 @@ public void listOfList() throws IOException { { final MapWriter map = writer.rootAsMap(); final ListWriter list = map.list("a"); - list.start(); + list.startList(); final ListWriter innerList = list.list(); final IntWriter innerInt = innerList.integer(); - innerList.start(); + innerList.startList(); final IntHolder holder = new IntHolder(); @@ -155,16 +155,16 @@ public void listOfList() throws IOException { holder.value = 3; innerInt.write(holder); - innerList.end(); - innerList.start(); + innerList.endList(); + innerList.startList(); holder.value = 4; innerInt.write(holder); holder.value = 5; innerInt.write(holder); - innerList.end(); - list.end(); + innerList.endList(); + list.endList(); final IntWriter numCol = map.integer("nums"); holder.value = 14; @@ -192,12 +192,12 @@ public void listOfList() throws IOException { final MapWriter map = writer.rootAsMap(); final ListWriter list = map.list("a"); - list.start(); + list.startList(); final ListWriter innerList = list.list(); final IntWriter innerInt = innerList.integer(); - innerList.start(); + innerList.startList(); final IntHolder holder = new IntHolder(); @@ -208,16 +208,16 @@ public void listOfList() throws IOException { holder.value = -3; innerInt.write(holder); - innerList.end(); - innerList.start(); + innerList.endList(); + innerList.startList(); holder.value = -4; innerInt.write(holder); holder.value = -5; innerInt.write(holder); - innerList.end(); - list.end(); + innerList.endList(); + list.endList(); final IntWriter numCol = map.integer("nums"); holder.value = -28; diff --git a/exec/java-exec/src/test/resources/jsoninput/union/a.json b/exec/java-exec/src/test/resources/jsoninput/union/a.json new file mode 100644 index 00000000000..438dba3c685 --- /dev/null +++ b/exec/java-exec/src/test/resources/jsoninput/union/a.json @@ -0,0 +1,52 @@ +{ + field1: 1, + field2: 1.2 +} +{ + field1: [ + 2 + ], + field2: 1.2 +} +{ + field1: { + inner1: 3, + inner2: 4 + }, + field2: [ + 3, + 4.0, + "5" + ] +} +{ + field1: { + inner1: 3, + inner2: [ + { + innerInner1: 1, + innerInner2: [ + 3, + "a" + ] + } + ] + }, + field2: [ + { + inner3: 7 + }, + 4.0, + "5", + { + inner4: 9 + }, + [{ + inner5: 10, + inner6: 11 + },{ + inner5: 12, + inner7: 13 + }] + ] +} \ No newline at end of file diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java index 35890ec2eb5..6f0a9607cf7 100644 --- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java +++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/TestJdbcDistQuery.java @@ -70,6 +70,12 @@ public void testSimpleQuerySingleFile() throws Exception{ + "from dfs_test.`%s/../../sample-data/regionsSF/`", WORKING_PATH)); } + @Test + public void testUnionType() throws Exception{ + testQuery(String.format("select * " + + "from dfs.tmp.`sample.json`", WORKING_PATH)); + } + @Test public void testSimpleQueryMultiFile() throws Exception{ diff --git a/protocol/src/main/java/org/apache/drill/common/types/MinorType.java b/protocol/src/main/java/org/apache/drill/common/types/MinorType.java index 423ed533329..16bd53e90df 100644 --- a/protocol/src/main/java/org/apache/drill/common/types/MinorType.java +++ b/protocol/src/main/java/org/apache/drill/common/types/MinorType.java @@ -58,7 +58,8 @@ public enum MinorType implements com.dyuproject.protostuff.EnumLite INTERVALYEAR(38), INTERVALDAY(39), LIST(40), - GENERIC_OBJECT(41); + GENERIC_OBJECT(41), + UNION(42); public final int number; @@ -113,6 +114,7 @@ public static MinorType valueOf(int number) case 39: return INTERVALDAY; case 40: return LIST; case 41: return GENERIC_OBJECT; + case 42: return UNION; default: return null; } } diff --git a/protocol/src/main/java/org/apache/drill/common/types/TypeProtos.java b/protocol/src/main/java/org/apache/drill/common/types/TypeProtos.java index 74ac444485d..be7fe7443d6 100644 --- a/protocol/src/main/java/org/apache/drill/common/types/TypeProtos.java +++ b/protocol/src/main/java/org/apache/drill/common/types/TypeProtos.java @@ -317,6 +317,10 @@ public enum MinorType * GENERIC_OBJECT = 41; */ GENERIC_OBJECT(36, 41), + /** + * UNION = 42; + */ + UNION(37, 42), ; /** @@ -606,6 +610,10 @@ public enum MinorType * GENERIC_OBJECT = 41; */ public static final int GENERIC_OBJECT_VALUE = 41; + /** + * UNION = 42; + */ + public static final int UNION_VALUE = 42; public final int getNumber() { return value; } @@ -649,6 +657,7 @@ public static MinorType valueOf(int value) { case 39: return INTERVALDAY; case 40: return LIST; case 41: return GENERIC_OBJECT; + case 42: return UNION; default: return null; } } @@ -1780,7 +1789,7 @@ public Builder clearTimeZone() { "inor_type\030\001 \001(\0162\021.common.MinorType\022\036\n\004mo" + "de\030\002 \001(\0162\020.common.DataMode\022\r\n\005width\030\003 \001(" + "\005\022\021\n\tprecision\030\004 \001(\005\022\r\n\005scale\030\005 \001(\005\022\020\n\010t" + - "imeZone\030\006 \001(\005*\212\004\n\tMinorType\022\010\n\004LATE\020\000\022\007\n" + + "imeZone\030\006 \001(\005*\225\004\n\tMinorType\022\010\n\004LATE\020\000\022\007\n" + "\003MAP\020\001\022\013\n\007TINYINT\020\003\022\014\n\010SMALLINT\020\004\022\007\n\003INT" + "\020\005\022\n\n\006BIGINT\020\006\022\014\n\010DECIMAL9\020\007\022\r\n\tDECIMAL1" + "8\020\010\022\023\n\017DECIMAL28SPARSE\020\t\022\023\n\017DECIMAL38SPA" + @@ -1793,9 +1802,10 @@ public Builder clearTimeZone() { "\036\022\t\n\005UINT4\020\037\022\t\n\005UINT8\020 \022\022\n\016DECIMAL28DENS" + "E\020!\022\022\n\016DECIMAL38DENSE\020\"\022\010\n\004NULL\020%\022\020\n\014INT" + "ERVALYEAR\020&\022\017\n\013INTERVALDAY\020\'\022\010\n\004LIST\020(\022\022" + - "\n\016GENERIC_OBJECT\020)*4\n\010DataMode\022\014\n\010OPTION" + - "AL\020\000\022\014\n\010REQUIRED\020\001\022\014\n\010REPEATED\020\002B-\n\035org." + - "apache.drill.common.typesB\nTypeProtosH\001" + "\n\016GENERIC_OBJECT\020)\022\t\n\005UNION\020**4\n\010DataMod" + + "e\022\014\n\010OPTIONAL\020\000\022\014\n\010REQUIRED\020\001\022\014\n\010REPEATE" + + "D\020\002B-\n\035org.apache.drill.common.typesB\nTy", + "peProtosH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/protocol/src/main/protobuf/Types.proto b/protocol/src/main/protobuf/Types.proto index d93bcb53674..36899f6502b 100644 --- a/protocol/src/main/protobuf/Types.proto +++ b/protocol/src/main/protobuf/Types.proto @@ -64,6 +64,7 @@ enum MinorType { INTERVALDAY = 39; // Interval type specifying DAY to SECONDS LIST = 40; GENERIC_OBJECT = 41; + UNION = 42; } message MajorType { From a794f961ddc3b17826d7ef6814d37821341e4f37 Mon Sep 17 00:00:00 2001 From: Steven Phillips Date: Thu, 1 Oct 2015 12:00:50 -0700 Subject: [PATCH 2/2] Remove old code that causes compilation failure --- .../drill/exec/physical/base/AbstractPhysicalVisitor.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index d065a6d40aa..a3b5f27ec80 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -40,7 +40,6 @@ import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.config.UnionExchange; -import org.apache.drill.exec.physical.config.UnionTypeReducer; import org.apache.drill.exec.physical.config.UnorderedReceiver; import org.apache.drill.exec.physical.config.Values; import org.apache.drill.exec.physical.config.WindowPOP; @@ -216,11 +215,6 @@ public T visitIteratorValidator(IteratorValidator op, X value) throws E { return visitOp(op, value); } - @Override - public T visitUnionTypeReducer(UnionTypeReducer op, X value) throws E { - return visitOp(op, value); - } - @Override public T visitValues(Values op, X value) throws E { return visitOp(op, value);