From c27542909078e62d340086f93fca01741ac246c7 Mon Sep 17 00:00:00 2001 From: Ryan Schmitt Date: Tue, 2 Oct 2018 13:37:10 -0700 Subject: [PATCH] Don't drop ReactiveDataConsumer window updates It is possible, in principle, for the ReactiveDataConsumer's update channel to be null when a flush occurs. If this happens, the window update increment will be dropped, permanently decreasing the size of the transfer window. With this change, window update increments will accumulate across flushes until the update channel is set. --- .../core5/reactive/ReactiveDataConsumer.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java index 8027be4b3e..50f31068b7 100644 --- a/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java +++ b/httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveDataConsumer.java @@ -61,6 +61,7 @@ final class ReactiveDataConsumer implements AsyncDataConsumer, Publisher buffers = new LinkedBlockingQueue<>(); private final AtomicBoolean flushInProgress = new AtomicBoolean(false); + private final AtomicInteger windowScalingIncrement = new AtomicInteger(0); private volatile boolean cancelled = false; private volatile boolean completed = false; private volatile Exception exception; @@ -125,21 +126,23 @@ private void flushToSubscriber() { s.onError(exception); return; } - int windowScalingIncrement = 0; ByteBuffer next; while (requests.get() > 0 && ((next = buffers.poll()) != null)) { final int bytesFreed = next.remaining(); remainingBufferSpace.addAndGet(bytesFreed); s.onNext(next); requests.decrementAndGet(); - windowScalingIncrement += bytesFreed; + windowScalingIncrement.addAndGet(bytesFreed); } - if (capacityChannel != null && windowScalingIncrement > 0) { - try { - capacityChannel.update(windowScalingIncrement); - } catch (final IOException ex) { - failed(ex); - return; + if (capacityChannel != null) { + final int increment = windowScalingIncrement.getAndSet(0); + if (increment > 0) { + try { + capacityChannel.update(increment); + } catch (final IOException ex) { + failed(ex); + return; + } } } if (completed && buffers.isEmpty()) {