From 825175086bdb0d1b26ca084ba4e741e4dca637c9 Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Mon, 7 Nov 2016 14:10:16 +0800 Subject: [PATCH 1/8] HIVE-15112: Implement Parquet vectorization reader for Complex types Refactor UT --- .../vector/VectorizedParquetColumnReader.java | 13 + .../vector/VectorizedParquetMapReader.java | 31 ++ .../vector/VectorizedParquetRecordReader.java | 83 ++- ...a => VectorizedPrimitiveColumnReader.java} | 134 +++-- .../vector/VectorizedStructReader.java | 36 ++ .../parquet/TestVectorizedColumnReader.java | 406 +------------- .../TestVectorizedColumnReaderBase.java | 497 ++++++++++++++++++ ...torizedDictionaryEncodingColumnReader.java | 57 ++ 8 files changed, 821 insertions(+), 436 deletions(-) create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetColumnReader.java create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetMapReader.java rename ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/{VectorizedColumnReader.java => VectorizedPrimitiveColumnReader.java} (84%) create mode 100644 ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java create mode 100644 ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java create mode 100644 ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetColumnReader.java new file mode 100644 index 000000000000..ec492793da6d --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetColumnReader.java @@ -0,0 +1,13 @@ +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.io.IOException; + +public interface VectorizedParquetColumnReader { + void readBatch( + int total, + ColumnVector column, + TypeInfo columnType) throws IOException; +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetMapReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetMapReader.java new file mode 100644 index 000000000000..fbdaa8f49e9a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetMapReader.java @@ -0,0 +1,31 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.io.IOException; + +public class VectorizedParquetMapReader implements VectorizedParquetColumnReader{ + + @Override + public void readBatch( + int total, + ColumnVector column, + TypeInfo columnType) throws IOException { + + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index f94c49aaf1e7..bec322ef610e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; @@ -35,7 +37,9 @@ import org.apache.parquet.hadoop.ParquetInputSplit; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,8 +47,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; @@ -73,7 +79,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase * For each request column, the reader to read this column. This is NULL if this column * is missing from the file, in which case we populate the attribute with NULL. */ - private VectorizedColumnReader[] columnReaders; + private VectorizedParquetColumnReader[] columnReaders; /** * The number of rows that have been returned. @@ -278,12 +284,77 @@ private void checkEndOfRowGroup() throws IOException { } List columns = requestedSchema.getColumns(); List types = requestedSchema.getFields(); - columnReaders = new VectorizedColumnReader[columns.size()]; - for (int i = 0; i < columns.size(); ++i) { - columnReaders[i] = - new VectorizedColumnReader(columns.get(i), pages.getPageReader(columns.get(i)), - skipTimestampConversion, types.get(i)); + columnReaders = new VectorizedParquetColumnReader[columns.size()]; + for (int i = 0; i < columnTypesList.size(); ++i) { + columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(i), types.get(i), pages, + requestedSchema, skipTimestampConversion); } totalCountLoadedSoFar += pages.getRowCount(); } + + private List getAllColumnDescriptorByType( + int depth, + Type type, + List columns) { + List res = new ArrayList<>(); + for (ColumnDescriptor descriptor : columns) { + if (type.getName().equals(descriptor.getPath()[depth])) { + res.add(descriptor); + } + } + return res; + } + + // Build VectorizedParquetColumnReader via Hive typeInfo and Parquet schema + private VectorizedParquetColumnReader buildVectorizedParquetReader( + TypeInfo typeInfo, + Type type, + PageReadStore pages, + MessageType schema, + boolean skipTimestampConversion) throws IOException { + return buildVectorizedParquetReader(typeInfo, type, pages, schema.getColumns(), skipTimestampConversion, + 0); + } + + private VectorizedParquetColumnReader buildVectorizedParquetReader( + TypeInfo typeInfo, + Type type, + PageReadStore pages, + List columnDescriptors, + boolean skipTimestampConversion, + int depth) throws IOException { + List descriptors = getAllColumnDescriptorByType(depth, type, columnDescriptors); + switch (typeInfo.getCategory()) { + case PRIMITIVE: + if (columnDescriptors == null || columnDescriptors.isEmpty()) { + return null; + } else { + return new VectorizedPrimitiveColumnReader(descriptors.get(0), + pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type); + } + case STRUCT: + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + List fieldReaders = new ArrayList<>(); + List fieldTypes = structTypeInfo.getAllStructFieldTypeInfos(); + List types = type.asGroupType().getFields(); + for (int i = 0; i < fieldTypes.size(); i++) { + VectorizedParquetColumnReader r = + buildVectorizedParquetReader(fieldTypes.get(i), types.get(i), pages, descriptors, + skipTimestampConversion, depth + 1); + if (r != null) { + fieldReaders.add(r); + } + } + if (fieldReaders.size() > 0) { + return new VectorizedStructReader(fieldReaders); + } else { + return null; + } + case LIST: + case MAP: + case UNION: + default: + throw new RuntimeException("Unsupported category " + typeInfo.getCategory().name()); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java similarity index 84% rename from ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java rename to ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java index 5a9c7f922e98..5bc97b271d9b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -19,10 +19,14 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.BytesUtils; @@ -48,6 +52,7 @@ import java.nio.ByteOrder; import java.sql.Timestamp; import java.util.Arrays; +import java.util.List; import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; @@ -57,9 +62,9 @@ * It's column level Parquet reader which is used to read a batch of records for a column, * part of the code is referred from Apache Spark and Apache Parquet. */ -public class VectorizedColumnReader { +public class VectorizedPrimitiveColumnReader implements VectorizedParquetColumnReader{ - private static final Logger LOG = LoggerFactory.getLogger(VectorizedColumnReader.class); + private static final Logger LOG = LoggerFactory.getLogger(VectorizedPrimitiveColumnReader.class); private boolean skipTimestampConversion = false; @@ -108,7 +113,7 @@ public class VectorizedColumnReader { private final ColumnDescriptor descriptor; private final Type type; - public VectorizedColumnReader( + public VectorizedPrimitiveColumnReader( ColumnDescriptor descriptor, PageReader pageReader, boolean skipTimestampConversion, @@ -133,11 +138,10 @@ public VectorizedColumnReader( } } - void readBatch( + public void readBatch( int total, ColumnVector column, TypeInfo columnType) throws IOException { - int rowId = 0; while (total > 0) { // Compute the number of values we want to read in this page. @@ -155,48 +159,96 @@ void readBatch( decodeDictionaryIds(rowId, num, column, dictionaryIds); } else { // assign values in vector - PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType; - switch (primitiveColumnType.getPrimitiveCategory()) { - case INT: - case BYTE: - case SHORT: - readIntegers(num, (LongColumnVector) column, rowId); - break; - case DATE: - case INTERVAL_YEAR_MONTH: - case LONG: - readLongs(num, (LongColumnVector) column, rowId); - break; - case BOOLEAN: - readBooleans(num, (LongColumnVector) column, rowId); - break; - case DOUBLE: - readDoubles(num, (DoubleColumnVector) column, rowId); - break; - case BINARY: - case STRING: - case CHAR: - case VARCHAR: - readBinaries(num, (BytesColumnVector) column, rowId); - break; - case FLOAT: - readFloats(num, (DoubleColumnVector) column, rowId); - break; - case DECIMAL: - readDecimal(num, (DecimalColumnVector) column, rowId); - break; - case INTERVAL_DAY_TIME: - case TIMESTAMP: - default: - throw new IOException( - "Unsupported type category: " + primitiveColumnType.getPrimitiveCategory()); - } + readBatchHelper(num, column, columnType, rowId); } rowId += num; total -= num; } } + private void readBatchHelper( + int num, + ColumnVector column, + TypeInfo columnType, + int rowId) throws IOException { + switch (columnType.getCategory()) { + case PRIMITIVE: + PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType; + readBatchForPrimitiveType(num, column, primitiveColumnType, rowId); + break; + case LIST: +// ListTypeInfo listColumnType = (ListTypeInfo) columnType; +// ListColumnVector listColumn = (ListColumnVector) column; +// int offset = listColumn.childCount; +// listColumn.offsets[rowId] = offset; +// listColumn.lengths[] +// TypeInfo listTypeInfos = listColumnType.getListElementTypeInfo(); +// listColumn.childCount = ; + break; + case MAP: + MapTypeInfo mapColumnType = (MapTypeInfo) columnType; + MapColumnVector mapColumn = (MapColumnVector) column; +// for(){ +// +// } + readBatchHelper(num, mapColumn.keys, mapColumnType.getMapKeyTypeInfo(), rowId); + readBatchHelper(num, mapColumn.values, mapColumnType.getMapKeyTypeInfo(), rowId); + break; + case STRUCT: + StructTypeInfo structTypeInfo = (StructTypeInfo) columnType; + StructColumnVector structColumn = (StructColumnVector) column; + List typeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + for (int i = 0; i < typeInfos.size(); i++) { + readBatch(num, structColumn.fields[i], typeInfos.get(i)); + } + break; + case UNION: + break; + default: + throw new IOException("Unsupported category " + columnType.getCategory().name()); + } + } + + private void readBatchForPrimitiveType( + int num, + ColumnVector column, + PrimitiveTypeInfo primitiveColumnType, + int rowId) throws IOException { + switch (primitiveColumnType.getPrimitiveCategory()) { + case INT: + case BYTE: + case SHORT: + readIntegers(num, (LongColumnVector) column, rowId); + break; + case DATE: + case INTERVAL_YEAR_MONTH: + case LONG: + readLongs(num, (LongColumnVector) column, rowId); + break; + case BOOLEAN: + readBooleans(num, (LongColumnVector) column, rowId); + break; + case DOUBLE: + readDoubles(num, (DoubleColumnVector) column, rowId); + break; + case BINARY: + case STRING: + case CHAR: + case VARCHAR: + readBinaries(num, (BytesColumnVector) column, rowId); + break; + case FLOAT: + readFloats(num, (DoubleColumnVector) column, rowId); + break; + case DECIMAL: + readDecimal(num, (DecimalColumnVector) column, rowId); + case INTERVAL_DAY_TIME: + case TIMESTAMP: + default: + throw new IOException("Unsupported"); + } + } + private void readDictionaryIDs( int total, LongColumnVector c, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java new file mode 100644 index 000000000000..d5f8859c4b42 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java @@ -0,0 +1,36 @@ +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +import java.io.IOException; +import java.util.List; + +public class VectorizedStructReader implements VectorizedParquetColumnReader{ + + List fieldReaders; + + public VectorizedStructReader(List fieldReaders) { + this.fieldReaders = fieldReaders; + } + + @Override + public void readBatch( + int total, + ColumnVector column, + TypeInfo columnType) throws IOException { + StructColumnVector structColumnVector = (StructColumnVector) column; + StructTypeInfo structTypeInfo = (StructTypeInfo) columnType; + ColumnVector[] vectors = structColumnVector.fields; + for (int i = 0; i < vectors.length; i++) { + fieldReaders.get(i) + .readBatch(total, vectors[i], structTypeInfo.getAllStructFieldTypeInfos().get(i)); + structColumnVector.isRepeating = structColumnVector.isRepeating && vectors[i].isRepeating; + for (int j = 0; j < vectors[i].isNull.length; j++) { + structColumnVector.isNull[i] = structColumnVector.isNull[i] && vectors[i].isNull[i]; + } + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java index 276ff1925d6e..40dc53eab86f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -1,429 +1,57 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.hadoop.hive.ql.io.parquet; -import org.apache.commons.lang.ArrayUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.io.IOConstants; -import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; -import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; -import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.MapWork; -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapreduce.Job; -import org.apache.parquet.example.data.Group; -import org.apache.parquet.example.data.simple.SimpleGroupFactory; -import org.apache.parquet.hadoop.ParquetInputFormat; -import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.example.GroupReadSupport; -import org.apache.parquet.hadoop.example.GroupWriteSupport; -import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.MessageType; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; -import java.util.List; -import java.util.Random; - -import static junit.framework.Assert.assertTrue; -import static junit.framework.TestCase.assertFalse; -import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; -import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA; -import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; -import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; -import static org.junit.Assert.assertEquals; - -public class TestVectorizedColumnReader { - - private static final int nElements = 2500; - protected static final Configuration conf = new Configuration(); - protected static final Path file = - new Path("target/test/TestParquetVectorReader/testParquetFile"); - private static String[] uniqueStrs = new String[nElements]; - private static boolean[] isNulls = new boolean[nElements]; - private static Random random = new Random(); - protected static final MessageType schema = parseMessageType( - "message test { " - + "required int32 int32_field; " - + "required int64 int64_field; " - + "required int96 int96_field; " - + "required double double_field; " - + "required float float_field; " - + "required boolean boolean_field; " - + "required fixed_len_byte_array(3) flba_field; " - + "optional fixed_len_byte_array(1) some_null_field; " - + "optional fixed_len_byte_array(1) all_null_field; " - + "optional binary binary_field; " - + "optional binary binary_field_non_repeating; " - + "} "); - @AfterClass - public static void cleanup() throws IOException { - FileSystem fs = file.getFileSystem(conf); - if (fs.exists(file)) { - fs.delete(file, true); - } - } +public class TestVectorizedColumnReader extends TestVectorizedColumnReaderBase{ + static boolean isDictionaryEncoding = false; @BeforeClass - public static void prepareFile() throws IOException { - cleanup(); - - boolean dictionaryEnabled = true; - boolean validating = false; - GroupWriteSupport.setSchema(schema, conf); - SimpleGroupFactory f = new SimpleGroupFactory(schema); - ParquetWriter writer = new ParquetWriter( - file, - new GroupWriteSupport(), - GZIP, 1024*1024, 1024, 1024*1024, - dictionaryEnabled, validating, PARQUET_1_0, conf); - writeData(f, writer); - } - - protected static void writeData(SimpleGroupFactory f, ParquetWriter writer) throws IOException { - initialStrings(uniqueStrs); - for (int i = 0; i < nElements; i++) { - Group group = f.newGroup() - .append("int32_field", i) - .append("int64_field", (long) 2 * i) - .append("int96_field", Binary.fromReusedByteArray("999999999999".getBytes())) - .append("double_field", i * 1.0) - .append("float_field", ((float) (i * 2.0))) - .append("boolean_field", i % 5 == 0) - .append("flba_field", "abc"); - - if (i % 2 == 1) { - group.append("some_null_field", "x"); - } - - if (i % 13 != 1) { - int binaryLen = i % 10; - group.append("binary_field", - Binary.fromString(new String(new char[binaryLen]).replace("\0", "x"))); - } - - if (uniqueStrs[i] != null) { - group.append("binary_field_non_repeating", Binary.fromString(uniqueStrs[i])); - } - writer.write(group); - } - writer.close(); + public static void setup() throws IOException { + removeFile(); + writeData(initWriterFromFile(), isDictionaryEncoding); } - private static String getRandomStr() { - int len = random.nextInt(10); - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < len; i++) { - sb.append((char) ('a' + random.nextInt(25))); - } - return sb.toString(); - } - - public static void initialStrings(String[] uniqueStrs) { - for (int i = 0; i < uniqueStrs.length; i++) { - String str = getRandomStr(); - if (!str.isEmpty()) { - uniqueStrs[i] = str; - isNulls[i] = false; - }else{ - isNulls[i] = true; - } - } - } - - private VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf) - throws IOException, InterruptedException, HiveException { - conf.set(PARQUET_READ_SCHEMA, schemaString); - HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); - HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp"); - - Job vectorJob = new Job(conf, "read vector"); - ParquetInputFormat.setInputPaths(vectorJob, file); - ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class); - InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0); - initialVectorizedRowBatchCtx(conf); - return new VectorizedParquetRecordReader(split, new JobConf(conf)); - } - - private void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException { - MapWork mapWork = new MapWork(); - VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx(); - rbCtx.init(createStructObjectInspector(conf), new String[0]); - mapWork.setVectorMode(true); - mapWork.setVectorizedRowBatchCtx(rbCtx); - Utilities.setMapWork(conf, mapWork); - } - - private StructObjectInspector createStructObjectInspector(Configuration conf) { - // Create row related objects - String columnNames = conf.get(IOConstants.COLUMNS); - List columnNamesList = DataWritableReadSupport.getColumnNames(columnNames); - String columnTypes = conf.get(IOConstants.COLUMNS_TYPES); - List columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes); - TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); - return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); + @AfterClass + public static void cleanup() throws IOException { + removeFile(); } @Test public void testIntRead() throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"int32_field"); - conf.set(IOConstants.COLUMNS_TYPES,"int"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - VectorizedParquetRecordReader reader = - createParquetReader("message test { required int32 int32_field;}", conf); - VectorizedRowBatch previous = reader.createValue(); - try { - long c = 0; - while (reader.next(NullWritable.get(), previous)) { - LongColumnVector vector = (LongColumnVector) previous.cols[0]; - assertTrue(vector.noNulls); - for (int i = 0; i < vector.vector.length; i++) { - if(c == nElements){ - break; - } - assertEquals(c, vector.vector[i]); - assertFalse(vector.isNull[i]); - c++; - } - } - assertEquals(nElements, c); - } finally { - reader.close(); - } + intRead(isDictionaryEncoding); } @Test public void testLongRead() throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"int64_field"); - conf.set(IOConstants.COLUMNS_TYPES, "bigint"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - VectorizedParquetRecordReader reader = - createParquetReader("message test { required int64 int64_field;}", conf); - VectorizedRowBatch previous = reader.createValue(); - try { - long c = 0; - while (reader.next(NullWritable.get(), previous)) { - LongColumnVector vector = (LongColumnVector) previous.cols[0]; - assertTrue(vector.noNulls); - for (int i = 0; i < vector.vector.length; i++) { - if(c == nElements){ - break; - } - assertEquals(2 * c, vector.vector[i]); - assertFalse(vector.isNull[i]); - c++; - } - } - assertEquals(nElements, c); - } finally { - reader.close(); - } + longRead(isDictionaryEncoding); } @Test public void testDoubleRead() throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"double_field"); - conf.set(IOConstants.COLUMNS_TYPES, "double"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - VectorizedParquetRecordReader reader = - createParquetReader("message test { required double double_field;}", conf); - VectorizedRowBatch previous = reader.createValue(); - try { - long c = 0; - while (reader.next(NullWritable.get(), previous)) { - DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; - assertTrue(vector.noNulls); - for (int i = 0; i < vector.vector.length; i++) { - if(c == nElements){ - break; - } - assertEquals(1.0 * c, vector.vector[i], 0); - assertFalse(vector.isNull[i]); - c++; - } - } - assertEquals(nElements, c); - } finally { - reader.close(); - } + doubleRead(isDictionaryEncoding); } @Test public void testFloatRead() throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"float_field"); - conf.set(IOConstants.COLUMNS_TYPES, "float"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - VectorizedParquetRecordReader reader = - createParquetReader("message test { required float float_field;}", conf); - VectorizedRowBatch previous = reader.createValue(); - try { - long c = 0; - while (reader.next(NullWritable.get(), previous)) { - DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; - assertTrue(vector.noNulls); - for (int i = 0; i < vector.vector.length; i++) { - if(c == nElements){ - break; - } - assertEquals((float)2.0 * c, vector.vector[i], 0); - assertFalse(vector.isNull[i]); - c++; - } - } - assertEquals(nElements, c); - } finally { - reader.close(); - } + floatRead(isDictionaryEncoding); } @Test public void testBooleanRead() throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"boolean_field"); - conf.set(IOConstants.COLUMNS_TYPES, "boolean"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - VectorizedParquetRecordReader reader = - createParquetReader("message test { required boolean boolean_field;}", conf); - VectorizedRowBatch previous = reader.createValue(); - try { - long c = 0; - while (reader.next(NullWritable.get(), previous)) { - LongColumnVector vector = (LongColumnVector) previous.cols[0]; - assertTrue(vector.noNulls); - for (int i = 0; i < vector.vector.length; i++) { - if(c == nElements){ - break; - } - int e = (c % 5 == 0) ? 1 : 0; - assertEquals(e, vector.vector[i]); - assertFalse(vector.isNull[i]); - c++; - } - } - assertEquals(nElements, c); - } finally { - reader.close(); - } + booleanRead(isDictionaryEncoding); } @Test - public void testBinaryReadDictionaryEncoding() throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"binary_field"); - conf.set(IOConstants.COLUMNS_TYPES, "string"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - VectorizedParquetRecordReader reader = - createParquetReader("message test { required binary binary_field;}", conf); - VectorizedRowBatch previous = reader.createValue(); - int c = 0; - try { - while (reader.next(NullWritable.get(), previous)) { - BytesColumnVector vector = (BytesColumnVector) previous.cols[0]; - boolean noNull = true; - for (int i = 0; i < vector.vector.length; i++) { - if(c == nElements){ - break; - } - if (c % 13 == 1) { - assertTrue(vector.isNull[i]); - } else { - assertFalse(vector.isNull[i]); - int binaryLen = c % 10; - String expected = new String(new char[binaryLen]).replace("\0", "x"); - String actual = new String(ArrayUtils - .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i])); - assertEquals("Failed at " + c, expected, actual); - noNull = false; - } - c++; - } - assertEquals("No Null check failed at " + c, noNull, vector.noNulls); - assertFalse(vector.isRepeating); - } - assertEquals(nElements, c); - } finally { - reader.close(); - } + public void testBinaryRead() throws Exception { + binaryRead(isDictionaryEncoding); } @Test - public void testBinaryRead() throws Exception { - Configuration conf = new Configuration(); - conf.set(IOConstants.COLUMNS,"binary_field_non_repeating"); - conf.set(IOConstants.COLUMNS_TYPES, "string"); - conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); - conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - VectorizedParquetRecordReader reader = - createParquetReader("message test { required binary binary_field_non_repeating;}", conf); - VectorizedRowBatch previous = reader.createValue(); - int c = 0; - try { - while (reader.next(NullWritable.get(), previous)) { - BytesColumnVector vector = (BytesColumnVector) previous.cols[0]; - boolean noNull = true; - for (int i = 0; i < vector.vector.length; i++) { - if(c == nElements){ - break; - } - String actual; - assertEquals("Null assert failed at " + c, isNulls[c], vector.isNull[i]); - if (!vector.isNull[i]) { - actual = new String(ArrayUtils - .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i])); - assertEquals("failed at " + c, uniqueStrs[c], actual); - }else{ - noNull = false; - } - c++; - } - assertEquals("No Null check failed at " + c, noNull, vector.noNulls); - assertFalse(vector.isRepeating); - } - assertEquals("It doesn't exit at expected position", nElements, c); - } finally { - reader.close(); - } + public void testStructRead() throws Exception { + structRead(isDictionaryEncoding); } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java new file mode 100644 index 000000000000..0e89feb7e3df --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java @@ -0,0 +1,497 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import java.io.IOException; +import java.sql.Timestamp; +import java.util.Calendar; +import java.util.List; +import java.util.Random; + +import static junit.framework.Assert.assertTrue; +import static junit.framework.TestCase.assertFalse; +import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; +import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.junit.Assert.assertEquals; + +public class TestVectorizedColumnReaderBase { + + protected final static int nElements = 2500; + protected final static int UNIQUE_NUM = 10; + protected final static int NULL_FREQUENCY = 13; + + protected final static Configuration conf = new Configuration(); + protected final static Path file = new Path("target/test/TestParquetVectorReader/testParquetFile"); + + protected static final MessageType schema = parseMessageType( + "message hive_schema { " + + "required int32 int32_field; " + + "required int64 int64_field; " + + "required int96 int96_field; " + + "required double double_field; " + + "required float float_field; " + + "required boolean boolean_field; " + + "required fixed_len_byte_array(3) flba_field; " + + "optional fixed_len_byte_array(1) some_null_field; " + + "optional fixed_len_byte_array(1) all_null_field; " + + "required binary binary_field; " + + "optional binary binary_field_some_null; " + + "optional group struct_field {" + + " optional int32 a;\n" + + " optional double b;\n" + + "}\n" + + "optional group map_field (MAP) {\n" + + " repeated group map (MAP_KEY_VALUE) {\n" + + " required binary key;\n" + + " optional binary value;\n" + + " }\n" + + "}\n" + + "optional group array_list (LIST) {\n" + + " repeated group bag {\n" + + " optional int32 array_element;\n" + + " }\n" + + "}\n" + + "} "); + + protected static void removeFile() throws IOException { + FileSystem fs = file.getFileSystem(conf); + if (fs.exists(file)) { + fs.delete(file, true); + } + } + + protected static ParquetWriter initWriterFromFile() throws IOException { + GroupWriteSupport.setSchema(schema, conf); + return new ParquetWriter<>( + file, + new GroupWriteSupport(), + GZIP, 1024 * 1024, 1024, 1024 * 1024, + true, false, PARQUET_1_0, conf); + } + + protected static int getIntValue( + boolean isDictionaryEncoding, + int index) { + return isDictionaryEncoding ? index % UNIQUE_NUM : index; + } + + protected static double getDoubleValue( + boolean isDictionaryEncoding, + int index) { + return isDictionaryEncoding ? index % UNIQUE_NUM : index; + } + + protected static long getLongValue( + boolean isDictionaryEncoding, + int index) { + return isDictionaryEncoding ? (long) 2 * index % UNIQUE_NUM : (long) 2 * index; + } + + protected static float getFloatValue( + boolean isDictionaryEncoding, + int index) { + return (float) (isDictionaryEncoding ? index % UNIQUE_NUM * 2.0 : index * 2.0); + } + + protected static boolean getBooleanValue( + float index) { + return (index % 2 == 0); + } + + protected static String getTimestampStr(int index) { + String s = String.valueOf(index); + int l = 4 - s.length(); + for (int i = 0; i < l; i++) { + s = "0" + s; + } + return "99999999" + s; + } + + protected static Binary getTimestamp( + boolean isDictionaryEncoding, + int index) { + String s = isDictionaryEncoding ? getTimestampStr(index % UNIQUE_NUM) : getTimestampStr(index); + return Binary.fromReusedByteArray(s.getBytes()); + } + + protected static String getStr( + boolean isDictionaryEncoding, + int index) { + int binaryLen = isDictionaryEncoding ? index % UNIQUE_NUM : index; + String v = ""; + while (binaryLen > 0) { + char t = (char) ('a' + binaryLen % 26); + binaryLen /= 26; + v = t + v; + } + return v; + } + + protected static Binary getBinaryValue( + boolean isDictionaryEncoding, + int index) { + return Binary.fromString(getStr(isDictionaryEncoding, index)); + } + + protected static boolean isNull(int index) { + return (index % NULL_FREQUENCY == 0); + } + + protected VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf) + throws IOException, InterruptedException, HiveException { + conf.set(PARQUET_READ_SCHEMA, schemaString); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp"); + + Job vectorJob = new Job(conf, "read vector"); + ParquetInputFormat.setInputPaths(vectorJob, file); + ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class); + InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0); + initialVectorizedRowBatchCtx(conf); + return new VectorizedParquetRecordReader(split, new JobConf(conf)); + } + + protected static void writeData(ParquetWriter writer, boolean isDictionaryEncoding) throws IOException { + SimpleGroupFactory f = new SimpleGroupFactory(schema); + for (int i = 0; i < nElements; i++) { + boolean isNull = isNull(i); + int intVal = getIntValue(isDictionaryEncoding, i); + long longVal = getLongValue(isDictionaryEncoding, i); + Binary timeStamp = getTimestamp(isDictionaryEncoding, i); + double doubleVal = getDoubleValue(isDictionaryEncoding, i); + float floatVal = getFloatValue(isDictionaryEncoding, i); + boolean booleanVal = getBooleanValue(i); + Binary binary = getBinaryValue(isDictionaryEncoding, i); + Group group = f.newGroup() + .append("int32_field", intVal) + .append("int64_field", longVal) + .append("int96_field", timeStamp) + .append("double_field", doubleVal) + .append("float_field", floatVal) + .append("boolean_field", booleanVal) + .append("flba_field", "abc"); + + if (!isNull) { + group.append("some_null_field", "x"); + } + + group.append("binary_field", binary); + + if (!isNull) { + group.append("binary_field_some_null", binary); + } + + group.addGroup("struct_field") + .append("a", intVal) + .append("b", doubleVal); + + Group mapGroup = group.addGroup("map_field"); + if (i % 13 != 1) { + mapGroup.addGroup("map").append("key", binary).append("value", "abc"); + } else { + mapGroup.addGroup("map").append("key", binary); + } + + Group arrayGroup = group.addGroup("array_list"); + for (int j = 0; j < i % 4; j++) { + arrayGroup.addGroup("bag").append("array_element", intVal); + } + + writer.write(group); + } + writer.close(); + } + + private void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException { + MapWork mapWork = new MapWork(); + VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx(); + rbCtx.init(createStructObjectInspector(conf), new String[0]); + mapWork.setVectorMode(true); + mapWork.setVectorizedRowBatchCtx(rbCtx); + Utilities.setMapWork(conf, mapWork); + } + + private StructObjectInspector createStructObjectInspector(Configuration conf) { + // Create row related objects + String columnNames = conf.get(IOConstants.COLUMNS); + List columnNamesList = DataWritableReadSupport.getColumnNames(columnNames); + String columnTypes = conf.get(IOConstants.COLUMNS_TYPES); + List columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes); + TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); + return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); + } + + protected void intRead(boolean isDictionaryEncoding) throws InterruptedException, HiveException, IOException { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"int32_field"); + conf.set(IOConstants.COLUMNS_TYPES,"int"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required int32 int32_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals("Failed at " + c, getIntValue(isDictionaryEncoding, c), vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + protected void longRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "int64_field"); + conf.set(IOConstants.COLUMNS_TYPES, "bigint"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required int64 int64_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals("Failed at " + c, getLongValue(isDictionaryEncoding, c), vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + protected void doubleRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "double_field"); + conf.set(IOConstants.COLUMNS_TYPES, "double"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required double double_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals("Failed at " + c, getDoubleValue(isDictionaryEncoding, c), vector.vector[i], + 0); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + protected void floatRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "float_field"); + conf.set(IOConstants.COLUMNS_TYPES, "float"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required float float_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals("Failed at " + c, getFloatValue(isDictionaryEncoding, c), vector.vector[i], + 0); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + protected void booleanRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "boolean_field"); + conf.set(IOConstants.COLUMNS_TYPES, "boolean"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required boolean boolean_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals("Failed at " + c, (getBooleanValue(c) ? 1 : 0), vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + protected void binaryRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "binary_field_some_null"); + conf.set(IOConstants.COLUMNS_TYPES, "string"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required binary binary_field_some_null;}", conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + BytesColumnVector vector = (BytesColumnVector) previous.cols[0]; + boolean noNull = true; + for (int i = 0; i < vector.vector.length; i++) { + if (c == nElements) { + break; + } + String actual; + assertEquals("Null assert failed at " + c, isNull(c), vector.isNull[i]); + if (!vector.isNull[i]) { + actual = new String(ArrayUtils + .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i])); + assertEquals("failed at " + c, getStr(isDictionaryEncoding, c), actual); + } else { + noNull = false; + } + c++; + } + assertEquals("No Null check failed at " + c, noNull, vector.noNulls); + assertFalse(vector.isRepeating); + } + assertEquals("It doesn't exit at expected position", nElements, c); + } finally { + reader.close(); + } + } + + protected void structRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "struct_field"); + conf.set(IOConstants.COLUMNS_TYPES, "struct"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + String schema = "message hive_schema {\n" + "group struct_field {\n" + " optional int32 a;\n" + + " optional double b;\n" + "}\n" + "}\n"; + VectorizedParquetRecordReader reader = createParquetReader(schema, conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + StructColumnVector vector = (StructColumnVector) previous.cols[0]; + LongColumnVector cv = (LongColumnVector) vector.fields[0]; + DoubleColumnVector dv = (DoubleColumnVector) vector.fields[1]; + + for (int i = 0; i < cv.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals(getIntValue(isDictionaryEncoding, c), cv.vector[i]); + assertEquals(getDoubleValue(isDictionaryEncoding, c), dv.vector[i], 0); + assertFalse(vector.isNull[i]); + assertFalse(vector.isRepeating); + c++; + } + } + assertEquals("It doesn't exit at expected position", nElements, c); + } finally { + reader.close(); + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java new file mode 100644 index 000000000000..307dac936a28 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java @@ -0,0 +1,57 @@ +package org.apache.hadoop.hive.ql.io.parquet; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; + +public class TestVectorizedDictionaryEncodingColumnReader extends TestVectorizedColumnReaderBase { + static boolean isDictionaryEncoding = true; + + @BeforeClass + public static void setup() throws IOException { + removeFile(); + writeData(initWriterFromFile(), isDictionaryEncoding); + } + + @AfterClass + public static void cleanup() throws IOException { + removeFile(); + } + + @Test + public void testIntRead() throws Exception { + intRead(isDictionaryEncoding); + } + + @Test + public void testLongRead() throws Exception { + longRead(isDictionaryEncoding); + } + + @Test + public void testDoubleRead() throws Exception { + doubleRead(isDictionaryEncoding); + } + + @Test + public void testFloatRead() throws Exception { + floatRead(isDictionaryEncoding); + } + + @Test + public void testBooleanRead() throws Exception { + booleanRead(isDictionaryEncoding); + } + + @Test + public void testBinaryRead() throws Exception { + binaryRead(isDictionaryEncoding); + } + + @Test + public void testStructRead() throws Exception { + structRead(isDictionaryEncoding); + } +} From d90013b7637fb71254367ae2f88854e62efac76d Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Mon, 21 Nov 2016 06:52:51 +0800 Subject: [PATCH 2/8] Add license header --- .../vector/VectorizedParquetColumnReader.java | 18 ++++++++++++++++++ .../io/parquet/TestVectorizedColumnReader.java | 18 ++++++++++++++++++ ...ctorizedDictionaryEncodingColumnReader.java | 18 ++++++++++++++++++ 3 files changed, 54 insertions(+) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetColumnReader.java index ec492793da6d..084d556e88b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetColumnReader.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.io.parquet.vector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java index 40dc53eab86f..2b89bf692701 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.io.parquet; import org.junit.AfterClass; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java index 307dac936a28..690ddcc2750a 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.io.parquet; import org.junit.AfterClass; From 0a6e822fc4343491f087e493d285ea555baf5977 Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Mon, 21 Nov 2016 14:58:39 +0800 Subject: [PATCH 3/8] Remove list implementation --- .../vector/VectorizedParquetMapReader.java | 31 ------------------- 1 file changed, 31 deletions(-) delete mode 100644 ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetMapReader.java diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetMapReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetMapReader.java deleted file mode 100644 index fbdaa8f49e9a..000000000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetMapReader.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io.parquet.vector; - -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; - -import java.io.IOException; - -public class VectorizedParquetMapReader implements VectorizedParquetColumnReader{ - - @Override - public void readBatch( - int total, - ColumnVector column, - TypeInfo columnType) throws IOException { - - } -} From 2c27477eef4aa3fe8660c8f1e3ba24e64637a21a Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Tue, 29 Nov 2016 01:26:22 +0800 Subject: [PATCH 4/8] Refine code --- .../vector/VectorizedParquetRecordReader.java | 10 +- .../VectorizedPrimitiveColumnReader.java | 26 +- ...es_non_dictionary_encoding_vectorization.q | 5 +- ...on_dictionary_encoding_vectorization.q.out | 357 ++++++++++++++++++ 4 files changed, 369 insertions(+), 29 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index bec322ef610e..4014de4939ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -74,6 +74,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase private List columnNamesList; private List columnTypesList; private VectorizedRowBatchCtx rbCtx; + private List indexColumnsWanted; /** * For each request column, the reader to read this column. This is NULL if this column @@ -204,7 +205,7 @@ public void initialize( columnTypesList); } - List indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); + indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { requestedSchema = DataWritableReadSupport.getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted); @@ -285,9 +286,10 @@ private void checkEndOfRowGroup() throws IOException { List columns = requestedSchema.getColumns(); List types = requestedSchema.getFields(); columnReaders = new VectorizedParquetColumnReader[columns.size()]; - for (int i = 0; i < columnTypesList.size(); ++i) { - columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(i), types.get(i), pages, - requestedSchema, skipTimestampConversion); + for (int i = 0; i < types.size(); ++i) { + columnReaders[i] = + buildVectorizedParquetReader(columnTypesList.get(indexColumnsWanted.get(i)), types.get(i), + pages, requestedSchema, skipTimestampConversion); } totalCountLoadedSoFar += pages.getRowCount(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java index 5bc97b271d9b..ad9d051938f3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -13,18 +13,15 @@ */ package org.apache.hadoop.hive.ql.io.parquet.vector; -import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; -import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -41,7 +38,6 @@ import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder; import org.apache.parquet.io.ParquetDecodingException; -import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +47,6 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.sql.Timestamp; -import java.util.Arrays; import java.util.List; import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; @@ -176,24 +171,6 @@ private void readBatchHelper( PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType; readBatchForPrimitiveType(num, column, primitiveColumnType, rowId); break; - case LIST: -// ListTypeInfo listColumnType = (ListTypeInfo) columnType; -// ListColumnVector listColumn = (ListColumnVector) column; -// int offset = listColumn.childCount; -// listColumn.offsets[rowId] = offset; -// listColumn.lengths[] -// TypeInfo listTypeInfos = listColumnType.getListElementTypeInfo(); -// listColumn.childCount = ; - break; - case MAP: - MapTypeInfo mapColumnType = (MapTypeInfo) columnType; - MapColumnVector mapColumn = (MapColumnVector) column; -// for(){ -// -// } - readBatchHelper(num, mapColumn.keys, mapColumnType.getMapKeyTypeInfo(), rowId); - readBatchHelper(num, mapColumn.values, mapColumnType.getMapKeyTypeInfo(), rowId); - break; case STRUCT: StructTypeInfo structTypeInfo = (StructTypeInfo) columnType; StructColumnVector structColumn = (StructColumnVector) column; @@ -202,8 +179,9 @@ private void readBatchHelper( readBatch(num, structColumn.fields[i], typeInfos.get(i)); } break; + case LIST: + case MAP: case UNION: - break; default: throw new IOException("Unsupported category " + columnType.getCategory().name()); } diff --git a/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q b/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q index 7de444fca7cc..abb0526c909f 100644 --- a/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q +++ b/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q @@ -91,4 +91,7 @@ EXPLAIN SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1; EXPLAIN SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary; -SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary; \ No newline at end of file +SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary; + +EXPLAIN SELECT st1.c1 FROM parquet_types GROUP BY st1.c1; +SELECT st1.c1 FROM parquet_types GROUP BY st1.c1; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q.out b/ql/src/test/results/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q.out index a9f5e480ba4e..55527fe7343f 100644 --- a/ql/src/test/results/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q.out +++ b/ql/src/test/results/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q.out @@ -2450,3 +2450,360 @@ FC 1 FD 1 FE 1 FF 1 +PREHOOK: query: EXPLAIN SELECT st1.c1 FROM parquet_types GROUP BY st1.c1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT st1.c1 FROM parquet_types GROUP BY st1.c1 +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Map Reduce + Map Operator Tree: + TableScan + alias: parquet_types + Statistics: Num rows: 300 Data size: 4200 Basic stats: COMPLETE Column stats: NONE + Select Operator + expressions: st1 (type: struct) + outputColumnNames: st1 + Statistics: Num rows: 300 Data size: 4200 Basic stats: COMPLETE Column stats: NONE + Group By Operator + keys: st1.c1 (type: int) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 300 Data size: 4200 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + key expressions: _col0 (type: int) + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 300 Data size: 4200 Basic stats: COMPLETE Column stats: NONE + Reduce Operator Tree: + Group By Operator + keys: KEY._col0 (type: int) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 150 Data size: 2100 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 150 Data size: 2100 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT st1.c1 FROM parquet_types GROUP BY st1.c1 +PREHOOK: type: QUERY +PREHOOK: Input: default@parquet_types +#### A masked pattern was here #### +POSTHOOK: query: SELECT st1.c1 FROM parquet_types GROUP BY st1.c1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@parquet_types +#### A masked pattern was here #### +1000 +1001 +1002 +1003 +1004 +1005 +1006 +1007 +1008 +1009 +1010 +1011 +1012 +1013 +1014 +1015 +1016 +1017 +1018 +1019 +1020 +1021 +1022 +1023 +1024 +1025 +1026 +1027 +1028 +1029 +1030 +1031 +1032 +1033 +1034 +1035 +1036 +1037 +1038 +1039 +1040 +1041 +1042 +1043 +1044 +1045 +1046 +1047 +1048 +1049 +1050 +1051 +1052 +1053 +1054 +1055 +1056 +1057 +1058 +1059 +1060 +1061 +1062 +1063 +1064 +1065 +1066 +1067 +1068 +1069 +1070 +1071 +1072 +1073 +1074 +1075 +1076 +1077 +1078 +1079 +1080 +1081 +1082 +1083 +1084 +1085 +1086 +1087 +1088 +1089 +1090 +1091 +1092 +1093 +1094 +1095 +1096 +1097 +1098 +1099 +1100 +1101 +1102 +1103 +1104 +1105 +1106 +1107 +1108 +1109 +1110 +1111 +1112 +1113 +1114 +1115 +1116 +1117 +1118 +1119 +1120 +1121 +1122 +1123 +1124 +1125 +1126 +1127 +1128 +1129 +1130 +1131 +1132 +1133 +1134 +1135 +1136 +1137 +1138 +1139 +1140 +1141 +1142 +1143 +1144 +1145 +1146 +1147 +1148 +1149 +1150 +1151 +1152 +1153 +1154 +1155 +1156 +1157 +1158 +1159 +1160 +1161 +1162 +1163 +1164 +1165 +1166 +1167 +1168 +1169 +1170 +1171 +1172 +1173 +1174 +1175 +1176 +1177 +1178 +1179 +1180 +1181 +1182 +1183 +1184 +1185 +1186 +1187 +1188 +1189 +1190 +1191 +1192 +1193 +1194 +1195 +1196 +1197 +1198 +1199 +1200 +1201 +1202 +1203 +1204 +1205 +1206 +1207 +1208 +1209 +1210 +1211 +1212 +1213 +1214 +1215 +1216 +1217 +1218 +1219 +1220 +1221 +1222 +1223 +1224 +1225 +1226 +1227 +1228 +1229 +1230 +1231 +1232 +1233 +1234 +1235 +1236 +1237 +1238 +1239 +1240 +1241 +1242 +1243 +1244 +1245 +1246 +1247 +1248 +1249 +1250 +1251 +1252 +1253 +1254 +1255 +1256 +1257 +1258 +1259 +1260 +1261 +1262 +1263 +1264 +1265 +1266 +1267 +1268 +1269 +1270 +1271 +1272 +1273 +1274 +1275 +1276 +1277 +1278 +1279 +1280 +1281 +1282 +1283 +1284 +1285 +1286 +1287 +1288 +1289 +1290 +1291 +1292 +1293 +1294 +1295 +1296 +1297 +1298 +1299 From cf08512d44b91b2224a6fb2790b850c0e66dedb9 Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Wed, 30 Nov 2016 07:46:43 +0800 Subject: [PATCH 5/8] Address comments --- ...eader.java => VectorizedColumnReader.java} | 10 +- .../vector/VectorizedParquetRecordReader.java | 40 +- .../VectorizedPrimitiveColumnReader.java | 30 +- .../vector/VectorizedStructReader.java | 6 +- .../parquet/TestVectorizedColumnReader.java | 6 + .../TestVectorizedColumnReaderBase.java | 101 ++++- ...torizedDictionaryEncodingColumnReader.java | 5 + ...es_non_dictionary_encoding_vectorization.q | 5 +- ...on_dictionary_encoding_vectorization.q.out | 357 ------------------ 9 files changed, 147 insertions(+), 413 deletions(-) rename ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/{VectorizedParquetColumnReader.java => VectorizedColumnReader.java} (76%) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java similarity index 76% rename from ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetColumnReader.java rename to ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java index 084d556e88b2..e3be982730e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedColumnReader.java @@ -23,7 +23,15 @@ import java.io.IOException; -public interface VectorizedParquetColumnReader { +public interface VectorizedColumnReader { + /** + * read records with specified size and type into the columnVector + * + * @param total number of records to read into the column vector + * @param column column vector where the reader will read data into + * @param columnType the type of column vector + * @throws IOException + */ void readBatch( int total, ColumnVector column, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 4014de4939ab..9bbccccb6969 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -23,13 +23,13 @@ import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.parquet.ParquetRuntimeException; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.filter2.compat.FilterCompat; @@ -37,9 +37,8 @@ import org.apache.parquet.hadoop.ParquetInputSplit; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,10 +46,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Set; import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; @@ -80,7 +77,7 @@ public class VectorizedParquetRecordReader extends ParquetRecordReaderBase * For each request column, the reader to read this column. This is NULL if this column * is missing from the file, in which case we populate the attribute with NULL. */ - private VectorizedParquetColumnReader[] columnReaders; + private VectorizedColumnReader[] columnReaders; /** * The number of rows that have been returned. @@ -285,7 +282,7 @@ private void checkEndOfRowGroup() throws IOException { } List columns = requestedSchema.getColumns(); List types = requestedSchema.getFields(); - columnReaders = new VectorizedParquetColumnReader[columns.size()]; + columnReaders = new VectorizedColumnReader[columns.size()]; for (int i = 0; i < types.size(); ++i) { columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(indexColumnsWanted.get(i)), types.get(i), @@ -297,9 +294,12 @@ private void checkEndOfRowGroup() throws IOException { private List getAllColumnDescriptorByType( int depth, Type type, - List columns) { + List columns) throws ParquetRuntimeException { List res = new ArrayList<>(); for (ColumnDescriptor descriptor : columns) { + if (depth > descriptor.getPath().length) { + throw new InvalidSchemaException("Corrupted Parquet schema"); + } if (type.getName().equals(descriptor.getPath()[depth])) { res.add(descriptor); } @@ -308,7 +308,7 @@ private List getAllColumnDescriptorByType( } // Build VectorizedParquetColumnReader via Hive typeInfo and Parquet schema - private VectorizedParquetColumnReader buildVectorizedParquetReader( + private VectorizedColumnReader buildVectorizedParquetReader( TypeInfo typeInfo, Type type, PageReadStore pages, @@ -318,40 +318,42 @@ private VectorizedParquetColumnReader buildVectorizedParquetReader( 0); } - private VectorizedParquetColumnReader buildVectorizedParquetReader( + private VectorizedColumnReader buildVectorizedParquetReader( TypeInfo typeInfo, Type type, PageReadStore pages, List columnDescriptors, boolean skipTimestampConversion, int depth) throws IOException { - List descriptors = getAllColumnDescriptorByType(depth, type, columnDescriptors); + List descriptors = + getAllColumnDescriptorByType(depth, type, columnDescriptors); switch (typeInfo.getCategory()) { case PRIMITIVE: if (columnDescriptors == null || columnDescriptors.isEmpty()) { - return null; + throw new RuntimeException( + "Failed to find related Parquet column descriptor with type " + type); } else { return new VectorizedPrimitiveColumnReader(descriptors.get(0), pages.getPageReader(descriptors.get(0)), skipTimestampConversion, type); } case STRUCT: StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; - List fieldReaders = new ArrayList<>(); + List fieldReaders = new ArrayList<>(); List fieldTypes = structTypeInfo.getAllStructFieldTypeInfos(); List types = type.asGroupType().getFields(); for (int i = 0; i < fieldTypes.size(); i++) { - VectorizedParquetColumnReader r = + VectorizedColumnReader r = buildVectorizedParquetReader(fieldTypes.get(i), types.get(i), pages, descriptors, skipTimestampConversion, depth + 1); if (r != null) { fieldReaders.add(r); + } else { + throw new RuntimeException( + "Fail to build Parquet vectorized reader based on Hive type " + fieldTypes.get(i) + .getTypeName() + " and Parquet type" + types.get(i).toString()); } } - if (fieldReaders.size() > 0) { - return new VectorizedStructReader(fieldReaders); - } else { - return null; - } + return new VectorizedStructReader(fieldReaders); case LIST: case MAP: case UNION: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java index ad9d051938f3..54eea199a883 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -57,7 +58,7 @@ * It's column level Parquet reader which is used to read a batch of records for a column, * part of the code is referred from Apache Spark and Apache Parquet. */ -public class VectorizedPrimitiveColumnReader implements VectorizedParquetColumnReader{ +public class VectorizedPrimitiveColumnReader implements VectorizedColumnReader { private static final Logger LOG = LoggerFactory.getLogger(VectorizedPrimitiveColumnReader.class); @@ -166,32 +167,7 @@ private void readBatchHelper( ColumnVector column, TypeInfo columnType, int rowId) throws IOException { - switch (columnType.getCategory()) { - case PRIMITIVE: - PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType; - readBatchForPrimitiveType(num, column, primitiveColumnType, rowId); - break; - case STRUCT: - StructTypeInfo structTypeInfo = (StructTypeInfo) columnType; - StructColumnVector structColumn = (StructColumnVector) column; - List typeInfos = structTypeInfo.getAllStructFieldTypeInfos(); - for (int i = 0; i < typeInfos.size(); i++) { - readBatch(num, structColumn.fields[i], typeInfos.get(i)); - } - break; - case LIST: - case MAP: - case UNION: - default: - throw new IOException("Unsupported category " + columnType.getCategory().name()); - } - } - - private void readBatchForPrimitiveType( - int num, - ColumnVector column, - PrimitiveTypeInfo primitiveColumnType, - int rowId) throws IOException { + PrimitiveTypeInfo primitiveColumnType = (PrimitiveTypeInfo) columnType; switch (primitiveColumnType.getPrimitiveCategory()) { case INT: case BYTE: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java index d5f8859c4b42..ba09b9f70d85 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java @@ -8,11 +8,11 @@ import java.io.IOException; import java.util.List; -public class VectorizedStructReader implements VectorizedParquetColumnReader{ +public class VectorizedStructReader implements VectorizedColumnReader { - List fieldReaders; + List fieldReaders; - public VectorizedStructReader(List fieldReaders) { + public VectorizedStructReader(List fieldReaders) { this.fieldReaders = fieldReaders; } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java index 2b89bf692701..0252dec792c5 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -72,4 +72,10 @@ public void testBinaryRead() throws Exception { public void testStructRead() throws Exception { structRead(isDictionaryEncoding); } + + @Test + public void testNestedStructRead() throws Exception { + nestedStructRead0(isDictionaryEncoding); + nestedStructRead1(isDictionaryEncoding); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java index 0e89feb7e3df..ae2df0c51fc5 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java @@ -95,6 +95,13 @@ public class TestVectorizedColumnReaderBase { + " optional int32 a;\n" + " optional double b;\n" + "}\n" + + "optional group nested_struct_field {" + + " optional group nsf {" + + " optional int32 c;\n" + + " optional int32 d;\n" + + " }\n" + + " optional double e;\n" + + "}\n" + "optional group map_field (MAP) {\n" + " repeated group map (MAP_KEY_VALUE) {\n" + " required binary key;\n" @@ -240,6 +247,11 @@ protected static void writeData(ParquetWriter writer, boolean isDictionar .append("a", intVal) .append("b", doubleVal); + Group g = group.addGroup("nested_struct_field"); + + g.addGroup("nsf").append("c", intVal).append("d", intVal); + g.append("e", doubleVal); + Group mapGroup = group.addGroup("map_field"); if (i % 13 != 1) { mapGroup.addGroup("map").append("key", binary).append("value", "abc"); @@ -467,8 +479,12 @@ protected void structRead(boolean isDictionaryEncoding) throws Exception { conf.set(IOConstants.COLUMNS_TYPES, "struct"); conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); - String schema = "message hive_schema {\n" + "group struct_field {\n" + " optional int32 a;\n" - + " optional double b;\n" + "}\n" + "}\n"; + String schema = "message hive_schema {\n" + + "group struct_field {\n" + + " optional int32 a;\n" + + " optional double b;\n" + + "}\n" + + "}\n"; VectorizedParquetRecordReader reader = createParquetReader(schema, conf); VectorizedRowBatch previous = reader.createValue(); int c = 0; @@ -494,4 +510,85 @@ protected void structRead(boolean isDictionaryEncoding) throws Exception { reader.close(); } } + + protected void nestedStructRead0(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "nested_struct_field"); + conf.set(IOConstants.COLUMNS_TYPES, "struct,e:double>"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + String schema = "message hive_schema {\n" + + "group nested_struct_field {\n" + + " optional group nsf {\n" + + " optional int32 c;\n" + + " optional int32 d;\n" + + " }" + + "optional double e;\n" + + "}\n"; + VectorizedParquetRecordReader reader = createParquetReader(schema, conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + StructColumnVector vector = (StructColumnVector) previous.cols[0]; + StructColumnVector sv = (StructColumnVector) vector.fields[0]; + LongColumnVector cv = (LongColumnVector) sv.fields[0]; + LongColumnVector dv = (LongColumnVector) sv.fields[1]; + DoubleColumnVector ev = (DoubleColumnVector) vector.fields[1]; + + for (int i = 0; i < cv.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals(getIntValue(isDictionaryEncoding, c), cv.vector[i]); + assertEquals(getIntValue(isDictionaryEncoding, c), dv.vector[i]); + assertEquals(getDoubleValue(isDictionaryEncoding, c), ev.vector[i], 0); + assertFalse(vector.isNull[i]); + assertFalse(vector.isRepeating); + c++; + } + } + assertEquals("It doesn't exit at expected position", nElements, c); + } finally { + reader.close(); + } + } + + + protected void nestedStructRead1(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "nested_struct_field"); + conf.set(IOConstants.COLUMNS_TYPES, "struct>"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + String schema = "message hive_schema {\n" + + "group nested_struct_field {\n" + + " optional group nsf {\n" + + " optional int32 c;\n" + + " }" + + "}\n"; + VectorizedParquetRecordReader reader = createParquetReader(schema, conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + StructColumnVector vector = (StructColumnVector) previous.cols[0]; + StructColumnVector sv = (StructColumnVector) vector.fields[0]; + LongColumnVector cv = (LongColumnVector) sv.fields[0]; + + for (int i = 0; i < cv.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals(getIntValue(isDictionaryEncoding, c), cv.vector[i]); + assertFalse(vector.isNull[i]); + assertFalse(vector.isRepeating); + c++; + } + } + assertEquals("It doesn't exit at expected position", nElements, c); + } finally { + reader.close(); + } + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java index 690ddcc2750a..79d13fd4fcc1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java @@ -72,4 +72,9 @@ public void testBinaryRead() throws Exception { public void testStructRead() throws Exception { structRead(isDictionaryEncoding); } + + @Test + public void testNestedStructRead() throws Exception { + structRead(isDictionaryEncoding); + } } diff --git a/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q b/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q index abb0526c909f..7de444fca7cc 100644 --- a/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q +++ b/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q @@ -91,7 +91,4 @@ EXPLAIN SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1; EXPLAIN SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary; -SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary; - -EXPLAIN SELECT st1.c1 FROM parquet_types GROUP BY st1.c1; -SELECT st1.c1 FROM parquet_types GROUP BY st1.c1; \ No newline at end of file +SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q.out b/ql/src/test/results/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q.out index 55527fe7343f..a9f5e480ba4e 100644 --- a/ql/src/test/results/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q.out +++ b/ql/src/test/results/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q.out @@ -2450,360 +2450,3 @@ FC 1 FD 1 FE 1 FF 1 -PREHOOK: query: EXPLAIN SELECT st1.c1 FROM parquet_types GROUP BY st1.c1 -PREHOOK: type: QUERY -POSTHOOK: query: EXPLAIN SELECT st1.c1 FROM parquet_types GROUP BY st1.c1 -POSTHOOK: type: QUERY -STAGE DEPENDENCIES: - Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 - -STAGE PLANS: - Stage: Stage-1 - Map Reduce - Map Operator Tree: - TableScan - alias: parquet_types - Statistics: Num rows: 300 Data size: 4200 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: st1 (type: struct) - outputColumnNames: st1 - Statistics: Num rows: 300 Data size: 4200 Basic stats: COMPLETE Column stats: NONE - Group By Operator - keys: st1.c1 (type: int) - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 300 Data size: 4200 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: int) - sort order: + - Map-reduce partition columns: _col0 (type: int) - Statistics: Num rows: 300 Data size: 4200 Basic stats: COMPLETE Column stats: NONE - Reduce Operator Tree: - Group By Operator - keys: KEY._col0 (type: int) - mode: mergepartial - outputColumnNames: _col0 - Statistics: Num rows: 150 Data size: 2100 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 150 Data size: 2100 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - - Stage: Stage-0 - Fetch Operator - limit: -1 - Processor Tree: - ListSink - -PREHOOK: query: SELECT st1.c1 FROM parquet_types GROUP BY st1.c1 -PREHOOK: type: QUERY -PREHOOK: Input: default@parquet_types -#### A masked pattern was here #### -POSTHOOK: query: SELECT st1.c1 FROM parquet_types GROUP BY st1.c1 -POSTHOOK: type: QUERY -POSTHOOK: Input: default@parquet_types -#### A masked pattern was here #### -1000 -1001 -1002 -1003 -1004 -1005 -1006 -1007 -1008 -1009 -1010 -1011 -1012 -1013 -1014 -1015 -1016 -1017 -1018 -1019 -1020 -1021 -1022 -1023 -1024 -1025 -1026 -1027 -1028 -1029 -1030 -1031 -1032 -1033 -1034 -1035 -1036 -1037 -1038 -1039 -1040 -1041 -1042 -1043 -1044 -1045 -1046 -1047 -1048 -1049 -1050 -1051 -1052 -1053 -1054 -1055 -1056 -1057 -1058 -1059 -1060 -1061 -1062 -1063 -1064 -1065 -1066 -1067 -1068 -1069 -1070 -1071 -1072 -1073 -1074 -1075 -1076 -1077 -1078 -1079 -1080 -1081 -1082 -1083 -1084 -1085 -1086 -1087 -1088 -1089 -1090 -1091 -1092 -1093 -1094 -1095 -1096 -1097 -1098 -1099 -1100 -1101 -1102 -1103 -1104 -1105 -1106 -1107 -1108 -1109 -1110 -1111 -1112 -1113 -1114 -1115 -1116 -1117 -1118 -1119 -1120 -1121 -1122 -1123 -1124 -1125 -1126 -1127 -1128 -1129 -1130 -1131 -1132 -1133 -1134 -1135 -1136 -1137 -1138 -1139 -1140 -1141 -1142 -1143 -1144 -1145 -1146 -1147 -1148 -1149 -1150 -1151 -1152 -1153 -1154 -1155 -1156 -1157 -1158 -1159 -1160 -1161 -1162 -1163 -1164 -1165 -1166 -1167 -1168 -1169 -1170 -1171 -1172 -1173 -1174 -1175 -1176 -1177 -1178 -1179 -1180 -1181 -1182 -1183 -1184 -1185 -1186 -1187 -1188 -1189 -1190 -1191 -1192 -1193 -1194 -1195 -1196 -1197 -1198 -1199 -1200 -1201 -1202 -1203 -1204 -1205 -1206 -1207 -1208 -1209 -1210 -1211 -1212 -1213 -1214 -1215 -1216 -1217 -1218 -1219 -1220 -1221 -1222 -1223 -1224 -1225 -1226 -1227 -1228 -1229 -1230 -1231 -1232 -1233 -1234 -1235 -1236 -1237 -1238 -1239 -1240 -1241 -1242 -1243 -1244 -1245 -1246 -1247 -1248 -1249 -1250 -1251 -1252 -1253 -1254 -1255 -1256 -1257 -1258 -1259 -1260 -1261 -1262 -1263 -1264 -1265 -1266 -1267 -1268 -1269 -1270 -1271 -1272 -1273 -1274 -1275 -1276 -1277 -1278 -1279 -1280 -1281 -1282 -1283 -1284 -1285 -1286 -1287 -1288 -1289 -1290 -1291 -1292 -1293 -1294 -1295 -1296 -1297 -1298 -1299 From 76717d1488edc5b5269e09c3a9d41d33497824d0 Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Tue, 6 Dec 2016 14:41:28 +0800 Subject: [PATCH 6/8] Add Decimal test --- .../vector/VectorizedParquetRecordReader.java | 29 ++--- .../VectorizedPrimitiveColumnReader.java | 32 +++-- .../vector/VectorizedStructReader.java | 24 +++- .../parquet/TestVectorizedColumnReader.java | 14 +- .../TestVectorizedColumnReaderBase.java | 120 ++++++++++++++++-- ...torizedDictionaryEncodingColumnReader.java | 15 ++- 6 files changed, 186 insertions(+), 48 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 9bbccccb6969..48f5fbd84080 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -283,11 +283,20 @@ private void checkEndOfRowGroup() throws IOException { List columns = requestedSchema.getColumns(); List types = requestedSchema.getFields(); columnReaders = new VectorizedColumnReader[columns.size()]; - for (int i = 0; i < types.size(); ++i) { - columnReaders[i] = - buildVectorizedParquetReader(columnTypesList.get(indexColumnsWanted.get(i)), types.get(i), - pages, requestedSchema, skipTimestampConversion); + + if (!ColumnProjectionUtils.isReadAllColumns(jobConf) && !indexColumnsWanted.isEmpty()) { + for (int i = 0; i < types.size(); ++i) { + columnReaders[i] = + buildVectorizedParquetReader(columnTypesList.get(indexColumnsWanted.get(i)), types.get(i), + pages, requestedSchema.getColumns(), skipTimestampConversion, 0); + } + } else { + for (int i = 0; i < types.size(); ++i) { + columnReaders[i] = buildVectorizedParquetReader(columnTypesList.get(i), types.get(i), pages, + requestedSchema.getColumns(), skipTimestampConversion, 0); + } } + totalCountLoadedSoFar += pages.getRowCount(); } @@ -297,7 +306,7 @@ private List getAllColumnDescriptorByType( List columns) throws ParquetRuntimeException { List res = new ArrayList<>(); for (ColumnDescriptor descriptor : columns) { - if (depth > descriptor.getPath().length) { + if (depth >= descriptor.getPath().length) { throw new InvalidSchemaException("Corrupted Parquet schema"); } if (type.getName().equals(descriptor.getPath()[depth])) { @@ -308,16 +317,6 @@ private List getAllColumnDescriptorByType( } // Build VectorizedParquetColumnReader via Hive typeInfo and Parquet schema - private VectorizedColumnReader buildVectorizedParquetReader( - TypeInfo typeInfo, - Type type, - PageReadStore pages, - MessageType schema, - boolean skipTimestampConversion) throws IOException { - return buildVectorizedParquetReader(typeInfo, type, pages, schema.getColumns(), skipTimestampConversion, - 0); - } - private VectorizedColumnReader buildVectorizedParquetReader( TypeInfo typeInfo, Type type, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java index 54eea199a883..3d5c6e6a092d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedPrimitiveColumnReader.java @@ -18,13 +18,10 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.BytesUtils; @@ -48,7 +45,6 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.sql.Timestamp; -import java.util.List; import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL; import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL; @@ -196,10 +192,11 @@ private void readBatchHelper( break; case DECIMAL: readDecimal(num, (DecimalColumnVector) column, rowId); + break; case INTERVAL_DAY_TIME: case TIMESTAMP: default: - throw new IOException("Unsupported"); + throw new IOException("Unsupported type: " + type); } } @@ -377,8 +374,11 @@ private void readBinaries( /** * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`. */ - private void decodeDictionaryIds(int rowId, int num, ColumnVector column, - LongColumnVector dictionaryIds) { + private void decodeDictionaryIds( + int rowId, + int num, + ColumnVector column, + LongColumnVector dictionaryIds) { System.arraycopy(dictionaryIds.isNull, rowId, column.isNull, rowId, num); if (column.noNulls) { column.noNulls = dictionaryIds.noNulls; @@ -423,9 +423,21 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column, break; case BINARY: case FIXED_LEN_BYTE_ARRAY: - for (int i = rowId; i < rowId + num; ++i) { - ((BytesColumnVector) column) - .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe()); + if (column instanceof BytesColumnVector) { + for (int i = rowId; i < rowId + num; ++i) { + ((BytesColumnVector) column) + .setVal(i, dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe()); + } + } else { + DecimalColumnVector decimalColumnVector = ((DecimalColumnVector) column); + decimalColumnVector.precision = + (short) type.asPrimitiveType().getDecimalMetadata().getPrecision(); + decimalColumnVector.scale = (short) type.asPrimitiveType().getDecimalMetadata().getScale(); + for (int i = rowId; i < rowId + num; ++i) { + decimalColumnVector.vector[i] + .set(dictionary.decodeToBinary((int) dictionaryIds.vector[i]).getBytesUnsafe(), + decimalColumnVector.scale); + } } break; default: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java index ba09b9f70d85..9dc1f0758e0e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hive.ql.io.parquet.vector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; @@ -10,7 +28,7 @@ public class VectorizedStructReader implements VectorizedColumnReader { - List fieldReaders; + private List fieldReaders; public VectorizedStructReader(List fieldReaders) { this.fieldReaders = fieldReaders; @@ -28,8 +46,10 @@ public void readBatch( fieldReaders.get(i) .readBatch(total, vectors[i], structTypeInfo.getAllStructFieldTypeInfos().get(i)); structColumnVector.isRepeating = structColumnVector.isRepeating && vectors[i].isRepeating; + for (int j = 0; j < vectors[i].isNull.length; j++) { - structColumnVector.isNull[i] = structColumnVector.isNull[i] && vectors[i].isNull[i]; + structColumnVector.isNull[j] = + (i == 0) ? vectors[i].isNull[j] : structColumnVector.isNull[j] && vectors[i].isNull[j]; } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java index 0252dec792c5..d4b4140b1161 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -24,7 +24,7 @@ import java.io.IOException; -public class TestVectorizedColumnReader extends TestVectorizedColumnReaderBase{ +public class TestVectorizedColumnReader extends TestVectorizedColumnReaderBase { static boolean isDictionaryEncoding = false; @BeforeClass @@ -60,7 +60,7 @@ public void testFloatRead() throws Exception { @Test public void testBooleanRead() throws Exception { - booleanRead(isDictionaryEncoding); + booleanRead(); } @Test @@ -78,4 +78,14 @@ public void testNestedStructRead() throws Exception { nestedStructRead0(isDictionaryEncoding); nestedStructRead1(isDictionaryEncoding); } + + @Test + public void structReadSomeNull() throws Exception { + structReadSomeNull(isDictionaryEncoding); + } + + @Test + public void decimalRead() throws Exception { + decimalRead(isDictionaryEncoding); + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java index ae2df0c51fc5..ea90e2dd71b8 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java @@ -18,9 +18,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; @@ -33,6 +35,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; @@ -48,18 +51,9 @@ import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.example.GroupWriteSupport; import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; import java.io.IOException; -import java.sql.Timestamp; -import java.util.Calendar; import java.util.List; -import java.util.Random; import static junit.framework.Assert.assertTrue; import static junit.framework.TestCase.assertFalse; @@ -91,9 +85,10 @@ public class TestVectorizedColumnReaderBase { + "optional fixed_len_byte_array(1) all_null_field; " + "required binary binary_field; " + "optional binary binary_field_some_null; " - + "optional group struct_field {" - + " optional int32 a;\n" - + " optional double b;\n" + + "required binary value (DECIMAL(5,2)); " + + "required group struct_field {" + + " required int32 a;\n" + + " required double b;\n" + "}\n" + "optional group nested_struct_field {" + " optional group nsf {" @@ -102,6 +97,10 @@ public class TestVectorizedColumnReaderBase { + " }\n" + " optional double e;\n" + "}\n" + + "optional group struct_field_some_null {" + + " optional int32 f;\n" + + " optional double g;\n" + + "}\n" + "optional group map_field (MAP) {\n" + " repeated group map (MAP_KEY_VALUE) {\n" + " required binary key;\n" @@ -169,6 +168,13 @@ protected static String getTimestampStr(int index) { return "99999999" + s; } + protected static HiveDecimal getDecimal( + boolean isDictionaryEncoding, + int index) { + double d = (isDictionaryEncoding) ? (index % UNIQUE_NUM * 0.01) : (index * 0.01); + return HiveDecimal.create(String.valueOf(d)); + } + protected static Binary getTimestamp( boolean isDictionaryEncoding, int index) { @@ -220,6 +226,7 @@ protected static void writeData(ParquetWriter writer, boolean isDictionar int intVal = getIntValue(isDictionaryEncoding, i); long longVal = getLongValue(isDictionaryEncoding, i); Binary timeStamp = getTimestamp(isDictionaryEncoding, i); + HiveDecimal decimalVal = getDecimal(isDictionaryEncoding, i); double doubleVal = getDoubleValue(isDictionaryEncoding, i); float floatVal = getFloatValue(isDictionaryEncoding, i); boolean booleanVal = getBooleanValue(i); @@ -243,6 +250,9 @@ protected static void writeData(ParquetWriter writer, boolean isDictionar group.append("binary_field_some_null", binary); } + HiveDecimalWritable w = new HiveDecimalWritable(decimalVal); + group.append("value", Binary.fromConstantByteArray(w.getInternalStorage())); + group.addGroup("struct_field") .append("a", intVal) .append("b", doubleVal); @@ -252,6 +262,14 @@ protected static void writeData(ParquetWriter writer, boolean isDictionar g.addGroup("nsf").append("c", intVal).append("d", intVal); g.append("e", doubleVal); + Group some_null_g = group.addGroup("struct_field_some_null"); + if (i % 2 != 0) { + some_null_g.append("f", intVal); + } + if (i % 3 != 0) { + some_null_g.append("g", doubleVal); + } + Group mapGroup = group.addGroup("map_field"); if (i % 13 != 1) { mapGroup.addGroup("map").append("key", binary).append("value", "abc"); @@ -406,7 +424,7 @@ protected void floatRead(boolean isDictionaryEncoding) throws Exception { } } - protected void booleanRead(boolean isDictionaryEncoding) throws Exception { + protected void booleanRead() throws Exception { Configuration conf = new Configuration(); conf.set(IOConstants.COLUMNS, "boolean_field"); conf.set(IOConstants.COLUMNS_TYPES, "boolean"); @@ -554,7 +572,6 @@ protected void nestedStructRead0(boolean isDictionaryEncoding) throws Exception } } - protected void nestedStructRead1(boolean isDictionaryEncoding) throws Exception { Configuration conf = new Configuration(); conf.set(IOConstants.COLUMNS, "nested_struct_field"); @@ -591,4 +608,79 @@ protected void nestedStructRead1(boolean isDictionaryEncoding) throws Exception reader.close(); } } + + protected void structReadSomeNull(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "struct_field_some_null"); + conf.set(IOConstants.COLUMNS_TYPES, "struct"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + String schema = "message hive_schema {\n" + + "group struct_field_some_null {\n" + + " optional int32 f;\n" + + " optional double g;\n" + + "}\n"; + VectorizedParquetRecordReader reader = createParquetReader(schema, conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + StructColumnVector sv = (StructColumnVector) previous.cols[0]; + LongColumnVector fv = (LongColumnVector) sv.fields[0]; + DoubleColumnVector gv = (DoubleColumnVector) sv.fields[1]; + + for (int i = 0; i < fv.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals(c % 2 == 0, fv.isNull[i]); + assertEquals(c % 3 == 0, gv.isNull[i]); + assertEquals(c % /* 2*3 = */6 == 0, sv.isNull[i]); + if (!sv.isNull[i]) { + if (!fv.isNull[i]) { + assertEquals(getIntValue(isDictionaryEncoding, c), fv.vector[i]); + } + if (!gv.isNull[i]) { + assertEquals(getDoubleValue(isDictionaryEncoding, c), gv.vector[i], 0); + } + } + assertFalse(fv.isRepeating); + c++; + } + } + assertEquals("It doesn't exit at expected position", nElements, c); + } finally { + reader.close(); + } + } + + protected void decimalRead(boolean isDictionaryEncoding) throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS, "value"); + conf.set(IOConstants.COLUMNS_TYPES, "decimal(5,2)"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message hive_schema { required value (DECIMAL(5,2));}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + int c = 0; + while (reader.next(NullWritable.get(), previous)) { + DecimalColumnVector vector = (DecimalColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if (c == nElements) { + break; + } + assertEquals(getDecimal(isDictionaryEncoding, c), + vector.vector[i].getHiveDecimal()); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java index 79d13fd4fcc1..c6677cce2872 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedDictionaryEncodingColumnReader.java @@ -58,11 +58,6 @@ public void testFloatRead() throws Exception { floatRead(isDictionaryEncoding); } - @Test - public void testBooleanRead() throws Exception { - booleanRead(isDictionaryEncoding); - } - @Test public void testBinaryRead() throws Exception { binaryRead(isDictionaryEncoding); @@ -77,4 +72,14 @@ public void testStructRead() throws Exception { public void testNestedStructRead() throws Exception { structRead(isDictionaryEncoding); } + + @Test + public void structReadSomeNull() throws Exception { + structReadSomeNull(isDictionaryEncoding); + } + + @Test + public void decimalRead() throws Exception { + decimalRead(isDictionaryEncoding); + } } From e0a00edd44db6ceba0ab1203e4bc454cdf26eb2e Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Thu, 8 Dec 2016 05:25:04 +0800 Subject: [PATCH 7/8] Address further comments --- .../io/parquet/vector/VectorizedParquetRecordReader.java | 2 +- ...tructReader.java => VectorizedStructColumnReader.java} | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) rename ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/{VectorizedStructReader.java => VectorizedStructColumnReader.java} (84%) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java index 48f5fbd84080..699de59b58ec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -352,7 +352,7 @@ private VectorizedColumnReader buildVectorizedParquetReader( .getTypeName() + " and Parquet type" + types.get(i).toString()); } } - return new VectorizedStructReader(fieldReaders); + return new VectorizedStructColumnReader(fieldReaders); case LIST: case MAP: case UNION: diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java similarity index 84% rename from ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java rename to ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java index 9dc1f0758e0e..231a121d3281 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java @@ -26,11 +26,11 @@ import java.io.IOException; import java.util.List; -public class VectorizedStructReader implements VectorizedColumnReader { +public class VectorizedStructColumnReader implements VectorizedColumnReader { - private List fieldReaders; + private final List fieldReaders; - public VectorizedStructReader(List fieldReaders) { + public VectorizedStructColumnReader(List fieldReaders) { this.fieldReaders = fieldReaders; } @@ -50,6 +50,8 @@ public void readBatch( for (int j = 0; j < vectors[i].isNull.length; j++) { structColumnVector.isNull[j] = (i == 0) ? vectors[i].isNull[j] : structColumnVector.isNull[j] && vectors[i].isNull[j]; + structColumnVector.noNulls = (i == 0) ? structColumnVector.isNull[j] : + structColumnVector.noNulls && vectors[i].isNull[j]; } } } From c1c17556e1ec56976dafa80ed3bfe4e1a1fbb7e7 Mon Sep 17 00:00:00 2001 From: Ferdinand Xu Date: Thu, 8 Dec 2016 16:59:51 +0800 Subject: [PATCH 8/8] Fix failed cases --- .../vector/VectorizedStructColumnReader.java | 5 +++-- .../parquet/TestVectorizedColumnReaderBase.java | 16 ++++++++++++---- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java index 231a121d3281..cc6cb2059b65 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedStructColumnReader.java @@ -50,9 +50,10 @@ public void readBatch( for (int j = 0; j < vectors[i].isNull.length; j++) { structColumnVector.isNull[j] = (i == 0) ? vectors[i].isNull[j] : structColumnVector.isNull[j] && vectors[i].isNull[j]; - structColumnVector.noNulls = (i == 0) ? structColumnVector.isNull[j] : - structColumnVector.noNulls && vectors[i].isNull[j]; } + structColumnVector.noNulls = + (i == 0) ? vectors[i].noNulls : structColumnVector.noNulls && vectors[i].noNulls; } + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java index ea90e2dd71b8..eecccce43f80 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReaderBase.java @@ -53,6 +53,8 @@ import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.List; import static junit.framework.Assert.assertTrue; @@ -171,8 +173,14 @@ protected static String getTimestampStr(int index) { protected static HiveDecimal getDecimal( boolean isDictionaryEncoding, int index) { - double d = (isDictionaryEncoding) ? (index % UNIQUE_NUM * 0.01) : (index * 0.01); - return HiveDecimal.create(String.valueOf(d)); + int decimalVal = index % 100; + String decimalStr = (decimalVal < 10) ? "0" + String.valueOf(decimalVal) : String.valueOf + (decimalVal); + int intVal = (isDictionaryEncoding) ? index % UNIQUE_NUM : index / 100; + String d = String.valueOf(intVal) + decimalStr; + BigInteger bi = new BigInteger(d); + BigDecimal bd = new BigDecimal(bi); + return HiveDecimal.create(bd); } protected static Binary getTimestamp( @@ -226,7 +234,7 @@ protected static void writeData(ParquetWriter writer, boolean isDictionar int intVal = getIntValue(isDictionaryEncoding, i); long longVal = getLongValue(isDictionaryEncoding, i); Binary timeStamp = getTimestamp(isDictionaryEncoding, i); - HiveDecimal decimalVal = getDecimal(isDictionaryEncoding, i); + HiveDecimal decimalVal = getDecimal(isDictionaryEncoding, i).setScale(2); double doubleVal = getDoubleValue(isDictionaryEncoding, i); float floatVal = getFloatValue(isDictionaryEncoding, i); boolean booleanVal = getBooleanValue(i); @@ -672,7 +680,7 @@ protected void decimalRead(boolean isDictionaryEncoding) throws Exception { if (c == nElements) { break; } - assertEquals(getDecimal(isDictionaryEncoding, c), + assertEquals("Check failed at pos " + c, getDecimal(isDictionaryEncoding, c), vector.vector[i].getHiveDecimal()); assertFalse(vector.isNull[i]); c++;