Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce interop between CoroutineContext and Reactor Context #284

Closed
sdeleuze opened this issue Mar 13, 2018 · 15 comments
Closed

Introduce interop between CoroutineContext and Reactor Context #284

sdeleuze opened this issue Mar 13, 2018 · 15 comments
Assignees
Milestone

Comments

@sdeleuze
Copy link
Contributor

Reactor Context seems similar to CoroutineContext, so I tend to think it should be interesting to upgrade kotlinx-coroutines-reactor for seamless integration between both.

@elizarov Any thoughts?

@elizarov
Copy link
Contributor

@sdeleuze It makes total sense. They seem to fit conceptually. We can have Context.asCoroutineContext extension, so that you can mix it into the coroutine context, implement ReactorContext: CoroutineContext.Key, so that you can retrieve it back via coroutineContext[ReactorContext], and we can have all the reactor builders automatically extract the rector context from their coroutine context and pass it on. I'm open to PR.

sdeleuze added a commit to sdeleuze/spring-framework that referenced this issue Feb 18, 2019
This commit is the first part of a more complete Coroutines
support coming in Spring Framework 5.2. It introduces suspendable
Kotlin extensions for Mono based methods in WebFlux classes like
WebClient, ServerRequest, ServerResponse as well as a Coroutines
router usable via `coRouter { }`.

Coroutines extensions use `await` prefix or `AndAwait` suffix,
and most are using names close to their Reactive counterparts,
except `exchange` in `WebClient.RequestHeadersSpec`
which translates to `awaitResponse`.

