diff --git a/extensions/resteasy-reactive/rest-client-jaxrs/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java b/extensions/resteasy-reactive/rest-client-jaxrs/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java index 3efc42bcacf01..6ff286f6416f1 100644 --- a/extensions/resteasy-reactive/rest-client-jaxrs/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java +++ b/extensions/resteasy-reactive/rest-client-jaxrs/deployment/src/main/java/io/quarkus/jaxrs/client/reactive/deployment/JaxrsClientReactiveProcessor.java @@ -184,6 +184,8 @@ public class JaxrsClientReactiveProcessor { private static final String MULTI_BYTE_SIGNATURE = "L" + Multi.class.getName().replace('.', '/') + ";"; + private static final String MULTI_BUFFER_SIGNATURE = "L" + Multi.class.getName().replace('.', '/') + + ";"; private static final String FILE_SIGNATURE = "L" + File.class.getName().replace('.', '/') + ";"; private static final String PATH_SIGNATURE = "L" + java.nio.file.Path.class.getName().replace('.', '/') + ";"; private static final String BUFFER_SIGNATURE = "L" + Buffer.class.getName().replace('.', '/') + ";"; @@ -972,6 +974,13 @@ A more full example of generated client (with sub-resource) can is at the bottom getGenericTypeFromArray(methodCreator, methodGenericParametersField, paramIdx), getAnnotationsFromArray(methodCreator, methodParamAnnotationsField, paramIdx)); } else if (param.parameterType == ParameterType.BODY) { + if (param.declaredType.equals(Multi.class.getName())) { + if (!param.signature.equals(MULTI_BUFFER_SIGNATURE)) { + throw new IllegalArgumentException( + "When using Multi as body parameter only Multi is supported"); + } + } + // just store the index of parameter used to create the body, we'll use it later bodyParameterIdx = paramIdx; } else if (param.parameterType == ParameterType.HEADER) { diff --git a/extensions/resteasy-reactive/rest-client/deployment/src/test/java/io/quarkus/rest/client/reactive/SendMultiBufferTest.java b/extensions/resteasy-reactive/rest-client/deployment/src/test/java/io/quarkus/rest/client/reactive/SendMultiBufferTest.java new file mode 100644 index 0000000000000..277792714815f --- /dev/null +++ b/extensions/resteasy-reactive/rest-client/deployment/src/test/java/io/quarkus/rest/client/reactive/SendMultiBufferTest.java @@ -0,0 +1,60 @@ +package io.quarkus.rest.client.reactive; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.FileNotFoundException; +import java.net.URI; + +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; + +import org.eclipse.microprofile.rest.client.RestClientBuilder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.smallrye.mutiny.Multi; +import io.vertx.mutiny.core.buffer.Buffer; + +public class SendMultiBufferTest { + + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest(); + + @TestHTTPResource + URI uri; + + @Test + public void test() throws FileNotFoundException { + Multi multi = Multi.createFrom().emitter(e -> { + for (int i = 0; i < 1000; i++) { + e.emit(Buffer.buffer(String.format("%03d", i))); + } + e.complete(); + }); + Client client = RestClientBuilder.newBuilder().baseUri(uri).build(Client.class); + + long result = client.count(multi); + + assertEquals(3000, result); + } + + @Path("test") + public interface Client { + + @POST + @Path("count") + long count(Multi multi); + } + + @Path("test") + public static class Resource { + + @POST + @Path("count") + public long count(String input) { + return input.length(); + } + } +} diff --git a/independent-projects/resteasy-reactive/client/runtime/pom.xml b/independent-projects/resteasy-reactive/client/runtime/pom.xml index efef7d34efad3..835662ecf20a7 100644 --- a/independent-projects/resteasy-reactive/client/runtime/pom.xml +++ b/independent-projects/resteasy-reactive/client/runtime/pom.xml @@ -48,6 +48,10 @@ io.smallrye.common smallrye-common-vertx-context + + io.smallrye.reactive + smallrye-mutiny-vertx-core + org.junit.jupiter diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java index bb22f0624c655..93a15882c887f 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/handlers/ClientSendRequestHandler.java @@ -46,6 +46,7 @@ import io.netty.handler.codec.http.multipart.InterfaceHttpData; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.vertx.ReadStreamSubscriber; import io.smallrye.stork.api.ServiceInstance; import io.vertx.core.AsyncResult; import io.vertx.core.Future; @@ -178,6 +179,20 @@ public void handle(AsyncResult openedAsyncFile) { Vertx.currentContext().owner(), (InputStream) requestContext.getEntity().getEntity(), httpClientRequest)); attachSentHandlers(sent, httpClientRequest, requestContext); + } else if (requestContext.isMultiBufferUpload()) { + MultivaluedMap headerMap = requestContext.getRequestHeaders() + .asMap(); + updateRequestHeadersFromConfig(requestContext, headerMap); + setVertxHeaders(httpClientRequest, headerMap); + Future sent = httpClientRequest.send(ReadStreamSubscriber.asReadStream( + (Multi) requestContext.getEntity().getEntity(), + new Function<>() { + @Override + public Buffer apply(io.vertx.mutiny.core.buffer.Buffer buffer) { + return buffer.getDelegate(); + } + })); + attachSentHandlers(sent, httpClientRequest, requestContext); } else { Future sent; Buffer actualEntity; diff --git a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java index cf488776b086d..134cf0d594313 100644 --- a/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java +++ b/independent-projects/resteasy-reactive/client/runtime/src/main/java/org/jboss/resteasy/reactive/client/impl/RestClientRequestContext.java @@ -42,6 +42,7 @@ import org.jboss.resteasy.reactive.spi.ThreadSetupAction; import io.netty.handler.codec.http.multipart.InterfaceHttpData; +import io.smallrye.mutiny.Multi; import io.smallrye.stork.api.ServiceInstance; import io.vertx.core.Context; import io.vertx.core.MultiMap; @@ -498,6 +499,11 @@ public boolean isInputStreamUpload() { return entity != null && entity.getEntity() instanceof InputStream; } + public boolean isMultiBufferUpload() { + // we don't check the generic because Multi is checked at build time + return entity != null && entity.getEntity() instanceof Multi; + } + public boolean isMultipart() { return entity != null && entity.getEntity() instanceof QuarkusMultipartForm; }