diff --git a/feign-reactor-core/pom.xml b/feign-reactor-core/pom.xml index a89a76f0..dc994a8a 100644 --- a/feign-reactor-core/pom.xml +++ b/feign-reactor-core/pom.xml @@ -26,6 +26,11 @@ 3.2.4-SNAPSHOT + + 1.5.30 + 1.5.2 + + feign-reactor-core jar Feign Reactive Core @@ -63,6 +68,23 @@ true + + org.jetbrains.kotlin + kotlin-stdlib-jdk8 + ${kotlin.version} + + + + org.jetbrains.kotlin + kotlin-reflect + + + + org.jetbrains.kotlinx + kotlinx-coroutines-reactor + ${kotlinx.coroutines.version} + + io.projectreactor @@ -185,6 +207,37 @@ + + kotlin-maven-plugin + org.jetbrains.kotlin + ${kotlin.version} + + + compile + + compile + + + + ${project.basedir}/src/main/kotlin + ${project.basedir}/src/main/java + + + + + test-compile + + test-compile + + + + ${project.basedir}/src/test/kotlin + ${project.basedir}/src/test/java + + + + + diff --git a/feign-reactor-core/src/main/java/reactivefeign/MethodKt.kt b/feign-reactor-core/src/main/java/reactivefeign/MethodKt.kt new file mode 100644 index 00000000..b9a8ae93 --- /dev/null +++ b/feign-reactor-core/src/main/java/reactivefeign/MethodKt.kt @@ -0,0 +1,14 @@ +@file:JvmName("MethodKt") + +package reactivefeign + +import java.lang.reflect.Method +import java.lang.reflect.Type +import kotlin.reflect.jvm.javaType +import kotlin.reflect.jvm.kotlinFunction + +internal fun Method.isSuspend(): Boolean = + kotlinFunction?.isSuspend ?: false + +internal val Method.kotlinMethodReturnType: Type? + get() = kotlinFunction?.returnType?.javaType diff --git a/feign-reactor-core/src/main/java/reactivefeign/MonoKt.kt b/feign-reactor-core/src/main/java/reactivefeign/MonoKt.kt new file mode 100644 index 00000000..2456e923 --- /dev/null +++ b/feign-reactor-core/src/main/java/reactivefeign/MonoKt.kt @@ -0,0 +1,18 @@ +@file:JvmName("MonoKt") + +package reactivefeign + +import kotlinx.coroutines.reactor.awaitSingle +import reactivefeign.client.ReactiveHttpResponse +import reactor.core.publisher.Mono + +internal suspend fun Mono<*>.awaitReactiveHttpResponse(): Any { + val result = awaitSingle() + if (result is ReactiveHttpResponse<*>) { + val body = result.body() + require(body is Mono<*>) { "Only Mono type is allowed for suspend method" } + return body.awaitSingle() + } + + return result +} diff --git a/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java b/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java index 60f0cd43..5a738c19 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java +++ b/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java @@ -48,7 +48,13 @@ public List parseAndValidateMetadata(final Class targetType) for (final MethodMetadata metadata : methodsMetadata) { final Type type = metadata.returnType(); - if (!isReactorType(type)) { + + boolean isSuspend = MethodKt.isSuspend(metadata.method()); + if (isSuspend) { + modifySuspendMethodMetadata(metadata); + } + + if (!isReactorType(type) && !isSuspend) { throw new IllegalArgumentException(String.format( "Method %s of contract %s doesn't returns reactor.core.publisher.Mono or reactor.core.publisher.Flux", metadata.configKey(), targetType.getSimpleName())); @@ -64,6 +70,23 @@ public List parseAndValidateMetadata(final Class targetType) return methodsMetadata; } + private static void modifySuspendMethodMetadata(MethodMetadata metadata) { + Type kotlinMethodReturnType = MethodKt.getKotlinMethodReturnType(metadata.method()); + if (kotlinMethodReturnType == null) { + throw new IllegalArgumentException(String.format( + "Method %s can't have continuation argument, only kotlin method is allowed", + metadata.configKey())); + } + metadata.returnType(kotlinMethodReturnType); + + int continuationIndex = metadata.method().getParameterCount() - 1; + metadata.ignoreParamater(continuationIndex); + + if(metadata.bodyIndex() != null && metadata.bodyIndex().equals(continuationIndex)) { + metadata.bodyIndex(null); + } + } + private static final Set REACTOR_PUBLISHERS = new HashSet<>(asList(Mono.class, Flux.class)); private boolean isReactorType(final Type type) { diff --git a/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java b/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java index 3771a53c..12b45ff2 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java +++ b/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java @@ -287,7 +287,9 @@ public static PublisherHttpClient retry( MethodMetadata methodMetadata, Retry retry) { Type returnPublisherType = returnPublisherType(methodMetadata); - if(returnPublisherType == Mono.class){ + if (MethodKt.isSuspend(methodMetadata.method())) { + return new MonoRetryPublisherHttpClient(publisherClient, methodMetadata, retry); + } else if (returnPublisherType == Mono.class) { return new MonoRetryPublisherHttpClient(publisherClient, methodMetadata, retry); } else if(returnPublisherType == Flux.class) { return new FluxRetryPublisherHttpClient(publisherClient, methodMetadata, retry); @@ -297,7 +299,9 @@ public static PublisherHttpClient retry( } protected PublisherHttpClient toPublisher(ReactiveHttpClient reactiveHttpClient, MethodMetadata methodMetadata){ - if(isResponsePublisher(methodMetadata.returnType())){ + if (MethodKt.isSuspend(methodMetadata.method())) { + return new ResponsePublisherHttpClient(reactiveHttpClient); + } else if (isResponsePublisher(methodMetadata.returnType())) { return new ResponsePublisherHttpClient(reactiveHttpClient); } diff --git a/feign-reactor-core/src/main/java/reactivefeign/ReactiveInvocationHandler.java b/feign-reactor-core/src/main/java/reactivefeign/ReactiveInvocationHandler.java index 45b0e564..f8361300 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/ReactiveInvocationHandler.java +++ b/feign-reactor-core/src/main/java/reactivefeign/ReactiveInvocationHandler.java @@ -16,10 +16,13 @@ import feign.InvocationHandlerFactory; import feign.InvocationHandlerFactory.MethodHandler; import feign.Target; +import kotlin.coroutines.Continuation; +import reactor.core.publisher.Mono; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; +import java.util.Arrays; import java.util.Map; import static feign.Util.checkNotNull; @@ -61,6 +64,13 @@ private void defineObjectMethodsHandlers() { @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { + if (MethodKt.isSuspend(method)) { + Object[] newArgs = Arrays.copyOfRange(args, 0, args.length - 1); + Mono result = (Mono) dispatch.get(method).invoke(newArgs); + Continuation continuation = (Continuation) args[args.length - 1]; + return MonoKt.awaitReactiveHttpResponse(result, continuation); + } + return dispatch.get(method).invoke(args); } diff --git a/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java index f7996dd8..dc43a6a0 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java +++ b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java @@ -2,6 +2,7 @@ import feign.MethodMetadata; import feign.Target; +import reactivefeign.MethodKt; import reactivefeign.publisher.PublisherClientFactory; import reactivefeign.publisher.ResponsePublisherHttpClient; import reactor.core.publisher.Flux; @@ -35,7 +36,9 @@ public MethodHandler create(MethodMetadata metadata) { MethodHandler methodHandler = new PublisherClientMethodHandler( target, metadata, publisherClientFactory.create(metadata)); - if(isResponsePublisher(metadata.returnType())){ + if (MethodKt.isSuspend(metadata.method())) { + return new MonoMethodHandler(methodHandler); + } else if (isResponsePublisher(metadata.returnType())) { return new MonoMethodHandler(methodHandler); } @@ -53,7 +56,9 @@ public MethodHandler create(MethodMetadata metadata) { public MethodHandler createDefault(Method method) { MethodHandler defaultMethodHandler = new DefaultMethodHandler(method); - if(method.getReturnType() == Mono.class){ + if (MethodKt.isSuspend(method)) { + return new MonoMethodHandler(defaultMethodHandler); + } else if (method.getReturnType() == Mono.class) { return new MonoMethodHandler(defaultMethodHandler); } else if(method.getReturnType() == Flux.class) { return new FluxMethodHandler(defaultMethodHandler); diff --git a/feign-reactor-core/src/main/java/reactivefeign/utils/FeignUtils.java b/feign-reactor-core/src/main/java/reactivefeign/utils/FeignUtils.java index 44018cd0..134df0e6 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/utils/FeignUtils.java +++ b/feign-reactor-core/src/main/java/reactivefeign/utils/FeignUtils.java @@ -64,6 +64,10 @@ public static Type returnActualType(MethodMetadata methodMetadata) { } public static Type returnActualType(Type returnType) { + if (!(returnType instanceof ParameterizedType)) { + return returnType; + } + Class publisher = (Class)((ParameterizedType) returnType).getRawType(); Type typeInPublisher = resolveLastTypeParameter(returnType, publisher); if(isResponsePublisher(publisher, typeInPublisher)){ diff --git a/feign-reactor-core/src/test/java/reactivefeign/SuspendTest.kt b/feign-reactor-core/src/test/java/reactivefeign/SuspendTest.kt new file mode 100644 index 00000000..400a5f87 --- /dev/null +++ b/feign-reactor-core/src/test/java/reactivefeign/SuspendTest.kt @@ -0,0 +1,66 @@ +package reactivefeign + +import com.fasterxml.jackson.core.JsonProcessingException +import kotlinx.coroutines.runBlocking +import org.assertj.core.api.Assertions.assertThat +import org.junit.AfterClass +import org.junit.BeforeClass +import org.junit.Test +import reactivefeign.resttemplate.client.RestTemplateFakeReactiveFeign +import reactivefeign.testcase.SuspendIceCreamServiceApi +import reactivefeign.testcase.domain.OrderGenerator +import reactor.core.publisher.Mono +import reactor.netty.DisposableServer +import reactor.netty.http.HttpProtocol +import reactor.netty.http.server.HttpServer +import reactor.netty.http.server.HttpServerRequest +import reactor.netty.http.server.HttpServerResponse +import reactor.netty.http.server.HttpServerRoutes +import java.time.Duration + +class SuspendTest { + companion object { + private lateinit var server: DisposableServer + private const val DELAY_IN_MILLIS: Long = 500L + private val cannedValue = OrderGenerator().generate(1) + + @BeforeClass + @JvmStatic + @Throws(JsonProcessingException::class) + fun startServer() { + val data = TestUtils.MAPPER.writeValueAsString(cannedValue).toByteArray() + server = HttpServer.create() + .protocol(HttpProtocol.HTTP11, HttpProtocol.H2C) + .route { routes: HttpServerRoutes -> + routes[ + "/icecream/orders/1", { req: HttpServerRequest?, res: HttpServerResponse -> + res.header("Content-Type", "application/json") + Mono.delay(Duration.ofMillis(DELAY_IN_MILLIS)) + .thenEmpty(res.sendByteArray(Mono.just(data))) + } + ] + } + .bindNow() + } + + @JvmStatic + @AfterClass + fun stopServer() { + server.disposeNow() + } + } + + @Test + fun shouldRun(): Unit = runBlocking { + val client = RestTemplateFakeReactiveFeign + .builder() + .target( + SuspendIceCreamServiceApi::class.java, + "http://localhost:" + server.port() + ) + + val firstOrder = client.findOrder(orderId = 1) + + assertThat(firstOrder).usingRecursiveComparison().isEqualTo(cannedValue) + } +} diff --git a/feign-reactor-core/src/test/java/reactivefeign/resttemplate/client/RestTemplateFakeReactiveHttpClient.java b/feign-reactor-core/src/test/java/reactivefeign/resttemplate/client/RestTemplateFakeReactiveHttpClient.java index 56e68913..41d079ec 100644 --- a/feign-reactor-core/src/test/java/reactivefeign/resttemplate/client/RestTemplateFakeReactiveHttpClient.java +++ b/feign-reactor-core/src/test/java/reactivefeign/resttemplate/client/RestTemplateFakeReactiveHttpClient.java @@ -24,6 +24,7 @@ import org.springframework.web.client.HttpStatusCodeException; import org.springframework.web.client.ResourceAccessException; import org.springframework.web.client.RestTemplate; +import reactivefeign.MethodKt; import reactivefeign.client.*; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -51,7 +52,11 @@ public class RestTemplateFakeReactiveHttpClient implements ReactiveHttpClient { this.restTemplate = restTemplate; this.acceptGzip = acceptGzip; - returnPublisherType = returnPublisherType(methodMetadata); + if (MethodKt.isSuspend(methodMetadata.method())) { + returnPublisherType = Mono.class; + } else { + returnPublisherType = returnPublisherType(methodMetadata); + } returnActualType = forType(returnActualType(methodMetadata)); } diff --git a/feign-reactor-core/src/test/java/reactivefeign/testcase/SuspendIceCreamServiceApi.kt b/feign-reactor-core/src/test/java/reactivefeign/testcase/SuspendIceCreamServiceApi.kt new file mode 100644 index 00000000..6306f7de --- /dev/null +++ b/feign-reactor-core/src/test/java/reactivefeign/testcase/SuspendIceCreamServiceApi.kt @@ -0,0 +1,10 @@ +package reactivefeign.testcase + +import feign.Param +import feign.RequestLine +import reactivefeign.testcase.domain.IceCreamOrder + +interface SuspendIceCreamServiceApi { + @RequestLine("GET /icecream/orders/{orderId}") + suspend fun findOrder(@Param("orderId") orderId: Int): IceCreamOrder +}