From fb3f096e6724811bb9604c016b008519d34288b8 Mon Sep 17 00:00:00 2001 From: Gopal V Date: Mon, 24 Jul 2017 21:15:57 -0700 Subject: [PATCH 1/2] ORC-91: Use hdfs v-blocks instead of zero-padding stripes --- java/core/pom.xml | 5 + .../apache/orc/impl/HadoopShimsFactory.java | 6 +- .../org/apache/orc/impl/HadoopShims_2_2.java | 101 ---------------- .../org/apache/orc/impl/PhysicalFsWriter.java | 21 ++-- java/pom.xml | 11 +- java/shims/pom.xml | 5 + .../java/org/apache/orc/impl/HadoopShims.java | 43 +++---- .../apache/orc/impl/HadoopShimsCurrent.java | 97 +++------------ .../apache/orc/impl/HadoopShimsPre2_3.java | 65 ++++++++++ .../apache/orc/impl/HadoopShimsPre2_7.java | 113 ++++++++++++++++++ 10 files changed, 242 insertions(+), 225 deletions(-) delete mode 100644 java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java create mode 100644 java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java create mode 100644 java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java diff --git a/java/core/pom.xml b/java/core/pom.xml index 3bd5fa07d8..32b2c15807 100644 --- a/java/core/pom.xml +++ b/java/core/pom.xml @@ -55,6 +55,11 @@ org.apache.hadoop hadoop-common + + org.apache.hadoop + hadoop-hdfs + provided + org.apache.hive hive-storage-api diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShimsFactory.java b/java/core/src/java/org/apache/orc/impl/HadoopShimsFactory.java index 3dffdef6a8..090dc98b4c 100644 --- a/java/core/src/java/org/apache/orc/impl/HadoopShimsFactory.java +++ b/java/core/src/java/org/apache/orc/impl/HadoopShimsFactory.java @@ -26,6 +26,8 @@ public class HadoopShimsFactory { private static final String CURRENT_SHIM_NAME = "org.apache.orc.impl.HadoopShimsCurrent"; + private static final String PRE_2_7_SHIM_NAME = + "org.apache.orc.impl.HadoopShimsPre2_7"; private static HadoopShims SHIMS = null; @@ -49,7 +51,9 @@ public static synchronized HadoopShims get() { int major = Integer.parseInt(versionParts[0]); int minor = Integer.parseInt(versionParts[1]); if (major < 2 || (major == 2 && minor < 3)) { - SHIMS = new HadoopShims_2_2(); + SHIMS = new HadoopShimsPre2_3(); + } else if (major == 2 && minor < 7) { + SHIMS = createShimByName(PRE_2_7_SHIM_NAME); } else { SHIMS = createShimByName(CURRENT_SHIM_NAME); } diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java b/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java deleted file mode 100644 index 501101fff3..0000000000 --- a/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.io.Text; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.lang.reflect.Method; - -/** - * Shims for versions of Hadoop up to and including 2.2.x - */ -public class HadoopShims_2_2 implements HadoopShims { - - final boolean zeroCopy; - final boolean fastRead; - - HadoopShims_2_2() { - boolean zcr = false; - try { - Class.forName("org.apache.hadoop.fs.CacheFlag", false, - HadoopShims_2_2.class.getClassLoader()); - zcr = true; - } catch (ClassNotFoundException ce) { - } - zeroCopy = zcr; - boolean fastRead = false; - if (zcr) { - for (Method m : Text.class.getMethods()) { - if ("readWithKnownLength".equals(m.getName())) { - fastRead = true; - } - } - } - this.fastRead = fastRead; - } - - public DirectDecompressor getDirectDecompressor( - DirectCompressionType codec) { - return null; - } - - @Override - public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, - ByteBufferPoolShim pool - ) throws IOException { - if(zeroCopy) { - return ZeroCopyShims.getZeroCopyReader(in, pool); - } - /* not supported */ - return null; - } - - private static final class BasicTextReaderShim implements TextReaderShim { - private final InputStream in; - - public BasicTextReaderShim(InputStream in) { - this.in = in; - } - - @Override - public void read(Text txt, int len) throws IOException { - int offset = 0; - byte[] bytes = new byte[len]; - while (len > 0) { - int written = in.read(bytes, offset, len); - if (written < 0) { - throw new EOFException("Can't finish read from " + in + " read " - + (offset) + " bytes out of " + bytes.length); - } - len -= written; - offset += written; - } - txt.set(bytes); - } - } - - @Override - public TextReaderShim getTextReaderShim(InputStream in) throws IOException { - return new BasicTextReaderShim(in); - } -} diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java index 06e762ff8a..dd7ec6f886 100644 --- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java +++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -27,6 +27,7 @@ import java.util.TreeMap; import com.google.protobuf.CodedOutputStream; + import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -43,6 +44,7 @@ public class PhysicalFsWriter implements PhysicalWriter { private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class); private static final int HDFS_BUFFER_SIZE = 256 * 1024; + private static final HadoopShims shims = HadoopShimsFactory.get(); private final FSDataOutputStream rawWriter; // the compressed metadata information outStream @@ -66,6 +68,7 @@ public class PhysicalFsWriter implements PhysicalWriter { private long adjustedStripeSize; private long headerLength; private long stripeStart; + private long blockStart; private int metadataLength; private int footerLength; @@ -104,7 +107,7 @@ public CompressionCodec getCompressionCodec() { private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException { this.stripeStart = rawWriter.getPos(); final long currentStripeSize = indexSize + dataSize + footerSize; - final long available = blockSize - (stripeStart % blockSize); + final long available = blockSize - (stripeStart - blockStart); final long overflow = currentStripeSize - adjustedStripeSize; final float availRatio = (float) available / (float) defaultStripeSize; @@ -130,22 +133,18 @@ private void padStripe(long indexSize, long dataSize, int footerSize) throws IOE } if (availRatio < paddingTolerance && addBlockPadding) { - long padding = blockSize - (stripeStart % blockSize); - byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)]; + long padding = blockSize - (stripeStart - blockStart); LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)", padding, availRatio, defaultStripeSize)); - stripeStart += padding; - while (padding > 0) { - int writeLen = (int) Math.min(padding, pad.length); - rawWriter.write(pad, 0, writeLen); - padding -= writeLen; - } + stripeStart += shims.padStreamToBlock(rawWriter, padding); + blockStart = stripeStart; // new block adjustedStripeSize = defaultStripeSize; } else if (currentStripeSize < blockSize - && (stripeStart % blockSize) + currentStripeSize > blockSize) { + && (stripeStart - blockStart) + currentStripeSize > blockSize) { // even if you don't pad, reset the default stripe size when crossing a // block boundary adjustedStripeSize = defaultStripeSize; + blockStart = stripeStart + (stripeStart + currentStripeSize) % blockSize; } } diff --git a/java/pom.xml b/java/pom.xml index bc5f98951c..99738d026d 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -73,7 +73,7 @@ 1.8.1 2.2.0 - 2.6.4 + 2.7.3 2.1.1 2.2.1 3.4.6 @@ -244,6 +244,15 @@ 1.7 + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.7 + 1.7 + + org.apache.maven.plugins maven-shade-plugin diff --git a/java/shims/pom.xml b/java/shims/pom.xml index 07c9f995bc..a4b58c2eb1 100644 --- a/java/shims/pom.xml +++ b/java/shims/pom.xml @@ -41,6 +41,11 @@ hadoop-common ${hadoop.version} + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShims.java b/java/shims/src/java/org/apache/orc/impl/HadoopShims.java index 35cad81d65..3ba3b43185 100644 --- a/java/shims/src/java/org/apache/orc/impl/HadoopShims.java +++ b/java/shims/src/java/org/apache/orc/impl/HadoopShims.java @@ -19,12 +19,10 @@ package org.apache.orc.impl; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.util.VersionInfo; import java.io.Closeable; import java.io.IOException; -import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; public interface HadoopShims { @@ -44,15 +42,15 @@ interface DirectDecompressor { /** * Get a direct decompressor codec, if it is available - * @param codec - * @return + * @param codec the kind of decompressor that we need + * @return a direct decompressor or null, if it isn't available */ DirectDecompressor getDirectDecompressor(DirectCompressionType codec); /** * a hadoop.io ByteBufferPool shim. */ - public interface ByteBufferPoolShim { + interface ByteBufferPoolShim { /** * Get a new ByteBuffer from the pool. The pool can provide this from * removing a buffer from its internal cache, or by allocating a @@ -78,50 +76,37 @@ public interface ByteBufferPoolShim { /** * Provides an HDFS ZeroCopyReader shim. * @param in FSDataInputStream to read from (where the cached/mmap buffers are tied to) - * @param in ByteBufferPoolShim to allocate fallback buffers with + * @param pool ByteBufferPoolShim to allocate fallback buffers with * * @return returns null if not supported */ - public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException; + ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, ByteBufferPoolShim pool) throws IOException; - public interface ZeroCopyReaderShim extends Closeable { + interface ZeroCopyReaderShim extends Closeable { /** * Get a ByteBuffer from the FSDataInputStream - this can be either a HeapByteBuffer or an MappedByteBuffer. * Also move the in stream by that amount. The data read can be small than maxLength. * * @return ByteBuffer read from the stream, */ - public ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException; + ByteBuffer readBuffer(int maxLength, boolean verifyChecksums) throws IOException; /** * Release a ByteBuffer obtained from a read on the * Also move the in stream by that amount. The data read can be small than maxLength. * */ - public void releaseBuffer(ByteBuffer buffer); + void releaseBuffer(ByteBuffer buffer); /** * Close the underlying stream. - * @throws IOException */ - public void close() throws IOException; + void close() throws IOException; } /** - * Read data into a Text object in the fastest way possible + * Allow block boundaries to be reached by zero-fill or variable length block + * markers (in HDFS). + * @return the number of bytes written */ - public interface TextReaderShim { - /** - * @param txt - * @param size - * @throws IOException - */ - void read(Text txt, int size) throws IOException; - } - - /** - * Wrap a TextReaderShim around an input stream. The reader shim will not - * buffer any reads from the underlying stream and will only consume bytes - * which are required for TextReaderShim.read() input. - */ - public TextReaderShim getTextReaderShim(InputStream input) throws IOException; + long padStreamToBlock(OutputStream output, long padding) throws IOException; } diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java index 9f40272e5c..d9c9b6ed1f 100644 --- a/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java +++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsCurrent.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,81 +19,19 @@ package org.apache.orc.impl; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.snappy.SnappyDecompressor; -import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor; -import org.apache.hadoop.io.compress.zlib.ZlibDecompressor; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; -import java.io.DataInputStream; import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; +import java.io.OutputStream; +import java.util.EnumSet; /** * Shims for recent versions of Hadoop */ public class HadoopShimsCurrent implements HadoopShims { - private static class SnappyDirectDecompressWrapper implements DirectDecompressor { - private final SnappyDirectDecompressor root; - - SnappyDirectDecompressWrapper(SnappyDirectDecompressor root) { - this.root = root; - } - - public void decompress(ByteBuffer input, ByteBuffer output) throws IOException { - root.decompress(input, output); - } - - @Override - public void reset() { - root.reset(); - } - - @Override - public void end() { - root.end(); - } - } - - private static class ZlibDirectDecompressWrapper implements DirectDecompressor { - private final ZlibDecompressor.ZlibDirectDecompressor root; - - ZlibDirectDecompressWrapper(ZlibDecompressor.ZlibDirectDecompressor root) { - this.root = root; - } - - public void decompress(ByteBuffer input, ByteBuffer output) throws IOException { - root.decompress(input, output); - } - - @Override - public void reset() { - root.reset(); - } - - @Override - public void end() { - root.end(); - } - } - - public DirectDecompressor getDirectDecompressor( - DirectCompressionType codec) { - switch (codec) { - case ZLIB: - return new ZlibDirectDecompressWrapper - (new ZlibDecompressor.ZlibDirectDecompressor()); - case ZLIB_NOHEADER: - return new ZlibDirectDecompressWrapper - (new ZlibDecompressor.ZlibDirectDecompressor - (ZlibDecompressor.CompressionHeader.NO_HEADER, 0)); - case SNAPPY: - return new SnappyDirectDecompressWrapper - (new SnappyDecompressor.SnappyDirectDecompressor()); - default: - return null; - } + public DirectDecompressor getDirectDecompressor(DirectCompressionType codec) { + return HadoopShimsPre2_7.getDecompressor(codec); } @Override @@ -103,22 +41,17 @@ public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, return ZeroCopyShims.getZeroCopyReader(in, pool); } - private static final class FastTextReaderShim implements TextReaderShim { - private final DataInputStream din; - - public FastTextReaderShim(InputStream in) { - this.din = new DataInputStream(in); - } - - @Override - public void read(Text txt, int len) throws IOException { - txt.readWithKnownLength(din, len); + @Override + public long padStreamToBlock(OutputStream output, + long padding) throws IOException { + if (output instanceof HdfsDataOutputStream) { + ((HdfsDataOutputStream) output).hsync( + EnumSet.of(HdfsDataOutputStream.SyncFlag.END_BLOCK)); + return 0; // no padding + } else { + return HadoopShimsPre2_3.padStream(output, padding); } } - @Override - public TextReaderShim getTextReaderShim(InputStream in) throws IOException { - return new FastTextReaderShim(in); - } } diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java new file mode 100644 index 0000000000..3c676913a0 --- /dev/null +++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_3.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import org.apache.hadoop.fs.FSDataInputStream; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * Shims for versions of Hadoop up to and including 2.2.x + */ +public class HadoopShimsPre2_3 implements HadoopShims { + + HadoopShimsPre2_3() { + } + + public DirectDecompressor getDirectDecompressor( + DirectCompressionType codec) { + return null; + } + + @Override + public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, + ByteBufferPoolShim pool + ) throws IOException { + /* not supported */ + return null; + } + + private static final int BUFFER_SIZE = 256 * 1024; + + static long padStream(OutputStream output, + long padding) throws IOException { + byte[] pad = new byte[(int) Math.min(BUFFER_SIZE, padding)]; // always clear + while (padding > 0) { + int writeLen = (int) Math.min(padding, pad.length); + output.write(pad, 0, writeLen); + padding -= writeLen; + } + return padding; + } + + @Override + public long padStreamToBlock(OutputStream output, long padding) throws IOException { + return padStream(output, padding); + } + +} diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java new file mode 100644 index 0000000000..a2103538f6 --- /dev/null +++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_7.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.io.compress.snappy.SnappyDecompressor; +import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor; +import org.apache.hadoop.io.compress.zlib.ZlibDecompressor; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * Shims for versions of Hadoop less than 2.7 + */ +public class HadoopShimsPre2_7 implements HadoopShims { + + static class SnappyDirectDecompressWrapper implements DirectDecompressor { + private final SnappyDirectDecompressor root; + + SnappyDirectDecompressWrapper(SnappyDirectDecompressor root) { + this.root = root; + } + + public void decompress(ByteBuffer input, ByteBuffer output) throws IOException { + root.decompress(input, output); + } + + @Override + public void reset() { + root.reset(); + } + + @Override + public void end() { + root.end(); + } + } + + static class ZlibDirectDecompressWrapper implements DirectDecompressor { + private final ZlibDecompressor.ZlibDirectDecompressor root; + + ZlibDirectDecompressWrapper(ZlibDecompressor.ZlibDirectDecompressor root) { + this.root = root; + } + + public void decompress(ByteBuffer input, ByteBuffer output) throws IOException { + root.decompress(input, output); + } + + @Override + public void reset() { + root.reset(); + } + + @Override + public void end() { + root.end(); + } + } + + static DirectDecompressor getDecompressor( DirectCompressionType codec) { + switch (codec) { + case ZLIB: + return new ZlibDirectDecompressWrapper + (new ZlibDecompressor.ZlibDirectDecompressor()); + case ZLIB_NOHEADER: + return new ZlibDirectDecompressWrapper + (new ZlibDecompressor.ZlibDirectDecompressor + (ZlibDecompressor.CompressionHeader.NO_HEADER, 0)); + case SNAPPY: + return new SnappyDirectDecompressWrapper + (new SnappyDecompressor.SnappyDirectDecompressor()); + default: + return null; + } + } + + public DirectDecompressor getDirectDecompressor( DirectCompressionType codec) { + return getDecompressor(codec); + } + + @Override + public ZeroCopyReaderShim getZeroCopyReader(FSDataInputStream in, + ByteBufferPoolShim pool + ) throws IOException { + return ZeroCopyShims.getZeroCopyReader(in, pool); + } + + @Override + public long padStreamToBlock(OutputStream output, + long padding) throws IOException { + return HadoopShimsPre2_3.padStream(output, padding); + } + +} From 98e7fab4e8492b01f43d162bbdf4a5a801d4bae9 Mon Sep 17 00:00:00 2001 From: Owen O'Malley Date: Tue, 5 Sep 2017 09:30:01 -0700 Subject: [PATCH 2/2] ORC-235: Update the exclusion of recursive dependencies. Signed-off-by: Owen O'Malley --- java/pom.xml | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/java/pom.xml b/java/pom.xml index 99738d026d..ed3607ed99 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -357,6 +357,12 @@ com.opencsv opencsv 3.9 + + + commons-beanutils + commons-beanutils + + commons-cli @@ -377,6 +383,12 @@ io.airlift aircompressor 0.8 + + + io.airlift + slice + + org.apache.avro @@ -407,6 +419,10 @@ com.sun.jersey jersey-json + + commons-beanutils + commons-beanutils-core + commons-daemon commons-daemon @@ -431,6 +447,10 @@ net.java.dev.jets3t jets3t + + org.tukaani + xz + org.apache.curator curator-recipes @@ -533,6 +553,10 @@ org.apache.avro avro + + org.fusesource.leveldbjni + leveldbjni-all + org.mortbay.jetty jetty