From 0e28052f606a682191fe0ad2480f2f5f0312f41b Mon Sep 17 00:00:00 2001 From: Gustav Munkby Date: Wed, 22 Apr 2020 10:46:18 +0200 Subject: [PATCH] Simplify InputStreamResponseTransformer feedback The previous approach attempted to throttle the S3 download by measuring the bytes received, since we can only request chunk of unknown sizes from the subscription, rather than a specified number o f bytes. While the previous commit avoided the 10x buffering, the ramp-up in capacity becomes slow, and doing something better would require some additional bookkeeping and guesswork. Instead of trying to track the number of inflight bytes, this settles for just keeping 30 chunks in flight. When running both locally and in our production environment, the typical chunk size seem to align around 1500 bytes per chunk, which would mean that this effectively aims for a target buffer of about 45k in flight. This is much smaller than the previous 32MB, but given that we want to process a CSV-result line-by-line, it seems like 45k should typically be enough to cover at least a bunch of lines. It is also not clear how the chunk size is derived, and if that will stay consistent, and if it was bumped to 8k instead, the 30 in-flight requests would correspond to about 240k instead. --- .../s3/InputStreamResponseTransformer.java | 21 +++---------------- 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/src/main/java/io/burt/athena/result/s3/InputStreamResponseTransformer.java b/src/main/java/io/burt/athena/result/s3/InputStreamResponseTransformer.java index d2c7103..eda318c 100644 --- a/src/main/java/io/burt/athena/result/s3/InputStreamResponseTransformer.java +++ b/src/main/java/io/burt/athena/result/s3/InputStreamResponseTransformer.java @@ -13,11 +13,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; public class InputStreamResponseTransformer extends InputStream implements AsyncResponseTransformer, Subscriber { private static final ByteBuffer END_MARKER = ByteBuffer.allocate(0); - private static int TARGET_BUFFER_SIZE = 1 << 25; private final CompletableFuture future; private final BlockingQueue chunks; @@ -27,13 +25,11 @@ public class InputStreamResponseTransformer extends InputStream implements Async private ByteBuffer readChunk; private Throwable error; private AtomicBoolean complete; - private AtomicInteger approximateBufferSize; public InputStreamResponseTransformer() { this.future = new CompletableFuture<>(); this.chunks = new LinkedBlockingQueue<>(); this.complete = new AtomicBoolean(false); - this.approximateBufferSize = new AtomicInteger(0); } @Override @@ -64,24 +60,14 @@ public void exceptionOccurred(Throwable t) { @Override public void onSubscribe(Subscription s) { subscription = s; - if (response.contentLength() < TARGET_BUFFER_SIZE) { - subscription.request(Long.MAX_VALUE); - } else { - subscription.request(10L); - } + subscription.request(30L); } @Override public void onNext(ByteBuffer byteBuffer) { if(byteBuffer.hasRemaining()) { chunks.offer(byteBuffer); - } - int size = approximateBufferSize.addAndGet(byteBuffer.remaining()); - maybeRequestMore(size); - } - - private void maybeRequestMore(int currentSize) { - if (currentSize < TARGET_BUFFER_SIZE) { + } else { subscription.request(1L); } } @@ -121,8 +107,7 @@ private boolean ensureChunk() throws IOException { if (readChunk == END_MARKER) { return false; } else { - int size = approximateBufferSize.addAndGet(-readChunk.remaining()); - maybeRequestMore(size); + subscription.request(1L); } } catch (InterruptedException e) { Thread.currentThread().interrupt();