From ea9da980b3092f532b201bdd747b2d3d9afb70f8 Mon Sep 17 00:00:00 2001 From: Owen O'Malley Date: Fri, 9 Dec 2016 13:30:56 -0800 Subject: [PATCH 1/3] ORC-119. Create an API to separate out layout from the writer. --- .../core/src/java/org/apache/orc/OrcFile.java | 18 + .../java/org/apache/orc/PhysicalWriter.java | 113 ++++ .../org/apache/orc/impl/BitFieldWriter.java | 4 + .../org/apache/orc/impl/IntegerWriter.java | 6 + .../java/org/apache/orc/impl/OutStream.java | 52 +- .../org/apache/orc/impl/PhysicalFsWriter.java | 365 +++++++++++ .../apache/orc/impl/RunLengthByteWriter.java | 4 + .../orc/impl/RunLengthIntegerWriter.java | 4 + .../orc/impl/RunLengthIntegerWriterV2.java | 5 + .../java/org/apache/orc/impl/WriterImpl.java | 567 ++++++------------ .../org/apache/orc/TestVectorOrcFile.java | 4 +- .../org/apache/orc/impl/TestInStream.java | 8 +- .../org/apache/orc/impl/TestOutStream.java | 5 +- .../orc-file-dump-dictionary-threshold.out | 78 +-- 14 files changed, 763 insertions(+), 470 deletions(-) create mode 100644 java/core/src/java/org/apache/orc/PhysicalWriter.java create mode 100644 java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java index 7551c2e3ef..68e49f3d2c 100644 --- a/java/core/src/java/org/apache/orc/OrcFile.java +++ b/java/core/src/java/org/apache/orc/OrcFile.java @@ -298,6 +298,7 @@ public static class WriterOptions { private String bloomFilterColumns; private double bloomFilterFpp; private BloomFilterVersion bloomFilterVersion; + private PhysicalWriter physicalWriter; protected WriterOptions(Properties tableProperties, Configuration conf) { configuration = conf; @@ -486,6 +487,19 @@ public WriterOptions bloomFilterVersion(BloomFilterVersion version) { return this; } + /** + * Change the physical writer of the ORC file. + * + * SHOULD ONLY BE USED BY LLAP. + * + * @param writer the writer to control the layout and persistence + * @return this + */ + public WriterOptions physicalWriter(PhysicalWriter writer) { + this.physicalWriter = writer; + return this; + } + /** * A package local option to set the memory manager. */ @@ -569,6 +583,10 @@ public double getBloomFilterFpp() { public BloomFilterVersion getBloomFilterVersion() { return bloomFilterVersion; } + + public PhysicalWriter getPhysicalWriter() { + return physicalWriter; + } } /** diff --git a/java/core/src/java/org/apache/orc/PhysicalWriter.java b/java/core/src/java/org/apache/orc/PhysicalWriter.java new file mode 100644 index 0000000000..a028db81e2 --- /dev/null +++ b/java/core/src/java/org/apache/orc/PhysicalWriter.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.orc.impl.StreamName; + +/** + * This interface separates the physical layout of ORC files from the higher + * level details. + * + * This API is limited to being used by LLAP. + */ +public interface PhysicalWriter { + + /** + * The target of an output stream. + */ + interface OutputReceiver { + /** + * Output the given buffer to the final destination + * + * @param buffer the buffer to output + * @throws IOException + */ + void output(ByteBuffer buffer) throws IOException; + + /** + * Suppress this stream from being written to the stripe. + */ + void suppress(); + } + /** + * Writes the header of the file, which consists of the magic "ORC" bytes. + * @throws IOException + */ + void writeHeader() throws IOException; + + /** + * Create an OutputReceiver for the given name. + * @param name the name of the stream + * @throws IOException + */ + OutputReceiver createDataStream(StreamName name) throws IOException; + + /** + * Flushes the data in all the streams, spills them to disk, write out stripe + * footer. + * @param footer Stripe footer to be updated with relevant data and written out. + * @param dirEntry File metadata entry for the stripe, to be updated with + * relevant data. + */ + void finalizeStripe(OrcProto.StripeFooter.Builder footer, + OrcProto.StripeInformation.Builder dirEntry) throws IOException; + + /** + * Writes out the file metadata. + * @param builder Metadata builder to finalize and write. + */ + void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException; + + /** + * Writes out the file footer. + * @param builder Footer builder to finalize and write. + */ + void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException; + + /** + * Writes out the postscript (including the size byte if needed). + * @param builder Postscript builder to finalize and write. + */ + long writePostScript(OrcProto.PostScript.Builder builder) throws IOException; + + /** + * Closes the writer. + */ + void close() throws IOException; + + /** + * Flushes the writer so that readers can see the preceeding postscripts. + */ + void flush() throws IOException; + + /** + * Appends raw stripe data (e.g. for file merger). + * @param stripe Stripe data buffer. + * @param dirEntry File metadata entry for the stripe, to be updated with + * relevant data. + * @throws IOException + */ + void appendRawStripe(ByteBuffer stripe, + OrcProto.StripeInformation.Builder dirEntry + ) throws IOException; + +} diff --git a/java/core/src/java/org/apache/orc/impl/BitFieldWriter.java b/java/core/src/java/org/apache/orc/impl/BitFieldWriter.java index aa5f886e65..3c8070f760 100644 --- a/java/core/src/java/org/apache/orc/impl/BitFieldWriter.java +++ b/java/core/src/java/org/apache/orc/impl/BitFieldWriter.java @@ -70,4 +70,8 @@ public void getPosition(PositionRecorder recorder) throws IOException { output.getPosition(recorder); recorder.addPosition(8 - bitsLeft); } + + public long estimateMemory() { + return output.estimateMemory(); + } } diff --git a/java/core/src/java/org/apache/orc/impl/IntegerWriter.java b/java/core/src/java/org/apache/orc/impl/IntegerWriter.java index 419054f7ea..70b16d3e9f 100644 --- a/java/core/src/java/org/apache/orc/impl/IntegerWriter.java +++ b/java/core/src/java/org/apache/orc/impl/IntegerWriter.java @@ -44,4 +44,10 @@ public interface IntegerWriter { * @throws IOException */ void flush() throws IOException; + + /** + * Estimate the amount of memory being used. + * @return number of bytes + */ + long estimateMemory(); } diff --git a/java/core/src/java/org/apache/orc/impl/OutStream.java b/java/core/src/java/org/apache/orc/impl/OutStream.java index 81662cc31f..a1131e444a 100644 --- a/java/core/src/java/org/apache/orc/impl/OutStream.java +++ b/java/core/src/java/org/apache/orc/impl/OutStream.java @@ -18,26 +18,16 @@ package org.apache.orc.impl; import org.apache.orc.CompressionCodec; +import org.apache.orc.PhysicalWriter; import java.io.IOException; import java.nio.ByteBuffer; public class OutStream extends PositionedOutputStream { - public interface OutputReceiver { - /** - * Output the given buffer to the final destination - * @param buffer the buffer to output - * @throws IOException - */ - void output(ByteBuffer buffer) throws IOException; - } - public static final int HEADER_SIZE = 3; private final String name; - private final OutputReceiver receiver; - // if enabled the stream will be suppressed when writing stripe - private boolean suppress; + private final PhysicalWriter.OutputReceiver receiver; /** * Stores the uncompressed bytes that have been serialized, but not @@ -69,17 +59,15 @@ public interface OutputReceiver { public OutStream(String name, int bufferSize, CompressionCodec codec, - OutputReceiver receiver) throws IOException { + PhysicalWriter.OutputReceiver receiver) throws IOException { this.name = name; this.bufferSize = bufferSize; this.codec = codec; this.receiver = receiver; - this.suppress = false; } public void clear() throws IOException { flush(); - suppress = false; } /** @@ -258,32 +246,28 @@ public String toString() { @Override public long getBufferSize() { - long result = 0; - if (current != null) { - result += current.capacity(); - } - if (compressed != null) { - result += compressed.capacity(); - } - if (overflow != null) { - result += overflow.capacity(); + if (codec == null) { + return uncompressedBytes + (current == null ? 0 : current.remaining()); + } else { + long result = 0; + if (current != null) { + result += current.capacity(); + } + if (compressed != null) { + result += compressed.capacity(); + } + if (overflow != null) { + result += overflow.capacity(); + } + return result + compressedBytes; } - return result; } /** * Set suppress flag */ public void suppress() { - suppress = true; - } - - /** - * Returns the state of suppress flag - * @return value of suppress flag - */ - public boolean isSuppressed() { - return suppress; + receiver.suppress(); } } diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java new file mode 100644 index 0000000000..b8e0341d00 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -0,0 +1,365 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import com.google.protobuf.CodedOutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.orc.CompressionCodec; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.OrcProto; +import org.apache.orc.PhysicalWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PhysicalFsWriter implements PhysicalWriter { + private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class); + + private static final int HDFS_BUFFER_SIZE = 256 * 1024; + + private final FSDataOutputStream rawWriter; + // the compressed metadata information outStream + private OutStream writer = null; + // a protobuf outStream around streamFactory + private CodedOutputStream protobufWriter = null; + + private final Path path; + private final long blockSize; + private final int bufferSize; + private final double paddingTolerance; + private final long defaultStripeSize; + private final CompressionKind compress; + private final boolean addBlockPadding; + + // the streams that make up the current stripe + private final Map streams = + new TreeMap<>(); + + private long adjustedStripeSize; + private long headerLength; + private long stripeStart; + private int metadataLength; + private int footerLength; + + public PhysicalFsWriter(FileSystem fs, + Path path, + OrcFile.WriterOptions opts) throws IOException { + this.path = path; + this.defaultStripeSize = this.adjustedStripeSize = opts.getStripeSize(); + this.addBlockPadding = opts.getBlockPadding(); + if (opts.isEnforceBufferSize()) { + this.bufferSize = opts.getBufferSize(); + } else { + this.bufferSize = WriterImpl.getEstimatedBufferSize(defaultStripeSize, + opts.getSchema().getMaximumId() + 1, + opts.getBufferSize()); + } + this.compress = opts.getCompress(); + this.paddingTolerance = opts.getPaddingTolerance(); + this.blockSize = opts.getBlockSize(); + LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" + + " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize, + compress, bufferSize); + rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE, + fs.getDefaultReplication(path), blockSize); + CompressionCodec codec = WriterImpl.createCodec(compress); + writer = new OutStream("metadata", bufferSize, codec, + new DirectStream(rawWriter)); + protobufWriter = CodedOutputStream.newInstance(writer); + } + + private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException { + this.stripeStart = rawWriter.getPos(); + final long currentStripeSize = indexSize + dataSize + footerSize; + final long available = blockSize - (stripeStart % blockSize); + final long overflow = currentStripeSize - adjustedStripeSize; + final float availRatio = (float) available / (float) defaultStripeSize; + + if (availRatio > 0.0f && availRatio < 1.0f + && availRatio > paddingTolerance) { + // adjust default stripe size to fit into remaining space, also adjust + // the next stripe for correction based on the current stripe size + // and user specified padding tolerance. Since stripe size can overflow + // the default stripe size we should apply this correction to avoid + // writing portion of last stripe to next hdfs block. + double correction = overflow > 0 ? (double) overflow + / (double) adjustedStripeSize : 0.0; + + // correction should not be greater than user specified padding + // tolerance + correction = correction > paddingTolerance ? paddingTolerance + : correction; + + // adjust next stripe size based on current stripe estimate correction + adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize)); + } else if (availRatio >= 1.0) { + adjustedStripeSize = defaultStripeSize; + } + + if (availRatio < paddingTolerance && addBlockPadding) { + long padding = blockSize - (stripeStart % blockSize); + byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)]; + LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)", + padding, availRatio, defaultStripeSize)); + stripeStart += padding; + while (padding > 0) { + int writeLen = (int) Math.min(padding, pad.length); + rawWriter.write(pad, 0, writeLen); + padding -= writeLen; + } + adjustedStripeSize = defaultStripeSize; + } else if (currentStripeSize < blockSize + && (stripeStart % blockSize) + currentStripeSize > blockSize) { + // even if you don't pad, reset the default stripe size when crossing a + // block boundary + adjustedStripeSize = defaultStripeSize; + } + } + + /** + * An output receiver that writes the ByteBuffers to the output stream + * as they are received. + */ + private class DirectStream implements OutputReceiver { + private final FSDataOutputStream output; + + DirectStream(FSDataOutputStream output) { + this.output = output; + } + + @Override + public void output(ByteBuffer buffer) throws IOException { + output.write(buffer.array(), buffer.arrayOffset() + buffer.position(), + buffer.remaining()); + } + + @Override + public void suppress() { + throw new UnsupportedOperationException("Can't suppress direct stream"); + } + } + + private void writeStripeFooter(OrcProto.StripeFooter footer, + long dataSize, + long indexSize, + OrcProto.StripeInformation.Builder dirEntry) throws IOException { + footer.writeTo(protobufWriter); + protobufWriter.flush(); + writer.flush(); + dirEntry.setOffset(stripeStart); + dirEntry.setFooterLength(rawWriter.getPos() - stripeStart - dataSize - indexSize); + } + + @Override + public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException { + long startPosn = rawWriter.getPos(); + OrcProto.Metadata metadata = builder.build(); + metadata.writeTo(protobufWriter); + protobufWriter.flush(); + writer.flush(); + this.metadataLength = (int) (rawWriter.getPos() - startPosn); + } + + @Override + public void writeFileFooter(OrcProto.Footer.Builder builder) throws IOException { + long bodyLength = rawWriter.getPos() - metadataLength; + builder.setContentLength(bodyLength); + builder.setHeaderLength(headerLength); + long startPosn = rawWriter.getPos(); + OrcProto.Footer footer = builder.build(); + footer.writeTo(protobufWriter); + protobufWriter.flush(); + writer.flush(); + this.footerLength = (int) (rawWriter.getPos() - startPosn); + } + + @Override + public long writePostScript(OrcProto.PostScript.Builder builder) throws IOException { + builder.setFooterLength(footerLength); + builder.setMetadataLength(metadataLength); + if (compress != CompressionKind.NONE) { + builder.setCompressionBlockSize(bufferSize); + } + OrcProto.PostScript ps = builder.build(); + // need to write this uncompressed + long startPosn = rawWriter.getPos(); + ps.writeTo(rawWriter); + long length = rawWriter.getPos() - startPosn; + if (length > 255) { + throw new IllegalArgumentException("PostScript too large at " + length); + } + rawWriter.writeByte((int)length); + return rawWriter.getPos(); + } + + @Override + public void close() throws IOException { + rawWriter.close(); + } + + @Override + public void flush() throws IOException { + rawWriter.hflush(); + } + + @Override + public void appendRawStripe(ByteBuffer buffer, + OrcProto.StripeInformation.Builder dirEntry) throws IOException { + long start = rawWriter.getPos(); + int length = buffer.remaining(); + long availBlockSpace = blockSize - (start % blockSize); + + // see if stripe can fit in the current hdfs block, else pad the remaining + // space in the block + if (length < blockSize && length > availBlockSpace && + addBlockPadding) { + byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; + LOG.info(String.format("Padding ORC by %d bytes while merging..", + availBlockSpace)); + start += availBlockSpace; + while (availBlockSpace > 0) { + int writeLen = (int) Math.min(availBlockSpace, pad.length); + rawWriter.write(pad, 0, writeLen); + availBlockSpace -= writeLen; + } + } + rawWriter.write(buffer.array(), buffer.arrayOffset() + buffer.position(), + length); + dirEntry.setOffset(start); + } + + + /** + * This class is used to hold the contents of streams as they are buffered. + * The TreeWriters write to the outStream and the codec compresses the + * data as buffers fill up and stores them in the output list. When the + * stripe is being written, the whole stream is written to the file. + */ + private static final class BufferedStream implements OutputReceiver { + private boolean isSuppressed = false; + private final List output = new ArrayList<>(); + + @Override + public void output(ByteBuffer buffer) { + if (!isSuppressed) { + output.add(buffer); + } + } + + public void suppress() { + isSuppressed = true; + output.clear(); + } + + /** + * Write any saved buffers to the OutputStream if needed, and clears all the + * buffers. + */ + void spillToDiskAndClear(FSDataOutputStream raw + ) throws IOException { + if (!isSuppressed) { + for (ByteBuffer buffer: output) { + raw.write(buffer.array(), buffer.arrayOffset() + buffer.position(), + buffer.remaining()); + } + output.clear(); + } + isSuppressed = false; + } + + /** + * Get the number of bytes that will be written to the output. + * + * Assumes the stream writing into this receiver has already been flushed. + * @return number of bytes + */ + public long getOutputSize() { + long result = 0; + for (ByteBuffer buffer: output) { + result += buffer.remaining(); + } + return result; + } + } + + @Override + public void finalizeStripe(OrcProto.StripeFooter.Builder footerBuilder, + OrcProto.StripeInformation.Builder dirEntry + ) throws IOException { + long indexSize = 0; + long dataSize = 0; + for (Map.Entry pair: streams.entrySet()) { + BufferedStream receiver = pair.getValue(); + if (!receiver.isSuppressed) { + long streamSize = receiver.getOutputSize(); + StreamName name = pair.getKey(); + footerBuilder.addStreams(OrcProto.Stream.newBuilder().setColumn(name.getColumn()) + .setKind(name.getKind()).setLength(streamSize)); + if (StreamName.Area.INDEX == name.getArea()) { + indexSize += streamSize; + } else { + dataSize += streamSize; + } + } + } + dirEntry.setIndexLength(indexSize).setDataLength(dataSize); + + OrcProto.StripeFooter footer = footerBuilder.build(); + // Do we need to pad the file so the stripe doesn't straddle a block boundary? + padStripe(indexSize, dataSize, footer.getSerializedSize()); + + // write out the data streams + for (Map.Entry pair : streams.entrySet()) { + pair.getValue().spillToDiskAndClear(rawWriter); + } + // Write out the footer. + writeStripeFooter(footer, dataSize, indexSize, dirEntry); + } + + @Override + public void writeHeader() throws IOException { + rawWriter.writeBytes(OrcFile.MAGIC); + headerLength = rawWriter.getPos(); + } + + @Override + public BufferedStream createDataStream(StreamName name) { + BufferedStream result = streams.get(name); + if (result == null) { + result = new BufferedStream(); + streams.put(name, result); + } + return result; + } + + @Override + public String toString() { + return path.toString(); + } +} diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java b/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java index 09108b2633..c2f1fa74da 100644 --- a/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java +++ b/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java @@ -103,4 +103,8 @@ public void getPosition(PositionRecorder recorder) throws IOException { output.getPosition(recorder); recorder.addPosition(numLiterals); } + + public long estimateMemory() { + return output.getBufferSize() + MAX_LITERAL_SIZE; + } } diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java index 3e5f2e20d6..88b47e6a6d 100644 --- a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java +++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java @@ -140,4 +140,8 @@ public void getPosition(PositionRecorder recorder) throws IOException { recorder.addPosition(numLiterals); } + @Override + public long estimateMemory() { + return output.getBufferSize(); + } } diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java index 183fd8e81b..29cbebf08b 100644 --- a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java +++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java @@ -818,4 +818,9 @@ public void getPosition(PositionRecorder recorder) throws IOException { output.getPosition(recorder); recorder.addPosition(numLiterals); } + + @Override + public long estimateMemory() { + return output.getBufferSize(); + } } diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java index 22ff867085..ba1b326306 100644 --- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java +++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java @@ -19,7 +19,6 @@ package org.apache.orc.impl; import java.io.IOException; -import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.sql.Timestamp; @@ -39,23 +38,23 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.orc.BinaryColumnStatistics; import org.apache.orc.ColumnStatistics; -import org.apache.orc.util.BloomFilter; -import org.apache.orc.util.BloomFilterIO; import org.apache.orc.CompressionCodec; import org.apache.orc.CompressionKind; import org.apache.orc.OrcConf; import org.apache.orc.OrcFile; import org.apache.orc.OrcProto; import org.apache.orc.OrcUtils; +import org.apache.orc.PhysicalWriter; import org.apache.orc.StringColumnStatistics; import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; +import org.apache.orc.util.BloomFilter; +import org.apache.orc.util.BloomFilterIO; import org.apache.orc.util.BloomFilterUtf8; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; @@ -73,7 +72,6 @@ import org.apache.hadoop.io.Text; import com.google.protobuf.ByteString; -import com.google.protobuf.CodedOutputStream; /** * An ORC file writer. The file is divided into stripes, which is the natural @@ -98,40 +96,28 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class); - private static final int HDFS_BUFFER_SIZE = 256 * 1024; private static final int MIN_ROW_INDEX_STRIDE = 1000; // threshold above which buffer size will be automatically resized private static final int COLUMN_COUNT_THRESHOLD = 1000; - private final FileSystem fs; private final Path path; private final long defaultStripeSize; private long adjustedStripeSize; private final int rowIndexStride; private final CompressionKind compress; private final CompressionCodec codec; - private final boolean addBlockPadding; private final int bufferSize; private final long blockSize; - private final double paddingTolerance; private final TypeDescription schema; + private final PhysicalWriter physicalWriter; - // the streams that make up the current stripe - private final Map streams = - new TreeMap(); - - private FSDataOutputStream rawWriter = null; - // the compressed metadata information outStream - private OutStream writer = null; - // a protobuf outStream around streamFactory - private CodedOutputStream protobufWriter = null; - private long headerLength; private int columnCount; private long rowCount = 0; private long rowsInStripe = 0; private long rawDataSize = 0; private int rowsInIndex = 0; + private long lastFlushOffset = 0; private int stripesAtLastFlush = -1; private final List stripes = new ArrayList(); @@ -155,7 +141,6 @@ public class WriterImpl implements Writer, MemoryManager.Callback { public WriterImpl(FileSystem fs, Path path, OrcFile.WriterOptions opts) throws IOException { - this.fs = fs; this.path = path; this.conf = opts.getConfiguration(); this.callback = opts.getCallback(); @@ -177,9 +162,7 @@ public Writer getWriter() { this.version = opts.getVersion(); this.encodingStrategy = opts.getEncodingStrategy(); this.compressionStrategy = opts.getCompressionStrategy(); - this.addBlockPadding = opts.getBlockPadding(); this.blockSize = opts.getBlockSize(); - this.paddingTolerance = opts.getPaddingTolerance(); this.compress = opts.getCompress(); this.rowIndexStride = opts.getRowIndexStride(); this.memoryManager = opts.getMemoryManager(); @@ -200,6 +183,9 @@ public Writer getWriter() { OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema); } this.bloomFilterFpp = opts.getBloomFilterFpp(); + this.physicalWriter = opts.getPhysicalWriter() == null ? + new PhysicalFsWriter(fs, path, opts) : opts.getPhysicalWriter(); + physicalWriter.writeHeader(); treeWriter = createTreeWriter(schema, streamFactory, false); if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) { throw new IllegalArgumentException("Row stride must be at least " + @@ -273,10 +259,10 @@ public static CompressionCodec createCodec(CompressionKind kind) { @Override public boolean checkMemory(double newScale) throws IOException { long limit = (long) Math.round(adjustedStripeSize * newScale); - long size = estimateStripeSize(); + long size = treeWriter.estimateMemory(); if (LOG.isDebugEnabled()) { - LOG.debug("ORC writer " + path + " size = " + size + " limit = " + - limit); + LOG.debug("ORC writer " + physicalWriter + " size = " + size + + " limit = " + limit); } if (size > limit) { flushStripe(); @@ -285,116 +271,6 @@ public boolean checkMemory(double newScale) throws IOException { return false; } - /** - * This class is used to hold the contents of streams as they are buffered. - * The TreeWriters write to the outStream and the codec compresses the - * data as buffers fill up and stores them in the output list. When the - * stripe is being written, the whole stream is written to the file. - */ - private class BufferedStream implements OutStream.OutputReceiver { - private final OutStream outStream; - private final List output = new ArrayList(); - - BufferedStream(String name, int bufferSize, - CompressionCodec codec) throws IOException { - outStream = new OutStream(name, bufferSize, codec, this); - } - - /** - * Receive a buffer from the compression codec. - * @param buffer the buffer to save - */ - @Override - public void output(ByteBuffer buffer) { - output.add(buffer); - } - - /** - * Get the number of bytes in buffers that are allocated to this stream. - * @return number of bytes in buffers - */ - public long getBufferSize() { - long result = 0; - for(ByteBuffer buf: output) { - result += buf.capacity(); - } - return outStream.getBufferSize() + result; - } - - /** - * Flush the stream to the codec. - * @throws IOException - */ - public void flush() throws IOException { - outStream.flush(); - } - - /** - * Clear all of the buffers. - * @throws IOException - */ - public void clear() throws IOException { - outStream.clear(); - output.clear(); - } - - /** - * Check the state of suppress flag in output stream - * @return value of suppress flag - */ - public boolean isSuppressed() { - return outStream.isSuppressed(); - } - - /** - * Get the number of bytes that will be written to the output. Assumes - * the stream has already been flushed. - * @return the number of bytes - */ - public long getOutputSize() { - long result = 0; - for(ByteBuffer buffer: output) { - result += buffer.remaining(); - } - return result; - } - - /** - * Write the saved compressed buffers to the OutputStream. - * @param out the stream to write to - * @throws IOException - */ - void spillTo(OutputStream out) throws IOException { - for(ByteBuffer buffer: output) { - out.write(buffer.array(), buffer.arrayOffset() + buffer.position(), - buffer.remaining()); - } - } - - @Override - public String toString() { - return outStream.toString(); - } - } - - /** - * An output receiver that writes the ByteBuffers to the output stream - * as they are received. - */ - private class DirectStream implements OutStream.OutputReceiver { - private final FSDataOutputStream output; - - DirectStream(FSDataOutputStream output) { - this.output = output; - } - - @Override - public void output(ByteBuffer buffer) throws IOException { - output.write(buffer.array(), buffer.arrayOffset() + buffer.position(), - buffer.remaining()); - } - } - private static class RowIndexPositionRecorder implements PositionRecorder { private final OrcProto.RowIndexEntry.Builder builder; @@ -424,43 +300,38 @@ public OutStream createStream(int column, OrcProto.Stream.Kind kind ) throws IOException { final StreamName name = new StreamName(column, kind); - final EnumSet modifiers; - - switch (kind) { - case BLOOM_FILTER: - case DATA: - case DICTIONARY_DATA: - case BLOOM_FILTER_UTF8: - if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) { - modifiers = EnumSet.of(CompressionCodec.Modifier.FAST, - CompressionCodec.Modifier.TEXT); - } else { - modifiers = EnumSet.of(CompressionCodec.Modifier.DEFAULT, - CompressionCodec.Modifier.TEXT); - } - break; - case LENGTH: - case DICTIONARY_COUNT: - case PRESENT: - case ROW_INDEX: - case SECONDARY: - // easily compressed using the fastest modes - modifiers = EnumSet.of(CompressionCodec.Modifier.FASTEST, - CompressionCodec.Modifier.BINARY); - break; - default: - LOG.warn("Missing ORC compression modifiers for " + kind); - modifiers = null; - break; - } - - BufferedStream result = streams.get(name); - if (result == null) { - result = new BufferedStream(name.toString(), bufferSize, - codec == null ? codec : codec.modify(modifiers)); - streams.put(name, result); - } - return result.outStream; + CompressionCodec codec = WriterImpl.this.codec; + if (codec != null) { + switch (kind) { + case BLOOM_FILTER: + case DATA: + case DICTIONARY_DATA: + case BLOOM_FILTER_UTF8: + if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) { + codec = codec.modify(EnumSet.of(CompressionCodec.Modifier.FAST, + CompressionCodec.Modifier.TEXT)); + } else { + codec = codec.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT, + CompressionCodec.Modifier.TEXT)); + } + break; + case LENGTH: + case DICTIONARY_COUNT: + case PRESENT: + case ROW_INDEX: + case SECONDARY: + // easily compressed using the fastest modes + codec = codec.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST, + CompressionCodec.Modifier.BINARY)); + break; + default: + LOG.info("Missing ORC compression modifiers for " + kind); + break; + } + } + + return new OutStream(physicalWriter.toString(), bufferSize, codec, + physicalWriter.createDataStream(name)); } /** @@ -899,6 +770,9 @@ void recordPosition(PositionRecorder recorder) throws IOException { */ long estimateMemory() { long result = 0; + if (isPresent != null) { + result = isPresentOutStream.getBufferSize(); + } for (TreeWriter child: childrenWriters) { result += child.estimateMemory(); } @@ -957,6 +831,11 @@ void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); writer.getPosition(recorder); } + + @Override + long estimateMemory() { + return super.estimateMemory() + writer.estimateMemory(); + } } private static class ByteTreeWriter extends TreeWriter { @@ -1021,6 +900,11 @@ void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); writer.getPosition(recorder); } + + @Override + long estimateMemory() { + return super.estimateMemory() + writer.estimateMemory(); + } } private static class IntegerTreeWriter extends TreeWriter { @@ -1098,6 +982,11 @@ void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); writer.getPosition(recorder); } + + @Override + long estimateMemory() { + return super.estimateMemory() + writer.estimateMemory(); + } } private static class FloatTreeWriter extends TreeWriter { @@ -1165,6 +1054,11 @@ void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); stream.getPosition(recorder); } + + @Override + long estimateMemory() { + return super.estimateMemory() + stream.getBufferSize(); + } } private static class DoubleTreeWriter extends TreeWriter { @@ -1231,18 +1125,22 @@ void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); stream.getPosition(recorder); } + + @Override + long estimateMemory() { + return super.estimateMemory() + stream.getBufferSize(); + } } private static abstract class StringBaseTreeWriter extends TreeWriter { private static final int INITIAL_DICTIONARY_SIZE = 4096; private final OutStream stringOutput; - private final IntegerWriter lengthOutput; + protected final IntegerWriter lengthOutput; private final IntegerWriter rowOutput; protected final StringRedBlackTree dictionary = new StringRedBlackTree(INITIAL_DICTIONARY_SIZE); protected final DynamicIntArray rows = new DynamicIntArray(); protected final PositionedOutputStream directStreamOutput; - protected final IntegerWriter directLengthOutput; private final List savedRowIndex = new ArrayList(); private final boolean buildIndex; @@ -1261,18 +1159,16 @@ private static abstract class StringBaseTreeWriter extends TreeWriter { boolean nullable) throws IOException { super(columnId, schema, writer, nullable); this.isDirectV2 = isNewWriteFormat(writer); + directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA); stringOutput = writer.createStream(id, OrcProto.Stream.Kind.DICTIONARY_DATA); lengthOutput = createIntegerWriter(writer.createStream(id, OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); - rowOutput = createIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.DATA), false, isDirectV2, writer); + rowOutput = createIntegerWriter(directStreamOutput, false, isDirectV2, + writer); recordPosition(rowIndexPosition); rowIndexValueCount.add(0L); buildIndex = writer.buildIndex(); - directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA); - directLengthOutput = createIntegerWriter(writer.createStream(id, - OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); Configuration conf = writer.getConfiguration(); dictionaryKeySizeThreshold = OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf); @@ -1315,11 +1211,14 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, // we need to build the rowindex before calling super, since it // writes it out. super.writeStripe(builder, requiredIndexEntries); - stringOutput.flush(); - lengthOutput.flush(); - rowOutput.flush(); - directStreamOutput.flush(); - directLengthOutput.flush(); + if (useDictionaryEncoding) { + stringOutput.flush(); + lengthOutput.flush(); + rowOutput.flush(); + } else { + directStreamOutput.flush(); + lengthOutput.flush(); + } // reset all of the fields to be ready for the next stripe. dictionary.clear(); savedRowIndex.clear(); @@ -1374,7 +1273,7 @@ public void visit(StringRedBlackTree.VisitorContext context } else { PositionRecorder posn = new RowIndexPositionRecorder(base); directStreamOutput.getPosition(posn); - directLengthOutput.getPosition(posn); + lengthOutput.getPosition(posn); } rowIndex.addEntry(base.build()); } @@ -1385,7 +1284,7 @@ public void visit(StringRedBlackTree.VisitorContext context } else { dictionary.getText(text, rows.get(i)); directStreamOutput.write(text.getBytes(), 0, text.getLength()); - directLengthOutput.write(text.getLength()); + lengthOutput.write(text.getLength()); } } } @@ -1451,12 +1350,18 @@ void createRowIndexEntry() throws IOException { private void recordDirectStreamPosition() throws IOException { directStreamOutput.getPosition(rowIndexPosition); - directLengthOutput.getPosition(rowIndexPosition); + lengthOutput.getPosition(rowIndexPosition); } @Override long estimateMemory() { - return rows.getSizeInBytes() + dictionary.getSizeInBytes(); + long parent = super.estimateMemory(); + if (useDictionaryEncoding) { + return parent + dictionary.getSizeInBytes() + rows.getSizeInBytes(); + } else { + return parent + lengthOutput.estimateMemory() + + directStreamOutput.getBufferSize(); + } } } @@ -1484,7 +1389,7 @@ void writeBatch(ColumnVector vector, int offset, for(int i=0; i < length; ++i) { directStreamOutput.write(vec.vector[0], vec.start[0], vec.length[0]); - directLengthOutput.write(vec.length[0]); + lengthOutput.write(vec.length[0]); } } indexStatistics.updateString(vec.vector[0], vec.start[0], @@ -1507,7 +1412,7 @@ void writeBatch(ColumnVector vector, int offset, } else { directStreamOutput.write(vec.vector[offset + i], vec.start[offset + i], vec.length[offset + i]); - directLengthOutput.write(vec.length[offset + i]); + lengthOutput.write(vec.length[offset + i]); } indexStatistics.updateString(vec.vector[offset + i], vec.start[offset + i], vec.length[offset + i], 1); @@ -1570,7 +1475,7 @@ void writeBatch(ColumnVector vector, int offset, } else { for(int i=0; i < length; ++i) { directStreamOutput.write(ptr, ptrOffset, itemLength); - directLengthOutput.write(itemLength); + lengthOutput.write(itemLength); } } indexStatistics.updateString(ptr, ptrOffset, itemLength, length); @@ -1603,7 +1508,7 @@ void writeBatch(ColumnVector vector, int offset, rows.add(dictionary.add(ptr, ptrOffset, itemLength)); } else { directStreamOutput.write(ptr, ptrOffset, itemLength); - directLengthOutput.write(itemLength); + lengthOutput.write(itemLength); } indexStatistics.updateString(ptr, ptrOffset, itemLength, 1); if (createBloomFilter) { @@ -1653,7 +1558,7 @@ void writeBatch(ColumnVector vector, int offset, for(int i=0; i < length; ++i) { directStreamOutput.write(vec.vector[0], vec.start[0], itemLength); - directLengthOutput.write(itemLength); + lengthOutput.write(itemLength); } } indexStatistics.updateString(vec.vector[0], vec.start[0], @@ -1679,7 +1584,7 @@ void writeBatch(ColumnVector vector, int offset, } else { directStreamOutput.write(vec.vector[offset + i], vec.start[offset + i], itemLength); - directLengthOutput.write(itemLength); + lengthOutput.write(itemLength); } indexStatistics.updateString(vec.vector[offset + i], vec.start[offset + i], itemLength, 1); @@ -1785,6 +1690,12 @@ void recordPosition(PositionRecorder recorder) throws IOException { stream.getPosition(recorder); length.getPosition(recorder); } + + @Override + long estimateMemory() { + return super.estimateMemory() + stream.getBufferSize() + + length.estimateMemory(); + } } public static long MILLIS_PER_DAY = 24 * 60 * 60 * 1000; @@ -1900,6 +1811,12 @@ void recordPosition(PositionRecorder recorder) throws IOException { seconds.getPosition(recorder); nanos.getPosition(recorder); } + + @Override + long estimateMemory() { + return super.estimateMemory() + seconds.estimateMemory() + + nanos.estimateMemory(); + } } private static class DateTreeWriter extends TreeWriter { @@ -1977,6 +1894,11 @@ OrcProto.ColumnEncoding getEncoding() { return OrcProto.ColumnEncoding.newBuilder() .setKind(OrcProto.ColumnEncoding.Kind.DIRECT).build(); } + + @Override + long estimateMemory() { + return super.estimateMemory() + writer.estimateMemory(); + } } private static class DecimalTreeWriter extends TreeWriter { @@ -2069,6 +1991,12 @@ void recordPosition(PositionRecorder recorder) throws IOException { valueStream.getPosition(recorder); scaleStream.getPosition(recorder); } + + @Override + long estimateMemory() { + return super.estimateMemory() + valueStream.getBufferSize() + + scaleStream.estimateMemory(); + } } private static class StructTreeWriter extends TreeWriter { @@ -2251,6 +2179,11 @@ void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); lengths.getPosition(recorder); } + + @Override + long estimateMemory() { + return super.estimateMemory() + lengths.estimateMemory(); + } } private static class MapTreeWriter extends TreeWriter { @@ -2360,6 +2293,11 @@ void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); lengths.getPosition(recorder); } + + @Override + long estimateMemory() { + return super.estimateMemory() + lengths.estimateMemory(); + } } private static class UnionTreeWriter extends TreeWriter { @@ -2459,6 +2397,11 @@ void recordPosition(PositionRecorder recorder) throws IOException { super.recordPosition(recorder); tags.getPosition(recorder); } + + @Override + long estimateMemory() { + return super.estimateMemory() + tags.estimateMemory(); + } } private static TreeWriter createTreeWriter(TypeDescription schema, @@ -2609,27 +2552,12 @@ private static void writeTypes(OrcProto.Footer.Builder builder, } } - // @VisibleForTesting - public FSDataOutputStream getStream() throws IOException { - if (rawWriter == null) { - rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE, - fs.getDefaultReplication(path), blockSize); - rawWriter.writeBytes(OrcFile.MAGIC); - headerLength = rawWriter.getPos(); - writer = new OutStream("metadata", bufferSize, codec, - new DirectStream(rawWriter)); - protobufWriter = CodedOutputStream.newInstance(writer); - } - return rawWriter; - } - private void createRowIndexEntry() throws IOException { treeWriter.createRowIndexEntry(); rowsInIndex = 0; } private void flushStripe() throws IOException { - getStream(); if (buildIndex && rowsInIndex != 0) { createRowIndexEntry(); } @@ -2643,95 +2571,11 @@ private void flushStripe() throws IOException { OrcProto.StripeFooter.Builder builder = OrcProto.StripeFooter.newBuilder(); treeWriter.writeStripe(builder, requiredIndexEntries); - long indexSize = 0; - long dataSize = 0; - for(Map.Entry pair: streams.entrySet()) { - BufferedStream stream = pair.getValue(); - if (!stream.isSuppressed()) { - stream.flush(); - StreamName name = pair.getKey(); - long streamSize = pair.getValue().getOutputSize(); - builder.addStreams(OrcProto.Stream.newBuilder() - .setColumn(name.getColumn()) - .setKind(name.getKind()) - .setLength(streamSize)); - if (StreamName.Area.INDEX == name.getArea()) { - indexSize += streamSize; - } else { - dataSize += streamSize; - } - } - } - OrcProto.StripeFooter footer = builder.build(); - - // Do we need to pad the file so the stripe doesn't straddle a block - // boundary? - long start = rawWriter.getPos(); - final long currentStripeSize = indexSize + dataSize + footer.getSerializedSize(); - final long available = blockSize - (start % blockSize); - final long overflow = currentStripeSize - adjustedStripeSize; - final float availRatio = (float) available / (float) defaultStripeSize; - - if (availRatio > 0.0f && availRatio < 1.0f - && availRatio > paddingTolerance) { - // adjust default stripe size to fit into remaining space, also adjust - // the next stripe for correction based on the current stripe size - // and user specified padding tolerance. Since stripe size can overflow - // the default stripe size we should apply this correction to avoid - // writing portion of last stripe to next hdfs block. - double correction = overflow > 0 ? (double) overflow - / (double) adjustedStripeSize : 0.0; - - // correction should not be greater than user specified padding - // tolerance - correction = correction > paddingTolerance ? paddingTolerance - : correction; - - // adjust next stripe size based on current stripe estimate correction - adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize)); - } else if (availRatio >= 1.0) { - adjustedStripeSize = defaultStripeSize; - } - - if (availRatio < paddingTolerance && addBlockPadding) { - long padding = blockSize - (start % blockSize); - byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)]; - LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)", - padding, availRatio, defaultStripeSize)); - start += padding; - while (padding > 0) { - int writeLen = (int) Math.min(padding, pad.length); - rawWriter.write(pad, 0, writeLen); - padding -= writeLen; - } - adjustedStripeSize = defaultStripeSize; - } else if (currentStripeSize < blockSize - && (start % blockSize) + currentStripeSize > blockSize) { - // even if you don't pad, reset the default stripe size when crossing a - // block boundary - adjustedStripeSize = defaultStripeSize; - } - - // write out the data streams - for(Map.Entry pair: streams.entrySet()) { - BufferedStream stream = pair.getValue(); - if (!stream.isSuppressed()) { - stream.spillTo(rawWriter); - } - stream.clear(); - } - footer.writeTo(protobufWriter); - protobufWriter.flush(); - writer.flush(); - long footerLength = rawWriter.getPos() - start - dataSize - indexSize; - OrcProto.StripeInformation dirEntry = + OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation.newBuilder() - .setOffset(start) - .setNumberOfRows(rowsInStripe) - .setIndexLength(indexSize) - .setDataLength(dataSize) - .setFooterLength(footerLength).build(); - stripes.add(dirEntry); + .setNumberOfRows(rowsInStripe); + physicalWriter.finalizeStripe(builder, dirEntry); + stripes.add(dirEntry.build()); rowCount += rowsInStripe; rowsInStripe = 0; } @@ -2812,30 +2656,33 @@ private void writeFileStatistics(OrcProto.Footer.Builder builder, } } - private int writeMetadata() throws IOException { - getStream(); + private void writeMetadata() throws IOException { OrcProto.Metadata.Builder builder = OrcProto.Metadata.newBuilder(); for(OrcProto.StripeStatistics.Builder ssb : treeWriter.stripeStatsBuilders) { builder.addStripeStats(ssb.build()); } + physicalWriter.writeFileMetadata(builder); + } - long startPosn = rawWriter.getPos(); - OrcProto.Metadata metadata = builder.build(); - metadata.writeTo(protobufWriter); - protobufWriter.flush(); - writer.flush(); - return (int) (rawWriter.getPos() - startPosn); + private long writePostScript() throws IOException { + OrcProto.PostScript.Builder builder = + OrcProto.PostScript.newBuilder() + .setCompression(writeCompressionKind(compress)) + .setMagic(OrcFile.MAGIC) + .addVersion(version.getMajor()) + .addVersion(version.getMinor()) + .setWriterVersion(OrcFile.CURRENT_WRITER.getId()); + if (compress != CompressionKind.NONE) { + builder.setCompressionBlockSize(bufferSize); + } + return physicalWriter.writePostScript(builder); } - private int writeFooter(long bodyLength) throws IOException { - getStream(); + private long writeFooter() throws IOException { + writeMetadata(); OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder(); - builder.setContentLength(bodyLength); - builder.setHeaderLength(headerLength); builder.setNumberOfRows(rowCount); builder.setRowIndexStride(rowIndexStride); - // populate raw data size - rawDataSize = computeRawDataSize(); // serialize the types writeTypes(builder, schema); // add the stripe information @@ -2849,45 +2696,8 @@ private int writeFooter(long bodyLength) throws IOException { builder.addMetadata(OrcProto.UserMetadataItem.newBuilder() .setName(entry.getKey()).setValue(entry.getValue())); } - long startPosn = rawWriter.getPos(); - OrcProto.Footer footer = builder.build(); - footer.writeTo(protobufWriter); - protobufWriter.flush(); - writer.flush(); - return (int) (rawWriter.getPos() - startPosn); - } - - private int writePostScript(int footerLength, int metadataLength) throws IOException { - OrcProto.PostScript.Builder builder = - OrcProto.PostScript.newBuilder() - .setCompression(writeCompressionKind(compress)) - .setFooterLength(footerLength) - .setMetadataLength(metadataLength) - .setMagic(OrcFile.MAGIC) - .addVersion(version.getMajor()) - .addVersion(version.getMinor()) - .setWriterVersion(OrcFile.CURRENT_WRITER.getId()); - if (compress != CompressionKind.NONE) { - builder.setCompressionBlockSize(bufferSize); - } - OrcProto.PostScript ps = builder.build(); - // need to write this uncompressed - long startPosn = rawWriter.getPos(); - ps.writeTo(rawWriter); - long length = rawWriter.getPos() - startPosn; - if (length > 255) { - throw new IllegalArgumentException("PostScript too large at " + length); - } - return (int) length; - } - - private long estimateStripeSize() { - long result = 0; - for(BufferedStream stream: streams.values()) { - result += stream.getBufferSize(); - } - result += treeWriter.estimateMemory(); - return result; + physicalWriter.writeFileFooter(builder); + return writePostScript(); } @Override @@ -2933,11 +2743,8 @@ public void close() throws IOException { memoryManager.removeWriter(path); // actually close the file flushStripe(); - int metadataLength = writeMetadata(); - int footerLength = writeFooter(rawWriter.getPos() - metadataLength); - rawWriter.writeByte(writePostScript(footerLength, metadataLength)); - rawWriter.close(); - + lastFlushOffset = writeFooter(); + physicalWriter.close(); } /** @@ -2967,13 +2774,11 @@ public long writeIntermediateFooter() throws IOException { if (callback != null) { callback.preFooterWrite(callbackContext); } - int metaLength = writeMetadata(); - int footLength = writeFooter(rawWriter.getPos() - metaLength); - rawWriter.writeByte(writePostScript(footLength, metaLength)); + lastFlushOffset = writeFooter(); stripesAtLastFlush = stripes.size(); - rawWriter.hflush(); + physicalWriter.flush(); } - return rawWriter.getPos(); + return lastFlushOffset; } static void checkArgument(boolean expression, String message) { @@ -2993,28 +2798,15 @@ public void appendStripe(byte[] stripe, int offset, int length, checkArgument(stripeStatistics != null, "Stripe statistics must not be null"); - getStream(); - long start = rawWriter.getPos(); - long availBlockSpace = blockSize - (start % blockSize); - - // see if stripe can fit in the current hdfs block, else pad the remaining - // space in the block - if (length < blockSize && length > availBlockSpace && - addBlockPadding) { - byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, availBlockSpace)]; - LOG.info(String.format("Padding ORC by %d bytes while merging..", - availBlockSpace)); - start += availBlockSpace; - while (availBlockSpace > 0) { - int writeLen = (int) Math.min(availBlockSpace, pad.length); - rawWriter.write(pad, 0, writeLen); - availBlockSpace -= writeLen; - } - } - - rawWriter.write(stripe); - rowsInStripe = stripeStatistics.getColStats(0).getNumberOfValues(); - rowCount += rowsInStripe; + // update stripe information + OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation + .newBuilder() + .setNumberOfRows(rowsInStripe) + .setIndexLength(stripeInfo.getIndexLength()) + .setDataLength(stripeInfo.getDataLength()) + .setFooterLength(stripeInfo.getFooterLength()); + physicalWriter.appendRawStripe(ByteBuffer.wrap(stripe, offset, length), + dirEntry); // since we have already written the stripe, just update stripe statistics treeWriter.stripeStatsBuilders.add(stripeStatistics.toBuilder()); @@ -3022,16 +2814,7 @@ public void appendStripe(byte[] stripe, int offset, int length, // update file level statistics updateFileStatistics(stripeStatistics); - // update stripe information - OrcProto.StripeInformation dirEntry = OrcProto.StripeInformation - .newBuilder() - .setOffset(start) - .setNumberOfRows(rowsInStripe) - .setIndexLength(stripeInfo.getIndexLength()) - .setDataLength(stripeInfo.getDataLength()) - .setFooterLength(stripeInfo.getFooterLength()) - .build(); - stripes.add(dirEntry); + stripes.add(dirEntry.build()); // reset it after writing the stripe rowsInStripe = 0; diff --git a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java index 2448cb7d73..7df521df34 100644 --- a/java/core/src/test/org/apache/orc/TestVectorOrcFile.java +++ b/java/core/src/test/org/apache/orc/TestVectorOrcFile.java @@ -2117,8 +2117,8 @@ public void testMemoryManagementV12() throws Exception { int i = 0; for(StripeInformation stripe: reader.getStripes()) { i += 1; - assertTrue("stripe " + i + " is too long at " + stripe.getDataLength(), - stripe.getDataLength() < 5000); + assertTrue(testFilePath + " stripe " + i + " is too long at " + + stripe.getDataLength(), stripe.getDataLength() < 5000); } // with HIVE-7832, the dictionaries will be disabled after writing the first // stripe as there are too many distinct values. Hence only 3 stripes as diff --git a/java/core/src/test/org/apache/orc/impl/TestInStream.java b/java/core/src/test/org/apache/orc/impl/TestInStream.java index eea2ab23c4..d40676c78f 100644 --- a/java/core/src/test/org/apache/orc/impl/TestInStream.java +++ b/java/core/src/test/org/apache/orc/impl/TestInStream.java @@ -31,11 +31,12 @@ import org.apache.hadoop.hive.common.io.DiskRange; import org.apache.orc.CompressionCodec; +import org.apache.orc.PhysicalWriter; import org.junit.Test; public class TestInStream { - static class OutputCollector implements OutStream.OutputReceiver { + static class OutputCollector implements PhysicalWriter.OutputReceiver { DynamicByteArray buffer = new DynamicByteArray(); @Override @@ -43,6 +44,11 @@ public void output(ByteBuffer buffer) throws IOException { this.buffer.add(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining()); } + + @Override + public void suppress() { + // PASS + } } static class PositionCollector diff --git a/java/core/src/test/org/apache/orc/impl/TestOutStream.java b/java/core/src/test/org/apache/orc/impl/TestOutStream.java index e9614d51f2..77aae065eb 100644 --- a/java/core/src/test/org/apache/orc/impl/TestOutStream.java +++ b/java/core/src/test/org/apache/orc/impl/TestOutStream.java @@ -19,6 +19,7 @@ package org.apache.orc.impl; import org.apache.orc.CompressionCodec; +import org.apache.orc.PhysicalWriter; import org.junit.Test; import org.mockito.Mockito; @@ -30,8 +31,8 @@ public class TestOutStream { @Test public void testFlush() throws Exception { - OutStream.OutputReceiver receiver = - Mockito.mock(OutStream.OutputReceiver.class); + PhysicalWriter.OutputReceiver receiver = + Mockito.mock(PhysicalWriter.OutputReceiver.class); CompressionCodec codec = new ZlibCodec(); OutStream stream = new OutStream("test", 128*1024, codec, receiver); assertEquals(0L, stream.getBufferSize()); diff --git a/java/tools/src/test/resources/orc-file-dump-dictionary-threshold.out b/java/tools/src/test/resources/orc-file-dump-dictionary-threshold.out index 4b0822ff9f..b0315b4428 100644 --- a/java/tools/src/test/resources/orc-file-dump-dictionary-threshold.out +++ b/java/tools/src/test/resources/orc-file-dump-dictionary-threshold.out @@ -39,7 +39,7 @@ File Statistics: Column 3: count: 21000 hasNull: false min: Darkness,-230 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878-18010-18410-18524-18788-19204-19254-19518-19596-19786-19874-19904-20390-20752-20936 sum: 6910238 Stripes: - Stripe: offset: 3 data: 163602 rows: 5000 tail: 68 index: 720 + Stripe: offset: 3 data: 163585 rows: 5000 tail: 68 index: 720 Stream: column 0 section ROW_INDEX start: 3 length 17 Stream: column 1 section ROW_INDEX start: 20 length 166 Stream: column 2 section ROW_INDEX start: 186 length 171 @@ -47,7 +47,7 @@ Stripes: Stream: column 1 section DATA start: 723 length 20035 Stream: column 2 section DATA start: 20758 length 40050 Stream: column 3 section DATA start: 60808 length 99226 - Stream: column 3 section LENGTH start: 160034 length 4291 + Stream: column 3 section LENGTH start: 160034 length 4274 Encoding column 0: DIRECT Encoding column 1: DIRECT_V2 Encoding column 2: DIRECT_V2 @@ -70,15 +70,15 @@ Stripes: Entry 2: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660 sum: 75448 positions: 16464,3340,0,1554,14 Entry 3: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788 sum: 104868 positions: 36532,964,0,2372,90 Entry 4: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744 sum: 136158 positions: 63067,3432,0,3354,108 - Stripe: offset: 164393 data: 368335 rows: 5000 tail: 69 index: 956 - Stream: column 0 section ROW_INDEX start: 164393 length 17 - Stream: column 1 section ROW_INDEX start: 164410 length 157 - Stream: column 2 section ROW_INDEX start: 164567 length 166 - Stream: column 3 section ROW_INDEX start: 164733 length 616 - Stream: column 1 section DATA start: 165349 length 20035 - Stream: column 2 section DATA start: 185384 length 40050 - Stream: column 3 section DATA start: 225434 length 302715 - Stream: column 3 section LENGTH start: 528149 length 5535 + Stripe: offset: 164376 data: 368332 rows: 5000 tail: 69 index: 956 + Stream: column 0 section ROW_INDEX start: 164376 length 17 + Stream: column 1 section ROW_INDEX start: 164393 length 157 + Stream: column 2 section ROW_INDEX start: 164550 length 166 + Stream: column 3 section ROW_INDEX start: 164716 length 616 + Stream: column 1 section DATA start: 165332 length 20035 + Stream: column 2 section DATA start: 185367 length 40050 + Stream: column 3 section DATA start: 225417 length 302715 + Stream: column 3 section LENGTH start: 528132 length 5532 Encoding column 0: DIRECT Encoding column 1: DIRECT_V2 Encoding column 2: DIRECT_V2 @@ -101,15 +101,15 @@ Stripes: Entry 2: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988 sum: 224740 positions: 94117,3404,0,1945,222 Entry 3: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984 sum: 252094 positions: 155111,2864,0,3268,48 Entry 4: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938 sum: 281404 positions: 224570,1006,0,4064,342 - Stripe: offset: 533753 data: 606074 rows: 5000 tail: 69 index: 1427 - Stream: column 0 section ROW_INDEX start: 533753 length 17 - Stream: column 1 section ROW_INDEX start: 533770 length 167 - Stream: column 2 section ROW_INDEX start: 533937 length 168 - Stream: column 3 section ROW_INDEX start: 534105 length 1075 - Stream: column 1 section DATA start: 535180 length 20035 - Stream: column 2 section DATA start: 555215 length 40050 - Stream: column 3 section DATA start: 595265 length 540210 - Stream: column 3 section LENGTH start: 1135475 length 5779 + Stripe: offset: 533733 data: 606071 rows: 5000 tail: 69 index: 1427 + Stream: column 0 section ROW_INDEX start: 533733 length 17 + Stream: column 1 section ROW_INDEX start: 533750 length 167 + Stream: column 2 section ROW_INDEX start: 533917 length 168 + Stream: column 3 section ROW_INDEX start: 534085 length 1075 + Stream: column 1 section DATA start: 535160 length 20035 + Stream: column 2 section DATA start: 555195 length 40050 + Stream: column 3 section DATA start: 595245 length 540210 + Stream: column 3 section LENGTH start: 1135455 length 5776 Encoding column 0: DIRECT Encoding column 1: DIRECT_V2 Encoding column 2: DIRECT_V2 @@ -132,15 +132,15 @@ Stripes: Entry 2: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976 sum: 386538 positions: 185635,3966,0,2077,162 Entry 3: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766 sum: 421660 positions: 295550,1384,0,3369,16 Entry 4: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974 sum: 453606 positions: 412768,1156,0,4041,470 - Stripe: offset: 1141323 data: 864001 rows: 5000 tail: 69 index: 1975 - Stream: column 0 section ROW_INDEX start: 1141323 length 17 - Stream: column 1 section ROW_INDEX start: 1141340 length 156 - Stream: column 2 section ROW_INDEX start: 1141496 length 168 - Stream: column 3 section ROW_INDEX start: 1141664 length 1634 - Stream: column 1 section DATA start: 1143298 length 20035 - Stream: column 2 section DATA start: 1163333 length 40050 - Stream: column 3 section DATA start: 1203383 length 798014 - Stream: column 3 section LENGTH start: 2001397 length 5902 + Stripe: offset: 1141300 data: 863962 rows: 5000 tail: 69 index: 1975 + Stream: column 0 section ROW_INDEX start: 1141300 length 17 + Stream: column 1 section ROW_INDEX start: 1141317 length 156 + Stream: column 2 section ROW_INDEX start: 1141473 length 168 + Stream: column 3 section ROW_INDEX start: 1141641 length 1634 + Stream: column 1 section DATA start: 1143275 length 20035 + Stream: column 2 section DATA start: 1163310 length 40050 + Stream: column 3 section DATA start: 1203360 length 798014 + Stream: column 3 section LENGTH start: 2001374 length 5863 Encoding column 0: DIRECT Encoding column 1: DIRECT_V2 Encoding column 2: DIRECT_V2 @@ -163,15 +163,15 @@ Stripes: Entry 2: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188-14246-14340-14364-14394-14762-14850-14964-15048-15494-15674-15726-16006-16056-16180-16304-16332-16452-16598-16730-16810-16994-17210 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878 sum: 568274 positions: 286457,302,0,1926,462 Entry 3: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188-14246-14340-14364-14394-14762-14850-14964-15048-15494-15674-15726-16006-16056-16180-16304-16332-16452-16598-16730-16810-16994-17210-17268-17786-17962-18214 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878-18010-18410-18524-18788 sum: 594578 positions: 447943,3328,0,3444,250 Entry 4: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188-14246-14340-14364-14394-14762-14850-14964-15048-15494-15674-15726-16006-16056-16180-16304-16332-16452-16598-16730-16810-16994-17210-17268-17786-17962-18214-18444-18446-18724-18912-18952-19164 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878-18010-18410-18524-18788-19204-19254-19518-19596-19786-19874-19904 sum: 631944 positions: 616471,3986,3778,547,292 - Stripe: offset: 2007368 data: 207295 rows: 1000 tail: 67 index: 841 - Stream: column 0 section ROW_INDEX start: 2007368 length 12 - Stream: column 1 section ROW_INDEX start: 2007380 length 38 - Stream: column 2 section ROW_INDEX start: 2007418 length 41 - Stream: column 3 section ROW_INDEX start: 2007459 length 750 - Stream: column 1 section DATA start: 2008209 length 4007 - Stream: column 2 section DATA start: 2012216 length 8010 - Stream: column 3 section DATA start: 2020226 length 194018 - Stream: column 3 section LENGTH start: 2214244 length 1260 + Stripe: offset: 2007306 data: 207282 rows: 1000 tail: 67 index: 841 + Stream: column 0 section ROW_INDEX start: 2007306 length 12 + Stream: column 1 section ROW_INDEX start: 2007318 length 38 + Stream: column 2 section ROW_INDEX start: 2007356 length 41 + Stream: column 3 section ROW_INDEX start: 2007397 length 750 + Stream: column 1 section DATA start: 2008147 length 4007 + Stream: column 2 section DATA start: 2012154 length 8010 + Stream: column 3 section DATA start: 2020164 length 194018 + Stream: column 3 section LENGTH start: 2214182 length 1247 Encoding column 0: DIRECT Encoding column 1: DIRECT_V2 Encoding column 2: DIRECT_V2 @@ -183,7 +183,7 @@ Stripes: Row group indices for column 3: Entry 0: count: 1000 hasNull: false min: Darkness,-230-368-488-586-862-930-1686-2044-2636-2652-2872-3108-3162-3192-3404-3442-3508-3542-3550-3712-3980-4146-4204-4336-4390-4418-4424-4490-4512-4650-4768-4924-4950-5210-5524-5630-5678-5710-5758-5952-6238-6252-6300-6366-6668-6712-6926-6942-7100-7194-7802-8030-8452-8608-8640-8862-8868-9134-9234-9412-9602-9608-9642-9678-9740-9780-10426-10510-10514-10706-10814-10870-10942-11028-11244-11326-11462-11496-11656-11830-12022-12178-12418-12832-13304-13448-13590-13618-13908-14188-14246-14340-14364-14394-14762-14850-14964-15048-15494-15674-15726-16006-16056-16180-16304-16332-16452-16598-16730-16810-16994-17210-17268-17786-17962-18214-18444-18446-18724-18912-18952-19164-19348-19400-19546-19776-19896-20084 max: worst-54-290-346-648-908-996-1038-1080-1560-1584-1620-1744-1770-1798-1852-1966-2162-2244-2286-2296-2534-2660-3114-3676-3788-4068-4150-4706-4744-5350-5420-5582-5696-5726-6006-6020-6024-6098-6184-6568-6636-6802-6994-7004-7318-7498-7758-7780-7798-7920-7952-7960-7988-8232-8256-8390-8416-8478-8620-8840-8984-9038-9128-9236-9248-9344-9594-9650-9714-9928-9938-10178-10368-10414-10502-10732-10876-11008-11158-11410-11722-11836-11964-12054-12096-12126-12136-12202-12246-12298-12616-12774-12782-12790-12802-12976-13216-13246-13502-13766-14454-14974-15004-15124-15252-15294-15356-15530-15610-16316-16936-17024-17122-17214-17310-17528-17682-17742-17870-17878-18010-18410-18524-18788-19204-19254-19518-19596-19786-19874-19904-20390-20752-20936 sum: 670762 positions: 0,0,0,0,0 -File length: 2217685 bytes +File length: 2217611 bytes Padding length: 0 bytes Padding ratio: 0% ________________________________________________________________________________________________________________________ From 877b2b81f2fa0e4b2f3346bd34101ecb06dc8e0a Mon Sep 17 00:00:00 2001 From: Owen O'Malley Date: Mon, 16 Jan 2017 15:12:29 -0800 Subject: [PATCH 2/3] ORC-119 Move the indexes to be serialized by the PhysicalWriter. --- .../java/org/apache/orc/PhysicalWriter.java | 20 ++ .../org/apache/orc/impl/PhysicalFsWriter.java | 21 ++ .../java/org/apache/orc/impl/WriterImpl.java | 239 +++++++++++------- 3 files changed, 193 insertions(+), 87 deletions(-) diff --git a/java/core/src/java/org/apache/orc/PhysicalWriter.java b/java/core/src/java/org/apache/orc/PhysicalWriter.java index a028db81e2..41a1ee9bf4 100644 --- a/java/core/src/java/org/apache/orc/PhysicalWriter.java +++ b/java/core/src/java/org/apache/orc/PhysicalWriter.java @@ -61,6 +61,26 @@ interface OutputReceiver { */ OutputReceiver createDataStream(StreamName name) throws IOException; + /** + * Write an index in the given stream name. + * @param name the name of the stream + * @param bloom the bloom filter to write + * @param codec the compression codec to use + */ + void writeIndex(StreamName name, + OrcProto.RowIndex.Builder bloom, + CompressionCodec codec) throws IOException; + + /** + * Write a bloom filter index in the given stream name. + * @param name the name of the stream + * @param bloom the bloom filter to write + * @param codec the compression codec to use + */ + void writeBloomFilter(StreamName name, + OrcProto.BloomFilterIndex.Builder bloom, + CompressionCodec codec) throws IOException; + /** * Flushes the data in all the streams, spills them to disk, write out stripe * footer. diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java index b8e0341d00..46730ab011 100644 --- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java +++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -19,6 +19,7 @@ package org.apache.orc.impl; import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -358,6 +359,26 @@ public BufferedStream createDataStream(StreamName name) { return result; } + @Override + public void writeIndex(StreamName name, + OrcProto.RowIndex.Builder index, + CompressionCodec codec) throws IOException { + OutputStream stream = new OutStream(path.toString(), bufferSize, codec, + createDataStream(name)); + index.build().writeTo(stream); + stream.flush(); + } + + @Override + public void writeBloomFilter(StreamName name, + OrcProto.BloomFilterIndex.Builder bloom, + CompressionCodec codec) throws IOException { + OutputStream stream = new OutStream(path.toString(), bufferSize, codec, + createDataStream(name)); + bloom.build().writeTo(stream); + stream.flush(); + } + @Override public String toString() { return path.toString(); diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java index ba1b326306..c364ca0027 100644 --- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java +++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java @@ -284,6 +284,39 @@ public void addPosition(long position) { } } + CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) { + CompressionCodec result = codec; + if (codec != null) { + switch (kind) { + case BLOOM_FILTER: + case DATA: + case DICTIONARY_DATA: + case BLOOM_FILTER_UTF8: + if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) { + result = codec.modify(EnumSet.of(CompressionCodec.Modifier.FAST, + CompressionCodec.Modifier.TEXT)); + } else { + result = codec.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT, + CompressionCodec.Modifier.TEXT)); + } + break; + case LENGTH: + case DICTIONARY_COUNT: + case PRESENT: + case ROW_INDEX: + case SECONDARY: + // easily compressed using the fastest modes + result = codec.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST, + CompressionCodec.Modifier.BINARY)); + break; + default: + LOG.info("Missing ORC compression modifiers for " + kind); + break; + } + } + return result; + } + /** * Interface from the Writer to the TreeWriters. This limits the visibility * that the TreeWriters have into the Writer. @@ -300,35 +333,7 @@ public OutStream createStream(int column, OrcProto.Stream.Kind kind ) throws IOException { final StreamName name = new StreamName(column, kind); - CompressionCodec codec = WriterImpl.this.codec; - if (codec != null) { - switch (kind) { - case BLOOM_FILTER: - case DATA: - case DICTIONARY_DATA: - case BLOOM_FILTER_UTF8: - if (getCompressionStrategy() == OrcFile.CompressionStrategy.SPEED) { - codec = codec.modify(EnumSet.of(CompressionCodec.Modifier.FAST, - CompressionCodec.Modifier.TEXT)); - } else { - codec = codec.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT, - CompressionCodec.Modifier.TEXT)); - } - break; - case LENGTH: - case DICTIONARY_COUNT: - case PRESENT: - case ROW_INDEX: - case SECONDARY: - // easily compressed using the fastest modes - codec = codec.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST, - CompressionCodec.Modifier.BINARY)); - break; - default: - LOG.info("Missing ORC compression modifiers for " + kind); - break; - } - } + CompressionCodec codec = getCustomizedCodec(kind); return new OutStream(physicalWriter.toString(), bufferSize, codec, physicalWriter.createDataStream(name)); @@ -423,6 +428,18 @@ public boolean hasWriterTimeZone() { public OrcFile.BloomFilterVersion getBloomFilterVersion() { return bloomFilterVersion; } + + public void writeIndex(StreamName name, + OrcProto.RowIndex.Builder index) throws IOException { + physicalWriter.writeIndex(name, index, getCustomizedCodec(name.getKind())); + } + + public void writeBloomFilter(StreamName name, + OrcProto.BloomFilterIndex.Builder bloom + ) throws IOException { + physicalWriter.writeBloomFilter(name, bloom, + getCustomizedCodec(name.getKind())); + } } /** @@ -442,9 +459,6 @@ private abstract static class TreeWriter { protected final RowIndexPositionRecorder rowIndexPosition; private final OrcProto.RowIndex.Builder rowIndex; private final OrcProto.RowIndexEntry.Builder rowIndexEntry; - private final PositionedOutputStream rowIndexStream; - private final PositionedOutputStream bloomFilterStream; - private final PositionedOutputStream bloomFilterStreamUtf8; protected final BloomFilter bloomFilter; protected final BloomFilterUtf8 bloomFilterUtf8; protected final boolean createBloomFilter; @@ -484,39 +498,33 @@ private abstract static class TreeWriter { stripeColStatistics = ColumnStatisticsImpl.create(schema); fileStatistics = ColumnStatisticsImpl.create(schema); childrenWriters = new TreeWriter[0]; - rowIndex = OrcProto.RowIndex.newBuilder(); - rowIndexEntry = OrcProto.RowIndexEntry.newBuilder(); - rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry); - stripeStatsBuilders = new ArrayList<>(); if (streamFactory.buildIndex()) { - rowIndexStream = streamFactory.createStream(id, OrcProto.Stream.Kind.ROW_INDEX); + rowIndex = OrcProto.RowIndex.newBuilder(); + rowIndexEntry = OrcProto.RowIndexEntry.newBuilder(); + rowIndexPosition = new RowIndexPositionRecorder(rowIndexEntry); } else { - rowIndexStream = null; + rowIndex = null; + rowIndexEntry = null; + rowIndexPosition = null; } + stripeStatsBuilders = new ArrayList<>(); if (createBloomFilter) { bloomFilterEntry = OrcProto.BloomFilter.newBuilder(); if (streamFactory.getBloomFilterVersion() == OrcFile.BloomFilterVersion.ORIGINAL) { bloomFilter = new BloomFilter(streamFactory.getRowIndexStride(), streamFactory.getBloomFilterFPP()); bloomFilterIndex = OrcProto.BloomFilterIndex.newBuilder(); - bloomFilterStream = streamFactory.createStream(id, - OrcProto.Stream.Kind.BLOOM_FILTER);; } else { bloomFilter = null; bloomFilterIndex = null; - bloomFilterStream = null; } bloomFilterUtf8 = new BloomFilterUtf8(streamFactory.getRowIndexStride(), streamFactory.getBloomFilterFPP()); bloomFilterIndexUtf8 = OrcProto.BloomFilterIndex.newBuilder(); - bloomFilterStreamUtf8 = streamFactory.createStream(id, - OrcProto.Stream.Kind.BLOOM_FILTER_UTF8);; } else { bloomFilterEntry = null; bloomFilterIndex = null; bloomFilterIndexUtf8 = null; - bloomFilterStreamUtf8 = null; - bloomFilterStream = null; bloomFilter = null; bloomFilterUtf8 = null; } @@ -650,7 +658,7 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, isPresentOutStream.suppress(); // since isPresent bitstream is suppressed, update the index to // remove the positions of the isPresent stream - if (rowIndexStream != null) { + if (rowIndex != null) { removeIsPresentPositions(); } } @@ -669,28 +677,27 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, if (streamFactory.hasWriterTimeZone()) { builder.setWriterTimezone(TimeZone.getDefault().getID()); } - if (rowIndexStream != null) { + if (rowIndex != null) { if (rowIndex.getEntryCount() != requiredIndexEntries) { throw new IllegalArgumentException("Column has wrong number of " + "index entries found: " + rowIndex.getEntryCount() + " expected: " + requiredIndexEntries); } - rowIndex.build().writeTo(rowIndexStream); - rowIndexStream.flush(); + streamFactory.writeIndex(new StreamName(id, OrcProto.Stream.Kind.ROW_INDEX), rowIndex); + rowIndex.clear(); + rowIndexEntry.clear(); } - rowIndex.clear(); - rowIndexEntry.clear(); // write the bloom filter to out stream - if (bloomFilterStream != null) { - bloomFilterIndex.build().writeTo(bloomFilterStream); - bloomFilterStream.flush(); + if (bloomFilterIndex != null) { + streamFactory.writeBloomFilter(new StreamName(id, + OrcProto.Stream.Kind.BLOOM_FILTER), bloomFilterIndex); bloomFilterIndex.clear(); } // write the bloom filter to out stream - if (bloomFilterStreamUtf8 != null) { - bloomFilterIndexUtf8.build().writeTo(bloomFilterStreamUtf8); - bloomFilterStreamUtf8.flush(); + if (bloomFilterIndexUtf8 != null) { + streamFactory.writeBloomFilter(new StreamName(id, + OrcProto.Stream.Kind.BLOOM_FILTER_UTF8), bloomFilterIndexUtf8); bloomFilterIndexUtf8.clear(); } } @@ -791,7 +798,9 @@ private static class BooleanTreeWriter extends TreeWriter { PositionedOutputStream out = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.writer = new BitFieldWriter(out, 1); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -823,7 +832,9 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); writer.flush(); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -848,7 +859,9 @@ private static class ByteTreeWriter extends TreeWriter { super(columnId, schema, writer, nullable); this.writer = new RunLengthByteWriter(writer.createStream(id, OrcProto.Stream.Kind.DATA)); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -892,7 +905,9 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); writer.flush(); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -920,7 +935,9 @@ private static class IntegerTreeWriter extends TreeWriter { OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); this.writer = createIntegerWriter(out, true, isDirectV2, writer); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -974,7 +991,9 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); writer.flush(); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -1001,7 +1020,9 @@ private static class FloatTreeWriter extends TreeWriter { this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.utils = new SerializationUtils(); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -1046,7 +1067,9 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); stream.flush(); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -1073,7 +1096,9 @@ private static class DoubleTreeWriter extends TreeWriter { this.stream = writer.createStream(id, OrcProto.Stream.Kind.DATA); this.utils = new SerializationUtils(); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -1117,7 +1142,9 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); stream.flush(); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -1166,7 +1193,9 @@ private static abstract class StringBaseTreeWriter extends TreeWriter { OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); rowOutput = createIntegerWriter(directStreamOutput, false, isDirectV2, writer); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } rowIndexValueCount.add(0L); buildIndex = writer.buildIndex(); Configuration conf = writer.getConfiguration(); @@ -1223,7 +1252,9 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, dictionary.clear(); savedRowIndex.clear(); rowIndexValueCount.clear(); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } rowIndexValueCount.add(0L); if (!useDictionaryEncoding) { @@ -1349,8 +1380,10 @@ void createRowIndexEntry() throws IOException { } private void recordDirectStreamPosition() throws IOException { - directStreamOutput.getPosition(rowIndexPosition); - lengthOutput.getPosition(rowIndexPosition); + if (rowIndexPosition != null) { + directStreamOutput.getPosition(rowIndexPosition); + lengthOutput.getPosition(rowIndexPosition); + } } @Override @@ -1619,7 +1652,9 @@ private static class BinaryTreeWriter extends TreeWriter { this.isDirectV2 = isNewWriteFormat(writer); this.length = createIntegerWriter(writer.createStream(id, OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -1681,7 +1716,9 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, super.writeStripe(builder, requiredIndexEntries); stream.flush(); length.flush(); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -1720,7 +1757,9 @@ private static class TimestampTreeWriter extends TreeWriter { OrcProto.Stream.Kind.DATA), true, isDirectV2, writer); this.nanos = createIntegerWriter(writer.createStream(id, OrcProto.Stream.Kind.SECONDARY), false, isDirectV2, writer); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } // for unit tests to set different time zones this.base_timestamp = Timestamp.valueOf(BASE_TIMESTAMP_STRING).getTime() / MILLIS_PER_SECOND; writer.useWriterTimeZone(true); @@ -1786,7 +1825,9 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, super.writeStripe(builder, requiredIndexEntries); seconds.flush(); nanos.flush(); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } private static long formatNanos(int nanos) { @@ -1832,7 +1873,9 @@ private static class DateTreeWriter extends TreeWriter { OrcProto.Stream.Kind.DATA); this.isDirectV2 = isNewWriteFormat(writer); this.writer = createIntegerWriter(out, true, isDirectV2, writer); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -1876,7 +1919,9 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, int requiredIndexEntries) throws IOException { super.writeStripe(builder, requiredIndexEntries); writer.flush(); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -1922,7 +1967,9 @@ private static class DecimalTreeWriter extends TreeWriter { scratchBuffer = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_TO_BYTES]; this.scaleStream = createIntegerWriter(writer.createStream(id, OrcProto.Stream.Kind.SECONDARY), true, isDirectV2, writer); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -1982,7 +2029,9 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, super.writeStripe(builder, requiredIndexEntries); valueStream.flush(); scaleStream.flush(); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -2012,7 +2061,9 @@ private static class StructTreeWriter extends TreeWriter { children.get(i), writer, true); } - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -2076,7 +2127,9 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, for(TreeWriter child: childrenWriters) { child.writeStripe(builder, requiredIndexEntries); } - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } } @@ -2095,7 +2148,9 @@ private static class ListTreeWriter extends TreeWriter { createTreeWriter(schema.getChildren().get(0), writer, true); lengths = createIntegerWriter(writer.createStream(columnId, OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -2171,7 +2226,9 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, for(TreeWriter child: childrenWriters) { child.writeStripe(builder, requiredIndexEntries); } - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -2204,7 +2261,9 @@ private static class MapTreeWriter extends TreeWriter { createTreeWriter(children.get(1), writer, true); lengths = createIntegerWriter(writer.createStream(columnId, OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -2285,7 +2344,9 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, for(TreeWriter child: childrenWriters) { child.writeStripe(builder, requiredIndexEntries); } - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -2317,7 +2378,9 @@ private static class UnionTreeWriter extends TreeWriter { tags = new RunLengthByteWriter(writer.createStream(columnId, OrcProto.Stream.Kind.DATA)); - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override @@ -2389,7 +2452,9 @@ void writeStripe(OrcProto.StripeFooter.Builder builder, for(TreeWriter child: childrenWriters) { child.writeStripe(builder, requiredIndexEntries); } - recordPosition(rowIndexPosition); + if (rowIndexPosition != null) { + recordPosition(rowIndexPosition); + } } @Override From 09050ad59eb1a02767af98be950b995589dc0067 Mon Sep 17 00:00:00 2001 From: Owen O'Malley Date: Mon, 16 Jan 2017 15:26:25 -0800 Subject: [PATCH 3/3] ORC-119 fixed copy-paste error --- java/core/src/java/org/apache/orc/PhysicalWriter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java/core/src/java/org/apache/orc/PhysicalWriter.java b/java/core/src/java/org/apache/orc/PhysicalWriter.java index 41a1ee9bf4..9953d41b88 100644 --- a/java/core/src/java/org/apache/orc/PhysicalWriter.java +++ b/java/core/src/java/org/apache/orc/PhysicalWriter.java @@ -64,11 +64,11 @@ interface OutputReceiver { /** * Write an index in the given stream name. * @param name the name of the stream - * @param bloom the bloom filter to write + * @param index the bloom filter to write * @param codec the compression codec to use */ void writeIndex(StreamName name, - OrcProto.RowIndex.Builder bloom, + OrcProto.RowIndex.Builder index, CompressionCodec codec) throws IOException; /**