diff --git a/src/main/java/io/burt/athena/result/s3/InputStreamResponseTransformer.java b/src/main/java/io/burt/athena/result/s3/InputStreamResponseTransformer.java index d2c7103..a546d60 100644 --- a/src/main/java/io/burt/athena/result/s3/InputStreamResponseTransformer.java +++ b/src/main/java/io/burt/athena/result/s3/InputStreamResponseTransformer.java @@ -18,6 +18,8 @@ public class InputStreamResponseTransformer extends InputStream implements AsyncResponseTransformer, Subscriber { private static final ByteBuffer END_MARKER = ByteBuffer.allocate(0); private static int TARGET_BUFFER_SIZE = 1 << 25; + private static int CHUNKS_REQUEST_LIMIT = 1000; + private static float CHUNK_SIZE_EXPONENTIAL_WEIGHT = 0.2f; private final CompletableFuture future; private final BlockingQueue chunks; @@ -28,12 +30,15 @@ public class InputStreamResponseTransformer extends InputStream implements Async private Throwable error; private AtomicBoolean complete; private AtomicInteger approximateBufferSize; + private AtomicInteger requests; + private volatile float approximateChunkSize; public InputStreamResponseTransformer() { this.future = new CompletableFuture<>(); this.chunks = new LinkedBlockingQueue<>(); this.complete = new AtomicBoolean(false); this.approximateBufferSize = new AtomicInteger(0); + this.requests = new AtomicInteger(0); } @Override @@ -65,24 +70,35 @@ public void exceptionOccurred(Throwable t) { public void onSubscribe(Subscription s) { subscription = s; if (response.contentLength() < TARGET_BUFFER_SIZE) { + requests.set(Integer.MAX_VALUE); subscription.request(Long.MAX_VALUE); } else { + requests.set(10); subscription.request(10L); } } @Override public void onNext(ByteBuffer byteBuffer) { - if(byteBuffer.hasRemaining()) { + int remaining = byteBuffer.remaining(); + if (remaining > 0) { chunks.offer(byteBuffer); + approximateChunkSize += CHUNK_SIZE_EXPONENTIAL_WEIGHT * (remaining - approximateChunkSize); } - int size = approximateBufferSize.addAndGet(byteBuffer.remaining()); + requests.decrementAndGet(); + int size = approximateBufferSize.addAndGet(remaining); maybeRequestMore(size); } private void maybeRequestMore(int currentSize) { if (currentSize < TARGET_BUFFER_SIZE) { - subscription.request(1L); + int newRequests = requests.get() + 10; + if (newRequests < CHUNKS_REQUEST_LIMIT) { + if (newRequests * approximateChunkSize + currentSize < TARGET_BUFFER_SIZE) { + requests.addAndGet(10); + subscription.request(10); + } + } } }