diff --git a/src/main/java/io/burt/athena/result/s3/InputStreamResponseTransformer.java b/src/main/java/io/burt/athena/result/s3/InputStreamResponseTransformer.java index d2c7103..eda318c 100644 --- a/src/main/java/io/burt/athena/result/s3/InputStreamResponseTransformer.java +++ b/src/main/java/io/burt/athena/result/s3/InputStreamResponseTransformer.java @@ -13,11 +13,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; public class InputStreamResponseTransformer extends InputStream implements AsyncResponseTransformer, Subscriber { private static final ByteBuffer END_MARKER = ByteBuffer.allocate(0); - private static int TARGET_BUFFER_SIZE = 1 << 25; private final CompletableFuture future; private final BlockingQueue chunks; @@ -27,13 +25,11 @@ public class InputStreamResponseTransformer extends InputStream implements Async private ByteBuffer readChunk; private Throwable error; private AtomicBoolean complete; - private AtomicInteger approximateBufferSize; public InputStreamResponseTransformer() { this.future = new CompletableFuture<>(); this.chunks = new LinkedBlockingQueue<>(); this.complete = new AtomicBoolean(false); - this.approximateBufferSize = new AtomicInteger(0); } @Override @@ -64,24 +60,14 @@ public void exceptionOccurred(Throwable t) { @Override public void onSubscribe(Subscription s) { subscription = s; - if (response.contentLength() < TARGET_BUFFER_SIZE) { - subscription.request(Long.MAX_VALUE); - } else { - subscription.request(10L); - } + subscription.request(30L); } @Override public void onNext(ByteBuffer byteBuffer) { if(byteBuffer.hasRemaining()) { chunks.offer(byteBuffer); - } - int size = approximateBufferSize.addAndGet(byteBuffer.remaining()); - maybeRequestMore(size); - } - - private void maybeRequestMore(int currentSize) { - if (currentSize < TARGET_BUFFER_SIZE) { + } else { subscription.request(1L); } } @@ -121,8 +107,7 @@ private boolean ensureChunk() throws IOException { if (readChunk == END_MARKER) { return false; } else { - int size = approximateBufferSize.addAndGet(-readChunk.remaining()); - maybeRequestMore(size); + subscription.request(1L); } } catch (InterruptedException e) { Thread.currentThread().interrupt();