diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartFormUpload.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartFormUpload.java index c784f8034367e..fb6031c076722 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartFormUpload.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/QuarkusMultipartFormUpload.java @@ -16,6 +16,7 @@ import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.impl.headers.HeadersAdaptor; +import io.vertx.core.streams.Pipe; import io.vertx.core.streams.ReadStream; import io.vertx.core.streams.impl.InboundBuffer; import java.io.File; @@ -231,4 +232,10 @@ public synchronized QuarkusMultipartFormUpload endHandler(Handler handler) return this; } + @Override + public Pipe pipe() { + pause(); + return new RequestTrackingPipe<>(this, 16); + } + } diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/RequestTrackingPipe.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/RequestTrackingPipe.java new file mode 100644 index 0000000000000..21de9d8b06da2 --- /dev/null +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/multipart/RequestTrackingPipe.java @@ -0,0 +1,171 @@ +package org.jboss.resteasy.reactive.client.impl.multipart; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.VertxException; +import io.vertx.core.streams.Pipe; +import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.WriteStream; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Based on {@link io.vertx.core.streams.impl.PipeImpl}. + * We can't use the aforementioned class as it is not open enough for us to add the read tracking we want. + */ +class RequestTrackingPipe implements Pipe { + + private final long maxInFlightReads; + + private final Promise result; + private final ReadStream src; + private boolean endOnSuccess = true; + private boolean endOnFailure = true; + private WriteStream dst; + + private boolean manuallyPaused = false; + + private final AtomicLong inFlightReads = new AtomicLong(0); + + public RequestTrackingPipe(ReadStream src, long maxInFlightReads) { + this.src = src; + this.maxInFlightReads = maxInFlightReads; + this.result = Promise.promise(); + + // Set handlers now + src.endHandler(result::tryComplete); + src.exceptionHandler(result::tryFail); + } + + @Override + public synchronized Pipe endOnFailure(boolean end) { + endOnFailure = end; + return this; + } + + @Override + public synchronized Pipe endOnSuccess(boolean end) { + endOnSuccess = end; + return this; + } + + @Override + public synchronized Pipe endOnComplete(boolean end) { + endOnSuccess = end; + endOnFailure = end; + return this; + } + + private void handleWriteResult(AsyncResult ack) { + long currentInFlightReads = inFlightReads.decrementAndGet(); + if (currentInFlightReads <= (maxInFlightReads / 2)) { + synchronized (RequestTrackingPipe.this) { + if (manuallyPaused) { + manuallyPaused = false; + src.resume(); + } + } + } + if (ack.failed()) { + result.tryFail(new WriteException(ack.cause())); + } + } + + @Override + public void to(WriteStream ws, Handler> completionHandler) { + if (ws == null) { + throw new NullPointerException(); + } + boolean endOnSuccess; + boolean endOnFailure; + synchronized (RequestTrackingPipe.this) { + if (dst != null) { + throw new IllegalStateException(); + } + dst = ws; + endOnSuccess = this.endOnSuccess; + endOnFailure = this.endOnFailure; + } + Handler drainHandler = v -> src.resume(); + src.handler(item -> { + ws.write(item, this::handleWriteResult); + long currentInFlightReads = inFlightReads.incrementAndGet(); + if (ws.writeQueueFull()) { + src.pause(); + ws.drainHandler(drainHandler); + } else if (currentInFlightReads > maxInFlightReads) { + synchronized (RequestTrackingPipe.this) { + src.pause(); + manuallyPaused = true; + } + } + }); + src.resume(); + result.future().onComplete(ar -> { + try { + src.handler(null); + } catch (Exception ignore) { + } + try { + src.exceptionHandler(null); + } catch (Exception ignore) { + } + try { + src.endHandler(null); + } catch (Exception ignore) { + } + if (ar.succeeded()) { + handleSuccess(completionHandler); + } else { + Throwable err = ar.cause(); + if (err instanceof WriteException) { + src.resume(); + err = err.getCause(); + } + handleFailure(err, completionHandler); + } + }); + } + + private void handleSuccess(Handler> completionHandler) { + if (endOnSuccess) { + dst.end(completionHandler); + } else { + completionHandler.handle(Future.succeededFuture()); + } + } + + private void handleFailure(Throwable cause, Handler> completionHandler) { + Future res = Future.failedFuture(cause); + if (endOnFailure) { + dst.end(ignore -> { + completionHandler.handle(res); + }); + } else { + completionHandler.handle(res); + } + } + + public void close() { + synchronized (this) { + src.exceptionHandler(null); + src.handler(null); + if (dst != null) { + dst.drainHandler(null); + dst.exceptionHandler(null); + } + } + VertxException err = new VertxException("Pipe closed", true); + if (result.tryFail(err)) { + src.resume(); + } + } + + private static class WriteException extends VertxException { + private WriteException(Throwable cause) { + super(cause, true); + } + } + +}