Skip to content

Commit

Permalink
Simplify InputStreamResponseTransformer feedback
Browse files Browse the repository at this point in the history
The previous approach attempted to throttle the S3 download by measuring the bytes received, since we can only request chunk of unknown sizes from the subscription, rather than a specified number o f bytes. While the previous commit avoided the 10x buffering, the ramp-up in capacity becomes slow, and doing something better would require some additional bookkeeping and guesswork.

Instead of trying to track the number of inflight bytes, this settles for just keeping 30 chunks in flight. When running both locally and in our production environment, the typical chunk size seem to align around 1500 bytes per chunk, which would mean that this effectively aims for a target buffer of about 45k in flight. This is much smaller than the previous 32MB, but given that we want to process a CSV-result line-by-line, it seems like 45k should typically be enough to cover at least a bunch of lines. It is also not clear how the chunk size is derived, and if that will stay consistent, and if it was bumped to 8k instead, the 30 in-flight requests would correspond to about 240k instead.
  • Loading branch information
grddev committed Apr 22, 2020
1 parent 27327c1 commit 0e28052
Showing 1 changed file with 3 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class InputStreamResponseTransformer extends InputStream implements AsyncResponseTransformer<GetObjectResponse, InputStream>, Subscriber<ByteBuffer> {
private static final ByteBuffer END_MARKER = ByteBuffer.allocate(0);
private static int TARGET_BUFFER_SIZE = 1 << 25;

private final CompletableFuture<InputStream> future;
private final BlockingQueue<ByteBuffer> chunks;
Expand All @@ -27,13 +25,11 @@ public class InputStreamResponseTransformer extends InputStream implements Async
private ByteBuffer readChunk;
private Throwable error;
private AtomicBoolean complete;
private AtomicInteger approximateBufferSize;

public InputStreamResponseTransformer() {
this.future = new CompletableFuture<>();
this.chunks = new LinkedBlockingQueue<>();
this.complete = new AtomicBoolean(false);
this.approximateBufferSize = new AtomicInteger(0);
}

@Override
Expand Down Expand Up @@ -64,24 +60,14 @@ public void exceptionOccurred(Throwable t) {
@Override
public void onSubscribe(Subscription s) {
subscription = s;
if (response.contentLength() < TARGET_BUFFER_SIZE) {
subscription.request(Long.MAX_VALUE);
} else {
subscription.request(10L);
}
subscription.request(30L);
}

@Override
public void onNext(ByteBuffer byteBuffer) {
if(byteBuffer.hasRemaining()) {
chunks.offer(byteBuffer);
}
int size = approximateBufferSize.addAndGet(byteBuffer.remaining());
maybeRequestMore(size);
}

private void maybeRequestMore(int currentSize) {
if (currentSize < TARGET_BUFFER_SIZE) {
} else {
subscription.request(1L);
}
}
Expand Down Expand Up @@ -121,8 +107,7 @@ private boolean ensureChunk() throws IOException {
if (readChunk == END_MARKER) {
return false;
} else {
int size = approximateBufferSize.addAndGet(-readChunk.remaining());
maybeRequestMore(size);
subscription.request(1L);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down

0 comments on commit 0e28052

Please sign in to comment.