Skip to content

Commit

Permalink
DRILL-1410: Move off of Parquet fork
Browse files Browse the repository at this point in the history
  • Loading branch information
adeneche authored and StevenMPhillips committed Apr 13, 2015
1 parent a6df26a commit ba12f3d
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 15 deletions.
Expand Up @@ -42,8 +42,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import parquet.column.ColumnWriteStore;
import parquet.column.ParquetProperties.WriterVersion;
import parquet.column.impl.ColumnWriteStoreImpl;
import parquet.column.impl.ColumnWriteStoreV1;
import parquet.column.page.PageWriteStore;
import parquet.hadoop.ColumnChunkPageWriteStoreExposer;
import parquet.hadoop.ParquetFileWriter;
Expand Down Expand Up @@ -83,7 +84,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
private long recordCount = 0;
private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;

private ColumnWriteStoreImpl store;
private ColumnWriteStore store;
private PageWriteStore pageStore;

private RecordConsumer consumer;
Expand Down Expand Up @@ -160,7 +161,7 @@ private void newSchema() throws IOException {
this.schema,
initialBlockBufferSize);
int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
store = new ColumnWriteStoreV1(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion);
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(this.schema);
consumer = columnIO.getRecordWriter(store);
setUp(schema, consumer);
Expand All @@ -181,7 +182,7 @@ private PrimitiveType getPrimitiveType(MaterializedField field) {
OriginalType originalType = ParquetTypeHelper.getOriginalTypeForMinorType(minorType);
DecimalMetadata decimalMetadata = ParquetTypeHelper.getDecimalMetadataForField(field);
int length = ParquetTypeHelper.getLengthForMinorType(minorType);
return new PrimitiveType(repetition, primitiveTypeName, length, name, originalType, decimalMetadata);
return new PrimitiveType(repetition, primitiveTypeName, length, name, originalType, decimalMetadata, null);
}

private parquet.schema.Type getType(MaterializedField field) {
Expand Down Expand Up @@ -217,7 +218,7 @@ private void flush() throws IOException {

private void checkBlockSizeReached() throws IOException {
if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
long memSize = store.memSize();
long memSize = store.getBufferedSize();
if (memSize > blockSize) {
logger.debug("Reached block size " + blockSize);
flush();
Expand Down
Expand Up @@ -35,8 +35,8 @@
import parquet.bytes.BytesInput;
import parquet.column.Dictionary;
import parquet.column.ValuesType;
import parquet.column.page.DataPageV1;
import parquet.column.page.DictionaryPage;
import parquet.column.page.Page;
import parquet.column.values.ValuesReader;
import parquet.column.values.dictionary.DictionaryValuesReader;
import parquet.format.PageHeader;
Expand All @@ -46,14 +46,16 @@
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.schema.PrimitiveType;

import static parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;

// class to keep track of the read position of variable length columns
final class PageReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PageReader.class);

private final ColumnReader parentColumnReader;
private final ColumnDataReader dataReader;
// store references to the pages that have been uncompressed, but not copied to ValueVectors yet
Page currentPage;
DataPageV1 currentPage;
// buffer to store bytes of current page
DrillBuf pageDataByteArray;

Expand Down Expand Up @@ -217,10 +219,11 @@ public boolean next() throws IOException {
pageHeader.getUncompressed_page_size());
compressedData.release();
}
currentPage = new Page(
currentPage = new DataPageV1(
bytesIn,
pageHeader.data_page_header.num_values,
pageHeader.uncompressed_page_size,
fromParquetStatistics(pageHeader.data_page_header.getStatistics(), parentColumnReader.getColumnDescriptor().getType()), // ?
ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
ParquetFormatPlugin.parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
Expand Down
Expand Up @@ -55,11 +55,11 @@
import org.apache.hadoop.fs.Path;

import parquet.column.ColumnDescriptor;
import parquet.common.schema.ColumnPath;
import parquet.hadoop.CodecFactoryExposer;
import parquet.hadoop.ColumnChunkIncReadStore;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.ColumnPath;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.io.ColumnIOFactory;
import parquet.io.InvalidRecordException;
Expand Down
Expand Up @@ -35,8 +35,9 @@

import parquet.bytes.BytesInput;
import parquet.column.ColumnDescriptor;
import parquet.column.page.DataPage;
import parquet.column.page.DataPageV1;
import parquet.column.page.DictionaryPage;
import parquet.column.page.Page;
import parquet.column.page.PageReadStore;
import parquet.column.page.PageReader;
import parquet.format.PageHeader;
Expand Down Expand Up @@ -129,7 +130,7 @@ public long getTotalValueCount() {
}

@Override
public Page readPage() {
public DataPage readPage() {
PageHeader pageHeader = new PageHeader();
try {
if (lastPage != null) {
Expand Down Expand Up @@ -159,7 +160,7 @@ public Page readPage() {
while (buffer.remaining() > 0) {
CompatibilityUtil.getBuf(in, buffer, pageHeader.compressed_page_size);
}
return new Page(
return new DataPageV1(
decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
pageHeader.data_page_header.num_values,
pageHeader.uncompressed_page_size,
Expand Down
Expand Up @@ -67,7 +67,7 @@
import org.junit.Test;

import parquet.bytes.BytesInput;
import parquet.column.page.Page;
import parquet.column.page.DataPageV1;
import parquet.column.page.PageReadStore;
import parquet.column.page.PageReader;
import parquet.hadoop.CodecFactoryExposer;
Expand Down Expand Up @@ -370,7 +370,7 @@ private void validateFooters(final List<Footer> metadata) {
private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes)
throws IOException {
PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path));
Page page = pageReader.readPage();
DataPageV1 page = (DataPageV1) pageReader.readPage();
assertEquals(values, page.getValueCount());
assertArrayEquals(bytes.toByteArray(), page.getBytes().toByteArray());
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -33,7 +33,7 @@
<proto.cas.path>${project.basedir}/src/main/protobuf/</proto.cas.path>
<dep.junit.version>4.11</dep.junit.version>
<dep.slf4j.version>1.7.5</dep.slf4j.version>
<parquet.version>1.5.1-drill-r7</parquet.version>
<parquet.version>1.6.0rc3-drill-r0.1</parquet.version>
</properties>

<scm>
Expand Down

0 comments on commit ba12f3d

Please sign in to comment.