From 1ec69a5fdf15dc2a5daca1b92d7e5d526fd9dc98 Mon Sep 17 00:00:00 2001 From: Jacob Tolar Date: Thu, 18 Oct 2018 11:49:08 -0500 Subject: [PATCH] Java compression codec fixes. --- .../java/org/apache/avro/file/BZip2Codec.java | 34 ++++++------------- .../main/java/org/apache/avro/file/Codec.java | 12 +++++++ .../org/apache/avro/file/CodecFactory.java | 12 +++---- .../org/apache/avro/file/DeflateCodec.java | 27 +++++---------- .../org/apache/avro/file/SnappyCodec.java | 13 +++---- .../java/org/apache/avro/file/XZCodec.java | 24 ++++--------- .../org/apache/avro/file/ZstandardCodec.java | 29 +++++----------- 7 files changed, 57 insertions(+), 94 deletions(-) diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java b/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java index e2dbc094fe1..1d1f4ed2677 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java @@ -29,6 +29,8 @@ public class BZip2Codec extends Codec { public static final int DEFAULT_BUFFER_SIZE = 64 * 1024; + private final byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; + private ByteArrayOutputStream outputBuffer; static class Option extends CodecFactory { @@ -43,41 +45,27 @@ protected Codec createInstance() { @Override public ByteBuffer compress(ByteBuffer uncompressedData) throws IOException { - ByteArrayOutputStream baos = getOutputBuffer(uncompressedData.remaining()); - BZip2CompressorOutputStream outputStream = new BZip2CompressorOutputStream(baos); - - try { - outputStream.write(uncompressedData.array(), - uncompressedData.position(), - uncompressedData.remaining()); - } finally { - outputStream.close(); + + try (BZip2CompressorOutputStream outputStream = new BZip2CompressorOutputStream(baos)) { + outputStream.write(uncompressedData.array(), computeOffset(uncompressedData), uncompressedData.remaining()); } - ByteBuffer result = ByteBuffer.wrap(baos.toByteArray()); - return result; + return ByteBuffer.wrap(baos.toByteArray()); } @Override public ByteBuffer decompress(ByteBuffer compressedData) throws IOException { - ByteArrayInputStream bais = new ByteArrayInputStream(compressedData.array()); - BZip2CompressorInputStream inputStream = new BZip2CompressorInputStream(bais); - try { + ByteArrayInputStream bais = new ByteArrayInputStream(compressedData.array(), computeOffset(compressedData), compressedData.remaining()); + try(BZip2CompressorInputStream inputStream = new BZip2CompressorInputStream(bais)) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; - int readCount = -1; - - while ( (readCount = inputStream.read(buffer, compressedData.position(), buffer.length))> 0) { + while ((readCount = inputStream.read(buffer, compressedData.position(), buffer.length)) > 0) { baos.write(buffer, 0, readCount); } - ByteBuffer result = ByteBuffer.wrap(baos.toByteArray()); - return result; - } finally { - inputStream.close(); + return ByteBuffer.wrap(baos.toByteArray()); } } @@ -100,6 +88,4 @@ private ByteArrayOutputStream getOutputBuffer(int suggestedLength) { outputBuffer.reset(); return outputBuffer; } - - } diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java b/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java index bd335c9256e..d4621390f68 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/Codec.java @@ -22,6 +22,9 @@ /** * Interface for Avro-supported compression codecs for data files. + * + * Note that Codec objects may maintain internal state (e.g. buffers) + * and are not thread safe. */ public abstract class Codec { /** Name of the codec; written to the file's metadata. */ @@ -37,12 +40,21 @@ public abstract class Codec { **/ @Override public abstract boolean equals(Object other); + /** * Codecs must implement a hashCode() method that is consistent with equals().*/ @Override public abstract int hashCode(); + @Override public String toString() { return getName(); } + + // Codecs often reference the array inside a ByteBuffer. Compute the offset + // to the start of data correctly in the case that our ByteBuffer + // is a slice() of another. + protected static int computeOffset(ByteBuffer data) { + return data.arrayOffset() + data.position(); + } } diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java b/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java index db51fc69316..238e8a4d788 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/CodecFactory.java @@ -83,12 +83,12 @@ public static CodecFactory zstandardCodec() { public static final int DEFAULT_XZ_LEVEL = LZMA2Options.PRESET_DEFAULT; static { - addCodec("null", nullCodec()); - addCodec("deflate", deflateCodec(DEFAULT_DEFLATE_LEVEL)); - addCodec("snappy", snappyCodec()); - addCodec("bzip2", bzip2Codec()); - addCodec("xz", xzCodec(DEFAULT_XZ_LEVEL)); - addCodec("zstandard", zstandardCodec()); + addCodec(DataFileConstants.NULL_CODEC, nullCodec()); + addCodec(DataFileConstants.DEFLATE_CODEC, deflateCodec(DEFAULT_DEFLATE_LEVEL)); + addCodec(DataFileConstants.SNAPPY_CODEC, snappyCodec()); + addCodec(DataFileConstants.BZIP2_CODEC, bzip2Codec()); + addCodec(DataFileConstants.XZ_CODEC, xzCodec(DEFAULT_XZ_LEVEL)); + addCodec(DataFileConstants.ZSTANDARD_CODEC, zstandardCodec()); } /** Maps a codec name into a CodecFactory. diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java index 7080d65ead6..5a3f9271811 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java @@ -36,7 +36,7 @@ * {@link Inflater} and {@link Deflater}, is using * RFC1951. */ -class DeflateCodec extends Codec { +public class DeflateCodec extends Codec { static class Option extends CodecFactory { private int compressionLevel; @@ -70,30 +70,19 @@ public String getName() { @Override public ByteBuffer compress(ByteBuffer data) throws IOException { ByteArrayOutputStream baos = getOutputBuffer(data.remaining()); - DeflaterOutputStream ios = new DeflaterOutputStream(baos, getDeflater()); - writeAndClose(data, ios); - ByteBuffer result = ByteBuffer.wrap(baos.toByteArray()); - return result; + try(OutputStream outputStream = new DeflaterOutputStream(baos, getDeflater())) { + outputStream.write(data.array(), computeOffset(data), data.remaining()); + } + return ByteBuffer.wrap(baos.toByteArray()); } @Override public ByteBuffer decompress(ByteBuffer data) throws IOException { ByteArrayOutputStream baos = getOutputBuffer(data.remaining()); - InflaterOutputStream ios = new InflaterOutputStream(baos, getInflater()); - writeAndClose(data, ios); - ByteBuffer result = ByteBuffer.wrap(baos.toByteArray()); - return result; - } - - private void writeAndClose(ByteBuffer data, OutputStream to) throws IOException { - byte[] input = data.array(); - int offset = data.arrayOffset() + data.position(); - int length = data.remaining(); - try { - to.write(input, offset, length); - } finally { - to.close(); + try(OutputStream outputStream = new InflaterOutputStream(baos, getInflater())) { + outputStream.write(data.array(), computeOffset(data), data.remaining()); } + return ByteBuffer.wrap(baos.toByteArray()); } // get and initialize the inflater for use. diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java index 3c75bb70fed..04f7218610e 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java @@ -24,7 +24,7 @@ import org.xerial.snappy.Snappy; /** * Implements Snappy compression and decompression. */ -class SnappyCodec extends Codec { +public class SnappyCodec extends Codec { private CRC32 crc32 = new CRC32(); static class Option extends CodecFactory { @@ -40,12 +40,13 @@ private SnappyCodec() {} @Override public ByteBuffer compress(ByteBuffer in) throws IOException { + int offset = computeOffset(in); ByteBuffer out = ByteBuffer.allocate(Snappy.maxCompressedLength(in.remaining())+4); - int size = Snappy.compress(in.array(), in.position(), in.remaining(), + int size = Snappy.compress(in.array(), offset, in.remaining(), out.array(), 0); crc32.reset(); - crc32.update(in.array(), in.position(), in.remaining()); + crc32.update(in.array(), offset, in.remaining()); out.putInt(size, (int)crc32.getValue()); out.limit(size+4); @@ -55,9 +56,10 @@ public ByteBuffer compress(ByteBuffer in) throws IOException { @Override public ByteBuffer decompress(ByteBuffer in) throws IOException { + int offset = computeOffset(in); ByteBuffer out = ByteBuffer.allocate - (Snappy.uncompressedLength(in.array(),in.position(),in.remaining()-4)); - int size = Snappy.uncompress(in.array(),in.position(),in.remaining()-4, + (Snappy.uncompressedLength(in.array(), offset, in.remaining()-4)); + int size = Snappy.uncompress(in.array(), offset, in.remaining()-4, out.array(), 0); out.limit(size); @@ -79,5 +81,4 @@ public boolean equals(Object obj) { return false; return true; } - } diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java index 6586818fa8b..92a742a561d 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java @@ -59,8 +59,9 @@ public String getName() { @Override public ByteBuffer compress(ByteBuffer data) throws IOException { ByteArrayOutputStream baos = getOutputBuffer(data.remaining()); - OutputStream ios = new XZCompressorOutputStream(baos, compressionLevel); - writeAndClose(data, ios); + try (OutputStream outputStream = new XZCompressorOutputStream(baos, compressionLevel)) { + outputStream.write(data.array(), computeOffset(data), data.remaining()); + } return ByteBuffer.wrap(baos.toByteArray()); } @@ -69,28 +70,15 @@ public ByteBuffer decompress(ByteBuffer data) throws IOException { ByteArrayOutputStream baos = getOutputBuffer(data.remaining()); InputStream bytesIn = new ByteArrayInputStream( data.array(), - data.arrayOffset() + data.position(), + computeOffset(data), data.remaining()); - InputStream ios = new XZCompressorInputStream(bytesIn); - try { + + try (InputStream ios = new XZCompressorInputStream(bytesIn)) { IOUtils.copy(ios, baos); - } finally { - ios.close(); } return ByteBuffer.wrap(baos.toByteArray()); } - private void writeAndClose(ByteBuffer data, OutputStream to) throws IOException { - byte[] input = data.array(); - int offset = data.arrayOffset() + data.position(); - int length = data.remaining(); - try { - to.write(input, offset, length); - } finally { - to.close(); - } - } - // get and initialize the output buffer for use. private ByteArrayOutputStream getOutputBuffer(int suggestedLength) { if (null == outputBuffer) { diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java index 4ec84335f67..deea4b88ed9 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/ZstandardCodec.java @@ -46,10 +46,11 @@ public String getName() { } @Override - public ByteBuffer compress(ByteBuffer uncompressedData) throws IOException { - ByteArrayOutputStream baos = getOutputBuffer(uncompressedData.remaining()); - OutputStream outputStream = new ZstdCompressorOutputStream(baos); - writeAndClose(uncompressedData, outputStream); + public ByteBuffer compress(ByteBuffer data) throws IOException { + ByteArrayOutputStream baos = getOutputBuffer(data.remaining()); + try (OutputStream outputStream = new ZstdCompressorOutputStream(baos)) { + outputStream.write(data.array(), computeOffset(data), data.remaining()); + } return ByteBuffer.wrap(baos.toByteArray()); } @@ -58,28 +59,14 @@ public ByteBuffer decompress(ByteBuffer compressedData) throws IOException { ByteArrayOutputStream baos = getOutputBuffer(compressedData.remaining()); InputStream bytesIn = new ByteArrayInputStream( compressedData.array(), - compressedData.arrayOffset() + compressedData.position(), + computeOffset(compressedData), compressedData.remaining()); - InputStream ios = new ZstdCompressorInputStream(bytesIn); - try { - IOUtils.copy(ios, baos); - } finally { - ios.close(); + try (InputStream ios = new ZstdCompressorInputStream(bytesIn)) { + IOUtils.copy(ios, baos); } return ByteBuffer.wrap(baos.toByteArray()); } - private void writeAndClose(ByteBuffer data, OutputStream to) throws IOException { - byte[] input = data.array(); - int offset = data.arrayOffset() + data.position(); - int length = data.remaining(); - try { - to.write(input, offset, length); - } finally { - to.close(); - } - } - // get and initialize the output buffer for use. private ByteArrayOutputStream getOutputBuffer(int suggestedLength) { if (outputBuffer == null) {