Skip to content
Permalink
Browse files
Merge pull request #2322 from apache/change_transport_resize_policy
Allow compression buffer to oversize maxLength during compression and shrink it later
  • Loading branch information
jt2594838 committed Jan 12, 2021
2 parents ee293e6 + 488511d commit d7b466c32e3cd554fb069b329090df26e38a911e
Showing 4 changed files with 61 additions and 33 deletions.
@@ -44,6 +44,12 @@ public class RpcUtils {
* How big is the largest allowable frame? Defaults to 16MB.
*/
public static final int DEFAULT_MAX_LENGTH = 16384000;
/**
* It is used to prevent the size of the parsing package from being too large and allocating the
* buffer will cause oom. Therefore, the maximum length of the requested memory is limited when
* reading. The default value is 512MB
*/
public static final int FRAME_HARD_MAX_LENGTH = 536870912;

private RpcUtils() {
// util class
@@ -30,9 +30,14 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr
private TByteBuffer writeCompressBuffer;
private TByteBuffer readCompressBuffer;

private static final long MIN_SHRINK_INTERVAL = 60_000L;
private static final int MAX_BUFFER_OVERSIZE_TIME = 5;
private long lastShrinkTime;
private int bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;

protected TCompressedElasticFramedTransport(TTransport underlying, int initialBufferCapacity,
int maxLength) {
super(underlying, initialBufferCapacity, maxLength);
int softMaxLength) {
super(underlying, initialBufferCapacity, softMaxLength);
writeCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
readCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
}
@@ -64,14 +69,30 @@ protected void readFrame() throws TTransportException {
}
}

private TByteBuffer resizeCompressBuf(int size, TByteBuffer byteBuffer) {
double expandFactor = 1.5;
double loadFactor = 0.6;
if (byteBuffer.getByteBuffer().capacity() < size) {
int newCap = (int) Math.min(size * expandFactor, maxLength);
byteBuffer = new TByteBuffer(ByteBuffer.allocate(newCap));
} else if (byteBuffer.getByteBuffer().capacity() * loadFactor > size) {
byteBuffer = new TByteBuffer(ByteBuffer.allocate(size));
private TByteBuffer resizeCompressBuf(int size, TByteBuffer byteBuffer)
throws TTransportException {
if (size > RpcUtils.FRAME_HARD_MAX_LENGTH) {
close();
throw new TTransportException(TTransportException.CORRUPTED_DATA,
"Frame size (" + size + ") larger than protect max length (" + RpcUtils.FRAME_HARD_MAX_LENGTH
+ ")!");
}

final int currentCapacity = byteBuffer.getByteBuffer().capacity();
final double loadFactor = 0.6;
if (currentCapacity < size) {
// Increase by a factor of 1.5x
int growCapacity = currentCapacity + (currentCapacity >> 1);
int newCapacity = Math.max(growCapacity, size);
byteBuffer = new TByteBuffer(ByteBuffer.allocate(newCapacity));
bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
} else if (currentCapacity > softMaxLength && currentCapacity * loadFactor > size
&& bufTooLargeCounter-- <= 0
&& System.currentTimeMillis() - lastShrinkTime > MIN_SHRINK_INTERVAL) {
// do not shrink beneath the initial size and do not shrink too often
byteBuffer = new TByteBuffer(ByteBuffer.allocate(size + (currentCapacity - size) / 2));
lastShrinkTime = System.currentTimeMillis();
bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
}
return byteBuffer;
}
@@ -95,8 +116,8 @@ public void flush() throws TTransportException {
}

writeBuffer.reset();
if (maxLength < length) {
writeBuffer.resizeIfNecessary(maxLength);
if (softMaxLength < length) {
writeBuffer.resizeIfNecessary(softMaxLength);
}
underlying.flush();
}
@@ -28,17 +28,10 @@

public class TElasticFramedTransport extends TTransport {

/**
* It is used to prevent the size of the parsing package from being too large and allocating the
* buffer will cause oom. Therefore, the maximum length of the requested memory is limited when
* reading. The default value is 512MB
*/
private static final int PROTECT_MAX_LENGTH = 536870912;

public static class Factory extends TTransportFactory {

protected final int initialCapacity;
protected final int maxLength;
protected final int softMaxLength;

public Factory() {
this(DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
@@ -48,29 +41,36 @@ public Factory(int initialCapacity) {
this(initialCapacity, DEFAULT_MAX_LENGTH);
}

public Factory(int initialCapacity, int maxLength) {
public Factory(int initialCapacity, int softMaxLength) {
this.initialCapacity = initialCapacity;
this.maxLength = maxLength;
this.softMaxLength = softMaxLength;
}

@Override
public TTransport getTransport(TTransport trans) {
return new TElasticFramedTransport(trans, initialCapacity, maxLength);
return new TElasticFramedTransport(trans, initialCapacity, softMaxLength);
}
}

public TElasticFramedTransport(TTransport underlying) {
this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
}

public TElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
public TElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int softMaxLength) {
this.underlying = underlying;
this.maxLength = maxLength;
this.softMaxLength = softMaxLength;
readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity);
}

protected final int maxLength;
/**
* The capacity of the underlying buffer is allowed to exceed maxSoftLength, but if adjacent
* requests all have sizes smaller than maxSoftLength, the underlying buffer will be shrunk
* beneath maxSoftLength.
* The shrinking is limited at most once per minute to reduce overhead when maxSoftLength is
* set unreasonably or the workload naturally contains both ver large and very small requests.
*/
protected final int softMaxLength;
protected final TTransport underlying;
protected AutoScalingBufferReadTransport readBuffer;
protected AutoScalingBufferWriteTransport writeBuffer;
@@ -113,14 +113,15 @@ protected void readFrame() throws TTransportException {
"Read a negative frame size (" + size + ")!");
}

if (size > PROTECT_MAX_LENGTH) {
if (size > RpcUtils.FRAME_HARD_MAX_LENGTH) {
close();
throw new TTransportException(TTransportException.CORRUPTED_DATA,
"Frame size (" + size + ") larger than protect max length (" + PROTECT_MAX_LENGTH + ")!");
"Frame size (" + size + ") larger than protect max length (" + RpcUtils.FRAME_HARD_MAX_LENGTH
+ ")!");
}

if (size < maxLength) {
readBuffer.resizeIfNecessary(maxLength);
if (size < softMaxLength) {
readBuffer.resizeIfNecessary(softMaxLength);
}
readBuffer.fill(underlying, size);
}
@@ -132,8 +133,8 @@ public void flush() throws TTransportException {
underlying.write(i32buf, 0, 4);
underlying.write(writeBuffer.getBuf().array(), 0, length);
writeBuffer.reset();
if (length > maxLength) {
writeBuffer.resizeIfNecessary(maxLength);
if (length > softMaxLength) {
writeBuffer.resizeIfNecessary(softMaxLength);
}
underlying.flush();
}
@@ -43,7 +43,7 @@ public Factory(int initialCapacity, int maxLength) {

@Override
public TTransport getTransport(TTransport trans) {
return new TSnappyElasticFramedTransport(trans, initialCapacity, maxLength);
return new TSnappyElasticFramedTransport(trans, initialCapacity, softMaxLength);
}
}

0 comments on commit d7b466c

Please sign in to comment.