Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support kotlin coroutines #486

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
53 changes: 53 additions & 0 deletions feign-reactor-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
<version>3.2.4-SNAPSHOT</version>
</parent>

<properties>
<kotlin.version>1.5.30</kotlin.version>
<kotlinx.coroutines.version>1.5.2</kotlinx.coroutines.version>
</properties>

<artifactId>feign-reactor-core</artifactId>
<packaging>jar</packaging>
<name>Feign Reactive Core</name>
Expand Down Expand Up @@ -63,6 +68,23 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
<version>${kotlin.version}</version>
</dependency>

<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-reflect</artifactId>
</dependency>

<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-coroutines-reactor</artifactId>
<version>${kotlinx.coroutines.version}</version>
</dependency>

<!-- Tests -->
<dependency>
<groupId>io.projectreactor</groupId>
Expand Down Expand Up @@ -185,6 +207,37 @@
</execution>
</executions>
</plugin>
<plugin>
<artifactId>kotlin-maven-plugin</artifactId>
<groupId>org.jetbrains.kotlin</groupId>
<version>${kotlin.version}</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>${project.basedir}/src/main/kotlin</sourceDir>
<sourceDir>${project.basedir}/src/main/java</sourceDir>
</sourceDirs>
</configuration>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>test-compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>${project.basedir}/src/test/kotlin</sourceDir>
<sourceDir>${project.basedir}/src/test/java</sourceDir>
</sourceDirs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
14 changes: 14 additions & 0 deletions feign-reactor-core/src/main/java/reactivefeign/MethodKt.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
@file:JvmName("MethodKt")

package reactivefeign

import java.lang.reflect.Method
import java.lang.reflect.Type
import kotlin.reflect.jvm.javaType
import kotlin.reflect.jvm.kotlinFunction

internal fun Method.isSuspend(): Boolean =
kotlinFunction?.isSuspend ?: false

internal val Method.kotlinMethodReturnType: Type?
get() = kotlinFunction?.returnType?.javaType
18 changes: 18 additions & 0 deletions feign-reactor-core/src/main/java/reactivefeign/MonoKt.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
@file:JvmName("MonoKt")

package reactivefeign

import kotlinx.coroutines.reactor.awaitSingle
import reactivefeign.client.ReactiveHttpResponse
import reactor.core.publisher.Mono

