Skip to content

Commit

Permalink
DRILL-649: Reading impala and avro generated parquet files
Browse files Browse the repository at this point in the history
Implemented dictionary encoding for the non-varlength types.
  • Loading branch information
jaltekruse authored and jacques-n committed May 23, 2014
1 parent cd7aeeb commit 28dd76a
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 34 deletions.
Expand Up @@ -51,6 +51,7 @@ ColumnDescriptor getColumnDescriptor() {
final PageReadStatus pageReadStatus;

final SchemaElement schemaElement;
boolean usingDictionary;

// quick reference to see if the field is fixed length (as this requires an instanceof)
final boolean isFixedLength;
Expand Down
Expand Up @@ -28,7 +28,7 @@

import java.io.IOException;

abstract class NullableColumnReader extends ColumnReader{
abstract class NullableColumnReader<V extends ValueVector> extends ColumnReader<V>{

int nullsFound;
// used to skip nulls found
Expand All @@ -37,7 +37,7 @@ abstract class NullableColumnReader extends ColumnReader{
int bitsUsed;

NullableColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}

Expand Down Expand Up @@ -118,7 +118,8 @@ else if (dataTypeLengthInBits < 8){
valuesReadInCurrentPass += recordsReadInThisIteration;
totalValuesRead += recordsReadInThisIteration;
pageReadStatus.valuesRead += recordsReadInThisIteration;
if (readStartInBytes + readLength >= pageReadStatus.byteLength && bitsUsed == 0) {
if ( (readStartInBytes + readLength >= pageReadStatus.byteLength && bitsUsed == 0)
|| pageReadStatus.valuesRead == pageReadStatus.currentPage.getValueCount()) {
if (!pageReadStatus.next()) {
break;
}
Expand All @@ -133,4 +134,4 @@ else if (dataTypeLengthInBits < 8){
}

protected abstract void readField(long recordsToRead, ColumnReader firstColumnStatus);
}
}
@@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet;

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.vector.*;

import org.apache.drill.exec.vector.NullableBigIntVector;
import org.apache.drill.exec.vector.NullableFloat4Vector;
import org.apache.drill.exec.vector.NullableFloat8Vector;
import org.apache.drill.exec.vector.NullableIntVector;
import parquet.column.ColumnDescriptor;
import parquet.column.Encoding;
import parquet.format.ConvertedType;
import parquet.format.SchemaElement;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.schema.PrimitiveType;

class NullableFixedByteAlignedReaders {

public static NullableColumnReader getNullableColumnReader(ParquetRecordReader parentReader, int allocateSize,
ColumnDescriptor columnDescriptor,
ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength,
ValueVector valueVec,
SchemaElement schemaElement) throws ExecutionSetupException {
if (! columnChunkMetaData.getEncodings().contains(Encoding.PLAIN_DICTIONARY)) {
return new NullableFixedByteAlignedReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
fixedLength, valueVec, schemaElement);
} else {
if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64) {
return new NullableDictionaryBigIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
fixedLength, (NullableBigIntVector)valueVec, schemaElement);
}
else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32) {
return new NullableDicationaryIntReader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
fixedLength, (NullableIntVector)valueVec, schemaElement);
}
else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.FLOAT) {
return new NullableDictionaryFloat4Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
fixedLength, (NullableFloat4Vector)valueVec, schemaElement);
}
else if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.DOUBLE) {
return new NullableDictionaryFloat8Reader(parentReader, allocateSize, columnDescriptor, columnChunkMetaData,
fixedLength, (NullableFloat8Vector)valueVec, schemaElement);
}
else{
throw new ExecutionSetupException("Unsupported nullable column type " + columnDescriptor.getType().name() );
}
}
}

private static class NullableFixedByteAlignedReader extends NullableColumnReader {
private byte[] bytes;

NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}

// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
this.recordsReadInThisIteration = recordsToReadInThisPass;

// set up metadata
this.readStartInBytes = pageReadStatus.readPosInBytes;
this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
this.bytes = pageReadStatus.pageDataByteArray;

// fill in data.
vectorData.writeBytes(bytes, (int) readStartInBytes, (int) readLength);
}
}

private static class NullableDicationaryIntReader extends NullableColumnReader<NullableIntVector> {

private byte[] bytes;

NullableDicationaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableIntVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}

// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
if (usingDictionary) {
for (int i = 0; i < recordsToReadInThisPass; i++){
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readInteger());
}
}
}
}

private static class NullableDictionaryBigIntReader extends NullableColumnReader<NullableBigIntVector> {

private byte[] bytes;

NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableBigIntVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}

// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
for (int i = 0; i < recordsToReadInThisPass; i++){
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readLong());
}
}
}

private static class NullableDictionaryFloat4Reader extends NullableColumnReader<NullableFloat4Vector> {

private byte[] bytes;

NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat4Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}

// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
for (int i = 0; i < recordsToReadInThisPass; i++){
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readFloat());
}
}
}