Upcoming expected changes are:
 - Leverage `Dispatchers.Unconfined` (Kotlin/kotlinx.coroutines#972)
 - Expose extensions for `Flux` based API (Kotlin/kotlinx.coroutines#254)
 - Introduce interop with `CoroutineContext` (Kotlin/kotlinx.coroutines#284)
 - Support Coroutines in ReactiveAdapterRegistry
 - Support Coroutines for `WebFlux` annotated controllers
 - Fix return type of Kotlin suspending functions (spring-projectsgh-21058)

See spring-projectsgh-19975
sdeleuze added a commit to sdeleuze/spring-framework that referenced this issue Feb 18, 2019
This commit is the first part of a more complete Coroutines
support coming in Spring Framework 5.2. It introduces suspendable
Kotlin extensions for Mono based methods in WebFlux classes like
WebClient, ServerRequest, ServerResponse as well as a Coroutines
router usable via `coRouter { }`.

Coroutines extensions use `await` prefix or `AndAwait` suffix,
and most are using names close to their Reactive counterparts,
except `exchange` in `WebClient.RequestHeadersSpec`
which translates to `awaitResponse`.

Upcoming expected changes are:
 - Leverage `Dispatchers.Unconfined` (Kotlin/kotlinx.coroutines#972)
 - Expose extensions for `Flux` based API (Kotlin/kotlinx.coroutines#254)
 - Introduce interop with `CoroutineContext` (Kotlin/kotlinx.coroutines#284)
 - Support Coroutines in ReactiveAdapterRegistry
 - Support Coroutines for `WebFlux` annotated controllers
 - Fix return type of Kotlin suspending functions (spring-projectsgh-21058)

See spring-projectsgh-19975
sdeleuze added a commit to sdeleuze/spring-framework that referenced this issue Feb 18, 2019
This commit is the first part of a more complete Coroutines
support coming in Spring Framework 5.2. It introduces suspendable
Kotlin extensions for Mono based methods in WebFlux classes like
WebClient, ServerRequest, ServerResponse as well as a Coroutines
router usable via `coRouter { }`.

Coroutines extensions use `await` prefix or `AndAwait` suffix,
and most are using names close to their Reactive counterparts,
except `exchange` in `WebClient.RequestHeadersSpec`
which translates to `awaitResponse`.

Upcoming expected changes are:
 - Leverage `Dispatchers.Unconfined` (Kotlin/kotlinx.coroutines#972)
 - Expose extensions for `Flux` based API (Kotlin/kotlinx.coroutines#254)
 - Introduce interop with `CoroutineContext` (Kotlin/kotlinx.coroutines#284)
 - Support Coroutines in ReactiveAdapterRegistry
 - Support Coroutines for `WebFlux` annotated controllers
 - Fix return type of Kotlin suspending functions (spring-projectsgh-21058)

See spring-projectsgh-19975
sdeleuze added a commit to sdeleuze/spring-framework that referenced this issue Feb 18, 2019
This commit is the first part of a more complete Coroutines
support coming in Spring Framework 5.2. It introduces suspendable
Kotlin extensions for Mono based methods in WebFlux classes like
WebClient, ServerRequest, ServerResponse as well as a Coroutines
router usable via `coRouter { }`.

Coroutines extensions use `await` prefix or `AndAwait` suffix,
and most are using names close to their Reactive counterparts,
except `exchange` in `WebClient.RequestHeadersSpec`
which translates to `awaitResponse`.

Upcoming expected changes are:
 - Leverage `Dispatchers.Unconfined` (Kotlin/kotlinx.coroutines#972)
 - Expose extensions for `Flux` based API (Kotlin/kotlinx.coroutines#254)
 - Introduce interop with `CoroutineContext` (Kotlin/kotlinx.coroutines#284)
 - Support Coroutines in ReactiveAdapterRegistry
 - Support Coroutines for `WebFlux` annotated controllers
 - Fix return type of Kotlin suspending functions (spring-projectsgh-21058)

See spring-projectsgh-19975
sdeleuze added a commit to sdeleuze/spring-framework that referenced this issue Feb 18, 2019
This commit is the first part of a more complete Coroutines
support coming in Spring Framework 5.2. It introduces suspendable
Kotlin extensions for Mono based methods in WebFlux classes like
WebClient, ServerRequest, ServerResponse as well as a Coroutines
router usable via `coRouter { }`.

Coroutines extensions use `await` prefix or `AndAwait` suffix,
and most are using names close to their Reactive counterparts,
except `exchange` in `WebClient.RequestHeadersSpec`
which translates to `awaitResponse`.

Upcoming expected changes are:
 - Leverage `Dispatchers.Unconfined` (Kotlin/kotlinx.coroutines#972)
 - Expose extensions for `Flux` based API (Kotlin/kotlinx.coroutines#254)
 - Introduce interop with `CoroutineContext` (Kotlin/kotlinx.coroutines#284)
 - Support Coroutines in ReactiveAdapterRegistry
 - Support Coroutines for `WebFlux` annotated controllers
 - Fix return type of Kotlin suspending functions (spring-projectsgh-21058)

See spring-projectsgh-19975
sdeleuze added a commit to sdeleuze/spring-framework that referenced this issue Feb 18, 2019
This commit is the first part of a more complete Coroutines
support coming in Spring Framework 5.2. It introduces suspendable
Kotlin extensions for Mono based methods in WebFlux classes like
WebClient, ServerRequest, ServerResponse as well as a Coroutines
router usable via `coRouter { }`.

Coroutines extensions use `await` prefix or `AndAwait` suffix,
and most are using names close to their Reactive counterparts,
except `exchange` in `WebClient.RequestHeadersSpec`
which translates to `awaitResponse`.

Upcoming expected changes are:
 - Leverage `Dispatchers.Unconfined` (Kotlin/kotlinx.coroutines#972)
 - Expose extensions for `Flux` based API (Kotlin/kotlinx.coroutines#254)
 - Introduce interop with `CoroutineContext` (Kotlin/kotlinx.coroutines#284)
 - Support Coroutines in `ReactiveAdapterRegistry`
 - Support Coroutines for WebFlux annotated controllers
 - Fix return type of Kotlin suspending functions (spring-projectsgh-21058)

See spring-projectsgh-19975
SokolovaMaria added a commit that referenced this issue Apr 16, 2019
@sdeleuze
Copy link
Contributor Author

sdeleuze commented Jun 17, 2019

Hi, any update on the integration of the related commit?

@gmarchelos
Copy link

@elizarov, @SokolovaMaria, @qwwdfsad Can you provide an update for this ?
I see that this feature is implemented in PR but still is not merged.

@qwwdfsad
Copy link
Collaborator

PR is on review, we are hopefully going to merge it to 1.3.0-RC

@gmarchelos
Copy link

@qwwdfsad, Sounds good. Thanks for update!

@sdeleuze
Copy link
Contributor Author

sdeleuze commented Jul 21, 2019

Thanks for this first step, but I think this issue should be reopened for refinements before releasing 1.3 final. Based on my current understanding, with Coroutines 1.3.0-RC what is currently supported for Spring Framework use case is context interop for converting suspending function to Mono.

For Mono to suspending function we are using extensions like suspend fun <T> Publisher<T>.awaitFirstOrNull(): T? or suspend fun <T> Publisher<T>.awaitSingle(): T which are at kotlinx.coroutines.reactive level so unlikely to support context interop. I guess we need to discuss how this should be exposed at API level. What about providing extensions like suspend fun <T> Mono<T>.awaitFirstOrNull(copyContext: Boolean = true): T? at Mono level?

About Flux, I am not sure how to use kotlinx.coroutines.reactor.FluxKt#flux. What we need is Flux to and from Flow interop, we currently use Publisher<T>.asFlow() and Flow<T>.asPublisher(). I suppose we would need something like Flux<T>.asFlow(copyContext: Boolean = true) and Flow<T>.asFlux(copyContext: Boolean = true).

In these example I have set the default value of copyContext to true for the sake of consistency since the kotlinx.coroutines.reactor.MonoKt#mono function does that by default. Maybe kotlinx.coroutines.reactor.MonoKt#mono should have this parameter too. I am open to set this optional parameter to false by default if you think that's a better default (for performances reason for example. But it would be nice to be consistent and provide this optional parameter everywhere where such context copy is performed.

Any thoughts?

@qwwdfsad qwwdfsad reopened this Jul 22, 2019
@elizarov
Copy link
Contributor

@sdeleuze can you, please, show a slightly more complete motivating example so that we have some sample code to verify our proposed solution against.

@sdeleuze
Copy link
Contributor Author

@elizarov Sure, but you will have to wait next week since I am in vacation without any laptop.

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Jul 30, 2019

After a bit of discussion, we've decided to provide the following API:

  • All Mono and Publisher extensions such as await* do capture current coroutine context and propagate ReactorContext to the Mono. The indirection is done via ServiceLoader mechanism, all you have to do is to add kotlinx-coroutines-reactor to the classpath.
    For example, in the following snippet
withContext(Context.of("key",  "value").asCoroutineContext()) {
    someMono.await()
}

context will be propagated to mono as long as kotlinx-coroutines-reactor is present in the classpath.

  • Publisher.asFlow propagate context to the underlying flux.
    For example, in the following snippet
val flow = flux.asFlow() 
withContext(Context.of("key",  "value").asCoroutineContext()) {
    flow.collect {
        // ...
    }
}

context is propagated to the original flux

  • New builder, Flow.asFlux that properly propagates SubscriberContext to the flow (using flowOn).

For example, in the following snippet

flow {
    someMono.await()
}.asFlux()..subscriberContext { ctx -> ctx.put("key", "value") }

the context will be propagated to flow body and, transitively, to someMono.

We are planning to merge this change to the upcoming 1.3 this week.

@sdeleuze
Copy link
Contributor Author

That sounds perfect, please let me know if you need more input from me.

elizarov pushed a commit that referenced this issue Jul 30, 2019
* Propagation of the coroutine context of await calls into
  Mono/Flux builder
* Publisher.asFlow propagates coroutine context from `collect`
  call to the Publisher
* Flow.asFlux transform

Fixes #284
@eiswind
Copy link

eiswind commented Aug 4, 2019

Will this solve the problem that I cannot access the ReactiveSecurityContextHolder.getContext fom a coroutine?

@Test
@DisplayName("it should propagate the security context to coroutine")
fun testContextCoroutine() {
    val ctx = mono {
        getContextCoroutine()
    }
            .subscriberContext { c -> c.putAll(ReactiveSecurityContextHolder.withAuthentication(UsernamePasswordAuthenticationToken("", ""))) }
            .block()
    assertNotNull(ctx)
}
suspend fun getContextCoroutine(): SecurityContext? {
    return ReactiveSecurityContextHolder.getContext().awaitFirstOrNull()
}

@elizarov
Copy link
Contributor

elizarov commented Aug 5, 2019

Will this solve the problem that I cannot access the ReactiveSecurityContextHolder.getContext fom a coroutine?

@eiswind Yes

@eiswind
Copy link

eiswind commented Aug 5, 2019

I tried the test above against the reactor-context branch and it still fails. I guess I have to do something to propagate the context?

The value is provided in the Reactor Context (in real life this is done by spring security), and I can see the value in the CoroutineContext. But it does seem to get lost on the awaitFirstOrNull.

Bildschirmfoto von 2019-08-05 15-02-50
Bildschirmfoto von 2019-08-05 15-13-22

After some investigation I found that the changes in Await.kt do not seem to be in this branch. So I guess I have to wait until everything is merged.

@elizarov
Copy link
Contributor

elizarov commented Aug 5, 2019

@eiswind Works for me in reactor-context branch.

@eiswind
Copy link

eiswind commented Aug 5, 2019

Thx for the reply.

.subscriberContext { c -> c.putAll(ReactiveSecurityContextHolder.withAuthentication(UsernamePasswordAuthenticationToken("", ""))) }

seems to be just the wrong way.

.subscriberContext { c -> c.put(SecurityContext::class.java, Mono.just(SecurityContextImpl(UsernamePasswordAuthenticationToken("", "")))) }

did the trick.
Again thanks for the support.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants