From d9727cb36c9b1e9f1c045e29d928401a680062cb Mon Sep 17 00:00:00 2001 From: sergey Date: Wed, 28 Feb 2018 16:00:51 -0800 Subject: [PATCH 1/5] ORC-310 better error handling for codec --- .../org/apache/orc/impl/OrcCodecPool.java | 15 +++++- .../src/java/org/apache/orc/impl/OrcTail.java | 4 +- .../org/apache/orc/impl/PhysicalFsWriter.java | 7 ++- .../java/org/apache/orc/impl/ReaderImpl.java | 12 +++-- .../org/apache/orc/impl/RecordReaderImpl.java | 6 ++- .../apache/orc/impl/RecordReaderUtils.java | 54 ++++++++++++++----- .../java/org/apache/orc/impl/WriterImpl.java | 6 ++- 7 files changed, 80 insertions(+), 24 deletions(-) diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java index 56b98969a7..34f0bd4623 100644 --- a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java +++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java @@ -18,10 +18,8 @@ package org.apache.orc.impl; import java.util.concurrent.ConcurrentHashMap; - import java.util.ArrayList; import java.util.List; - import org.apache.orc.CompressionCodec; import org.apache.orc.CompressionKind; import org.slf4j.Logger; @@ -62,6 +60,19 @@ public static CompressionCodec getCodec(CompressionKind kind) { return codec; } + public static void returnCodecSafely( + CompressionKind kind, CompressionCodec codec, boolean hasError) { + try { + if (!hasError) { + returnCodec(kind, codec); + } else { + codec.close(); + } + } catch (Exception ex) { + LOG.error("Ignoring codec cleanup error", ex); + } + } + public static void returnCodec(CompressionKind kind, CompressionCodec codec) { if (codec == null) { return; diff --git a/java/core/src/java/org/apache/orc/impl/OrcTail.java b/java/core/src/java/org/apache/orc/impl/OrcTail.java index 3c78874dee..01365114b4 100644 --- a/java/core/src/java/org/apache/orc/impl/OrcTail.java +++ b/java/core/src/java/org/apache/orc/impl/OrcTail.java @@ -107,11 +107,13 @@ public List getStripeStatisticsProto() throws IOExcep if (serializedTail == null) return null; if (metadata == null) { CompressionCodec codec = OrcCodecPool.getCodec(getCompressionKind()); + boolean isCodecError = true; try { metadata = extractMetadata(serializedTail, 0, (int) fileTail.getPostscript().getMetadataLength(), codec, getCompressionBufferSize()); + isCodecError = false; } finally { - OrcCodecPool.returnCodec(getCompressionKind(), codec); + OrcCodecPool.returnCodecSafely(getCompressionKind(), codec, isCodecError); } // clear does not clear the contents but sets position to 0 and limit = capacity serializedTail.clear(); diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java index 918fae8d34..5d7f75aaab 100644 --- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java +++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -225,7 +225,12 @@ public long writePostScript(OrcProto.PostScript.Builder builder) throws IOExcept @Override public void close() throws IOException { - OrcCodecPool.returnCodec(compress, codec); + // We don't use the codec directly but do give it out codec in getCompressionCodec; + // that is used in tests, for boolean checks, and in StreamFactory. Some of the changes that + // would get rid of this pattern require cross-project interface changes, so just return the + // codec for now. If the codec is broken, reset will usually throw, so this is still the\ + // correct thing to do. + OrcCodecPool.returnCodecSafely(compress, codec, false); rawWriter.close(); } diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java index 0daa0ed2ed..94d693b455 100644 --- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java @@ -463,6 +463,7 @@ public static OrcTail extractFileTail(ByteBuffer buffer, long fileLength, long m CompressionKind kind = CompressionKind.valueOf(ps.getCompression().name()); OrcProto.FileTail.Builder fileTailBuilder; CompressionCodec codec = OrcCodecPool.getCodec(kind); + boolean isCodecError = true; try { OrcProto.Footer footer = extractFooter(buffer, (int) (buffer.position() + ps.getMetadataLength()), @@ -472,8 +473,9 @@ public static OrcTail extractFileTail(ByteBuffer buffer, long fileLength, long m .setPostscript(ps) .setFooter(footer) .setFileLength(fileLength); + isCodecError = false; } finally { - OrcCodecPool.returnCodec(kind, codec); + OrcCodecPool.returnCodecSafely(kind, codec, isCodecError); } // clear does not clear the contents but sets position to 0 and limit = capacity buffer.clear(); @@ -593,10 +595,12 @@ protected OrcTail extractFileTail(FileSystem fs, Path path, buffer.reset(); OrcProto.Footer footer; CompressionCodec codec = OrcCodecPool.getCodec(compressionKind); + boolean isCodecError = true; try { footer = extractFooter(footerBuffer, 0, footerSize, codec, bufferSize); + isCodecError = false; } finally { - OrcCodecPool.returnCodec(compressionKind, codec); + OrcCodecPool.returnCodecSafely(compressionKind, codec, isCodecError); } fileTailBuilder.setFooter(footer); } @@ -782,10 +786,12 @@ public List getOrcProtoFileStatistics() { public List getStripeStatistics() throws IOException { if (metadata == null) { CompressionCodec codec = OrcCodecPool.getCodec(compressionKind); + boolean isCodecError = true; try { metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize, codec, bufferSize); + isCodecError = false; } finally { - OrcCodecPool.returnCodec(compressionKind, codec); + OrcCodecPool.returnCodecSafely(compressionKind, codec, isCodecError); } } if (stripeStats == null) { diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java index 0dacc70a44..d7722d17aa 100644 --- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java @@ -17,8 +17,9 @@ */ package org.apache.orc.impl; -import org.apache.orc.CompressionKind; +import com.google.common.annotations.VisibleForTesting; +import org.apache.orc.CompressionKind; import java.io.IOException; import java.math.BigDecimal; import java.sql.Date; @@ -29,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; - import org.apache.orc.OrcFile; import org.apache.orc.util.BloomFilter; import org.apache.orc.util.BloomFilterIO; @@ -1353,6 +1353,8 @@ public static int[] mapTranslatedSargColumns( return result; } + // TODO: remove this + @VisibleForTesting public CompressionCodec getCompressionCodec() { return dataReader.getCompressionCodec(); } diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java index 1e2d0f129f..b486571598 100644 --- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java @@ -149,6 +149,7 @@ private static class DefaultDataReader implements DataReader { private final Path path; private final boolean useZeroCopy; private final CompressionCodec codec; + private boolean hasCodecError = false; private final int bufferSize; private final int typeCount; private CompressionKind compressionKind; @@ -172,6 +173,7 @@ private DefaultDataReader(DataReaderProperties properties) { public void open() throws IOException { this.file = fs.open(path); if (useZeroCopy) { + // ZCR only uses codec for boolean checks. zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool); } else { zcr = null; @@ -228,11 +230,18 @@ public OrcIndex readRowIndex(StripeInformation stripe, ByteBuffer bb = range.getData().duplicate(); bb.position((int) (offset - range.getOffset())); bb.limit((int) (bb.position() + stream.getLength())); - indexes[column] = OrcProto.RowIndex.parseFrom( - InStream.createCodedInputStream("index", - ReaderImpl.singleton(new BufferChunk(bb, 0)), - stream.getLength(), - codec, bufferSize)); + boolean isOk = false; + try { + indexes[column] = OrcProto.RowIndex.parseFrom( + InStream.createCodedInputStream("index", + ReaderImpl.singleton(new BufferChunk(bb, 0)), + stream.getLength(), codec, bufferSize)); + isOk = true; + } finally { + if (!isOk) { + hasCodecError = true; + } + } } break; case BLOOM_FILTER: @@ -241,10 +250,18 @@ public OrcIndex readRowIndex(StripeInformation stripe, ByteBuffer bb = range.getData().duplicate(); bb.position((int) (offset - range.getOffset())); bb.limit((int) (bb.position() + stream.getLength())); - bloomFilterIndices[column] = OrcProto.BloomFilterIndex.parseFrom - (InStream.createCodedInputStream("bloom_filter", - ReaderImpl.singleton(new BufferChunk(bb, 0)), - stream.getLength(), codec, bufferSize)); + boolean isOk = false; + try { + bloomFilterIndices[column] = OrcProto.BloomFilterIndex.parseFrom + (InStream.createCodedInputStream("bloom_filter", + ReaderImpl.singleton(new BufferChunk(bb, 0)), + stream.getLength(), codec, bufferSize)); + isOk = true; + } finally { + if (!isOk) { + hasCodecError = true; + } + } } break; default: @@ -267,9 +284,18 @@ public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws I // read the footer ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength); - return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer", - ReaderImpl.singleton(new BufferChunk(tailBuf, 0)), - tailLength, codec, bufferSize)); + boolean isOk = false; + try { + OrcProto.StripeFooter result = OrcProto.StripeFooter.parseFrom( + InStream.createCodedInputStream("footer", ReaderImpl.singleton( + new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize)); + isOk = true; + return result; + } finally { + if (!isOk) { + hasCodecError = true; + } + } } @Override @@ -281,7 +307,7 @@ public DiskRangeList readFileData( @Override public void close() throws IOException { if (codec != null) { - OrcCodecPool.returnCodec(compressionKind, codec); + OrcCodecPool.returnCodecSafely(compressionKind, codec, hasCodecError); } if (pool != null) { pool.clear(); @@ -315,6 +341,8 @@ public DataReader clone() { @Override public CompressionCodec getCompressionCodec() { + // Note: see comments in PhysicalFsWriter; we should probably get rid of this usage + // pattern to make error handling for codec pool more robust. return codec; } } diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java index 90b410c97a..343f565745 100644 --- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java +++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java @@ -18,6 +18,8 @@ package org.apache.orc.impl; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -26,7 +28,6 @@ import java.util.Map; import java.util.TimeZone; import java.util.TreeMap; - import io.airlift.compress.lz4.Lz4Compressor; import io.airlift.compress.lz4.Lz4Decompressor; import io.airlift.compress.lzo.LzoCompressor; @@ -50,7 +51,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; - import com.google.protobuf.ByteString; /** @@ -645,6 +645,8 @@ public ColumnStatistics[] getStatistics() return ReaderImpl.deserializeStats(builder.getStatisticsList()); } + // TODO: remove this + @VisibleForTesting public CompressionCodec getCompressionCodec() { return physicalWriter.getCompressionCodec(); } From 06c93b30b9b075969c984e0c60f783572c6a4a24 Mon Sep 17 00:00:00 2001 From: sergey Date: Thu, 1 Mar 2018 15:41:45 -0800 Subject: [PATCH 2/5] ORC-310 CR 1 --- .../java/org/apache/orc/impl/OrcCodecPool.java | 17 +++++++++++++++-- .../org/apache/orc/impl/RecordReaderImpl.java | 5 +---- .../java/org/apache/orc/impl/WriterImpl.java | 5 +---- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java index 34f0bd4623..4ae43a2c73 100644 --- a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java +++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java @@ -60,10 +60,17 @@ public static CompressionCodec getCodec(CompressionKind kind) { return codec; } + /** + * Returns the codec to the pool or closes it, suppressing exceptions. + * @param kind Compression kind. + * @param codec Codec. + * @param observedError Whether the caller has definitely observed an error with the codec. + * The pool will also do basic correctness check on the codec itself. + */ public static void returnCodecSafely( - CompressionKind kind, CompressionCodec codec, boolean hasError) { + CompressionKind kind, CompressionCodec codec, boolean observedError) { try { - if (!hasError) { + if (!observedError) { returnCodec(kind, codec); } else { codec.close(); @@ -73,6 +80,12 @@ public static void returnCodecSafely( } } + /** + * Returns the codec to the pool, potentially failing if the codec cannot be reused, or if + * the codec is unneeded and cannot be closed. + * @param kind Compression kind. + * @param codec Codec. + */ public static void returnCodec(CompressionKind kind, CompressionCodec codec) { if (codec == null) { return; diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java index d7722d17aa..ff8dfd77de 100644 --- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java @@ -17,8 +17,6 @@ */ package org.apache.orc.impl; -import com.google.common.annotations.VisibleForTesting; - import org.apache.orc.CompressionKind; import java.io.IOException; import java.math.BigDecimal; @@ -1353,8 +1351,7 @@ public static int[] mapTranslatedSargColumns( return result; } - // TODO: remove this - @VisibleForTesting + // TODO: pollutes the interface for tests only - we should remove this. public CompressionCodec getCompressionCodec() { return dataReader.getCompressionCodec(); } diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java index 343f565745..0186445bd8 100644 --- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java +++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java @@ -18,8 +18,6 @@ package org.apache.orc.impl; -import com.google.common.annotations.VisibleForTesting; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -645,8 +643,7 @@ public ColumnStatistics[] getStatistics() return ReaderImpl.deserializeStats(builder.getStatisticsList()); } - // TODO: remove this - @VisibleForTesting + // TODO: pollutes the interface for tests only - we should remove this. public CompressionCodec getCompressionCodec() { return physicalWriter.getCompressionCodec(); } From c6a58c67051b136246030153e798980200b47f1a Mon Sep 17 00:00:00 2001 From: sergey Date: Thu, 8 Mar 2018 17:13:27 -0800 Subject: [PATCH 3/5] update --- .../org/apache/orc/impl/OrcCodecPool.java | 3 +++ .../org/apache/orc/impl/PhysicalFsWriter.java | 8 ++++--- .../apache/orc/impl/RecordReaderUtils.java | 21 ++++++++++++------- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java index 4ae43a2c73..8269316986 100644 --- a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java +++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java @@ -69,6 +69,9 @@ public static CompressionCodec getCodec(CompressionKind kind) { */ public static void returnCodecSafely( CompressionKind kind, CompressionCodec codec, boolean observedError) { + if (codec == null) { + return; + } try { if (!observedError) { returnCodec(kind, codec); diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java index 5d7f75aaab..3f8e4e236f 100644 --- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java +++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -46,7 +46,7 @@ public class PhysicalFsWriter implements PhysicalWriter { private static final int HDFS_BUFFER_SIZE = 256 * 1024; private static final HadoopShims shims = HadoopShimsFactory.get(); - private final FSDataOutputStream rawWriter; + private FSDataOutputStream rawWriter; // the compressed metadata information outStream private OutStream writer = null; // a protobuf outStream around streamFactory @@ -58,7 +58,7 @@ public class PhysicalFsWriter implements PhysicalWriter { private final double paddingTolerance; private final long defaultStripeSize; private final CompressionKind compress; - private final CompressionCodec codec; + private CompressionCodec codec; private final boolean addBlockPadding; // the streams that make up the current stripe @@ -228,10 +228,12 @@ public void close() throws IOException { // We don't use the codec directly but do give it out codec in getCompressionCodec; // that is used in tests, for boolean checks, and in StreamFactory. Some of the changes that // would get rid of this pattern require cross-project interface changes, so just return the - // codec for now. If the codec is broken, reset will usually throw, so this is still the\ + // codec for now. If the codec is broken, reset will usually throw, so this is still the // correct thing to do. OrcCodecPool.returnCodecSafely(compress, codec, false); + codec = null; rawWriter.close(); + rawWriter = null; } @Override diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java index b486571598..9d9e31aa5d 100644 --- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java @@ -143,12 +143,12 @@ public static DiskRangeList planIndexReading(TypeDescription fileSchema, private static class DefaultDataReader implements DataReader { private FSDataInputStream file = null; - private final ByteBufferAllocatorPool pool; + private ByteBufferAllocatorPool pool; private HadoopShims.ZeroCopyReaderShim zcr = null; private final FileSystem fs; private final Path path; private final boolean useZeroCopy; - private final CompressionCodec codec; + private CompressionCodec codec; private boolean hasCodecError = false; private final int bufferSize; private final int typeCount; @@ -162,11 +162,6 @@ private DefaultDataReader(DataReaderProperties properties) { this.codec = OrcCodecPool.getCodec(compressionKind); this.bufferSize = properties.getBufferSize(); this.typeCount = properties.getTypeCount(); - if (useZeroCopy) { - this.pool = new ByteBufferAllocatorPool(); - } else { - this.pool = null; - } } @Override @@ -174,6 +169,7 @@ public void open() throws IOException { this.file = fs.open(path); if (useZeroCopy) { // ZCR only uses codec for boolean checks. + pool = new ByteBufferAllocatorPool(); zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool); } else { zcr = null; @@ -308,6 +304,7 @@ public DiskRangeList readFileData( public void close() throws IOException { if (codec != null) { OrcCodecPool.returnCodecSafely(compressionKind, codec, hasCodecError); + codec = null; } if (pool != null) { pool.clear(); @@ -316,6 +313,7 @@ public void close() throws IOException { try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) { if (file != null) { file.close(); + file = null; } } } @@ -332,8 +330,15 @@ public void releaseBuffer(ByteBuffer buffer) { @Override public DataReader clone() { + if (this.file != null) { + throw new UnsupportedOperationException( + "Cannot clone a DataReader that is already opened"); + } try { - return (DataReader) super.clone(); + DefaultDataReader clone = (DefaultDataReader) super.clone(); + // Make sure we don't share the same codec between two readers. + clone.codec = OrcCodecPool.getCodec(clone.compressionKind); + return clone; } catch (CloneNotSupportedException e) { throw new UnsupportedOperationException("uncloneable", e); } From 127d205af285711333171a9a8f839f9de0a0a3c9 Mon Sep 17 00:00:00 2001 From: sergey Date: Mon, 12 Mar 2018 14:16:54 -0700 Subject: [PATCH 4/5] update 2 --- .../org/apache/orc/impl/RecordReaderUtils.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java index 9d9e31aa5d..7e8d650b1d 100644 --- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java @@ -17,13 +17,15 @@ */ package org.apache.orc.impl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; - import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -35,7 +37,6 @@ import org.apache.orc.DataReader; import org.apache.orc.OrcFile; import org.apache.orc.OrcProto; - import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; @@ -44,6 +45,7 @@ */ public class RecordReaderUtils { private static final HadoopShims SHIMS = HadoopShimsFactory.get(); + private static final Logger LOG = LoggerFactory.getLogger(RecordReaderUtils.class); static boolean hadBadBloomFilters(TypeDescription.Category category, OrcFile.WriterVersion version) { @@ -331,13 +333,16 @@ public void releaseBuffer(ByteBuffer buffer) { @Override public DataReader clone() { if (this.file != null) { - throw new UnsupportedOperationException( - "Cannot clone a DataReader that is already opened"); + // We should really throw here, but that will cause failures in Hive. + // While Hive uses clone, just log a warning. + LOG.warn("Cloning an opened DataReader; the stream will be reused and closed twice"); } try { DefaultDataReader clone = (DefaultDataReader) super.clone(); - // Make sure we don't share the same codec between two readers. - clone.codec = OrcCodecPool.getCodec(clone.compressionKind); + if (codec != null) { + // Make sure we don't share the same codec between two readers. + clone.codec = OrcCodecPool.getCodec(clone.compressionKind); + } return clone; } catch (CloneNotSupportedException e) { throw new UnsupportedOperationException("uncloneable", e); From 19c1b554c81f84eba5281edb17b547bd43dfdd14 Mon Sep 17 00:00:00 2001 From: sergey Date: Wed, 14 Mar 2018 12:18:57 -0700 Subject: [PATCH 5/5] remove the error flag --- .../org/apache/orc/impl/OrcCodecPool.java | 10 +--- .../src/java/org/apache/orc/impl/OrcTail.java | 4 +- .../org/apache/orc/impl/PhysicalFsWriter.java | 5 +- .../java/org/apache/orc/impl/ReaderImpl.java | 12 ++--- .../apache/orc/impl/RecordReaderUtils.java | 50 +++++-------------- 5 files changed, 20 insertions(+), 61 deletions(-) diff --git a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java index 8269316986..63f30709ec 100644 --- a/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java +++ b/java/core/src/java/org/apache/orc/impl/OrcCodecPool.java @@ -64,20 +64,14 @@ public static CompressionCodec getCodec(CompressionKind kind) { * Returns the codec to the pool or closes it, suppressing exceptions. * @param kind Compression kind. * @param codec Codec. - * @param observedError Whether the caller has definitely observed an error with the codec. - * The pool will also do basic correctness check on the codec itself. */ public static void returnCodecSafely( - CompressionKind kind, CompressionCodec codec, boolean observedError) { + CompressionKind kind, CompressionCodec codec) { if (codec == null) { return; } try { - if (!observedError) { - returnCodec(kind, codec); - } else { - codec.close(); - } + returnCodec(kind, codec); } catch (Exception ex) { LOG.error("Ignoring codec cleanup error", ex); } diff --git a/java/core/src/java/org/apache/orc/impl/OrcTail.java b/java/core/src/java/org/apache/orc/impl/OrcTail.java index 01365114b4..3fb7bd0c3c 100644 --- a/java/core/src/java/org/apache/orc/impl/OrcTail.java +++ b/java/core/src/java/org/apache/orc/impl/OrcTail.java @@ -107,13 +107,11 @@ public List getStripeStatisticsProto() throws IOExcep if (serializedTail == null) return null; if (metadata == null) { CompressionCodec codec = OrcCodecPool.getCodec(getCompressionKind()); - boolean isCodecError = true; try { metadata = extractMetadata(serializedTail, 0, (int) fileTail.getPostscript().getMetadataLength(), codec, getCompressionBufferSize()); - isCodecError = false; } finally { - OrcCodecPool.returnCodecSafely(getCompressionKind(), codec, isCodecError); + OrcCodecPool.returnCodecSafely(getCompressionKind(), codec); } // clear does not clear the contents but sets position to 0 and limit = capacity serializedTail.clear(); diff --git a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java index 3f8e4e236f..057179262e 100644 --- a/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java +++ b/java/core/src/java/org/apache/orc/impl/PhysicalFsWriter.java @@ -228,9 +228,8 @@ public void close() throws IOException { // We don't use the codec directly but do give it out codec in getCompressionCodec; // that is used in tests, for boolean checks, and in StreamFactory. Some of the changes that // would get rid of this pattern require cross-project interface changes, so just return the - // codec for now. If the codec is broken, reset will usually throw, so this is still the - // correct thing to do. - OrcCodecPool.returnCodecSafely(compress, codec, false); + // codec for now. + OrcCodecPool.returnCodecSafely(compress, codec); codec = null; rawWriter.close(); rawWriter = null; diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java index 94d693b455..aee8b5e0cb 100644 --- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java +++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java @@ -463,7 +463,6 @@ public static OrcTail extractFileTail(ByteBuffer buffer, long fileLength, long m CompressionKind kind = CompressionKind.valueOf(ps.getCompression().name()); OrcProto.FileTail.Builder fileTailBuilder; CompressionCodec codec = OrcCodecPool.getCodec(kind); - boolean isCodecError = true; try { OrcProto.Footer footer = extractFooter(buffer, (int) (buffer.position() + ps.getMetadataLength()), @@ -473,9 +472,8 @@ public static OrcTail extractFileTail(ByteBuffer buffer, long fileLength, long m .setPostscript(ps) .setFooter(footer) .setFileLength(fileLength); - isCodecError = false; } finally { - OrcCodecPool.returnCodecSafely(kind, codec, isCodecError); + OrcCodecPool.returnCodecSafely(kind, codec); } // clear does not clear the contents but sets position to 0 and limit = capacity buffer.clear(); @@ -595,12 +593,10 @@ protected OrcTail extractFileTail(FileSystem fs, Path path, buffer.reset(); OrcProto.Footer footer; CompressionCodec codec = OrcCodecPool.getCodec(compressionKind); - boolean isCodecError = true; try { footer = extractFooter(footerBuffer, 0, footerSize, codec, bufferSize); - isCodecError = false; } finally { - OrcCodecPool.returnCodecSafely(compressionKind, codec, isCodecError); + OrcCodecPool.returnCodecSafely(compressionKind, codec); } fileTailBuilder.setFooter(footer); } @@ -786,12 +782,10 @@ public List getOrcProtoFileStatistics() { public List getStripeStatistics() throws IOException { if (metadata == null) { CompressionCodec codec = OrcCodecPool.getCodec(compressionKind); - boolean isCodecError = true; try { metadata = extractMetadata(tail.getSerializedTail(), 0, metadataSize, codec, bufferSize); - isCodecError = false; } finally { - OrcCodecPool.returnCodecSafely(compressionKind, codec, isCodecError); + OrcCodecPool.returnCodecSafely(compressionKind, codec); } } if (stripeStats == null) { diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java index 7e8d650b1d..c66400d492 100644 --- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java @@ -151,7 +151,6 @@ private static class DefaultDataReader implements DataReader { private final Path path; private final boolean useZeroCopy; private CompressionCodec codec; - private boolean hasCodecError = false; private final int bufferSize; private final int typeCount; private CompressionKind compressionKind; @@ -228,18 +227,10 @@ public OrcIndex readRowIndex(StripeInformation stripe, ByteBuffer bb = range.getData().duplicate(); bb.position((int) (offset - range.getOffset())); bb.limit((int) (bb.position() + stream.getLength())); - boolean isOk = false; - try { - indexes[column] = OrcProto.RowIndex.parseFrom( - InStream.createCodedInputStream("index", - ReaderImpl.singleton(new BufferChunk(bb, 0)), - stream.getLength(), codec, bufferSize)); - isOk = true; - } finally { - if (!isOk) { - hasCodecError = true; - } - } + indexes[column] = OrcProto.RowIndex.parseFrom( + InStream.createCodedInputStream("index", + ReaderImpl.singleton(new BufferChunk(bb, 0)), + stream.getLength(), codec, bufferSize)); } break; case BLOOM_FILTER: @@ -248,18 +239,10 @@ public OrcIndex readRowIndex(StripeInformation stripe, ByteBuffer bb = range.getData().duplicate(); bb.position((int) (offset - range.getOffset())); bb.limit((int) (bb.position() + stream.getLength())); - boolean isOk = false; - try { - bloomFilterIndices[column] = OrcProto.BloomFilterIndex.parseFrom - (InStream.createCodedInputStream("bloom_filter", - ReaderImpl.singleton(new BufferChunk(bb, 0)), - stream.getLength(), codec, bufferSize)); - isOk = true; - } finally { - if (!isOk) { - hasCodecError = true; - } - } + bloomFilterIndices[column] = OrcProto.BloomFilterIndex.parseFrom + (InStream.createCodedInputStream("bloom_filter", + ReaderImpl.singleton(new BufferChunk(bb, 0)), + stream.getLength(), codec, bufferSize)); } break; default: @@ -282,18 +265,9 @@ public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws I // read the footer ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength); - boolean isOk = false; - try { - OrcProto.StripeFooter result = OrcProto.StripeFooter.parseFrom( - InStream.createCodedInputStream("footer", ReaderImpl.singleton( - new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize)); - isOk = true; - return result; - } finally { - if (!isOk) { - hasCodecError = true; - } - } + return OrcProto.StripeFooter.parseFrom( + InStream.createCodedInputStream("footer", ReaderImpl.singleton( + new BufferChunk(tailBuf, 0)), tailLength, codec, bufferSize)); } @Override @@ -305,7 +279,7 @@ public DiskRangeList readFileData( @Override public void close() throws IOException { if (codec != null) { - OrcCodecPool.returnCodecSafely(compressionKind, codec, hasCodecError); + OrcCodecPool.returnCodecSafely(compressionKind, codec); codec = null; } if (pool != null) {