Skip to content

Commit

Permalink
Context passing between coroutines and Reactor Mono/Flux
Browse files Browse the repository at this point in the history
Fixes #284
  • Loading branch information
SokolovaMaria committed Jul 17, 2019
1 parent f22604b commit ff06d83
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 7 deletions.
Expand Up @@ -36,3 +36,20 @@ public final class kotlinx/coroutines/reactive/flow/PublisherAsFlowKt {
public static final fun from (Lorg/reactivestreams/Publisher;I)Lkotlinx/coroutines/flow/Flow;
}

public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coroutines/AbstractCoroutine, kotlinx/coroutines/channels/ProducerScope, kotlinx/coroutines/selects/SelectClause2, org/reactivestreams/Subscription {
public fun <init> (Lkotlin/coroutines/CoroutineContext;Lorg/reactivestreams/Subscriber;)V
public fun cancel ()V
public fun close (Ljava/lang/Throwable;)Z
public fun getChannel ()Lkotlinx/coroutines/channels/SendChannel;
public fun getOnSend ()Lkotlinx/coroutines/selects/SelectClause2;
public fun invokeOnClose (Lkotlin/jvm/functions/Function1;)Ljava/lang/Void;
public synthetic fun invokeOnClose (Lkotlin/jvm/functions/Function1;)V
public fun isClosedForSend ()Z
public fun isFull ()Z
public fun offer (Ljava/lang/Object;)Z
public synthetic fun onCompleted (Ljava/lang/Object;)V
public fun registerSelectClause2 (Lkotlinx/coroutines/selects/SelectInstance;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
public fun request (J)V
public fun send (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Up @@ -19,6 +19,19 @@ public final class kotlinx/coroutines/reactor/MonoKt {
public static synthetic fun mono$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono;
}

public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines/AbstractCoroutineContextElement {
public static final field Key Lkotlinx/coroutines/reactor/ReactorContext$Key;
public fun <init> (Lreactor/util/context/Context;)V
public final fun getContext ()Lreactor/util/context/Context;
}

public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key {
}

public final class kotlinx/coroutines/reactor/ReactorContextKt {
public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlin/coroutines/CoroutineContext;
}

public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay {
public fun <init> (Lreactor/core/scheduler/Scheduler;)V
public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
3 changes: 2 additions & 1 deletion reactive/kotlinx-coroutines-reactive/src/Publish.kt
Expand Up @@ -76,7 +76,8 @@ private const val CLOSED = -1L // closed, but have not signalled onCompleted/
private const val SIGNALLED = -2L // already signalled subscriber onCompleted/onError

@Suppress("CONFLICTING_JVM_DECLARATIONS", "RETURN_TYPE_MISMATCH_ON_INHERITANCE")
private class PublisherCoroutine<in T>(
@InternalCoroutinesApi
public class PublisherCoroutine<in T>(
parentContext: CoroutineContext,
private val subscriber: Subscriber<T>
) : AbstractCoroutine<Unit>(parentContext, true), ProducerScope<T>, Subscription, SelectClause2<T, SendChannel<T>> {
Expand Down
8 changes: 8 additions & 0 deletions reactive/kotlinx-coroutines-reactor/build.gradle
Expand Up @@ -12,4 +12,12 @@ tasks.withType(dokka.getClass()) {
url = new URL("https://projectreactor.io/docs/core/$reactor_vesion/api/")
packageListUrl = projectDir.toPath().resolve("package.list").toUri().toURL()
}
}

compileTestKotlin {
kotlinOptions.jvmTarget = "1.8"
}

compileKotlin {
kotlinOptions.jvmTarget = "1.8"
}
25 changes: 22 additions & 3 deletions reactive/kotlinx-coroutines-reactor/src/Flux.kt
@@ -1,3 +1,4 @@

/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
Expand All @@ -9,6 +10,8 @@ package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.reactive.*
import org.reactivestreams.Publisher
import reactor.core.CoreSubscriber
import reactor.core.publisher.*
import kotlin.coroutines.*
import kotlin.internal.LowPriorityInOverloadResolution
Expand Down Expand Up @@ -41,8 +44,8 @@ public fun <T> flux(
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flux<T> {
require(context[Job] === null) { "Flux context cannot contain job in it." +
"Its lifecycle should be managed via Disposable handle. Had $context" }
return Flux.from(publishInternal(GlobalScope, context, block))
"Its lifecycle should be managed via Disposable handle. Had $context" }
return Flux.from(reactorPublish(GlobalScope, context, block))
}

@Deprecated(
Expand All @@ -55,4 +58,20 @@ public fun <T> CoroutineScope.flux(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flux<T> =
Flux.from(publishInternal(this, context, block))
Flux.from(reactorPublish(this, context, block))

private fun <T> reactorPublish(
scope: CoroutineScope,
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> = Publisher { subscriber ->
// specification requires NPE on null subscriber
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." }
val currentContext = subscriber.currentContext()
val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = PublisherCoroutine(newContext, subscriber)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
6 changes: 3 additions & 3 deletions reactive/kotlinx-coroutines-reactor/src/Mono.kt
Expand Up @@ -53,7 +53,8 @@ private fun <T> monoInternal(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
val newContext = scope.newCoroutineContext(context)
val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext()
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = MonoCoroutine(newContext, sink)
sink.onDispose(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
Expand All @@ -78,12 +79,11 @@ private class MonoCoroutine<in T>(
handleCoroutineException(context, cause)
}
}

override fun dispose() {
disposed = true
cancel()
}

override fun isDisposed(): Boolean = disposed
}

50 changes: 50 additions & 0 deletions reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
@@ -0,0 +1,50 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.ExperimentalCoroutinesApi
import reactor.util.context.Context
import kotlin.coroutines.*

/**
* Marks coroutine context element that contains Reactor's [Context] elements in [context] for seamless integration
* between [CoroutineContext] and Reactor's [Context].
*
* [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext].
*
* Reactor builders: [mono], [flux] can extract the reactor context from their coroutine context and
* pass it on. Modifications of reactor context can be retrieved by `coroutineContext[ReactorContext]`.
*
* Example usage:
*
* Passing reactor context from coroutine builder to reactor entity:
*
* ```
* launch(Context.of("key", "value").asCoroutineContext()) {
* mono {
* assertEquals(coroutineContext[ReactorContext]!!.context.get("key"), "value")
* }.subscribe()
* }
* ```
*
* Accessing modified reactor context enriched from downstream via coroutine context:
*
* ```
* launch {
* mono {
* assertEquals(coroutineContext[ReactorContext]!!.context.get("key"), "value")
* }.subscriberContext(Context.of("key", "value"))
* .subscribe()
* }
* ```
*/
@ExperimentalCoroutinesApi
public class ReactorContext(val context: Context) : AbstractCoroutineContextElement(ReactorContext) {
companion object Key : CoroutineContext.Key<ReactorContext>
}


/**
* Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context
* and later retrieved via `coroutineContext[ReactorContext]`.
*/
@ExperimentalCoroutinesApi
public fun Context.asCoroutineContext(): CoroutineContext = ReactorContext(this)
32 changes: 32 additions & 0 deletions reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt
@@ -0,0 +1,32 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
import org.junit.Test
import reactor.util.context.Context
import kotlin.test.assertEquals

class ReactorContextTest {
@Test
fun testMonoHookedContext() = runBlocking {
val mono = mono(Context.of(1, "1", 7, "7").asCoroutineContext()) {
val ctx = coroutineContext[ReactorContext]?.context
buildString {
(1..7).forEach { append(ctx?.getOrDefault(it, "noValue")) }
}
} .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
.subscriberContext { ctx -> ctx.put(6, "6") }
assertEquals(mono.awaitFirst(), "1234567")
}

@Test
fun testFluxContext() = runBlocking<Unit> {
val flux = flux(Context.of(1, "1", 7, "7").asCoroutineContext()) {
val ctx = coroutineContext[ReactorContext]!!.context
(1..7).forEach { send(ctx.getOrDefault(it, "noValue")) }
} .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5"))
.subscriberContext { ctx -> ctx.put(6, "6") }
var i = 0
flux.subscribe { str -> i++; assertEquals(str, i.toString()) }
}
}

0 comments on commit ff06d83

Please sign in to comment.