private static class NullableDictionaryFloat8Reader extends NullableColumnReader<NullableFloat8Vector> {

private byte[] bytes;

NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat8Vector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}

// this method is called by its superclass during a read loop
@Override
protected void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {
for (int i = 0; i < recordsToReadInThisPass; i++){
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReadStatus.valueReader.readDouble());
}
}
}

}
Expand Up @@ -41,6 +41,7 @@
import parquet.format.PageType;
import parquet.format.Util;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.schema.PrimitiveType;

// class to keep track of the read position of variable length columns
final class PageReadStatus {
Expand All @@ -65,26 +66,33 @@ final class PageReadStatus {
ValuesReader definitionLevels;
ValuesReader valueReader;
Dictionary dictionary;
PageHeader pageHeader = null;

PageReadStatus(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{
this.parentColumnReader = parentStatus;

long totalByteLength = columnChunkMetaData.getTotalSize();
long totalByteLength = columnChunkMetaData.getTotalUncompressedSize();
long start = columnChunkMetaData.getFirstDataPageOffset();
try {
FSDataInputStream f = fs.open(path);
this.dataReader = new ColumnDataReader(f, start, totalByteLength);
if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
f.seek(columnChunkMetaData.getDictionaryPageOffset());
PageHeader pageHeader = Util.readPageHeader(f);
assert pageHeader.type == PageType.DICTIONARY_PAGE;
DictionaryPage page = new DictionaryPage(BytesInput.copy(BytesInput.from(f, pageHeader.compressed_page_size)),
BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
.decompress( //
dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), //
pageHeader.getUncompressed_page_size(), //
parentColumnReader.columnChunkMetaData.getCodec());
DictionaryPage page = new DictionaryPage(
bytesIn,
pageHeader.uncompressed_page_size,
pageHeader.dictionary_page_header.num_values,
parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
);
this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
}
this.dataReader = new ColumnDataReader(f, start, totalByteLength);
} catch (IOException e) {
throw new ExecutionSetupException("Error opening or reading metatdata for parquet file at location: " + path.getName(), e);
}
Expand All @@ -102,21 +110,29 @@ public boolean next() throws IOException {

currentPage = null;

if(!dataReader.hasRemainder()) {
// TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause
// and submit a bug report
if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) {
return false;
}

// next, we need to decompress the bytes
PageHeader pageHeader = null;
// TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one
// I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary
do {
pageHeader = dataReader.readPageHeader();
if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
DictionaryPage page = new DictionaryPage(BytesInput.copy(BytesInput.from(dataReader.input, pageHeader.compressed_page_size)),
pageHeader.uncompressed_page_size,
pageHeader.dictionary_page_header.num_values,
parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
System.out.println(pageHeader.dictionary_page_header.getEncoding());
BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
.decompress( //
dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), //
pageHeader.getUncompressed_page_size(), //
parentColumnReader.columnChunkMetaData.getCodec());
DictionaryPage page = new DictionaryPage(
bytesIn,
pageHeader.uncompressed_page_size,
pageHeader.dictionary_page_header.num_values,
parquet.column.Encoding.valueOf(pageHeader.dictionary_page_header.encoding.name())
);
this.dictionary = page.getEncoding().initDictionary(parentColumnReader.columnDescriptor, page);
}
Expand Down Expand Up @@ -157,13 +173,16 @@ public boolean next() throws IOException {
valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES);
definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0);
readPosInBytes = definitionLevels.getNextOffset();
valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) {
valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
}
} else {
definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL);
definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, 0);
readPosInBytes = definitionLevels.getNextOffset();
valueReader = new DictionaryValuesReader(dictionary);
valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes);
this.parentColumnReader.usingDictionary = true;
}
}
return true;
Expand Down
@@ -0,0 +1,54 @@
/*******************************************************************************
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package org.apache.drill.exec.store.parquet;

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.vector.BigIntVector;
import org.apache.drill.exec.vector.ValueVector;
import parquet.column.ColumnDescriptor;
import parquet.format.ConvertedType;
import parquet.format.SchemaElement;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.schema.PrimitiveType;

public class ParquetFixedWidthDictionaryReader extends ColumnReader{

ParquetFixedWidthDictionaryReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v,
SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}

@Override
public void readField(long recordsToReadInThisPass, ColumnReader firstColumnStatus) {

recordsReadInThisIteration = Math.min(pageReadStatus.currentPage.getValueCount()
- pageReadStatus.valuesRead, recordsToReadInThisPass - valuesReadInCurrentPass);
int defLevel;
for (int i = 0; i < recordsReadInThisIteration; i++){
defLevel = pageReadStatus.definitionLevels.readInteger();
// if the value is defined
if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
if (columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64)
((BigIntVector)valueVec).getMutator().set(i + valuesReadInCurrentPass,
pageReadStatus.valueReader.readLong() );
}
// otherwise the value is skipped, because the bit vector indicating nullability is zero filled
}
}
}

0 comments on commit 28dd76a

Please sign in to comment.