Skip to content

Commit

Permalink
slightly improved lz4 performance
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Jun 26, 2022
1 parent 90e80ff commit cfbfe13
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
Expand Up @@ -26,6 +26,8 @@ public class Lz4InputStream extends AbstractByteArrayInputStream {
private final InputStream stream;
private final byte[] header;

private byte[] compressedBlock;

private boolean readFully(byte[] b, int off, int len) throws IOException {
int n = 0;
while (n < len) {
Expand Down Expand Up @@ -61,7 +63,8 @@ protected int updateBuffer() throws IOException {
// 4 bytes - size of uncompressed data
int uncompressedSize = BinaryStreamUtils.toInt32(header, 21);
int offset = 9;
byte[] block = new byte[compressedSizeWithHeader];
final byte[] block = compressedBlock.length >= compressedSizeWithHeader ? compressedBlock
: (compressedBlock = new byte[compressedSizeWithHeader]);
block[0] = header[16];
BinaryStreamUtils.setInt32(block, 1, compressedSizeWithHeader);
BinaryStreamUtils.setInt32(block, 5, uncompressedSize);
Expand All @@ -70,17 +73,17 @@ protected int updateBuffer() throws IOException {
throw new IOException(ClickHouseUtils.format(ERROR_INCOMPLETE_READ, 0, compressedSizeWithHeader - offset));
}

long[] real = ClickHouseCityHash.cityHash128(block, 0, block.length);
long[] real = ClickHouseCityHash.cityHash128(block, 0, compressedSizeWithHeader);
if (real[0] != BinaryStreamUtils.toInt64(header, 0) || real[1] != BinaryStreamUtils.toInt64(header, 8)) {
throw new IOException("Checksum doesn't match: corrupted data.");
}

buffer = new byte[uncompressedSize];
decompressor.decompress(block, offset, buffer, 0, uncompressedSize);
final byte[] buf = buffer.length >= uncompressedSize ? buffer : (buffer = new byte[uncompressedSize]);
decompressor.decompress(block, offset, buf, 0, uncompressedSize);
if (copyTo != null) {
copyTo.write(buffer);
copyTo.write(buf);
}
return limit = buffer.length;
return limit = uncompressedSize;
}

public Lz4InputStream(InputStream stream) {
Expand All @@ -93,6 +96,8 @@ public Lz4InputStream(ClickHouseFile file, InputStream stream, Runnable postClos
this.decompressor = factory.fastDecompressor();
this.stream = ClickHouseChecker.nonNull(stream, "InputStream");
this.header = new byte[HEADER_LENGTH];

this.compressedBlock = ClickHouseByteBuffer.EMPTY_BYTES;
}

@Override
Expand Down
Expand Up @@ -21,21 +21,23 @@ public class Lz4OutputStream extends AbstractByteArrayOutputStream {

@Override
protected void flushBuffer() throws IOException {
int compressed = compressor.compress(buffer, 0, position, compressedBlock, 25);
byte[] block = compressedBlock;
block[16] = Lz4InputStream.MAGIC;
int compressed = compressor.compress(buffer, 0, position, block, 25);
int compressedSizeWithHeader = compressed + 9;
BinaryStreamUtils.setInt32(compressedBlock, 17, compressedSizeWithHeader); // compressed size with header
BinaryStreamUtils.setInt32(compressedBlock, 21, position); // uncompressed size
long[] hash = ClickHouseCityHash.cityHash128(compressedBlock, 16, compressedSizeWithHeader);
BinaryStreamUtils.setInt64(compressedBlock, 0, hash[0]);
BinaryStreamUtils.setInt64(compressedBlock, 8, hash[1]);
output.write(compressedBlock, 0, compressed + 25);
BinaryStreamUtils.setInt32(block, 17, compressedSizeWithHeader); // compressed size with header
BinaryStreamUtils.setInt32(block, 21, position); // uncompressed size
long[] hash = ClickHouseCityHash.cityHash128(block, 16, compressedSizeWithHeader);
BinaryStreamUtils.setInt64(block, 0, hash[0]);
BinaryStreamUtils.setInt64(block, 8, hash[1]);
output.write(block, 0, compressed + 25);
position = 0;
}

@Override
protected void flushBuffer(byte[] bytes, int offset, int length) throws IOException {
int maxLen = compressor.maxCompressedLength(length) + 15;
byte[] block = maxLen < compressedBlock.length ? compressedBlock : new byte[maxLen];
byte[] block = maxLen <= compressedBlock.length ? compressedBlock : new byte[maxLen];
block[16] = Lz4InputStream.MAGIC;

int compressed = compressor.compress(bytes, offset, length, block, 25);
Expand All @@ -61,7 +63,6 @@ public Lz4OutputStream(ClickHouseFile file, OutputStream stream, int maxCompress
compressor = factory.fastCompressor();
// reserve the first 9 bytes for calculating checksum
compressedBlock = new byte[compressor.maxCompressedLength(maxCompressBlockSize) + 15];
compressedBlock[16] = Lz4InputStream.MAGIC;
}

@Override
Expand Down
Expand Up @@ -208,15 +208,24 @@ public void testCompression(ClickHouseFormat format, ClickHouseBufferingMode buf
boolean compressRequest, boolean compressResponse) throws ClickHouseException {
ClickHouseNode server = getServer();
String uuid = UUID.randomUUID().toString();
ClickHouseClient.send(server, "create table if not exists test_compress_decompress(id UUID)engine=Memory");
try (ClickHouseClient client = getClient()) {
ClickHouseRequest<?> request = client.connect(server)
.format(format)
.option(ClickHouseClientOption.RESPONSE_BUFFERING, bufferingMode)
.compressServerResponse(compressResponse)
.decompressClientRequest(compressRequest);
// start with insert
try (ClickHouseResponse resp = request
.query("insert into test_compress_decompress values(:uuid)").params(ClickHouseStringValue.of(uuid))
.executeAndWait()) {
Assert.assertNotNull(resp);
}

boolean hasResult = false;
try (ClickHouseResponse resp = request
.query("select :uuid").params(ClickHouseStringValue.of(uuid)).executeAndWait()) {
.query("select id from test_compress_decompress where id = :uuid")
.params(ClickHouseStringValue.of(uuid)).executeAndWait()) {
Assert.assertEquals(resp.firstRecord().getValue(0).asString(), uuid);
hasResult = true;
}
Expand Down

0 comments on commit cfbfe13

Please sign in to comment.