Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 10 additions & 24 deletions lang/java/avro/src/main/java/org/apache/avro/file/BZip2Codec.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
public class BZip2Codec extends Codec {

public static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private final byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];

private ByteArrayOutputStream outputBuffer;

static class Option extends CodecFactory {
Expand All @@ -43,41 +45,27 @@ protected Codec createInstance() {

@Override
public ByteBuffer compress(ByteBuffer uncompressedData) throws IOException {

ByteArrayOutputStream baos = getOutputBuffer(uncompressedData.remaining());
BZip2CompressorOutputStream outputStream = new BZip2CompressorOutputStream(baos);

try {
outputStream.write(uncompressedData.array(),
uncompressedData.position(),
uncompressedData.remaining());
} finally {
outputStream.close();

try (BZip2CompressorOutputStream outputStream = new BZip2CompressorOutputStream(baos)) {
outputStream.write(uncompressedData.array(), computeOffset(uncompressedData), uncompressedData.remaining());
}

ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
return result;
return ByteBuffer.wrap(baos.toByteArray());
}

@Override
public ByteBuffer decompress(ByteBuffer compressedData) throws IOException {
ByteArrayInputStream bais = new ByteArrayInputStream(compressedData.array());
BZip2CompressorInputStream inputStream = new BZip2CompressorInputStream(bais);
try {
ByteArrayInputStream bais = new ByteArrayInputStream(compressedData.array(), computeOffset(compressedData), compressedData.remaining());
try(BZip2CompressorInputStream inputStream = new BZip2CompressorInputStream(bais)) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();

byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];

int readCount = -1;

while ( (readCount = inputStream.read(buffer, compressedData.position(), buffer.length))> 0) {
while ((readCount = inputStream.read(buffer, compressedData.position(), buffer.length)) > 0) {
baos.write(buffer, 0, readCount);
}

ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
return result;
} finally {
inputStream.close();
return ByteBuffer.wrap(baos.toByteArray());
}
}

Expand All @@ -100,6 +88,4 @@ private ByteArrayOutputStream getOutputBuffer(int suggestedLength) {
outputBuffer.reset();
return outputBuffer;
}


}
12 changes: 12 additions & 0 deletions lang/java/avro/src/main/java/org/apache/avro/file/Codec.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

/**
* Interface for Avro-supported compression codecs for data files.
*
* Note that Codec objects may maintain internal state (e.g. buffers)
* and are not thread safe.
*/
public abstract class Codec {
/** Name of the codec; written to the file's metadata. */
Expand All @@ -37,12 +40,21 @@ public abstract class Codec {
**/
@Override
public abstract boolean equals(Object other);

/**
* Codecs must implement a hashCode() method that is consistent with equals().*/
@Override
public abstract int hashCode();

@Override
public String toString() {
return getName();
}

// Codecs often reference the array inside a ByteBuffer. Compute the offset
// to the start of data correctly in the case that our ByteBuffer
// is a slice() of another.
protected static int computeOffset(ByteBuffer data) {
return data.arrayOffset() + data.position();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ public static CodecFactory zstandardCodec() {
public static final int DEFAULT_XZ_LEVEL = LZMA2Options.PRESET_DEFAULT;

static {
addCodec("null", nullCodec());
addCodec("deflate", deflateCodec(DEFAULT_DEFLATE_LEVEL));
addCodec("snappy", snappyCodec());
addCodec("bzip2", bzip2Codec());
addCodec("xz", xzCodec(DEFAULT_XZ_LEVEL));
addCodec("zstandard", zstandardCodec());
addCodec(DataFileConstants.NULL_CODEC, nullCodec());
addCodec(DataFileConstants.DEFLATE_CODEC, deflateCodec(DEFAULT_DEFLATE_LEVEL));
addCodec(DataFileConstants.SNAPPY_CODEC, snappyCodec());
addCodec(DataFileConstants.BZIP2_CODEC, bzip2Codec());
addCodec(DataFileConstants.XZ_CODEC, xzCodec(DEFAULT_XZ_LEVEL));
addCodec(DataFileConstants.ZSTANDARD_CODEC, zstandardCodec());
}

/** Maps a codec name into a CodecFactory.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* {@link Inflater} and {@link Deflater}, is using
* RFC1951.
*/
class DeflateCodec extends Codec {
public class DeflateCodec extends Codec {

static class Option extends CodecFactory {
private int compressionLevel;
Expand Down Expand Up @@ -70,30 +70,19 @@ public String getName() {
@Override
public ByteBuffer compress(ByteBuffer data) throws IOException {
ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
DeflaterOutputStream ios = new DeflaterOutputStream(baos, getDeflater());
writeAndClose(data, ios);
ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
return result;
try(OutputStream outputStream = new DeflaterOutputStream(baos, getDeflater())) {
outputStream.write(data.array(), computeOffset(data), data.remaining());
}
return ByteBuffer.wrap(baos.toByteArray());
}

@Override
public ByteBuffer decompress(ByteBuffer data) throws IOException {
ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
InflaterOutputStream ios = new InflaterOutputStream(baos, getInflater());
writeAndClose(data, ios);
ByteBuffer result = ByteBuffer.wrap(baos.toByteArray());
return result;
}

private void writeAndClose(ByteBuffer data, OutputStream to) throws IOException {
byte[] input = data.array();
int offset = data.arrayOffset() + data.position();
int length = data.remaining();
try {
to.write(input, offset, length);
} finally {
to.close();
try(OutputStream outputStream = new InflaterOutputStream(baos, getInflater())) {
outputStream.write(data.array(), computeOffset(data), data.remaining());
}
return ByteBuffer.wrap(baos.toByteArray());
}

// get and initialize the inflater for use.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.xerial.snappy.Snappy;

/** * Implements Snappy compression and decompression. */
class SnappyCodec extends Codec {
public class SnappyCodec extends Codec {
private CRC32 crc32 = new CRC32();

static class Option extends CodecFactory {
Expand All @@ -40,12 +40,13 @@ private SnappyCodec() {}

@Override
public ByteBuffer compress(ByteBuffer in) throws IOException {
int offset = computeOffset(in);
ByteBuffer out =
ByteBuffer.allocate(Snappy.maxCompressedLength(in.remaining())+4);
int size = Snappy.compress(in.array(), in.position(), in.remaining(),
int size = Snappy.compress(in.array(), offset, in.remaining(),
out.array(), 0);
crc32.reset();
crc32.update(in.array(), in.position(), in.remaining());
crc32.update(in.array(), offset, in.remaining());
out.putInt(size, (int)crc32.getValue());

out.limit(size+4);
Expand All @@ -55,9 +56,10 @@ public ByteBuffer compress(ByteBuffer in) throws IOException {

@Override
public ByteBuffer decompress(ByteBuffer in) throws IOException {
int offset = computeOffset(in);
ByteBuffer out = ByteBuffer.allocate
(Snappy.uncompressedLength(in.array(),in.position(),in.remaining()-4));
int size = Snappy.uncompress(in.array(),in.position(),in.remaining()-4,
(Snappy.uncompressedLength(in.array(), offset, in.remaining()-4));
int size = Snappy.uncompress(in.array(), offset, in.remaining()-4,
out.array(), 0);
out.limit(size);

Expand All @@ -79,5 +81,4 @@ public boolean equals(Object obj) {
return false;
return true;
}

}
24 changes: 6 additions & 18 deletions lang/java/avro/src/main/java/org/apache/avro/file/XZCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ public String getName() {
@Override
public ByteBuffer compress(ByteBuffer data) throws IOException {
ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
OutputStream ios = new XZCompressorOutputStream(baos, compressionLevel);
writeAndClose(data, ios);
try (OutputStream outputStream = new XZCompressorOutputStream(baos, compressionLevel)) {
outputStream.write(data.array(), computeOffset(data), data.remaining());
}
return ByteBuffer.wrap(baos.toByteArray());
}

Expand All @@ -69,28 +70,15 @@ public ByteBuffer decompress(ByteBuffer data) throws IOException {
ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
InputStream bytesIn = new ByteArrayInputStream(
data.array(),
data.arrayOffset() + data.position(),
computeOffset(data),
data.remaining());
InputStream ios = new XZCompressorInputStream(bytesIn);
try {

try (InputStream ios = new XZCompressorInputStream(bytesIn)) {
IOUtils.copy(ios, baos);
} finally {
ios.close();
}
return ByteBuffer.wrap(baos.toByteArray());
}

private void writeAndClose(ByteBuffer data, OutputStream to) throws IOException {
byte[] input = data.array();
int offset = data.arrayOffset() + data.position();
int length = data.remaining();
try {
to.write(input, offset, length);
} finally {
to.close();
}
}

// get and initialize the output buffer for use.
private ByteArrayOutputStream getOutputBuffer(int suggestedLength) {
if (null == outputBuffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ public String getName() {
}

@Override
public ByteBuffer compress(ByteBuffer uncompressedData) throws IOException {
ByteArrayOutputStream baos = getOutputBuffer(uncompressedData.remaining());
OutputStream outputStream = new ZstdCompressorOutputStream(baos);
writeAndClose(uncompressedData, outputStream);
public ByteBuffer compress(ByteBuffer data) throws IOException {
ByteArrayOutputStream baos = getOutputBuffer(data.remaining());
try (OutputStream outputStream = new ZstdCompressorOutputStream(baos)) {
outputStream.write(data.array(), computeOffset(data), data.remaining());
}
return ByteBuffer.wrap(baos.toByteArray());
}

Expand All @@ -58,28 +59,14 @@ public ByteBuffer decompress(ByteBuffer compressedData) throws IOException {
ByteArrayOutputStream baos = getOutputBuffer(compressedData.remaining());
InputStream bytesIn = new ByteArrayInputStream(
compressedData.array(),
compressedData.arrayOffset() + compressedData.position(),
computeOffset(compressedData),
compressedData.remaining());
InputStream ios = new ZstdCompressorInputStream(bytesIn);
try {
IOUtils.copy(ios, baos);
} finally {
ios.close();
try (InputStream ios = new ZstdCompressorInputStream(bytesIn)) {
IOUtils.copy(ios, baos);
}
return ByteBuffer.wrap(baos.toByteArray());
}

private void writeAndClose(ByteBuffer data, OutputStream to) throws IOException {
byte[] input = data.array();
int offset = data.arrayOffset() + data.position();
int length = data.remaining();
try {
to.write(input, offset, length);
} finally {
to.close();
}
}

// get and initialize the output buffer for use.
private ByteArrayOutputStream getOutputBuffer(int suggestedLength) {
if (outputBuffer == null) {
Expand Down