Skip to content

Commit

Permalink
Support sending Multi<io.vertx.mutiny.core.buffer.Buffer> in REST Client
Browse files Browse the repository at this point in the history
  • Loading branch information
geoand committed Mar 8, 2024
1 parent b497488 commit dad1a42
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@
public class JaxrsClientReactiveProcessor {

private static final String MULTI_BYTE_SIGNATURE = "L" + Multi.class.getName().replace('.', '/') + "<Ljava/lang/Byte;>;";
private static final String MULTI_BUFFER_SIGNATURE = "L" + Multi.class.getName().replace('.', '/')
+ "<Lio/vertx/mutiny/core/buffer/Buffer;>;";
private static final String FILE_SIGNATURE = "L" + File.class.getName().replace('.', '/') + ";";
private static final String PATH_SIGNATURE = "L" + java.nio.file.Path.class.getName().replace('.', '/') + ";";
private static final String BUFFER_SIGNATURE = "L" + Buffer.class.getName().replace('.', '/') + ";";
Expand Down Expand Up @@ -985,6 +987,13 @@ A more full example of generated client (with sub-resource) can is at the bottom
getGenericTypeFromArray(methodCreator, methodGenericParametersField, paramIdx),
getAnnotationsFromArray(methodCreator, methodParamAnnotationsField, paramIdx));
} else if (param.parameterType == ParameterType.BODY) {
if (param.declaredType.equals(Multi.class.getName())) {
if (!param.signature.equals(MULTI_BUFFER_SIGNATURE)) {
throw new IllegalArgumentException(
"When using Multi as body parameter only Multi<io.vertx.mutiny.core.buffer.Buffer> is supported");
}
}

// just store the index of parameter used to create the body, we'll use it later
bodyParameterIdx = paramIdx;
} else if (param.parameterType == ParameterType.HEADER) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.quarkus.rest.client.reactive;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.FileNotFoundException;
import java.net.URI;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import org.eclipse.microprofile.rest.client.RestClientBuilder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.test.QuarkusUnitTest;
import io.quarkus.test.common.http.TestHTTPResource;
import io.smallrye.mutiny.Multi;
import io.vertx.mutiny.core.buffer.Buffer;

public class SendMultiBufferTest {

@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest();

@TestHTTPResource
URI uri;

@Test
public void test() throws FileNotFoundException {
Multi<io.vertx.mutiny.core.buffer.Buffer> multi = Multi.createFrom().emitter(e -> {
for (int i = 0; i < 1000; i++) {
e.emit(Buffer.buffer(String.format("%03d", i)));
}
e.complete();
});
Client client = RestClientBuilder.newBuilder().baseUri(uri).build(Client.class);

long result = client.count(multi);

assertEquals(3000, result);
}

@Path("test")
public interface Client {

@POST
@Path("count")
long count(Multi<io.vertx.mutiny.core.buffer.Buffer> multi);
}

@Path("test")
public static class Resource {

@POST
@Path("count")
public long count(String input) {
return input.length();
}
}
}
4 changes: 4 additions & 0 deletions independent-projects/resteasy-reactive/client/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
<groupId>io.smallrye.common</groupId>
<artifactId>smallrye-common-vertx-context</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-core</artifactId>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.ReadStreamSubscriber;
import io.smallrye.stork.api.ServiceInstance;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
Expand Down Expand Up @@ -177,6 +178,20 @@ public void handle(AsyncResult<AsyncFile> openedAsyncFile) {
Vertx.currentContext().owner(), (InputStream) requestContext.getEntity().getEntity(),
httpClientRequest));
attachSentHandlers(sent, httpClientRequest, requestContext);
} else if (requestContext.isMultiBufferUpload()) {
MultivaluedMap<String, String> headerMap = requestContext.getRequestHeaders()
.asMap();
updateRequestHeadersFromConfig(requestContext, headerMap);
setVertxHeaders(httpClientRequest, headerMap);
Future<HttpClientResponse> sent = httpClientRequest.send(ReadStreamSubscriber.asReadStream(
(Multi<io.vertx.mutiny.core.buffer.Buffer>) requestContext.getEntity().getEntity(),
new Function<>() {
@Override
public Buffer apply(io.vertx.mutiny.core.buffer.Buffer buffer) {
return buffer.getDelegate();
}
}));
attachSentHandlers(sent, httpClientRequest, requestContext);
} else {
Future<HttpClientResponse> sent;
Buffer actualEntity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.jboss.resteasy.reactive.spi.ThreadSetupAction;

import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.smallrye.mutiny.Multi;
import io.smallrye.stork.api.ServiceInstance;
import io.vertx.core.Context;
import io.vertx.core.MultiMap;
Expand Down Expand Up @@ -498,6 +499,11 @@ public boolean isInputStreamUpload() {
return entity != null && entity.getEntity() instanceof InputStream;
}

public boolean isMultiBufferUpload() {
// we don't check the generic because Multi<Buffer> is checked at build time
return entity != null && entity.getEntity() instanceof Multi;
}

public boolean isMultipart() {
return entity != null && entity.getEntity() instanceof QuarkusMultipartForm;
}
Expand Down

0 comments on commit dad1a42

Please sign in to comment.