From 837a1bd7c4c9a5c82b7525f65ed8eaa60a5a1b52 Mon Sep 17 00:00:00 2001 From: Owen O'Malley Date: Mon, 30 Apr 2018 14:10:07 -0700 Subject: [PATCH] ORC-248. PhysicalFsWriter will sometimes pass a negative amount of requested padding to the shims. Fixes #259 Signed-off-by: Owen O'Malley --- .../core/src/java/org/apache/orc/OrcConf.java | 5 +- .../core/src/java/org/apache/orc/OrcFile.java | 37 ++ .../org/apache/orc/impl/PhysicalFsWriter.java | 96 +++-- .../apache/orc/impl/TestPhysicalFsWriter.java | 331 ++++++++++++++++++ .../java/org/apache/orc/impl/HadoopShims.java | 9 +- .../apache/orc/impl/HadoopShimsCurrent.java | 12 +- .../apache/orc/impl/HadoopShimsPre2_3.java | 17 +- .../apache/orc/impl/HadoopShimsPre2_6.java | 5 +- .../apache/orc/impl/HadoopShimsPre2_7.java | 5 +- site/_docs/hive-config.md | 7 + 10 files changed, 441 insertions(+), 83 deletions(-) create mode 100644 java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java index 443da878ea..d92f776086 100644 --- a/java/core/src/java/org/apache/orc/OrcConf.java +++ b/java/core/src/java/org/apache/orc/OrcConf.java @@ -152,7 +152,10 @@ public enum OrcConf { OVERWRITE_OUTPUT_FILE("orc.overwrite.output.file", "orc.overwrite.output.file", false, "A boolean flag to enable overwriting of the output file if it already exists.\n"), IS_SCHEMA_EVOLUTION_CASE_SENSITIVE("orc.schema.evolution.case.sensitive", "orc.schema.evolution.case.sensitive", true, - "A boolean flag to determine if the comparision of field names in schema evolution is case sensitive .\n") + "A boolean flag to determine if the comparision of field names in schema evolution is case sensitive .\n"), + WRITE_VARIABLE_LENGTH_BLOCKS("orc.write.variable.length.blocks", null, false, + "A boolean flag as to whether the ORC writer should write variable length\n" + + "HDFS blocks.") ; private final String attribute; diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java index 3ff7666adf..c9262e9bb3 100644 --- a/java/core/src/java/org/apache/orc/OrcFile.java +++ b/java/core/src/java/org/apache/orc/OrcFile.java @@ -30,6 +30,8 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.orc.impl.HadoopShims; +import org.apache.orc.impl.HadoopShimsFactory; import org.apache.orc.impl.MemoryManagerImpl; import org.apache.orc.impl.OrcTail; import org.apache.orc.impl.ReaderImpl; @@ -389,6 +391,8 @@ public static class WriterOptions implements Cloneable { private PhysicalWriter physicalWriter; private WriterVersion writerVersion = CURRENT_WRITER; private boolean overwrite; + private boolean writeVariableLengthBlocks; + private HadoopShims shims; protected WriterOptions(Properties tableProperties, Configuration conf) { configuration = conf; @@ -428,6 +432,9 @@ protected WriterOptions(Properties tableProperties, Configuration conf) { BloomFilterVersion.fromString( OrcConf.BLOOM_FILTER_WRITE_VERSION.getString(tableProperties, conf)); + shims = HadoopShimsFactory.get(); + writeVariableLengthBlocks = + OrcConf.WRITE_VARIABLE_LENGTH_BLOCKS.getBoolean(tableProperties,conf); } /** @@ -621,6 +628,28 @@ public WriterOptions memory(MemoryManager value) { return this; } + /** + * Should the ORC file writer use HDFS variable length blocks, if they + * are available? + * @param value the new value + * @return this + */ + public WriterOptions writeVariableLengthBlocks(boolean value) { + writeVariableLengthBlocks = value; + return this; + } + + /** + * Set the HadoopShims to use. + * This is only for testing. + * @param value the new value + * @return this + */ + public WriterOptions setShims(HadoopShims value) { + this.shims = value; + return this; + } + /** * Manually set the writer version. * This is an internal API. @@ -722,6 +751,14 @@ public PhysicalWriter getPhysicalWriter() { public WriterVersion getWriterVersion() { return writerVersion; } + + public boolean getWriteVariableLengthBlocks() { + return writeVariableLengthBlocks; + } + + public HadoopShims getHadoopShims() { + return shims; + } } /** diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java index 2521e6d0ab..7a2be5bf08 100644 --- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java +++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -44,31 +44,32 @@ public class PhysicalFsWriter implements PhysicalWriter { private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class); private static final int HDFS_BUFFER_SIZE = 256 * 1024; - private static final HadoopShims shims = HadoopShimsFactory.get(); private FSDataOutputStream rawWriter; // the compressed metadata information outStream - private OutStream writer = null; + private OutStream writer; // a protobuf outStream around streamFactory - private CodedOutputStream protobufWriter = null; + private CodedOutputStream protobufWriter; private final Path path; + private final HadoopShims shims; private final long blockSize; private final int bufferSize; - private final double paddingTolerance; - private final long defaultStripeSize; + private final int maxPadding; private final CompressionKind compress; private CompressionCodec codec; private final boolean addBlockPadding; + private final boolean writeVariableLengthBlocks; // the streams that make up the current stripe private final Map streams = new TreeMap<>(); - private long adjustedStripeSize; private long headerLength; private long stripeStart; - private long blockStart; + // The position of the last time we wrote a short block, which becomes the + // natural blocks + private long blockOffset; private int metadataLength; private int footerLength; @@ -76,7 +77,7 @@ public PhysicalFsWriter(FileSystem fs, Path path, OrcFile.WriterOptions opts) throws IOException { this.path = path; - this.defaultStripeSize = this.adjustedStripeSize = opts.getStripeSize(); + long defaultStripeSize = opts.getStripeSize(); this.addBlockPadding = opts.getBlockPadding(); if (opts.isEnforceBufferSize()) { this.bufferSize = opts.getBufferSize(); @@ -86,17 +87,20 @@ public PhysicalFsWriter(FileSystem fs, opts.getBufferSize()); } this.compress = opts.getCompress(); - this.paddingTolerance = opts.getPaddingTolerance(); + this.maxPadding = (int) (opts.getPaddingTolerance() * defaultStripeSize); this.blockSize = opts.getBlockSize(); LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" + " compression: {} bufferSize: {}", path, defaultStripeSize, blockSize, compress, bufferSize); rawWriter = fs.create(path, opts.getOverwrite(), HDFS_BUFFER_SIZE, fs.getDefaultReplication(path), blockSize); + blockOffset = 0; codec = OrcCodecPool.getCodec(compress); writer = new OutStream("metadata", bufferSize, codec, new DirectStream(rawWriter)); protobufWriter = CodedOutputStream.newInstance(writer); + writeVariableLengthBlocks = opts.getWriteVariableLengthBlocks(); + shims = opts.getHadoopShims(); } @Override @@ -130,49 +134,41 @@ public long getFileBytes(final int column) { return size; } - private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException { - this.stripeStart = rawWriter.getPos(); - final long currentStripeSize = indexSize + dataSize + footerSize; - final long available = blockSize - (stripeStart - blockStart); - final long overflow = currentStripeSize - adjustedStripeSize; - final float availRatio = (float) available / (float) defaultStripeSize; - - if (availRatio > 0.0f && availRatio < 1.0f - && availRatio > paddingTolerance) { - // adjust default stripe size to fit into remaining space, also adjust - // the next stripe for correction based on the current stripe size - // and user specified padding tolerance. Since stripe size can overflow - // the default stripe size we should apply this correction to avoid - // writing portion of last stripe to next hdfs block. - double correction = overflow > 0 ? (double) overflow - / (double) adjustedStripeSize : 0.0; - - // correction should not be greater than user specified padding - // tolerance - correction = correction > paddingTolerance ? paddingTolerance - : correction; - - // adjust next stripe size based on current stripe estimate correction - adjustedStripeSize = (long) ((1.0f - correction) * (availRatio * defaultStripeSize)); - } else if (availRatio >= 1.0) { - adjustedStripeSize = defaultStripeSize; + private static final byte[] ZEROS = new byte[64*1024]; + + private static void writeZeros(OutputStream output, + long remaining) throws IOException { + while (remaining > 0) { + long size = Math.min(ZEROS.length, remaining); + output.write(ZEROS, 0, (int) size); + remaining -= size; } + } - if (addBlockPadding) { - if (availRatio < paddingTolerance) { - long padding = blockSize - (stripeStart - blockStart); - LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)", - padding, availRatio, defaultStripeSize)); - stripeStart += shims.padStreamToBlock(rawWriter, padding); - blockStart = stripeStart; // new block - adjustedStripeSize = defaultStripeSize; + /** + * Do any required shortening of the HDFS block or padding to avoid stradling + * HDFS blocks. This is called before writing the current stripe. + * @param stripeSize the number of bytes in the current stripe + */ + private void padStripe(long stripeSize) throws IOException { + this.stripeStart = rawWriter.getPos(); + long previousBytesInBlock = (stripeStart - blockOffset) % blockSize; + // We only have options if this isn't the first stripe in the block + if (previousBytesInBlock > 0) { + if (previousBytesInBlock + stripeSize >= blockSize) { + // Try making a short block + if (writeVariableLengthBlocks && + shims.endVariableLengthBlock(rawWriter)) { + blockOffset = stripeStart; + } else if (addBlockPadding) { + // if we cross the block boundary, figure out what we should do + long padding = blockSize - previousBytesInBlock; + if (padding <= maxPadding) { + writeZeros(rawWriter, padding); + stripeStart += padding; + } + } } - } else if (currentStripeSize < blockSize - && (stripeStart - blockStart) + currentStripeSize > blockSize) { - // even if you don't intend to pad, reset the default stripe size when crossing a - // block boundary - adjustedStripeSize = defaultStripeSize; - blockStart = stripeStart + (stripeStart + currentStripeSize) % blockSize; } } @@ -370,7 +366,7 @@ public void finalizeStripe(OrcProto.StripeFooter.Builder footerBuilder, OrcProto.StripeFooter footer = footerBuilder.build(); // Do we need to pad the file so the stripe doesn't straddle a block boundary? - padStripe(indexSize, dataSize, footer.getSerializedSize()); + padStripe(indexSize + dataSize + footer.getSerializedSize()); // write out the data streams for (Map.Entry pair : streams.entrySet()) { diff --git a/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java b/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java new file mode 100644 index 0000000000..6d2d298801 --- /dev/null +++ b/java/core/src/test/org/apache/orc/impl/TestPhysicalFsWriter.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.OrcProto; +import org.apache.orc.PhysicalWriter; +import org.apache.orc.TypeDescription; +import org.junit.Test; +import java.io.IOException; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +public class TestPhysicalFsWriter { + + final Configuration conf = new Configuration(); + + static class MemoryOutputStream extends OutputStream { + private final List contents; + + MemoryOutputStream(List contents) { + this.contents = contents; + } + + @Override + public void write(int b) throws IOException { + contents.add(new byte[]{(byte) b}); + } + + @Override + public void write(byte[] a, int offset, int length) { + byte[] buffer = new byte[length]; + System.arraycopy(a, offset, buffer, 0, length); + contents.add(buffer); + } + } + + static class MemoryFileSystem extends FileSystem { + + @Override + public URI getUri() { + try { + return new URI("test:///"); + } catch (URISyntaxException e) { + throw new IllegalStateException("bad url", e); + } + } + + @Override + public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return null; + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, int bufferSize, + short replication, long blockSize, + Progressable progress) throws IOException { + List contents = new ArrayList<>(); + fileContents.put(f, contents); + return new FSDataOutputStream(new MemoryOutputStream(contents)); + } + + @Override + public FSDataOutputStream append(Path f, int bufferSize, + Progressable progress) throws IOException { + throw new UnsupportedOperationException("append not supported"); + } + + @Override + public boolean rename(Path src, Path dst) throws IOException { + boolean result = fileContents.containsKey(src) && + !fileContents.containsKey(dst); + if (result) { + List contents = fileContents.remove(src); + fileContents.put(dst, contents); + } + return result; + } + + @Override + public boolean delete(Path f, boolean recursive) throws IOException { + boolean result = fileContents.containsKey(f); + fileContents.remove(f); + return result; + } + + @Override + public FileStatus[] listStatus(Path f) throws IOException { + return new FileStatus[]{getFileStatus(f)}; + } + + @Override + public void setWorkingDirectory(Path new_dir) { + currentWorkingDirectory = new_dir; + } + + @Override + public Path getWorkingDirectory() { + return currentWorkingDirectory; + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + return false; + } + + @Override + public FileStatus getFileStatus(Path f) throws IOException { + List contents = fileContents.get(f); + if (contents != null) { + long sum = 0; + for(byte[] b: contents) { + sum += b.length; + } + return new FileStatus(sum, false, 1, 256 * 1024, 0, f); + } + return null; + } + + private final Map> fileContents = new HashMap<>(); + private Path currentWorkingDirectory = new Path("/"); + } + + @Test + public void testStripePadding() throws IOException { + TypeDescription schema = TypeDescription.fromString("int"); + OrcFile.WriterOptions opts = + OrcFile.writerOptions(conf) + .stripeSize(32 * 1024) + .blockSize(64 * 1024) + .compress(CompressionKind.NONE) + .setSchema(schema); + MemoryFileSystem fs = new MemoryFileSystem(); + PhysicalFsWriter writer = new PhysicalFsWriter(fs, new Path("test1.orc"), + opts); + writer.writeHeader(); + StreamName stream0 = new StreamName(0, OrcProto.Stream.Kind.DATA); + PhysicalWriter.OutputReceiver output = writer.createDataStream(stream0); + byte[] buffer = new byte[1024]; + for(int i=0; i < buffer.length; ++i) { + buffer[i] = (byte) i; + } + for(int i=0; i < 63; ++i) { + output.output(ByteBuffer.wrap(buffer)); + } + OrcProto.StripeFooter.Builder footer = OrcProto.StripeFooter.newBuilder(); + OrcProto.StripeInformation.Builder dirEntry = + OrcProto.StripeInformation.newBuilder(); + writer.finalizeStripe(footer, dirEntry); + // check to make sure that it laid it out without padding + assertEquals(0L, dirEntry.getIndexLength()); + assertEquals(63 * 1024L, dirEntry.getDataLength()); + assertEquals(3, dirEntry.getOffset()); + for(int i=0; i < 62; ++i) { + output.output(ByteBuffer.wrap(buffer)); + } + footer = OrcProto.StripeFooter.newBuilder(); + dirEntry = OrcProto.StripeInformation.newBuilder(); + writer.finalizeStripe(footer, dirEntry); + // the second one should pad + assertEquals(64 * 1024, dirEntry.getOffset()); + assertEquals(62 * 1024, dirEntry.getDataLength()); + long endOfStripe = dirEntry.getOffset() + dirEntry.getIndexLength() + + dirEntry.getDataLength() + dirEntry.getFooterLength(); + + for(int i=0; i < 3; ++i) { + output.output(ByteBuffer.wrap(buffer)); + } + footer = OrcProto.StripeFooter.newBuilder(); + dirEntry = OrcProto.StripeInformation.newBuilder(); + writer.finalizeStripe(footer, dirEntry); + // the third one should be over the padding limit + assertEquals(endOfStripe, dirEntry.getOffset()); + assertEquals(3 * 1024, dirEntry.getDataLength()); + } + + @Test + public void testNoStripePadding() throws IOException { + TypeDescription schema = TypeDescription.fromString("int"); + OrcFile.WriterOptions opts = + OrcFile.writerOptions(conf) + .blockPadding(false) + .stripeSize(32 * 1024) + .blockSize(64 * 1024) + .compress(CompressionKind.NONE) + .setSchema(schema); + MemoryFileSystem fs = new MemoryFileSystem(); + PhysicalFsWriter writer = new PhysicalFsWriter(fs, new Path("test1.orc"), + opts); + writer.writeHeader(); + StreamName stream0 = new StreamName(0, OrcProto.Stream.Kind.DATA); + PhysicalWriter.OutputReceiver output = writer.createDataStream(stream0); + byte[] buffer = new byte[1024]; + for(int i=0; i < buffer.length; ++i) { + buffer[i] = (byte) i; + } + for(int i=0; i < 63; ++i) { + output.output(ByteBuffer.wrap(buffer)); + } + OrcProto.StripeFooter.Builder footer = OrcProto.StripeFooter.newBuilder(); + OrcProto.StripeInformation.Builder dirEntry = + OrcProto.StripeInformation.newBuilder(); + writer.finalizeStripe(footer, dirEntry); + // check to make sure that it laid it out without padding + assertEquals(0L, dirEntry.getIndexLength()); + assertEquals(63 * 1024L, dirEntry.getDataLength()); + assertEquals(3, dirEntry.getOffset()); + long endOfStripe = dirEntry.getOffset() + dirEntry.getDataLength() + + dirEntry.getFooterLength(); + for(int i=0; i < 62; ++i) { + output.output(ByteBuffer.wrap(buffer)); + } + footer = OrcProto.StripeFooter.newBuilder(); + dirEntry = OrcProto.StripeInformation.newBuilder(); + writer.finalizeStripe(footer, dirEntry); + // no padding, because we turned it off + assertEquals(endOfStripe, dirEntry.getOffset()); + assertEquals(62 * 1024, dirEntry.getDataLength()); + } + + static class MockHadoopShim implements HadoopShims { + long lastShortBlock = -1; + + @Override + public DirectDecompressor getDirectDecompressor(DirectCompressionType codec) { + return null; + } + + @Override + public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException { + return null; + } + + @Override + public boolean endVariableLengthBlock(OutputStream output) throws IOException { + if (output instanceof FSDataOutputStream) { + lastShortBlock = ((FSDataOutputStream) output).getPos(); + return true; + } + return false; + } + + @Override + public KeyProvider getKeyProvider(Configuration conf, Random random) throws IOException { + return null; + } + } + + @Test + public void testShortBlock() throws IOException { + MockHadoopShim shim = new MockHadoopShim(); + TypeDescription schema = TypeDescription.fromString("int"); + OrcFile.WriterOptions opts = + OrcFile.writerOptions(conf) + .blockPadding(false) + .stripeSize(32 * 1024) + .blockSize(64 * 1024) + .compress(CompressionKind.NONE) + .setSchema(schema) + .setShims(shim) + .writeVariableLengthBlocks(true); + MemoryFileSystem fs = new MemoryFileSystem(); + PhysicalFsWriter writer = new PhysicalFsWriter(fs, new Path("test1.orc"), + opts); + writer.writeHeader(); + StreamName stream0 = new StreamName(0, OrcProto.Stream.Kind.DATA); + PhysicalWriter.OutputReceiver output = writer.createDataStream(stream0); + byte[] buffer = new byte[1024]; + for(int i=0; i < buffer.length; ++i) { + buffer[i] = (byte) i; + } + for(int i=0; i < 63; ++i) { + output.output(ByteBuffer.wrap(buffer)); + } + OrcProto.StripeFooter.Builder footer = OrcProto.StripeFooter.newBuilder(); + OrcProto.StripeInformation.Builder dirEntry = + OrcProto.StripeInformation.newBuilder(); + writer.finalizeStripe(footer, dirEntry); + // check to make sure that it laid it out without padding + assertEquals(0L, dirEntry.getIndexLength()); + assertEquals(63 * 1024L, dirEntry.getDataLength()); + assertEquals(3, dirEntry.getOffset()); + long endOfStripe = dirEntry.getOffset() + dirEntry.getDataLength() + + dirEntry.getFooterLength(); + for(int i=0; i < 62; ++i) { + output.output(ByteBuffer.wrap(buffer)); + } + footer = OrcProto.StripeFooter.newBuilder(); + dirEntry = OrcProto.StripeInformation.newBuilder(); + writer.finalizeStripe(footer, dirEntry); + // we should get a short block and no padding + assertEquals(endOfStripe, dirEntry.getOffset()); + assertEquals(62 * 1024, dirEntry.getDataLength()); + assertEquals(endOfStripe, shim.lastShortBlock); + } +} diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShims.java b/java/shims/src/java/org/apache/orc/impl/HadoopShims.java index 6ba786641f..cdc43ac949 100644 --- a/java/shims/src/java/org/apache/orc/impl/HadoopShims.java +++ b/java/shims/src/java/org/apache/orc/impl/HadoopShims.java @@ -116,11 +116,12 @@ ByteBuffer readBuffer(int maxLength, } /** - * Allow block boundaries to be reached by zero-fill or variable length block - * markers (in HDFS). - * @return the number of bytes written + * End the OutputStream's current block at the current location. + * This is only available on HDFS on Hadoop >= 2.7, but will return false + * otherwise. + * @return was a variable length block created? */ - long padStreamToBlock(OutputStream output, long padding) throws IOException; + boolean endVariableLengthBlock(OutputStream output) throws IOException; /** * A source of crypto keys. This is usually backed by a Ranger KMS. diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java index 95c254bbe0..ff11159ff6 100644 --- a/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java +++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java @@ -49,15 +49,13 @@ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, } @Override - public long padStreamToBlock(OutputStream output, - long padding) throws IOException { + public boolean endVariableLengthBlock(OutputStream output) throws IOException { if (output instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) output).hsync( - EnumSet.of(HdfsDataOutputStream.SyncFlag.END_BLOCK)); - return 0; // no padding - } else { - return HadoopShimsPre2_3.padStream(output, padding); + HdfsDataOutputStream hdfs = (HdfsDataOutputStream) output; + hdfs.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.END_BLOCK)); + return true; } + return false; } @Override diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java index c76c4b971a..1d5514791c 100644 --- a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java +++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java @@ -49,22 +49,9 @@ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, return null; } - private static final int BUFFER_SIZE = 256 * 1024; - - static long padStream(OutputStream output, - long padding) throws IOException { - byte[] pad = new byte[(int) Math.min(BUFFER_SIZE, padding)]; // always clear - while (padding > 0) { - int writeLen = (int) Math.min(padding, pad.length); - output.write(pad, 0, writeLen); - padding -= writeLen; - } - return padding; - } - @Override - public long padStreamToBlock(OutputStream output, long padding) throws IOException { - return padStream(output, padding); + public boolean endVariableLengthBlock(OutputStream output) { + return false; } @Override diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java index 775ce0d349..618e4c85c6 100644 --- a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java +++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java @@ -124,9 +124,8 @@ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, } @Override - public long padStreamToBlock(OutputStream output, - long padding) throws IOException { - return HadoopShimsPre2_3.padStream(output, padding); + public boolean endVariableLengthBlock(OutputStream output) { + return false; } @Override diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java index f405f2864b..17296abb33 100644 --- a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java +++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java @@ -60,9 +60,8 @@ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, } @Override - public long padStreamToBlock(OutputStream output, - long padding) throws IOException { - return HadoopShimsPre2_3.padStream(output, padding); + public boolean endVariableLengthBlock(OutputStream output) { + return false; } static String buildKeyVersionName(KeyMetadata key) { diff --git a/site/_docs/hive-config.md b/site/_docs/hive-config.md index 7945cdf85f..b463a6b2ef 100644 --- a/site/_docs/hive-config.md +++ b/site/_docs/hive-config.md @@ -179,4 +179,11 @@ There are many Hive configuration properties related to ORC files: the compression level of higher level compression codec. Value can be SPEED or COMPRESSION. + + orc.write.variable.length.blocks + false + Should the ORC writer use HDFS variable length blocks, if they are + available? If the new stripe would straddle a block, Hadoop is ≥ 2.7, + and this is enabled, it will end the block before the new stripe. +