Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve InputStreamResponseTransformer subscription feedback #19

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,7 +17,10 @@

public class InputStreamResponseTransformer extends InputStream implements AsyncResponseTransformer<GetObjectResponse, InputStream>, Subscriber<ByteBuffer> {
private static final ByteBuffer END_MARKER = ByteBuffer.allocate(0);
private static int TARGET_BUFFER_SIZE = 1 << 25;
private static final int TARGET_BUFFER_SIZE = 1 << 25;
private static final int CHUNKS_REQUEST_LIMIT = 1000;
private static final float CHUNK_SIZE_EXPONENTIAL_WEIGHT = 0.2f;
private static final float CHUNK_SIZE_INITIAL_ESTIMATE = 8192f;

private final CompletableFuture<InputStream> future;
private final BlockingQueue<ByteBuffer> chunks;
Expand All @@ -28,12 +31,16 @@ public class InputStreamResponseTransformer extends InputStream implements Async
private Throwable error;
private AtomicBoolean complete;
private AtomicInteger approximateBufferSize;
private AtomicInteger requests;
grddev marked this conversation as resolved.
Show resolved Hide resolved
private volatile float approximateChunkSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not initialised anywhere which is dangerous. Setting it to the length of the first chunk received should be a good starting point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nontrivial to initialize this from the first chunk (and also ensure that it is nonzero), so I think the better option is to initialize it to something. I picked 8192 completely arbitrarily, but with the continued adjustment, it shouldn't matter so much, but it should not be too small.


public InputStreamResponseTransformer() {
this.future = new CompletableFuture<>();
this.chunks = new LinkedBlockingQueue<>();
this.complete = new AtomicBoolean(false);
this.approximateBufferSize = new AtomicInteger(0);
this.requests = new AtomicInteger(0);
this.approximateChunkSize = CHUNK_SIZE_INITIAL_ESTIMATE;
}

@Override
Expand Down Expand Up @@ -65,24 +72,35 @@ public void exceptionOccurred(Throwable t) {
public void onSubscribe(Subscription s) {
subscription = s;
if (response.contentLength() < TARGET_BUFFER_SIZE) {
requests.set(Integer.MAX_VALUE);
subscription.request(Long.MAX_VALUE);
} else {
requests.set(10);
subscription.request(10L);
}
}

@Override
public void onNext(ByteBuffer byteBuffer) {
if(byteBuffer.hasRemaining()) {
int chunkSize = byteBuffer.remaining();
if (chunkSize > 0) {
chunks.offer(byteBuffer);
approximateChunkSize += CHUNK_SIZE_EXPONENTIAL_WEIGHT * (chunkSize - approximateChunkSize);
}
int size = approximateBufferSize.addAndGet(byteBuffer.remaining());
requests.decrementAndGet();
int size = approximateBufferSize.addAndGet(chunkSize);
maybeRequestMore(size);
}

private void maybeRequestMore(int currentSize) {
if (currentSize < TARGET_BUFFER_SIZE) {
subscription.request(10L);
int newRequests = requests.get() + 10;
if (newRequests < CHUNKS_REQUEST_LIMIT) {
if (newRequests * approximateChunkSize + currentSize < TARGET_BUFFER_SIZE) {
requests.addAndGet(10);
subscription.request(10);
}
}
}
}

Expand Down