From 28dd76af303169a2fd4269d24c7b6cfa505c25eb Mon Sep 17 00:00:00 2001 From: Jason Altekruse Date: Fri, 16 May 2014 23:19:51 -0500 Subject: [PATCH] DRILL-649: Reading impala and avro generated parquet files Implemented dictionary encoding for the non-varlength types. --- .../exec/store/parquet/ColumnReader.java | 1 + .../store/parquet/NullableColumnReader.java | 9 +- .../NullableFixedByteAlignedReaders.java | 170 ++++++++++++++++++ .../exec/store/parquet/PageReadStatus.java | 39 ++-- .../ParquetFixedWidthDictionaryReader.java | 54 ++++++ .../store/parquet/ParquetRecordReader.java | 20 ++- .../store/parquet/VarLenBinaryReader.java | 2 +- .../store/parquet/VarLengthColumnReaders.java | 2 - .../parquet/ParquetRecordReaderTest.java | 31 +++- .../store/parquet/ParquetResultListener.java | 2 +- 10 files changed, 296 insertions(+), 34 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java index b9faafe3534..43f27a693f8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnReader.java @@ -51,6 +51,7 @@ ColumnDescriptor getColumnDescriptor() { final PageReadStatus pageReadStatus; final SchemaElement schemaElement; + boolean usingDictionary; // quick reference to see if the field is fixed length (as this requires an instanceof) final boolean isFixedLength; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java index 6040c67c926..687b373dbd8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableColumnReader.java @@ -28,7 +28,7 @@ import java.io.IOException; -abstract class NullableColumnReader extends ColumnReader{ +abstract class NullableColumnReader extends ColumnReader{ int nullsFound; // used to skip nulls found @@ -37,7 +37,7 @@ abstract class NullableColumnReader extends ColumnReader{ int bitsUsed; NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, - boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); } @@ -118,7 +118,8 @@ else if (dataTypeLengthInBits < 8){ valuesReadInCurrentPass += recordsReadInThisIteration; totalValuesRead += recordsReadInThisIteration; pageReadStatus.valuesRead += recordsReadInThisIteration; - if (readStartInBytes + readLength >= pageReadStatus.byteLength && bitsUsed == 0) { + if ( (readStartInBytes + readLength >= pageReadStatus.byteLength && bitsUsed == 0) + || pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) { if (!pageReadStatus.next()) { break; } @@ -133,4 +134,4 @@ else if (dataTypeLengthInBits < 8){ } protected abstract void readField(long recordsToRead, ColumnReader firstColumnStatus); -} \ No newline at end of file +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java new file mode 100644 index 00000000000..6dd96de087c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableFixedByteAlignedReaders.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.vector.*; + +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableFloat4Vector; +import org.apache.drill.exec.vector.NullableFloat8Vector; +import org.apache.drill.exec.vector.NullableIntVector; +import parquet.column.ColumnDescriptor; +import parquet.column.Encoding; +import parquet.format.ConvertedType; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.schema.PrimitiveType; + +class NullableFixedByteAlignedReaders { + + public static NullableColumnReader getNullableColumnReader(ParquetRecordReader parentReader, int allocateSize, + ColumnDescriptor columnDescriptor, + ColumnChunkMetaData columnChunkMetaData, + boolean fixedLength, + ValueVector valueVec, + SchemaElement schemaElement) throws ExecutionSetupException { + if (! columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { + return new NullableFixedByteAlignedReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, + fixedLength, valueVec, schemaElement); + } else { + if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64) { + return new NullableDictionaryBigIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, + fixedLength, (NullableBigIntVector)valueVec, schemaElement); + } + else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32) { + return new NullableDicationaryIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, + fixedLength, (NullableIntVector)valueVec, schemaElement); + } + else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT) { + return new NullableDictionaryFloat4Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, + fixedLength, (NullableFloat4Vector)valueVec, schemaElement); + } + else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE) { + return new NullableDictionaryFloat8Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData, + fixedLength, (NullableFloat8Vector)valueVec, schemaElement); + } + else{ + throw new ExecutionSetupException("Unsupported nullable column type " + columnDescriptor.getType().name() ); + } + } + } + + private static class NullableFixedByteAlignedReader extends NullableColumnReader { + private byte[] bytes; + + NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) { + this.recordsReadInThisIteration = recordsToReadInThisPass; + + // set up metadata + this.readStartInBytes = pageReadStatus.readPosInBytes; + this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; + this.readLength = (int) Math.ceil(readLengthInBits / 8.0); + this.bytes = pageReadStatus.pageDataByteArray; + + // fill in data. + vectorData.writeBytes(bytes, (int) readStartInBytes, (int) readLength); + } + } + + private static class NullableDicationaryIntReader extends NullableColumnReader { + + private byte[] bytes; + + NullableDicationaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableIntVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) { + if (usingDictionary) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readInteger()); + } + } + } + } + + private static class NullableDictionaryBigIntReader extends NullableColumnReader { + + private byte[] bytes; + + NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableBigIntVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readLong()); + } + } + } + + private static class NullableDictionaryFloat4Reader extends NullableColumnReader { + + private byte[] bytes; + + NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat4Vector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readFloat()); + } + } + } + + private static class NullableDictionaryFloat8Reader extends NullableColumnReader { + + private byte[] bytes; + + NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat8Vector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + // this method is called by its superclass during a read loop + @Override + protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) { + for (int i = 0; i < recordsToReadInThisPass; i++){ + valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readDouble()); + } + } + } + +} \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java index 021b62285f6..20bf3e947f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/PageReadStatus.java @@ -41,6 +41,7 @@ import parquet.format.PageType; import parquet.format.Util; import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.schema.PrimitiveType; // class to keep track of the read position of variable length columns final class PageReadStatus { @@ -65,26 +66,33 @@ final class PageReadStatus { ValuesReader definitionLevels; ValuesReader valueReader; Dictionary dictionary; + PageHeader pageHeader = null; PageReadStatus(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{ this.parentColumnReader = parentStatus; - long totalByteLength = columnChunkMetaData.getTotalSize(); + long totalByteLength = columnChunkMetaData.getTotalUncompressedSize(); long start = columnChunkMetaData.getFirstDataPageOffset(); try { FSDataInputStream f = fs.open(path); + this.dataReader = new ColumnDataReader(f, start, totalByteLength); if (columnChunkMetaData.getDictionaryPageOffset() > 0) { f.seek(columnChunkMetaData.getDictionaryPageOffset()); PageHeader pageHeader = Util.readPageHeader(f); assert pageHeader.type == PageType.DICTIONARY_PAGE; - DictionaryPage page = new DictionaryPage(BytesInput.copy(BytesInput.from(f, pageHeader.compressed_page_size)), + BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer() + .decompress( // + dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), // + pageHeader.getUncompressed_page_size(), // + parentColumnReader.columnChunkMetaData.getCodec()); + DictionaryPage page = new DictionaryPage( + bytesIn, pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values, parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name()) ); this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page); } - this.dataReader = new ColumnDataReader(f, start, totalByteLength); } catch (IOException e) { throw new ExecutionSetupException("Error opening or reading metatdata for parquet file at location: " + path.getName(), e); } @@ -102,21 +110,29 @@ public boolean next() throws IOException { currentPage = null; - if(!dataReader.hasRemainder()) { + // TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause + // and submit a bug report + if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) { return false; } // next, we need to decompress the bytes - PageHeader pageHeader = null; // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary do { pageHeader = dataReader.readPageHeader(); if (pageHeader.getType() == PageType.DICTIONARY_PAGE) { - DictionaryPage page = new DictionaryPage(BytesInput.copy(BytesInput.from(dataReader.input, pageHeader.compressed_page_size)), - pageHeader.uncompressed_page_size, - pageHeader.dictionary_page_header.num_values, - parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name()) + System.out.println(pageHeader.dictionary_page_header.getEncoding()); + BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer() + .decompress( // + dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), // + pageHeader.getUncompressed_page_size(), // + parentColumnReader.columnChunkMetaData.getCodec()); + DictionaryPage page = new DictionaryPage( + bytesIn, + pageHeader.uncompressed_page_size, + pageHeader.dictionary_page_header.num_values, + parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name()) ); this.dictionary = page.getEncoding().initDictionary(parentColumnReader.columnDescriptor, page); } @@ -157,13 +173,16 @@ public boolean next() throws IOException { valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0); readPosInBytes = definitionLevels.getNextOffset(); - valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) { + valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + } } else { definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0); readPosInBytes = definitionLevels.getNextOffset(); valueReader = new DictionaryValuesReader(dictionary); valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + this.parentColumnReader.usingDictionary = true; } } return true; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java new file mode 100644 index 00000000000..c0720a9fb77 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFixedWidthDictionaryReader.java @@ -0,0 +1,54 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ +package org.apache.drill.exec.store.parquet; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.ValueVector; +import parquet.column.ColumnDescriptor; +import parquet.format.ConvertedType; +import parquet.format.SchemaElement; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.schema.PrimitiveType; + +public class ParquetFixedWidthDictionaryReader extends ColumnReader{ + + ParquetFixedWidthDictionaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, + ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, + SchemaElement schemaElement) throws ExecutionSetupException { + super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); + } + + @Override + public void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) { + + recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount() + - pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass); + int defLevel; + for (int i = 0; i < recordsReadInThisIteration; i++){ + defLevel = pageReadStatus.definitionLevels.readInteger(); + // if the value is defined + if (defLevel == columnDescriptor.getMaxDefinitionLevel()){ + if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64) + ((BigIntVector)valueVec).getMutator().set(i + valuesReadInCurrentPass, + pageReadStatus.valueReader.readLong() ); + } + // otherwise the value is skipped, because the bit vector indicating nullability is zero filled + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java index 9cdd205c095..09966209770 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java @@ -28,8 +28,6 @@ import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.ExpressionPosition; -import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.DataMode; @@ -59,8 +57,6 @@ import parquet.hadoop.CodecFactoryExposer; import parquet.hadoop.ParquetFileWriter; import parquet.column.Encoding; -import parquet.hadoop.CodecFactoryExposer; -import parquet.hadoop.metadata.BlockMetaData; import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.ParquetMetadata; import parquet.schema.PrimitiveType; @@ -325,9 +321,15 @@ private boolean createFixedColumnReader(boolean fixedLength, ColumnDescriptor de } else if (length <= 16) { columnStatuses.add(new Decimal38Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement)); } - } else { - columnStatuses.add(new FixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData, - fixedLength, v, schemaElement)); + } + else{ + if (columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) { + columnStatuses.add(new ParquetFixedWidthDictionaryReader(this, allocateSize, descriptor, columnChunkMetaData, + fixedLength, v, schemaElement)); + } else { + columnStatuses.add(new FixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData, + fixedLength, v, schemaElement)); + } } return true; } @@ -343,8 +345,8 @@ private boolean createFixedColumnReader(boolean fixedLength, ColumnDescriptor de columnStatuses.add(new NullableDecimal38Reader(this, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement)); } } else { - columnStatuses.add(new NullableFixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData, - fixedLength, v, schemaElement)); + columnStatuses.add(NullableFixedByteAlignedReaders.getNullableColumnReader(this, allocateSize, descriptor, + columnChunkMetaData, fixedLength, v, schemaElement)); } return true; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java index c217e800406..91719e76683 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java @@ -97,7 +97,6 @@ public long readFields(long recordsToReadInThisPass, ColumnReader firstColumnSta } if (columnReader.pageReadStatus.currentPage == null || columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) { - columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead; if (!columnReader.pageReadStatus.next()) { rowGroupFinished = true; break; @@ -159,6 +158,7 @@ public long readFields(long recordsToReadInThisPass, ColumnReader firstColumnSta columnReader.pageReadStatus.valuesRead++; columnReader.valuesReadInCurrentPass++; if ( columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) { + columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead; columnReader.pageReadStatus.next(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java index bbc669de8db..fe9e574b599 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLengthColumnReaders.java @@ -40,7 +40,6 @@ public class VarLengthColumnReaders { public static abstract class VarLengthColumn extends ColumnReader { - boolean usingDictionary; Binary currDictVal; VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, @@ -70,7 +69,6 @@ public static abstract class NullableVarLengthColumn exte int nullsRead; boolean currentValNull = false; - boolean usingDictionary; Binary currDictVal; NullableVarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index e594441d9e5..dec4b153b94 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.parquet; +import static org.apache.drill.exec.store.parquet.TestFileGenerator.intVals; import static org.apache.drill.exec.store.parquet.TestFileGenerator.populateFieldInfoMap; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -291,7 +292,8 @@ public void testNullableColumnsVarLen() throws Exception { testParquetFullEngineEventBased(false, "/parquet/parquet_nullable_varlen.json", "/tmp/nullable_varlen.parquet", 1, props); fields.clear(); // pass strings instead of byte arrays - Object[] boolVals2 = { "b", "b2", "b3"}; + Object[] boolVals2 = { new org.apache.hadoop.io.Text("b"), new org.apache.hadoop.io.Text("b2"), + new org.apache.hadoop.io.Text("b3")}; props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals2, TypeProtos.MinorType.BIT, props)); testParquetFullEngineEventBased(false, "/parquet/parquet_scan_screen_read_entry_replace.json", "\"/tmp/varLen.parquet/a\"", "unused", 1, props); @@ -301,12 +303,27 @@ public void testNullableColumnsVarLen() throws Exception { @Test public void testDictionaryEncoding() throws Exception { HashMap fields = new HashMap<>(); - ParquetTestProperties props = new ParquetTestProperties(1, 300000, DEFAULT_BYTES_PER_PAGE, fields); - Object[] boolVals2 = { "b", "b2", "b3"}; - props.fields.put("a", new FieldInfo("boolean", "a", 1, boolVals2, TypeProtos.MinorType.BIT, props)); - // test dictionary encoding - testParquetFullEngineEventBased(false, "/parquet/parquet_scan_screen_read_entry_replace.json", - "\"/tmp/dictionary_pig.parquet/a\"", "unused", 1, props); + ParquetTestProperties props = new ParquetTestProperties(1, 25, DEFAULT_BYTES_PER_PAGE, fields); + Object[] boolVals = null; + props.fields.put("n_name", null); + props.fields.put("n_nationkey", null); + props.fields.put("n_regionkey", null); + props.fields.put("n_comment", null); + testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", + "\"/tmp/nation_dictionary_fail.parquet\"", "unused", 1, props, true); + + fields = new HashMap<>(); + props = new ParquetTestProperties(1, 5, DEFAULT_BYTES_PER_PAGE, fields); + props.fields.put("employee_id", null); + props.fields.put("name", null); + props.fields.put("role", null); + props.fields.put("phone", null); + props.fields.put("password_hash", null); + props.fields.put("gender_male", null); + props.fields.put("height", null); + props.fields.put("hair_thickness", null); + testParquetFullEngineEventBased(false, false, "/parquet/parquet_scan_screen_read_entry_replace.json", + "\"/tmp/employees_5_16_14.parquet\"", "unused", 1, props, true); } @Test diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java index 04197bc274e..a4ccbcc1d18 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetResultListener.java @@ -113,7 +113,7 @@ synchronized public void resultArrived(QueryResultBatch result, ConnectionThrott ValueVector vv = vw.getValueVector(); currentField = props.fields.get(vv.getField().getAsSchemaPath().getRootSegment().getPath()); if (ParquetRecordReaderTest.VERBOSE_DEBUG){ - System.out.println("\n" + (String) currentField.name); + System.out.println("\n" + vv.getField().getAsSchemaPath().getRootSegment().getPath()); } if ( ! valuesChecked.containsKey(vv.getField().getAsSchemaPath().getRootSegment().getPath())){ valuesChecked.put(vv.getField().getAsSchemaPath().getRootSegment().getPath(), 0);