From 3f72171ee6ceb72c3506a1dcac5a9fe1d6a10265 Mon Sep 17 00:00:00 2001 From: Gopal V Date: Thu, 20 Jul 2017 00:34:01 -0700 Subject: [PATCH 1/2] ORC-214. Upgrade Aircompressor to 0.8 --- java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/pom.xml b/java/pom.xml index 879c6f034f..c0c212aec7 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -364,7 +364,7 @@ io.airlift aircompressor - 0.3 + 0.8 org.apache.avro From ebd0e135ed0ade47434b118e9b17570c80e581c0 Mon Sep 17 00:00:00 2001 From: Gopal V Date: Mon, 24 Jul 2017 21:15:57 -0700 Subject: [PATCH 2/2] ORC-91: Use hdfs v-blocks instead of zero-padding stripes --- java/core/pom.xml | 14 ++++++ .../java/org/apache/orc/impl/HadoopShims.java | 39 +++++++++++++++ .../apache/orc/impl/HadoopShimsCurrent.java | 47 +++++++++++++++++++ .../org/apache/orc/impl/HadoopShims_2_2.java | 9 ++++ .../org/apache/orc/impl/PhysicalFsWriter.java | 22 +++++---- java/pom.xml | 11 ++++- 6 files changed, 131 insertions(+), 11 deletions(-) diff --git a/java/core/pom.xml b/java/core/pom.xml index eaec422f2a..727d552245 100644 --- a/java/core/pom.xml +++ b/java/core/pom.xml @@ -49,6 +49,11 @@ org.apache.hadoop hadoop-common + + org.apache.hadoop + hadoop-hdfs + provided + org.apache.hive hive-storage-api @@ -98,6 +103,15 @@ org.apache.maven.plugins maven-compiler-plugin + + org.apache.maven.plugins + maven-surefire-plugin + + + org.apache.hadoop:hadoop-hdfs + + + org.apache.maven.plugins maven-javadoc-plugin diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShims.java b/java/core/src/java/org/apache/orc/impl/HadoopShims.java index 304737646a..0afc297c81 100644 --- a/java/core/src/java/org/apache/orc/impl/HadoopShims.java +++ b/java/core/src/java/org/apache/orc/impl/HadoopShims.java @@ -19,12 +19,14 @@ package org.apache.orc.impl; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import org.apache.hadoop.util.VersionInfo; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; public interface HadoopShims { @@ -124,6 +126,43 @@ public interface TextReaderShim { */ public TextReaderShim getTextReaderShim(InputStream input) throws IOException; + + /** + * Block filler shim - make sure the DFS blocks ends at this offset. + */ + public interface BlockFillerShim { + /** + * Allow block boundaries to be reached by zero-fill or variable length block markers (in HDFS). + * @return the number of bytes written + */ + long fill(OutputStream output, long padding) throws IOException; + } + + /** + * Default implementation of BlockFillerShim + */ + public class ZeroFillerShim implements BlockFillerShim { + private static final int BUFFER_SIZE = 256 * 1024; + + @Override + public long fill(OutputStream output, long padding) throws IOException { + byte[] pad = new byte[(int) Math.min(BUFFER_SIZE, padding)]; // always clear + while (padding > 0) { + int writeLen = (int) Math.min(padding, pad.length); + output.write(pad, 0, writeLen); + padding -= writeLen; + } + return padding; + } + } + + /** + * + * @return + * @throws IOException + */ + public BlockFillerShim getBlockFillerShim(FileSystem fs) throws IOException; + class Factory { private static HadoopShims SHIMS = null; diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java b/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java index 9f40272e5c..ed1319050a 100644 --- a/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java +++ b/java/core/src/java/org/apache/orc/impl/HadoopShimsCurrent.java @@ -19,6 +19,11 @@ package org.apache.orc.impl; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Syncable; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.snappy.SnappyDecompressor; import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor; @@ -27,7 +32,9 @@ import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.EnumSet; /** * Shims for recent versions of Hadoop @@ -121,4 +128,44 @@ public TextReaderShim getTextReaderShim(InputStream in) throws IOException { return new FastTextReaderShim(in); } + private static class VariableBlockFillerShim implements BlockFillerShim { + static final EnumSet variableBlocksSupported = checkVariableBlocksSupport(); + static final BlockFillerShim fallback = new ZeroFillerShim(); + + private static EnumSet checkVariableBlocksSupport() { + if (SyncFlag.values().length > 1) { + // make sure this can compile on 2.6.x + return EnumSet.of(SyncFlag.valueOf("END_BLOCK")); + } + return null; + } + + @Override + public long fill(OutputStream output, long padding) throws IOException { + if (padding == 0) { + return 0; + } + if (output instanceof HdfsDataOutputStream && variableBlocksSupported != null) { + ((HdfsDataOutputStream) output).hsync(variableBlocksSupported); + return 0; // no padding + } + return fallback.fill(output, padding); + } + } + + @Override + public BlockFillerShim getBlockFillerShim(FileSystem fs) throws IOException { + // this is currently specialized by name, because the direct class access breaks + if (fs.getClass().getName().equals("org.apache.hadoop.hdfs.DistributedFileSystem")) { + try { + // use class name lookups + return (BlockFillerShim) Class.forName( + "org.apache.orc.impl.HadoopShimsCurrent.VariableBlockFillerShim").newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + // fall through to old implementation + } + } + return new ZeroFillerShim(); + } + } diff --git a/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java b/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java index 501101fff3..0f19fa8dd2 100644 --- a/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java +++ b/java/core/src/java/org/apache/orc/impl/HadoopShims_2_2.java @@ -19,11 +19,13 @@ package org.apache.orc.impl; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.io.Text; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.lang.reflect.Method; /** @@ -98,4 +100,11 @@ public void read(Text txt, int len) throws IOException { public TextReaderShim getTextReaderShim(InputStream in) throws IOException { return new BasicTextReaderShim(in); } + + @Override + public BlockFillerShim getBlockFillerShim(FileSystem fs) { + // no special cases + return new ZeroFillerShim(); + } + } diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java index 1769182ab3..bc75e29d41 100644 --- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java +++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -27,6 +27,7 @@ import java.util.TreeMap; import com.google.protobuf.CodedOutputStream; + import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -35,6 +36,7 @@ import org.apache.orc.OrcFile; import org.apache.orc.OrcProto; import org.apache.orc.PhysicalWriter; +import org.apache.orc.impl.HadoopShims.BlockFillerShim; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,8 +45,10 @@ public class PhysicalFsWriter implements PhysicalWriter { private static final Logger LOG = LoggerFactory.getLogger(PhysicalFsWriter.class); private static final int HDFS_BUFFER_SIZE = 256 * 1024; + private static final HadoopShims shims = HadoopShims.Factory.get(); private final FSDataOutputStream rawWriter; + private final BlockFillerShim blockFiller; // the compressed metadata information outStream private OutStream writer = null; // a protobuf outStream around streamFactory @@ -66,6 +70,7 @@ public class PhysicalFsWriter implements PhysicalWriter { private long adjustedStripeSize; private long headerLength; private long stripeStart; + private long blockStart; private int metadataLength; private int footerLength; @@ -94,6 +99,7 @@ public PhysicalFsWriter(FileSystem fs, writer = new OutStream("metadata", bufferSize, codec, new DirectStream(rawWriter)); protobufWriter = CodedOutputStream.newInstance(writer); + blockFiller = shims.getBlockFillerShim(fs); } @Override @@ -104,7 +110,7 @@ public CompressionCodec getCompressionCodec() { private void padStripe(long indexSize, long dataSize, int footerSize) throws IOException { this.stripeStart = rawWriter.getPos(); final long currentStripeSize = indexSize + dataSize + footerSize; - final long available = blockSize - (stripeStart % blockSize); + final long available = blockSize - (stripeStart - blockStart); final long overflow = currentStripeSize - adjustedStripeSize; final float availRatio = (float) available / (float) defaultStripeSize; @@ -130,22 +136,18 @@ private void padStripe(long indexSize, long dataSize, int footerSize) throws IOE } if (availRatio < paddingTolerance && addBlockPadding) { - long padding = blockSize - (stripeStart % blockSize); - byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)]; + long padding = blockSize - (stripeStart - blockStart); LOG.info(String.format("Padding ORC by %d bytes (<= %.2f * %d)", padding, availRatio, defaultStripeSize)); - stripeStart += padding; - while (padding > 0) { - int writeLen = (int) Math.min(padding, pad.length); - rawWriter.write(pad, 0, writeLen); - padding -= writeLen; - } + stripeStart += blockFiller.fill(rawWriter, padding); + blockStart = stripeStart; // new block adjustedStripeSize = defaultStripeSize; } else if (currentStripeSize < blockSize - && (stripeStart % blockSize) + currentStripeSize > blockSize) { + && (stripeStart - blockStart) + currentStripeSize > blockSize) { // even if you don't pad, reset the default stripe size when crossing a // block boundary adjustedStripeSize = defaultStripeSize; + blockStart = stripeStart + (stripeStart + currentStripeSize) % blockSize; } } diff --git a/java/pom.xml b/java/pom.xml index c0c212aec7..fd06f41f77 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -71,7 +71,7 @@ ${project.basedir}/../../examples 1.8.1 - 2.6.4 + 2.7.3 2.1.1 2.2.1 3.4.6 @@ -246,6 +246,15 @@ 1.7 + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.7 + 1.7 + + org.apache.maven.plugins maven-shade-plugin