Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Context passing between coroutines and Reactor Mono/Flux #1138

Merged
merged 2 commits into from Jul 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -27,6 +27,23 @@ public final class kotlinx/coroutines/reactive/PublishKt {
public static final fun publishInternal (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lorg/reactivestreams/Publisher;
}

public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coroutines/AbstractCoroutine, kotlinx/coroutines/channels/ProducerScope, kotlinx/coroutines/selects/SelectClause2, org/reactivestreams/Subscription {
public fun <init> (Lkotlin/coroutines/CoroutineContext;Lorg/reactivestreams/Subscriber;)V
public fun cancel ()V
public fun close (Ljava/lang/Throwable;)Z
public fun getChannel ()Lkotlinx/coroutines/channels/SendChannel;
public fun getOnSend ()Lkotlinx/coroutines/selects/SelectClause2;
public fun invokeOnClose (Lkotlin/jvm/functions/Function1;)Ljava/lang/Void;
public synthetic fun invokeOnClose (Lkotlin/jvm/functions/Function1;)V
public fun isClosedForSend ()Z
public fun isFull ()Z
public fun offer (Ljava/lang/Object;)Z
public synthetic fun onCompleted (Ljava/lang/Object;)V
public fun registerSelectClause2 (Lkotlinx/coroutines/selects/SelectInstance;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
public fun request (J)V
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/reactive/flow/FlowAsPublisherKt {
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
}
Expand Down
Expand Up @@ -19,6 +19,19 @@ public final class kotlinx/coroutines/reactor/MonoKt {
public static synthetic fun mono$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono;
}

public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines/AbstractCoroutineContextElement {
public static final field Key Lkotlinx/coroutines/reactor/ReactorContext$Key;
public fun <init> (Lreactor/util/context/Context;)V
public final fun getContext ()Lreactor/util/context/Context;
}

public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key {
}

public final class kotlinx/coroutines/reactor/ReactorContextKt {
public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
}

public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay {
public fun <init> (Lreactor/core/scheduler/Scheduler;)V
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
3 changes: 2 additions & 1 deletion reactive/kotlinx-coroutines-reactive/src/Publish.kt
Expand Up @@ -76,7 +76,8 @@ private const val CLOSED = -1L // closed, but have not signalled onCompleted/
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError

@Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
private class PublisherCoroutine<in T>(
@InternalCoroutinesApi
public class PublisherCoroutine<in T>(
parentContext: CoroutineContext,
private val subscriber: Subscriber<T>
) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
Expand Down
8 changes: 8 additions & 0 deletions reactive/kotlinx-coroutines-reactor/build.gradle
Expand Up @@ -12,4 +12,12 @@ tasks.withType(dokka.getClass()) {
url = new URL("https://projectreactor.io/docs/core/$reactor_vesion/api/")
packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
}
}

compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
}

compileKotlin {
kotlinOptions.jvmTarget = "1.8"
}
25 changes: 22 additions & 3 deletions reactive/kotlinx-coroutines-reactor/src/Flux.kt
@@ -1,3 +1,4 @@

/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
Expand All @@ -9,6 +10,8 @@ package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.reactive.*
import org.reactivestreams.Publisher
import reactor.core.CoreSubscriber
import reactor.core.publisher.*
import kotlin.coroutines.*
import kotlin.internal.LowPriorityInOverloadResolution
Expand Down Expand Up @@ -41,8 +44,8 @@ public fun <T> flux(
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flux<T> {
require(context[Job] === null) { "Flux context cannot contain job in it." +
"Its lifecycle should be managed via Disposable handle. Had $context" }
return Flux.from(publishInternal(GlobalScope, context, block))
"Its lifecycle should be managed via Disposable handle. Had $context" }
return Flux.from(reactorPublish(GlobalScope, context, block))
}

@Deprecated(
Expand All @@ -55,4 +58,20 @@ public fun <T> CoroutineScope.flux(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flux<T> =
Flux.from(publishInternal(this, context, block))
Flux.from(reactorPublish(this, context, block))

private fun <T> reactorPublish(
scope: CoroutineScope,
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> = Publisher { subscriber ->
// specification requires NPE on null subscriber
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." }
val currentContext = subscriber.currentContext()
val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = PublisherCoroutine(newContext, subscriber)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
6 changes: 3 additions & 3 deletions reactive/kotlinx-coroutines-reactor/src/Mono.kt
Expand Up @@ -53,7 +53,8 @@ private fun <T> monoInternal(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
val newContext = scope.newCoroutineContext(context)
val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext()
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = MonoCoroutine(newContext, sink)
sink.onDispose(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
Expand All @@ -78,12 +79,11 @@ private class MonoCoroutine<in T>(
handleCoroutineException(context, cause)
}
}

override fun dispose() {
disposed = true
cancel()
}

override fun isDisposed(): Boolean = disposed
}

44 changes: 44 additions & 0 deletions reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
@@ -0,0 +1,44 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.ExperimentalCoroutinesApi
import reactor.util.context.Context
import kotlin.coroutines.*

/**
* Wraps Reactor's [Context] into [CoroutineContext] element for seamless integration Reactor and kotlinx.coroutines.
*
* [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext].
*
* Reactor builders [mono] and [flux] use this context element to enhance the resulting `subscriberContext`.
*
* ### Usages
* Passing reactor context from coroutine builder to reactor entity:
* ```
* launch(Context.of("key", "value").asCoroutineContext()) {
* mono {
* println(coroutineContext[ReactorContext]) // Prints { "key": "value" }
* }.subscribe()
* }
* ```
*
* Accessing modified reactor context enriched from the downstream:
* ```
* launch {
* mono {
* println(coroutineContext[ReactorContext]) // Prints { "key": "value" }
* }.subscriberContext(Context.of("key", "value"))
* .subscribe()
* }
* ```
*/
@ExperimentalCoroutinesApi
public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
SokolovaMaria marked this conversation as resolved.
Show resolved Hide resolved
companion object Key : CoroutineContext.Key<ReactorContext>
}

/**
* Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context
* and later used via `coroutineContext[ReactorContext]`.
*/
@ExperimentalCoroutinesApi
public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this)
32 changes: 32 additions & 0 deletions reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
@@ -0,0 +1,32 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
import org.junit.Test
import reactor.util.context.Context
import kotlin.test.assertEquals

class ReactorContextTest {
@Test
fun testMonoHookedContext() = runBlocking {
val mono = mono(Context.of(1, "1", 7, "7").asCoroutineContext()) {
val ctx = coroutineContext[ReactorContext]?.context
buildString {
(1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) }
}
} .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
.subscriberContext { ctx -> ctx.put(6, "6") }
assertEquals(mono.awaitFirst(), "1234567")
}

@Test
fun testFluxContext() = runBlocking<Unit> {
val flux = flux(Context.of(1, "1", 7, "7").asCoroutineContext()) {
val ctx = coroutineContext[ReactorContext]!!.context
(1..7).forEach { send(ctx.getOrDefault(it, "noValue")) }
} .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
.subscriberContext { ctx -> ctx.put(6, "6") }
var i = 0
flux.subscribe { str -> i++; assertEquals(str, i.toString()) }
}
}