Skip to content

Commit

Permalink
Add support for ContextView in Reactor integration
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb committed Apr 2, 2021
1 parent f1e457f commit cd0d1e0
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ public final class kotlinx/coroutines/reactor/MonoKt {
public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines/AbstractCoroutineContextElement {
public static final field Key Lkotlinx/coroutines/reactor/ReactorContext$Key;
public fun <init> (Lreactor/util/context/Context;)V
public fun <init> (Lreactor/util/context/ContextView;)V
public final fun getContext ()Lreactor/util/context/Context;
}

public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key {
}

public final class kotlinx/coroutines/reactor/ReactorContextKt {
public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
public static final synthetic fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
public static final fun asCoroutineContext (Lreactor/util/context/ContextView;)Lkotlinx/coroutines/reactor/ReactorContext;
}

public final class kotlinx/coroutines/reactor/ReactorFlowKt {
Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-reactor/src/Flux.kt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private fun <T> reactorPublish(
return@onSubscribe
}
val currentContext = subscriber.currentContext()
val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
val reactorContext = context.extendReactorContext(currentContext)
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-reactor/src/Mono.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private fun <T> monoInternal(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext()
val reactorContext = context.extendReactorContext(sink.currentContext())
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = MonoCoroutine(newContext, sink)
sink.onDispose(coroutine)
Expand Down
23 changes: 20 additions & 3 deletions reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.ExperimentalCoroutinesApi
import reactor.util.context.Context
import kotlin.coroutines.*
import kotlinx.coroutines.reactive.*
import reactor.util.context.*

/**
* Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between
Expand Down Expand Up @@ -50,12 +50,29 @@ import kotlinx.coroutines.reactive.*
*/
@ExperimentalCoroutinesApi
public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) {

public constructor(contextView: ContextView): this(Context.of(contextView))

public companion object Key : CoroutineContext.Key<ReactorContext>
}

/**
* Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context
* Wraps the given [ContextView] into [ReactorContext], so it can be added to the coroutine's context
* and later used via `coroutineContext[ReactorContext]`.
*/
@ExperimentalCoroutinesApi
public fun ContextView.asCoroutineContext(): ReactorContext = ReactorContext(this)

/**
* Wraps the given [Context] into [ReactorContext], so it can be added to the coroutine's context
* and later used via `coroutineContext[ReactorContext]`.
*/
@ExperimentalCoroutinesApi
public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this)
@Deprecated("Use the more general version for ContextView instead", level = DeprecationLevel.HIDDEN)
public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext()

/**
* Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values.
*/
internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext =
(this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext()
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class ReactorContextTest : TestBase() {
}

@Test
fun testFluxAwaitContextPropagation() = runBlocking<Unit>(
fun testFluxAwaitContextPropagation() = runBlocking(
Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext()
) {
assertEquals(createFlux().awaitFirst(), "1")
Expand Down

0 comments on commit cd0d1e0

Please sign in to comment.