From c5b4b42e633c3d2498c100c9185c16b27f5dcb40 Mon Sep 17 00:00:00 2001 From: Jongyoung Park Date: Mon, 27 Jul 2015 18:13:24 +0900 Subject: [PATCH] basic update --- tajo-storage/tajo-storage-hdfs/pom.xml | 16 +- .../tajo/storage/parquet/ParquetAppender.java | 6 +- .../storage/parquet/TajoParquetReader.java | 4 +- .../storage/parquet/TajoParquetWriter.java | 4 +- .../tajo/storage/parquet/TajoReadSupport.java | 10 +- .../storage/parquet/TajoRecordConverter.java | 12 +- .../parquet/TajoRecordMaterializer.java | 6 +- .../storage/parquet/TajoSchemaConverter.java | 10 +- .../storage/parquet/TajoWriteSupport.java | 12 +- .../thirdparty/parquet/CodecFactory.java | 190 ------- .../parquet/ColumnChunkPageWriteStore.java | 206 -------- .../parquet/InternalParquetRecordReader.java | 190 ------- .../parquet/InternalParquetRecordWriter.java | 160 ------ .../thirdparty/parquet/ParquetFileWriter.java | 492 ------------------ .../thirdparty/parquet/ParquetReader.java | 146 ------ .../thirdparty/parquet/ParquetWriter.java | 224 -------- .../storage/parquet/TestSchemaConverter.java | 4 +- 17 files changed, 36 insertions(+), 1656 deletions(-) delete mode 100644 tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java delete mode 100644 tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java delete mode 100644 tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java delete mode 100644 tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java delete mode 100644 tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java delete mode 100644 tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java delete mode 100644 tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml index bfa5707996..1784ab32d5 100644 --- a/tajo-storage/tajo-storage-hdfs/pom.xml +++ b/tajo-storage/tajo-storage-hdfs/pom.xml @@ -34,8 +34,6 @@ UTF-8 UTF-8 - 1.5.0 - 2.1.0 @@ -334,19 +332,9 @@ test - com.twitter - parquet-column - ${parquet.version} - - - com.twitter + org.apache.parquet parquet-hadoop - ${parquet.version} - - - com.twitter - parquet-format - ${parquet.format.version} + 1.8.1 io.netty diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java index 45960aabea..4a8b256cee 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java @@ -21,8 +21,8 @@ import org.apache.hadoop.io.IOUtils; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.storage.StorageConstants; -import parquet.hadoop.ParquetOutputFormat; -import parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -133,7 +133,7 @@ public void close() throws IOException { } public long getEstimatedOutputSize() throws IOException { - return writer.getEstimatedWrittenSize(); + return writer.getDataSize(); } /** diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java index a765f489fc..1a6545f662 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java @@ -19,10 +19,10 @@ package org.apache.tajo.storage.parquet; import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetReader; import org.apache.tajo.catalog.Schema; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.thirdparty.parquet.ParquetReader; -import parquet.filter.UnboundRecordFilter; +import org.apache.parquet.filter.UnboundRecordFilter; import java.io.IOException; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java index 5f220c5467..8e6ae3e1da 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java @@ -19,10 +19,10 @@ package org.apache.tajo.storage.parquet; import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetWriter; import org.apache.tajo.catalog.Schema; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.thirdparty.parquet.ParquetWriter; -import parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; import java.io.IOException; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java index a64e9871f9..4a3300cb65 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java @@ -21,11 +21,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.tajo.catalog.Schema; import org.apache.tajo.storage.Tuple; -import parquet.Log; -import parquet.hadoop.api.InitContext; -import parquet.hadoop.api.ReadSupport; -import parquet.io.api.RecordMaterializer; -import parquet.schema.MessageType; +import org.apache.parquet.Log; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; import java.util.Map; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java index 7f236b65d3..43c55e1df6 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java @@ -27,12 +27,12 @@ import org.apache.tajo.datum.*; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; -import parquet.io.api.Binary; -import parquet.io.api.Converter; -import parquet.io.api.GroupConverter; -import parquet.io.api.PrimitiveConverter; -import parquet.schema.GroupType; -import parquet.schema.Type; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.Type; import java.nio.ByteBuffer; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java index 436159ce81..f7628206dd 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java @@ -21,9 +21,9 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.storage.Tuple; -import parquet.io.api.GroupConverter; -import parquet.io.api.RecordMaterializer; -import parquet.schema.MessageType; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; /** * Materializes a Tajo Tuple from a stream of Parquet data. diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java index 555b623f1c..e0cf64b00d 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java @@ -21,11 +21,11 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes; -import parquet.schema.MessageType; -import parquet.schema.OriginalType; -import parquet.schema.PrimitiveType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; -import parquet.schema.Type; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Type; import java.util.ArrayList; import java.util.List; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java index de2a1e3613..9613a25a87 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java @@ -24,12 +24,12 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.exception.ValueTooLongForTypeCharactersException; import org.apache.tajo.storage.Tuple; -import parquet.hadoop.api.WriteSupport; -import parquet.io.api.Binary; -import parquet.io.api.RecordConsumer; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.Type; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; import java.util.HashMap; import java.util.List; diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java deleted file mode 100644 index 4ba47c1379..0000000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/CodecFactory.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.*; -import org.apache.hadoop.util.ReflectionUtils; -import parquet.bytes.BytesInput; -import parquet.hadoop.BadConfigurationException; -import parquet.hadoop.metadata.CompressionCodecName; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; - -class CodecFactory { - - public static class BytesDecompressor { - - private final CompressionCodec codec; - private final Decompressor decompressor; - - public BytesDecompressor(CompressionCodec codec) { - this.codec = codec; - if (codec != null) { - decompressor = CodecPool.getDecompressor(codec); - } else { - decompressor = null; - } - } - - public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { - final BytesInput decompressed; - if (codec != null) { - decompressor.reset(); - InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor); - decompressed = BytesInput.from(is, uncompressedSize); - } else { - decompressed = bytes; - } - return decompressed; - } - - private void release() { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - } - } - } - - /** - * Encapsulates the logic around hadoop compression - * - * @author Julien Le Dem - * - */ - public static class BytesCompressor { - - private final CompressionCodec codec; - private final Compressor compressor; - private final ByteArrayOutputStream compressedOutBuffer; - private final CompressionCodecName codecName; - - public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) { - this.codecName = codecName; - this.codec = codec; - if (codec != null) { - this.compressor = CodecPool.getCompressor(codec); - this.compressedOutBuffer = new ByteArrayOutputStream(pageSize); - } else { - this.compressor = null; - this.compressedOutBuffer = null; - } - } - - public BytesInput compress(BytesInput bytes) throws IOException { - final BytesInput compressedBytes; - if (codec == null) { - compressedBytes = bytes; - } else { - compressedOutBuffer.reset(); - if (compressor != null) { - // null compressor for non-native gzip - compressor.reset(); - } - CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor); - bytes.writeAllTo(cos); - cos.finish(); - cos.close(); - compressedBytes = BytesInput.from(compressedOutBuffer); - } - return compressedBytes; - } - - private void release() { - if (compressor != null) { - CodecPool.returnCompressor(compressor); - } - } - - public CompressionCodecName getCodecName() { - return codecName; - } - - } - - private final Map compressors = new HashMap(); - private final Map decompressors = new HashMap(); - private final Map codecByName = new HashMap(); - private final Configuration configuration; - - public CodecFactory(Configuration configuration) { - this.configuration = configuration; - } - - /** - * - * @param codecName the requested codec - * @return the corresponding hadoop codec. null if UNCOMPRESSED - */ - private CompressionCodec getCodec(CompressionCodecName codecName) { - String codecClassName = codecName.getHadoopCompressionCodecClassName(); - if (codecClassName == null) { - return null; - } - CompressionCodec codec = codecByName.get(codecClassName); - if (codec != null) { - return codec; - } - - try { - Class codecClass = Class.forName(codecClassName); - codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration); - codecByName.put(codecClassName, codec); - return codec; - } catch (ClassNotFoundException e) { - throw new BadConfigurationException("Class " + codecClassName + " was not found", e); - } - } - - public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) { - BytesCompressor comp = compressors.get(codecName); - if (comp == null) { - CompressionCodec codec = getCodec(codecName); - comp = new BytesCompressor(codecName, codec, pageSize); - compressors.put(codecName, comp); - } - return comp; - } - - public BytesDecompressor getDecompressor(CompressionCodecName codecName) { - BytesDecompressor decomp = decompressors.get(codecName); - if (decomp == null) { - CompressionCodec codec = getCodec(codecName); - decomp = new BytesDecompressor(codec); - decompressors.put(codecName, decomp); - } - return decomp; - } - - public void release() { - for (BytesCompressor compressor : compressors.values()) { - compressor.release(); - } - compressors.clear(); - for (BytesDecompressor decompressor : decompressors.values()) { - decompressor.release(); - } - decompressors.clear(); - } -} \ No newline at end of file diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java deleted file mode 100644 index 91d4748adc..0000000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ColumnChunkPageWriteStore.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import parquet.Log; -import parquet.bytes.BytesInput; -import parquet.bytes.CapacityByteArrayOutputStream; -import parquet.column.ColumnDescriptor; -import parquet.column.Encoding; -import parquet.column.page.DictionaryPage; -import parquet.column.page.PageWriteStore; -import parquet.column.page.PageWriter; -import parquet.column.statistics.BooleanStatistics; -import parquet.column.statistics.Statistics; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.io.ParquetEncodingException; -import parquet.schema.MessageType; - -import java.io.IOException; -import java.util.*; - -import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor; -import static parquet.Log.DEBUG; - -class ColumnChunkPageWriteStore implements PageWriteStore { - private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class); - - private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); - - private static final class ColumnChunkPageWriter implements PageWriter { - - private final ColumnDescriptor path; - private final BytesCompressor compressor; - - private final CapacityByteArrayOutputStream buf; - private DictionaryPage dictionaryPage; - - private long uncompressedLength; - private long compressedLength; - private long totalValueCount; - private int pageCount; - - private Set encodings = new HashSet(); - - private Statistics totalStatistics; - - private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int initialSize) { - this.path = path; - this.compressor = compressor; - this.buf = new CapacityByteArrayOutputStream(initialSize); - this.totalStatistics = Statistics.getStatsBasedOnType(this.path.getType()); - } - - @Deprecated - @Override - public void writePage(BytesInput bytes, - int valueCount, - Encoding rlEncoding, - Encoding dlEncoding, - Encoding valuesEncoding) throws IOException { - long uncompressedSize = bytes.size(); - BytesInput compressedBytes = compressor.compress(bytes); - long compressedSize = compressedBytes.size(); - BooleanStatistics statistics = new BooleanStatistics(); // dummy stats object - parquetMetadataConverter.writeDataPageHeader( - (int)uncompressedSize, - (int)compressedSize, - valueCount, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - buf); - this.uncompressedLength += uncompressedSize; - this.compressedLength += compressedSize; - this.totalValueCount += valueCount; - this.pageCount += 1; - compressedBytes.writeAllTo(buf); - encodings.add(rlEncoding); - encodings.add(dlEncoding); - encodings.add(valuesEncoding); - } - - @Override - public void writePage(BytesInput bytes, - int valueCount, - Statistics statistics, - Encoding rlEncoding, - Encoding dlEncoding, - Encoding valuesEncoding) throws IOException { - long uncompressedSize = bytes.size(); - BytesInput compressedBytes = compressor.compress(bytes); - long compressedSize = compressedBytes.size(); - parquetMetadataConverter.writeDataPageHeader( - (int)uncompressedSize, - (int)compressedSize, - valueCount, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - buf); - this.uncompressedLength += uncompressedSize; - this.compressedLength += compressedSize; - this.totalValueCount += valueCount; - this.pageCount += 1; - this.totalStatistics.mergeStatistics(statistics); - compressedBytes.writeAllTo(buf); - encodings.add(rlEncoding); - encodings.add(dlEncoding); - encodings.add(valuesEncoding); - } - - @Override - public long getMemSize() { - return buf.size(); - } - - public void writeToFileWriter(ParquetFileWriter writer) throws IOException { - writer.startColumn(path, totalValueCount, compressor.getCodecName()); - if (dictionaryPage != null) { - writer.writeDictionaryPage(dictionaryPage); - encodings.add(dictionaryPage.getEncoding()); - } - writer.writeDataPages(BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics, new ArrayList(encodings)); - writer.endColumn(); - if (DEBUG) { - LOG.debug( - String.format( - "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s", - buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings) - + (dictionaryPage != null ? String.format( - ", dic { %,d entries, %,dB raw, %,dB comp}", - dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize()) - : "")); - } - encodings.clear(); - pageCount = 0; - } - - @Override - public long allocatedSize() { - return buf.getCapacity(); - } - - @Override - public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { - if (this.dictionaryPage != null) { - throw new ParquetEncodingException("Only one dictionary page is allowed"); - } - BytesInput dictionaryBytes = dictionaryPage.getBytes(); - int uncompressedSize = (int)dictionaryBytes.size(); - BytesInput compressedBytes = compressor.compress(dictionaryBytes); - this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding()); - } - - @Override - public String memUsageString(String prefix) { - return buf.memUsageString(prefix + " ColumnChunkPageWriter"); - } - } - - private final Map writers = new HashMap(); - private final MessageType schema; - private final BytesCompressor compressor; - private final int initialSize; - - public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int initialSize) { - this.compressor = compressor; - this.schema = schema; - this.initialSize = initialSize; - } - - @Override - public PageWriter getPageWriter(ColumnDescriptor path) { - if (!writers.containsKey(path)) { - writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSize)); - } - return writers.get(path); - } - - public void flushToFileWriter(ParquetFileWriter writer) throws IOException { - List columns = schema.getColumns(); - for (ColumnDescriptor columnDescriptor : columns) { - ColumnChunkPageWriter pageWriter = writers.get(columnDescriptor); - pageWriter.writeToFileWriter(writer); - } - } - -} \ No newline at end of file diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java deleted file mode 100644 index 10ac6de1f5..0000000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import parquet.Log; -import parquet.column.ColumnDescriptor; -import parquet.column.page.PageReadStore; -import parquet.filter.UnboundRecordFilter; -import parquet.hadoop.ParquetFileReader; -import parquet.hadoop.api.ReadSupport; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.util.counters.BenchmarkCounter; -import parquet.io.ColumnIOFactory; -import parquet.io.MessageColumnIO; -import parquet.io.ParquetDecodingException; -import parquet.io.api.RecordMaterializer; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.Type; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import static java.lang.String.format; -import static parquet.Log.DEBUG; - -class InternalParquetRecordReader { - private static final Log LOG = Log.getLog(InternalParquetRecordReader.class); - - private final ColumnIOFactory columnIOFactory = new ColumnIOFactory(); - - private MessageType requestedSchema; - private MessageType fileSchema; - private int columnCount; - private final ReadSupport readSupport; - - private RecordMaterializer recordConverter; - - private T currentValue; - private long total; - private int current = 0; - private int currentBlock = -1; - private ParquetFileReader reader; - private parquet.io.RecordReader recordReader; - private UnboundRecordFilter recordFilter; - - private long totalTimeSpentReadingBytes; - private long totalTimeSpentProcessingRecords; - private long startedAssemblingCurrentBlockAt; - - private long totalCountLoadedSoFar = 0; - - private Path file; - - /** - * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. - */ - public InternalParquetRecordReader(ReadSupport readSupport) { - this(readSupport, null); - } - - /** - * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. - * @param filter Optional filter for only returning matching records. - */ - public InternalParquetRecordReader(ReadSupport readSupport, UnboundRecordFilter - filter) { - this.readSupport = readSupport; - this.recordFilter = filter; - } - - private void checkRead() throws IOException { - if (current == totalCountLoadedSoFar) { - if (current != 0) { - long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt; - totalTimeSpentProcessingRecords += timeAssembling; - if (DEBUG) LOG.debug("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: " + ((float) totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float) totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms"); - long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes; - long percentReading = 100 * totalTimeSpentReadingBytes / totalTime; - long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime; - if (DEBUG) LOG.debug("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)"); - } - - if (DEBUG) LOG.debug("at row " + current + ". reading next block"); - long t0 = System.currentTimeMillis(); - PageReadStore pages = reader.readNextRowGroup(); - if (pages == null) { - throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total); - } - long timeSpentReading = System.currentTimeMillis() - t0; - totalTimeSpentReadingBytes += timeSpentReading; - BenchmarkCounter.incrementTime(timeSpentReading); - if (DEBUG) { - LOG.debug("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount()); - LOG.debug("initializing Record assembly with requested schema " + requestedSchema); - } - MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema); - recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter); - startedAssemblingCurrentBlockAt = System.currentTimeMillis(); - totalCountLoadedSoFar += pages.getRowCount(); - ++ currentBlock; - } - } - - public void close() throws IOException { - reader.close(); - } - - public Void getCurrentKey() throws IOException, InterruptedException { - return null; - } - - public T getCurrentValue() throws IOException, - InterruptedException { - return currentValue; - } - - public float getProgress() throws IOException, InterruptedException { - return (float) current / total; - } - - public void initialize(MessageType requestedSchema, MessageType fileSchema, - Map extraMetadata, Map readSupportMetadata, - Path file, List blocks, Configuration configuration) - throws IOException { - this.requestedSchema = requestedSchema; - this.fileSchema = fileSchema; - this.file = file; - this.columnCount = this.requestedSchema.getPaths().size(); - this.recordConverter = readSupport.prepareForRead( - configuration, extraMetadata, fileSchema, - new ReadSupport.ReadContext(requestedSchema, readSupportMetadata)); - - List columns = requestedSchema.getColumns(); - reader = new ParquetFileReader(configuration, file, blocks, columns); - for (BlockMetaData block : blocks) { - total += block.getRowCount(); - } - if (DEBUG) LOG.debug("RecordReader initialized will read a total of " + total + " records."); - } - - private boolean contains(GroupType group, String[] path, int index) { - if (index == path.length) { - return false; - } - if (group.containsField(path[index])) { - Type type = group.getType(path[index]); - if (type.isPrimitive()) { - return index + 1 == path.length; - } else { - return contains(type.asGroupType(), path, index + 1); - } - } - return false; - } - - public boolean nextKeyValue() throws IOException, InterruptedException { - if (current < total) { - try { - checkRead(); - currentValue = recordReader.read(); - if (DEBUG) LOG.debug("read value: " + currentValue); - current ++; - } catch (RuntimeException e) { - throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e); - } - return true; - } - return false; - } -} diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java deleted file mode 100644 index da57745660..0000000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java +++ /dev/null @@ -1,160 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import parquet.Log; -import parquet.column.ParquetProperties.WriterVersion; -import parquet.column.impl.ColumnWriteStoreImpl; -import parquet.hadoop.api.WriteSupport; -import parquet.io.ColumnIOFactory; -import parquet.io.MessageColumnIO; -import parquet.schema.MessageType; - -import java.io.IOException; -import java.util.Map; - -import static java.lang.Math.max; -import static java.lang.Math.min; -import static java.lang.String.format; -import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor; -import static parquet.Log.DEBUG; -import static parquet.Preconditions.checkNotNull; - -class InternalParquetRecordWriter { - private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class); - - private static final int MINIMUM_BUFFER_SIZE = 64 * 1024; - private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; - private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; - - private final ParquetFileWriter w; - private final WriteSupport writeSupport; - private final MessageType schema; - private final Map extraMetaData; - private final int blockSize; - private final int pageSize; - private final BytesCompressor compressor; - private final int dictionaryPageSize; - private final boolean enableDictionary; - private final boolean validating; - private final WriterVersion writerVersion; - - private long recordCount = 0; - private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; - - private ColumnWriteStoreImpl store; - private ColumnChunkPageWriteStore pageStore; - - /** - * @param w the file to write to - * @param writeSupport the class to convert incoming records - * @param schema the schema of the records - * @param extraMetaData extra meta data to write in the footer of the file - * @param blockSize the size of a block in the file (this will be approximate) - * @param codec the codec used to compress - */ - public InternalParquetRecordWriter( - ParquetFileWriter w, - WriteSupport writeSupport, - MessageType schema, - Map extraMetaData, - int blockSize, - int pageSize, - BytesCompressor compressor, - int dictionaryPageSize, - boolean enableDictionary, - boolean validating, - WriterVersion writerVersion) { - this.w = w; - this.writeSupport = checkNotNull(writeSupport, "writeSupport"); - this.schema = schema; - this.extraMetaData = extraMetaData; - this.blockSize = blockSize; - this.pageSize = pageSize; - this.compressor = compressor; - this.dictionaryPageSize = dictionaryPageSize; - this.enableDictionary = enableDictionary; - this.validating = validating; - this.writerVersion = writerVersion; - initStore(); - } - - private void initStore() { - // we don't want this number to be too small - // ideally we divide the block equally across the columns - // it is unlikely all columns are going to be the same size. - int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size() / 5); - pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize); - // we don't want this number to be too small either - // ideally, slightly bigger than the page size, but not bigger than the block buffer - int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); - store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion); - MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); - writeSupport.prepareForWrite(columnIO.getRecordWriter(store)); - } - - public void close() throws IOException, InterruptedException { - flushStore(); - w.end(extraMetaData); - } - - public void write(T value) throws IOException, InterruptedException { - writeSupport.write(value); - ++ recordCount; - checkBlockSizeReached(); - } - - private void checkBlockSizeReached() throws IOException { - if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record. - long memSize = store.memSize(); - if (memSize > blockSize) { - if (DEBUG) LOG.debug(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount)); - flushStore(); - initStore(); - recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); - } else { - float recordSize = (float) memSize / recordCount; - recordCountForNextMemCheck = min( - max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway - recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead - ); - if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck)); - } - } - } - - public long getEstimatedWrittenSize() throws IOException { - return w.getPos() + store.memSize(); - } - - private void flushStore() - throws IOException { - if (DEBUG) LOG.debug(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize())); - if (store.allocatedSize() > 3 * blockSize) { - LOG.warn("Too much memory used: " + store.memUsageString()); - } - w.startBlock(recordCount); - store.flush(); - pageStore.flushToFileWriter(w); - recordCount = 0; - w.endBlock(); - store = null; - pageStore = null; - } -} \ No newline at end of file diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java deleted file mode 100644 index ac1c421404..0000000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetFileWriter.java +++ /dev/null @@ -1,492 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import parquet.Log; -import parquet.Version; -import parquet.bytes.BytesInput; -import parquet.bytes.BytesUtils; -import parquet.column.ColumnDescriptor; -import parquet.column.page.DictionaryPage; -import parquet.column.statistics.Statistics; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.Footer; -import parquet.hadoop.metadata.*; -import parquet.io.ParquetEncodingException; -import parquet.schema.MessageType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.*; -import java.util.Map.Entry; - -import static parquet.Log.DEBUG; -import static parquet.format.Util.writeFileMetaData; - -/** - * Internal implementation of the Parquet file writer as a block container - * - * @author Julien Le Dem - * - */ -public class ParquetFileWriter { - private static final Log LOG = Log.getLog(ParquetFileWriter.class); - - public static final String PARQUET_METADATA_FILE = "_metadata"; - public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII")); - public static final int CURRENT_VERSION = 1; - - private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); - - private final MessageType schema; - private final FSDataOutputStream out; - private BlockMetaData currentBlock; - private ColumnChunkMetaData currentColumn; - private long currentRecordCount; - private List blocks = new ArrayList(); - private long uncompressedLength; - private long compressedLength; - private Set currentEncodings; - - private CompressionCodecName currentChunkCodec; - private ColumnPath currentChunkPath; - private PrimitiveTypeName currentChunkType; - private long currentChunkFirstDataPage; - private long currentChunkDictionaryPageOffset; - private long currentChunkValueCount; - - private Statistics currentStatistics; - - /** - * Captures the order in which methods should be called - * - * @author Julien Le Dem - * - */ - private enum STATE { - NOT_STARTED { - STATE start() { - return STARTED; - } - }, - STARTED { - STATE startBlock() { - return BLOCK; - } - STATE end() { - return ENDED; - } - }, - BLOCK { - STATE startColumn() { - return COLUMN; - } - STATE endBlock() { - return STARTED; - } - }, - COLUMN { - STATE endColumn() { - return BLOCK; - }; - STATE write() { - return this; - } - }, - ENDED; - - STATE start() throws IOException { return error(); } - STATE startBlock() throws IOException { return error(); } - STATE startColumn() throws IOException { return error(); } - STATE write() throws IOException { return error(); } - STATE endColumn() throws IOException { return error(); } - STATE endBlock() throws IOException { return error(); } - STATE end() throws IOException { return error(); } - - private final STATE error() throws IOException { - throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name()); - } - } - - private STATE state = STATE.NOT_STARTED; - - /** - * - * @param configuration Configuration - * @param schema the schema of the data - * @param file the file to write to - * @throws java.io.IOException if the file can not be created - */ - public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException { - super(); - this.schema = schema; - FileSystem fs = file.getFileSystem(configuration); - this.out = fs.create(file, false); - } - - /** - * start the file - * @throws java.io.IOException - */ - public void start() throws IOException { - state = state.start(); - if (DEBUG) LOG.debug(out.getPos() + ": start"); - out.write(MAGIC); - } - - /** - * start a block - * @param recordCount the record count in this block - * @throws java.io.IOException - */ - public void startBlock(long recordCount) throws IOException { - state = state.startBlock(); - if (DEBUG) LOG.debug(out.getPos() + ": start block"); -// out.write(MAGIC); // TODO: add a magic delimiter - currentBlock = new BlockMetaData(); - currentRecordCount = recordCount; - } - - /** - * start a column inside a block - * @param descriptor the column descriptor - * @param valueCount the value count in this column - * @param statistics the statistics in this column - * @param compressionCodecName - * @throws java.io.IOException - */ - public void startColumn(ColumnDescriptor descriptor, - long valueCount, - CompressionCodecName compressionCodecName) throws IOException { - state = state.startColumn(); - if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount); - currentEncodings = new HashSet(); - currentChunkPath = ColumnPath.get(descriptor.getPath()); - currentChunkType = descriptor.getType(); - currentChunkCodec = compressionCodecName; - currentChunkValueCount = valueCount; - currentChunkFirstDataPage = out.getPos(); - compressedLength = 0; - uncompressedLength = 0; - // need to know what type of stats to initialize to - // better way to do this? - currentStatistics = Statistics.getStatsBasedOnType(currentChunkType); - } - - /** - * writes a dictionary page page - * @param dictionaryPage the dictionary page - */ - public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { - state = state.write(); - if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values"); - currentChunkDictionaryPageOffset = out.getPos(); - int uncompressedSize = dictionaryPage.getUncompressedSize(); - int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts - metadataConverter.writeDictionaryPageHeader( - uncompressedSize, - compressedPageSize, - dictionaryPage.getDictionarySize(), - dictionaryPage.getEncoding(), - out); - long headerSize = out.getPos() - currentChunkDictionaryPageOffset; - this.uncompressedLength += uncompressedSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize); - dictionaryPage.getBytes().writeAllTo(out); - currentEncodings.add(dictionaryPage.getEncoding()); - } - - - /** - * writes a single page - * @param valueCount count of values - * @param uncompressedPageSize the size of the data once uncompressed - * @param bytes the compressed data for the page without header - * @param rlEncoding encoding of the repetition level - * @param dlEncoding encoding of the definition level - * @param valuesEncoding encoding of values - */ - @Deprecated - public void writeDataPage( - int valueCount, int uncompressedPageSize, - BytesInput bytes, - parquet.column.Encoding rlEncoding, - parquet.column.Encoding dlEncoding, - parquet.column.Encoding valuesEncoding) throws IOException { - state = state.write(); - long beforeHeader = out.getPos(); - if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values"); - int compressedPageSize = (int)bytes.size(); - metadataConverter.writeDataPageHeader( - uncompressedPageSize, compressedPageSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - out); - long headerSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedPageSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); - bytes.writeAllTo(out); - currentEncodings.add(rlEncoding); - currentEncodings.add(dlEncoding); - currentEncodings.add(valuesEncoding); - } - - /** - * writes a single page - * @param valueCount count of values - * @param uncompressedPageSize the size of the data once uncompressed - * @param bytes the compressed data for the page without header - * @param rlEncoding encoding of the repetition level - * @param dlEncoding encoding of the definition level - * @param valuesEncoding encoding of values - */ - public void writeDataPage( - int valueCount, int uncompressedPageSize, - BytesInput bytes, - Statistics statistics, - parquet.column.Encoding rlEncoding, - parquet.column.Encoding dlEncoding, - parquet.column.Encoding valuesEncoding) throws IOException { - state = state.write(); - long beforeHeader = out.getPos(); - if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values"); - int compressedPageSize = (int)bytes.size(); - metadataConverter.writeDataPageHeader( - uncompressedPageSize, compressedPageSize, - valueCount, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - out); - long headerSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedPageSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); - bytes.writeAllTo(out); - currentStatistics.mergeStatistics(statistics); - currentEncodings.add(rlEncoding); - currentEncodings.add(dlEncoding); - currentEncodings.add(valuesEncoding); - } - - /** - * writes a number of pages at once - * @param bytes bytes to be written including page headers - * @param uncompressedTotalPageSize total uncompressed size (without page headers) - * @param compressedTotalPageSize total compressed size (without page headers) - * @throws java.io.IOException - */ - void writeDataPages(BytesInput bytes, - long uncompressedTotalPageSize, - long compressedTotalPageSize, - Statistics totalStats, - List encodings) throws IOException { - state = state.write(); - if (DEBUG) LOG.debug(out.getPos() + ": write data pages"); - long headersSize = bytes.size() - compressedTotalPageSize; - this.uncompressedLength += uncompressedTotalPageSize + headersSize; - this.compressedLength += compressedTotalPageSize + headersSize; - if (DEBUG) LOG.debug(out.getPos() + ": write data pages content"); - bytes.writeAllTo(out); - currentEncodings.addAll(encodings); - currentStatistics = totalStats; - } - - /** - * end a column (once all rep, def and data have been written) - * @throws java.io.IOException - */ - public void endColumn() throws IOException { - state = state.endColumn(); - if (DEBUG) LOG.debug(out.getPos() + ": end column"); - currentBlock.addColumn(ColumnChunkMetaData.get( - currentChunkPath, - currentChunkType, - currentChunkCodec, - currentEncodings, - currentStatistics, - currentChunkFirstDataPage, - currentChunkDictionaryPageOffset, - currentChunkValueCount, - compressedLength, - uncompressedLength)); - if (DEBUG) LOG.info("ended Column chumk: " + currentColumn); - currentColumn = null; - this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); - this.uncompressedLength = 0; - this.compressedLength = 0; - } - - /** - * ends a block once all column chunks have been written - * @throws java.io.IOException - */ - public void endBlock() throws IOException { - state = state.endBlock(); - if (DEBUG) LOG.debug(out.getPos() + ": end block"); - currentBlock.setRowCount(currentRecordCount); - blocks.add(currentBlock); - currentBlock = null; - } - - /** - * ends a file once all blocks have been written. - * closes the file. - * @param extraMetaData the extra meta data to write in the footer - * @throws java.io.IOException - */ - public void end(Map extraMetaData) throws IOException { - state = state.end(); - if (DEBUG) LOG.debug(out.getPos() + ": end"); - ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); - serializeFooter(footer, out); - out.close(); - } - - private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException { - long footerIndex = out.getPos(); - parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer); - writeFileMetaData(parquetMetadata, out); - if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex)); - BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex)); - out.write(MAGIC); - } - - /** - * writes a _metadata file - * @param configuration the configuration to use to get the FileSystem - * @param outputPath the directory to write the _metadata file to - * @param footers the list of footers to merge - * @throws java.io.IOException - */ - public static void writeMetadataFile(Configuration configuration, Path outputPath, List