Skip to content

Commit

Permalink
Improvements after code review
Browse files Browse the repository at this point in the history
  • Loading branch information
grddev committed Apr 28, 2020
1 parent b9200ae commit 71cfc51
Showing 1 changed file with 9 additions and 7 deletions.
Expand Up @@ -17,9 +17,10 @@

public class InputStreamResponseTransformer extends InputStream implements AsyncResponseTransformer<GetObjectResponse, InputStream>, Subscriber<ByteBuffer> {
private static final ByteBuffer END_MARKER = ByteBuffer.allocate(0);
private static int TARGET_BUFFER_SIZE = 1 << 25;
private static int CHUNKS_REQUEST_LIMIT = 1000;
private static float CHUNK_SIZE_EXPONENTIAL_WEIGHT = 0.2f;
private static final int TARGET_BUFFER_SIZE = 1 << 25;
private static final int CHUNKS_REQUEST_LIMIT = 1000;
private static final float CHUNK_SIZE_EXPONENTIAL_WEIGHT = 0.2f;
private static final float CHUNK_SIZE_INITIAL_ESTIMATE = 8192f;

private final CompletableFuture<InputStream> future;
private final BlockingQueue<ByteBuffer> chunks;
Expand All @@ -39,6 +40,7 @@ public InputStreamResponseTransformer() {
this.complete = new AtomicBoolean(false);
this.approximateBufferSize = new AtomicInteger(0);
this.requests = new AtomicInteger(0);
this.approximateChunkSize = CHUNK_SIZE_INITIAL_ESTIMATE;
}

@Override
Expand Down Expand Up @@ -80,13 +82,13 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(ByteBuffer byteBuffer) {
int remaining = byteBuffer.remaining();
if (remaining > 0) {
int chunkSize = byteBuffer.remaining();
if (chunkSize > 0) {
chunks.offer(byteBuffer);
approximateChunkSize += CHUNK_SIZE_EXPONENTIAL_WEIGHT * (remaining - approximateChunkSize);
approximateChunkSize += CHUNK_SIZE_EXPONENTIAL_WEIGHT * (chunkSize - approximateChunkSize);
}
requests.decrementAndGet();
int size = approximateBufferSize.addAndGet(remaining);
int size = approximateBufferSize.addAndGet(chunkSize);
maybeRequestMore(size);
}

Expand Down

0 comments on commit 71cfc51

Please sign in to comment.