Skip to content

Commit

Permalink
Guess in-flight chunk sizes in response transformer
Browse files Browse the repository at this point in the history
It seems the simplistic approach becomes fairly slow when scaled down, and this once again amplifies the feedback, but with some limits in place.

The main problem in estimating the impact of the additional requests is that we don't know how large the corresponding chunks will be. This uses a simple exponentially-weighted average over chunk sizes received in the past and assumes that the in-flight chunks are equally large. If the chunk sizes stay consistent, or don't change too dramatically in sizes, this should work well.

In the event that the chunk sizes grow significantly we might still overshoot by a lot, so as an additional precaution we don't allow having more than 1000 chunks of unknown sizes in flight. If the chunk sizes changes rapidly and grow beyond 32k we could still be in trouble, but that seems very unlikely to happen, and we have to make some assumptions or else we can't request any data at all using the requests abstraction.
  • Loading branch information
grddev committed Apr 28, 2020
1 parent 87b22bf commit b9200ae
Showing 1 changed file with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
public class InputStreamResponseTransformer extends InputStream implements AsyncResponseTransformer<GetObjectResponse, InputStream>, Subscriber<ByteBuffer> {
private static final ByteBuffer END_MARKER = ByteBuffer.allocate(0);
private static int TARGET_BUFFER_SIZE = 1 << 25;
private static int CHUNKS_REQUEST_LIMIT = 1000;
private static float CHUNK_SIZE_EXPONENTIAL_WEIGHT = 0.2f;

private final CompletableFuture<InputStream> future;
private final BlockingQueue<ByteBuffer> chunks;
Expand All @@ -28,12 +30,15 @@ public class InputStreamResponseTransformer extends InputStream implements Async
private Throwable error;
private AtomicBoolean complete;
private AtomicInteger approximateBufferSize;
private AtomicInteger requests;
private volatile float approximateChunkSize;

public InputStreamResponseTransformer() {
this.future = new CompletableFuture<>();
this.chunks = new LinkedBlockingQueue<>();
this.complete = new AtomicBoolean(false);
this.approximateBufferSize = new AtomicInteger(0);
this.requests = new AtomicInteger(0);
}

@Override
Expand Down Expand Up @@ -65,24 +70,35 @@ public void exceptionOccurred(Throwable t) {
public void onSubscribe(Subscription s) {
subscription = s;
if (response.contentLength() < TARGET_BUFFER_SIZE) {
requests.set(Integer.MAX_VALUE);
subscription.request(Long.MAX_VALUE);
} else {
requests.set(10);
subscription.request(10L);
}
}

@Override
public void onNext(ByteBuffer byteBuffer) {
if(byteBuffer.hasRemaining()) {
int remaining = byteBuffer.remaining();
if (remaining > 0) {
chunks.offer(byteBuffer);
approximateChunkSize += CHUNK_SIZE_EXPONENTIAL_WEIGHT * (remaining - approximateChunkSize);
}
int size = approximateBufferSize.addAndGet(byteBuffer.remaining());
requests.decrementAndGet();
int size = approximateBufferSize.addAndGet(remaining);
maybeRequestMore(size);
}

private void maybeRequestMore(int currentSize) {
if (currentSize < TARGET_BUFFER_SIZE) {
subscription.request(1L);
int newRequests = requests.get() + 10;
if (newRequests < CHUNKS_REQUEST_LIMIT) {
if (newRequests * approximateChunkSize + currentSize < TARGET_BUFFER_SIZE) {
requests.addAndGet(10);
subscription.request(10);
}
}
}
}

Expand Down

0 comments on commit b9200ae

Please sign in to comment.