Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-27019 Minor compression performance improvements #4420

Merged
merged 1 commit into from
May 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import org.apache.hadoop.hbase.io.compress.CanReinit;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Hadoop compressor glue for aircompressor compressors.
Expand All @@ -34,7 +32,6 @@
public abstract class HadoopCompressor<T extends Compressor>
implements CanReinit, org.apache.hadoop.io.compress.Compressor {

protected static final Logger LOG = LoggerFactory.getLogger(HadoopCompressor.class);
protected T compressor;
protected ByteBuffer inBuf, outBuf;
protected int bufferSize;
Expand All @@ -56,7 +53,6 @@ public int compress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
Expand All @@ -77,7 +73,6 @@ public int compress(byte[] b, int off, int len) throws IOException {
} else {
if (outBuf.capacity() < needed) {
needed = CompressionUtil.roundInt2(needed);
LOG.trace("compress: resize outBuf {}", needed);
outBuf = ByteBuffer.allocate(needed);
} else {
outBuf.clear();
Expand All @@ -89,42 +84,34 @@ public int compress(byte[] b, int off, int len) throws IOException {
final int written = writeBuffer.position() - oldPos;
bytesWritten += written;
inBuf.clear();
LOG.trace("compress: compressed {} -> {}", uncompressed, written);
finished = true;
if (!direct) {
outBuf.flip();
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("compress: {} bytes", n);
return n;
} else {
LOG.trace("compress: {} bytes direct", written);
return written;
}
} else {
finished = true;
}
}
LOG.trace("No output");
return 0;
}

@Override
public void end() {
LOG.trace("end");
}

@Override
public void finish() {
LOG.trace("finish");
finish = true;
}

@Override
public boolean finished() {
boolean b = finished && !outBuf.hasRemaining();
LOG.trace("finished: {}", b);
return b;
return finished && !outBuf.hasRemaining();
}

@Override
Expand All @@ -139,14 +126,11 @@ public long getBytesWritten() {

@Override
public boolean needsInput() {
boolean b = !finished();
LOG.trace("needsInput: {}", b);
return b;
return !finished();
}

@Override
public void reinit(Configuration conf) {
LOG.trace("reinit");
if (conf != null) {
// Buffer size might have changed
int newBufferSize = getBufferSize(conf);
Expand All @@ -159,15 +143,8 @@ public void reinit(Configuration conf) {
reset();
}

@SuppressWarnings("unchecked")
@Override
public void reset() {
LOG.trace("reset");
try {
compressor = (T) compressor.getClass().getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
inBuf.clear();
outBuf.clear();
outBuf.position(outBuf.capacity());
Expand All @@ -184,13 +161,11 @@ public void setDictionary(byte[] b, int off, int len) {

@Override
public void setInput(byte[] b, int off, int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf {}", needed);
ByteBuffer newBuf = ByteBuffer.allocate(needed);
inBuf.flip();
newBuf.put(inBuf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Hadoop decompressor glue for aircompressor decompressors.
Expand All @@ -32,7 +30,6 @@
public class HadoopDecompressor<T extends Decompressor>
implements org.apache.hadoop.io.compress.Decompressor {

protected static final Logger LOG = LoggerFactory.getLogger(HadoopDecompressor.class);
protected T decompressor;
protected ByteBuffer inBuf, outBuf;
protected int inLen;
Expand All @@ -50,7 +47,6 @@ public int decompress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("decompress: read {} remaining bytes from outBuf", n);
return n;
}
if (inBuf.position() > 0) {
Expand All @@ -63,50 +59,36 @@ public int decompress(byte[] b, int off, int len) throws IOException {
inBuf.rewind();
inBuf.limit(inBuf.capacity());
final int written = outBuf.position();
LOG.trace("decompress: decompressed {} -> {}", remaining, written);
outBuf.flip();
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("decompress: {} bytes", n);
return n;
}
LOG.trace("decompress: No output, finished");
finished = true;
return 0;
}

@Override
public void end() {
LOG.trace("end");
}

@Override
public boolean finished() {
LOG.trace("finished");
return finished;
}

@Override
public int getRemaining() {
LOG.trace("getRemaining: {}", inLen);
return inLen;
}

@Override
public boolean needsDictionary() {
LOG.trace("needsDictionary");
return false;
}

@SuppressWarnings("unchecked")
@Override
public void reset() {
LOG.trace("reset");
try {
decompressor = (T) decompressor.getClass().getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
inBuf.rewind();
inBuf.limit(inBuf.capacity());
inLen = 0;
Expand All @@ -117,9 +99,7 @@ public void reset() {

@Override
public boolean needsInput() {
boolean b = (inBuf.position() == 0);
LOG.trace("needsInput: {}", b);
return b;
return inBuf.position() == 0;
}

@Override
Expand All @@ -129,13 +109,11 @@ public void setDictionary(byte[] b, int off, int len) {

@Override
public void setInput(byte[] b, int off, int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf {}", needed);
ByteBuffer newBuf = ByteBuffer.allocate(needed);
inBuf.flip();
newBuf.put(inBuf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,13 @@
import org.apache.hadoop.hbase.io.compress.CompressionUtil;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Hadoop compressor glue for Brotli4j
*/
@InterfaceAudience.Private
public class BrotliCompressor implements CanReinit, Compressor {

protected static final Logger LOG = LoggerFactory.getLogger(BrotliCompressor.class);
protected ByteBuffer inBuf, outBuf;
protected int bufferSize;
protected boolean finish, finished;
Expand Down Expand Up @@ -64,7 +61,6 @@ public int compress(byte[] b, int off, int len) throws IOException {
if (outBuf.hasRemaining()) {
int remaining = outBuf.remaining(), n = Math.min(remaining, len);
outBuf.get(b, off, n);
LOG.trace("compress: read {} remaining bytes from outBuf", n);
return n;
}
// We don't actually begin compression until our caller calls finish().
Expand All @@ -84,7 +80,6 @@ public int compress(byte[] b, int off, int len) throws IOException {
} else {
if (outBuf.capacity() < needed) {
needed = CompressionUtil.roundInt2(needed);
LOG.trace("compress: resize outBuf {}", needed);
outBuf = ByteBuffer.allocate(needed);
} else {
outBuf.clear();
Expand All @@ -96,42 +91,34 @@ public int compress(byte[] b, int off, int len) throws IOException {
final int written = writeBuf.position() - oldPos;
bytesWritten += written;
inBuf.clear();
LOG.trace("compress: compressed {} -> {}", uncompressed, written);
finished = true;
if (!direct) {
outBuf.flip();
int n = Math.min(written, len);
outBuf.get(b, off, n);
LOG.trace("compress: {} bytes", n);
return n;
} else {
LOG.trace("compress: {} bytes direct", written);
return written;
}
} else {
finished = true;
}
}
LOG.trace("No output");
return 0;
}

@Override
public void end() {
LOG.trace("end");
}

@Override
public void finish() {
LOG.trace("finish");
finish = true;
}

@Override
public boolean finished() {
boolean b = finished && !outBuf.hasRemaining();
LOG.trace("finished: {}", b);
return b;
return finished && !outBuf.hasRemaining();
}

@Override
Expand All @@ -146,14 +133,11 @@ public long getBytesWritten() {

@Override
public boolean needsInput() {
boolean b = !finished();
LOG.trace("needsInput: {}", b);
return b;
return !finished();
}

@Override
public void reinit(Configuration conf) {
LOG.trace("reinit");
if (conf != null) {
// Quality or window settings might have changed
params.setQuality(BrotliCodec.getLevel(conf));
Expand All @@ -171,7 +155,6 @@ public void reinit(Configuration conf) {

@Override
public void reset() {
LOG.trace("reset");
inBuf.clear();
outBuf.clear();
outBuf.position(outBuf.capacity());
Expand All @@ -188,13 +171,11 @@ public void setDictionary(byte[] b, int off, int len) {

@Override
public void setInput(byte[] b, int off, int len) {
LOG.trace("setInput: off={} len={}", off, len);
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
LOG.trace("setInput: resize inBuf {}", needed);
ByteBuffer newBuf = ByteBuffer.allocate(needed);
inBuf.flip();
newBuf.put(inBuf);
Expand Down
Loading