Skip to content

Commit

Permalink
[#593][part-1] feat: Codec compress support ByteBuffer (#830)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1. Codec add `ByteBuffer` type compress.

### Why are the changes needed?
Fix: #593

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
UT
  • Loading branch information
xumanbu committed Apr 24, 2023
1 parent d1ed98c commit 4a17a58
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,21 @@ public static Codec newInstance(RssConf rssConf) {
*/
public abstract byte[] compress(byte[] src);

/**
* Compresses the data in buffer src into dest.
* Snappy & Zstd should be the same type of both buffer.
* make sure dest.remaining() >= maxCompressedLength(src.remaining()).
* This method move the position of dest ByteBuffer,keep src ByteBuffer position.
* Returns:the compressed size
*/
public abstract int compress(ByteBuffer src, ByteBuffer dest);

/**
* maximum size of the compressed data
* @param sourceLength
*/
public abstract int maxCompressedLength(int sourceLength);

public enum Type {
LZ4,
ZSTD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import net.jpountz.lz4.LZ4Factory;

import org.apache.uniffle.common.exception.RssException;

public class Lz4Codec extends Codec {

private LZ4Factory lz4Factory;
Expand All @@ -38,4 +40,20 @@ public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dest, int
public byte[] compress(byte[] src) {
return lz4Factory.fastCompressor().compress(src);
}

@Override
public int compress(ByteBuffer src, ByteBuffer dest) {
try {
int destOff = dest.position();
lz4Factory.fastCompressor().compress(src.duplicate(), dest);
return dest.position() - destOff;
} catch (Exception e) {
throw new RssException("Failed to compress by Lz4", e);
}
}

@Override
public int maxCompressedLength(int sourceLength) {
return lz4Factory.fastCompressor().maxCompressedLength(sourceLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,16 @@ public byte[] compress(byte[] src) {
System.arraycopy(src, 0, dst, 0, src.length);
return dst;
}

@Override
public int compress(ByteBuffer src, ByteBuffer dest) {
int destOff = dest.position();
dest.put(src.duplicate());
return dest.position() - destOff;
}

@Override
public int maxCompressedLength(int sourceLength) {
return sourceLength;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,32 @@ public byte[] compress(byte[] src) {
throw new RssException("Failed to uncompress by Snappy", e);
}
}

@Override
public int compress(ByteBuffer src, ByteBuffer dest) {
try {
if (src.isDirect() && dest.isDirect()) {
int destOff = dest.position();
// dest.duplicate and reset dest.position is consistent with other codec
int compressedSize = Snappy.compress(src.duplicate(), dest.duplicate());
dest.position(destOff + compressedSize);
return compressedSize;
}
if (!src.isDirect() && !dest.isDirect()) {
int destOff = dest.position();
int compressedSize = Snappy.compress(src.array(), src.position(), src.limit() - src.position(), dest.array(),
dest.position());
dest.position(destOff + compressedSize);
return compressedSize;
}
} catch (Exception e) {
throw new RssException("Failed to compress by Snappy", e);
}
throw new IllegalStateException("Snappy only supports the same type of bytebuffer compression.");
}

@Override
public int maxCompressedLength(int sourceLength) {
return Snappy.maxCompressedLength(sourceLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,26 @@ public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dst, int
public byte[] compress(byte[] src) {
return Zstd.compress(src, compressionLevel);
}

@Override
public int compress(ByteBuffer src, ByteBuffer dest) {
try {
if (src.isDirect() && dest.isDirect()) {
return Zstd.compress(dest, src.duplicate(), compressionLevel);
}
if (!src.isDirect() && !dest.isDirect()) {
long compressedSize = Zstd.compressByteArray(dest.array(), dest.position(), dest.remaining(), src.array(),
src.position(), src.remaining(), compressionLevel);
return (int) compressedSize;
}
} catch (Exception e) {
throw new RssException("Failed to compress by Zstd", e);
}
throw new IllegalStateException("Zstd only supports the same type of bytebuffer compression.");
}

@Override
public int maxCompressedLength(int sourceLength) {
return (int) Zstd.compressBound(sourceLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class CompressionTest {

Expand Down Expand Up @@ -79,5 +81,69 @@ public void testCompression(int size, Codec.Type type) {
codec.decompress(ByteBuffer.wrap(compressed), size, recycledDst, 0);
recycledDst.get(res);
assertArrayEquals(data, res);

// case4: use off heap bytebuffer compress
ByteBuffer srcBuffer = ByteBuffer.allocateDirect(size);
ByteBuffer destBuffer = ByteBuffer.allocateDirect(codec.maxCompressedLength(size));
testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, 0);

// case5: use on heap bytebuffer compress
srcBuffer = ByteBuffer.allocate(size);
destBuffer = ByteBuffer.allocate(codec.maxCompressedLength(size));
testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, 0);

// case6: src buffer is on heap && dest buffer is off heap
srcBuffer = ByteBuffer.allocate(size);
destBuffer = ByteBuffer.allocateDirect(codec.maxCompressedLength(size));
testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, 0);

// case7: src buffer is off heap && dest buffer is on heap
srcBuffer = ByteBuffer.allocateDirect(size);
destBuffer = ByteBuffer.allocate(codec.maxCompressedLength(size));
testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, 0);

// case8: use src&dest bytebuffer with offset
int destOffset = 10;
srcBuffer = ByteBuffer.allocateDirect(size + destOffset);
destBuffer = ByteBuffer.allocateDirect(codec.maxCompressedLength(size) + destOffset);
testCompressWithByteBuffer(codec, data, srcBuffer, destBuffer, destOffset);
}

private void testCompressWithByteBuffer(Codec codec, byte[] originData, ByteBuffer srcBuffer, ByteBuffer destBuffer,
int destOffset) {
srcBuffer.position(destOffset);
srcBuffer.put(originData);
srcBuffer.flip();
srcBuffer.position(destOffset);
destBuffer.position(destOffset);
if (!isSameType(srcBuffer, destBuffer) && (codec instanceof SnappyCodec || codec instanceof ZstdCodec)) {
try {
codec.compress(srcBuffer, destBuffer);
} catch (Exception e) {
assertTrue(e instanceof IllegalStateException);
}
} else {
codec.compress(srcBuffer, destBuffer);
assertEquals(srcBuffer.position(), destOffset);
destBuffer.flip();
destBuffer.position(destOffset);
srcBuffer.clear();
checkCompressedData(codec, originData, srcBuffer, destBuffer);
}
}

private boolean isSameType(ByteBuffer srcBuffer, ByteBuffer destBuffer) {
if (srcBuffer == null || destBuffer == null) {
return false;
}
return (srcBuffer.isDirect() && destBuffer.isDirect()) || (!srcBuffer.isDirect() && !destBuffer.isDirect());
}

private void checkCompressedData(Codec codec, byte[] originData, ByteBuffer dest, ByteBuffer src) {
codec.decompress(src, originData.length, dest, 0);
byte[] res = new byte[originData.length];
dest.get(res);
assertArrayEquals(originData, res);
}

}

0 comments on commit 4a17a58

Please sign in to comment.