internal suspend fun Mono<*>.awaitReactiveHttpResponse(): Any {
val result = awaitSingle()
if (result is ReactiveHttpResponse<*>) {
val body = result.body()
require(body is Mono<*>) { "Only Mono type is allowed for suspend method" }
return body.awaitSingle()
}

return result
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@ public List<MethodMetadata> parseAndValidateMetadata(final Class<?> targetType)

for (final MethodMetadata metadata : methodsMetadata) {
final Type type = metadata.returnType();
if (!isReactorType(type)) {

boolean isSuspend = MethodKt.isSuspend(metadata.method());
if (isSuspend) {
modifySuspendMethodMetadata(metadata);
}

if (!isReactorType(type) && !isSuspend) {
throw new IllegalArgumentException(String.format(
"Method %s of contract %s doesn't returns reactor.core.publisher.Mono or reactor.core.publisher.Flux",
metadata.configKey(), targetType.getSimpleName()));
Expand All @@ -64,6 +70,23 @@ public List<MethodMetadata> parseAndValidateMetadata(final Class<?> targetType)
return methodsMetadata;
}

private static void modifySuspendMethodMetadata(MethodMetadata metadata) {
Type kotlinMethodReturnType = MethodKt.getKotlinMethodReturnType(metadata.method());
if (kotlinMethodReturnType == null) {
throw new IllegalArgumentException(String.format(
"Method %s can't have continuation argument, only kotlin method is allowed",
metadata.configKey()));
}
metadata.returnType(kotlinMethodReturnType);

int continuationIndex = metadata.method().getParameterCount() - 1;
metadata.ignoreParamater(continuationIndex);

if(metadata.bodyIndex() != null && metadata.bodyIndex().equals(continuationIndex)) {
metadata.bodyIndex(null);
}
}

private static final Set<Class> REACTOR_PUBLISHERS = new HashSet<>(asList(Mono.class, Flux.class));

private boolean isReactorType(final Type type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,9 @@ public static PublisherHttpClient retry(
MethodMetadata methodMetadata,
Retry retry) {
Type returnPublisherType = returnPublisherType(methodMetadata);
if(returnPublisherType == Mono.class){
if (MethodKt.isSuspend(methodMetadata.method())) {
return new MonoRetryPublisherHttpClient(publisherClient, methodMetadata, retry);
} else if (returnPublisherType == Mono.class) {
return new MonoRetryPublisherHttpClient(publisherClient, methodMetadata, retry);
} else if(returnPublisherType == Flux.class) {
return new FluxRetryPublisherHttpClient(publisherClient, methodMetadata, retry);
Expand All @@ -297,7 +299,9 @@ public static PublisherHttpClient retry(
}

protected PublisherHttpClient toPublisher(ReactiveHttpClient reactiveHttpClient, MethodMetadata methodMetadata){
if(isResponsePublisher(methodMetadata.returnType())){
if (MethodKt.isSuspend(methodMetadata.method())) {
return new ResponsePublisherHttpClient(reactiveHttpClient);
} else if (isResponsePublisher(methodMetadata.returnType())) {
return new ResponsePublisherHttpClient(reactiveHttpClient);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
import feign.InvocationHandlerFactory;
import feign.InvocationHandlerFactory.MethodHandler;
import feign.Target;
import kotlin.coroutines.Continuation;
import reactor.core.publisher.Mono;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.Map;

import static feign.Util.checkNotNull;
Expand Down Expand Up @@ -61,6 +64,13 @@ private void defineObjectMethodsHandlers() {

@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
if (MethodKt.isSuspend(method)) {
Object[] newArgs = Arrays.copyOfRange(args, 0, args.length - 1);
Mono<?> result = (Mono<?>) dispatch.get(method).invoke(newArgs);
Continuation<Object> continuation = (Continuation<Object>) args[args.length - 1];
return MonoKt.awaitReactiveHttpResponse(result, continuation);
}

return dispatch.get(method).invoke(args);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import feign.MethodMetadata;
import feign.Target;
import reactivefeign.MethodKt;
import reactivefeign.publisher.PublisherClientFactory;
import reactivefeign.publisher.ResponsePublisherHttpClient;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -35,7 +36,9 @@ public MethodHandler create(MethodMetadata metadata) {
MethodHandler methodHandler = new PublisherClientMethodHandler(
target, metadata, publisherClientFactory.create(metadata));

if(isResponsePublisher(metadata.returnType())){
if (MethodKt.isSuspend(metadata.method())) {
return new MonoMethodHandler(methodHandler);
} else if (isResponsePublisher(metadata.returnType())) {
return new MonoMethodHandler(methodHandler);
}

Expand All @@ -53,7 +56,9 @@ public MethodHandler create(MethodMetadata metadata) {
public MethodHandler createDefault(Method method) {
MethodHandler defaultMethodHandler = new DefaultMethodHandler(method);

if(method.getReturnType() == Mono.class){
if (MethodKt.isSuspend(method)) {
return new MonoMethodHandler(defaultMethodHandler);
} else if (method.getReturnType() == Mono.class) {
return new MonoMethodHandler(defaultMethodHandler);
} else if(method.getReturnType() == Flux.class) {
return new FluxMethodHandler(defaultMethodHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public static Type returnActualType(MethodMetadata methodMetadata) {
}

public static Type returnActualType(Type returnType) {
if (!(returnType instanceof ParameterizedType)) {
return returnType;
}

Class<?> publisher = (Class)((ParameterizedType) returnType).getRawType();
Type typeInPublisher = resolveLastTypeParameter(returnType, publisher);
if(isResponsePublisher(publisher, typeInPublisher)){
Expand Down
66 changes: 66 additions & 0 deletions feign-reactor-core/src/test/java/reactivefeign/SuspendTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package reactivefeign

import com.fasterxml.jackson.core.JsonProcessingException
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.AfterClass
import org.junit.BeforeClass
import org.junit.Test
import reactivefeign.resttemplate.client.RestTemplateFakeReactiveFeign
import reactivefeign.testcase.SuspendIceCreamServiceApi
import reactivefeign.testcase.domain.OrderGenerator
import reactor.core.publisher.Mono
import reactor.netty.DisposableServer
import reactor.netty.http.HttpProtocol
import reactor.netty.http.server.HttpServer
import reactor.netty.http.server.HttpServerRequest
import reactor.netty.http.server.HttpServerResponse
import reactor.netty.http.server.HttpServerRoutes
import java.time.Duration

class SuspendTest {
companion object {
private lateinit var server: DisposableServer
private const val DELAY_IN_MILLIS: Long = 500L
private val cannedValue = OrderGenerator().generate(1)

@BeforeClass
@JvmStatic
@Throws(JsonProcessingException::class)
fun startServer() {
val data = TestUtils.MAPPER.writeValueAsString(cannedValue).toByteArray()
server = HttpServer.create()
.protocol(HttpProtocol.HTTP11, HttpProtocol.H2C)
.route { routes: HttpServerRoutes ->
routes[
"/icecream/orders/1", { req: HttpServerRequest?, res: HttpServerResponse ->
res.header("Content-Type", "application/json")
Mono.delay(Duration.ofMillis(DELAY_IN_MILLIS))
.thenEmpty(res.sendByteArray(Mono.just(data)))
}
]
}
.bindNow()
}

@JvmStatic
@AfterClass
fun stopServer() {
server.disposeNow()
}
}

@Test
fun shouldRun(): Unit = runBlocking {
val client = RestTemplateFakeReactiveFeign
.builder<SuspendIceCreamServiceApi>()
.target(
SuspendIceCreamServiceApi::class.java,
"http://localhost:" + server.port()
)

val firstOrder = client.findOrder(orderId = 1)

assertThat(firstOrder).usingRecursiveComparison().isEqualTo(cannedValue)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.web.client.HttpStatusCodeException;
import org.springframework.web.client.ResourceAccessException;
import org.springframework.web.client.RestTemplate;
import reactivefeign.MethodKt;
import reactivefeign.client.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -51,7 +52,11 @@ public class RestTemplateFakeReactiveHttpClient implements ReactiveHttpClient {
this.restTemplate = restTemplate;
this.acceptGzip = acceptGzip;

returnPublisherType = returnPublisherType(methodMetadata);
if (MethodKt.isSuspend(methodMetadata.method())) {
returnPublisherType = Mono.class;
} else {
returnPublisherType = returnPublisherType(methodMetadata);
}
returnActualType = forType(returnActualType(methodMetadata));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package reactivefeign.testcase

import feign.Param
import feign.RequestLine
import reactivefeign.testcase.domain.IceCreamOrder

interface SuspendIceCreamServiceApi {
@RequestLine("GET /icecream/orders/{orderId}")
suspend fun findOrder(@Param("orderId") orderId: Int): IceCreamOrder
}