Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactor Context not propagated from Mono to other Reactive implementation when adapting using ReactiveAdapterRegistry #32841

Closed
anaconda875 opened this issue May 18, 2024 · 1 comment
Assignees
Labels
in: core Issues in core modules (aop, beans, core, context, expression) status: invalid An issue that we don't feel is valid

Comments

@anaconda875
Copy link

anaconda875 commented May 18, 2024

Affects: spring-core-6.1.1
Given this code:

    Mono.just("a")
        .flatMap(a -> {
          return Mono.deferContextual(c -> c.get("KEY"))
              .flatMap(val -> {
                System.out.println(val);
                return ReactiveWrapperConverters.toWrapper(Uni.createFrom().item("abcd"), Mono.class);
              });
        }).contextWrite(Context.of("KEY", Mono.just("VALUE")))

        .subscribe(System.out::println, t -> ((Throwable) t).printStackTrace());

Execute it will print to console:

VALUE
abcd

But with this code:

    Mono.just("a")
        .flatMap(a -> {
          return Mono.deferContextual(c -> c.get("KEY"))
              .flatMap(val -> {
                System.out.println(val);
                return ReactiveWrapperConverters.toWrapper(ReactiveWrapperConverters.toWrapper(Mono.deferContextual(c -> c.get("KEY")), Uni.class).flatMap(v -> {
                  System.out.println(v);
                  return Uni.createFrom().item("abcd");
                }), Mono.class);
              });
        }).contextWrite(Context.of("KEY", Mono.just("VALUE")))

        .subscribe(System.out::println, t -> ((Throwable) t).printStackTrace());

Will print:

val
java.util.NoSuchElementException: Context is empty
	at reactor.util.context.Context0.get(Context0.java:43)
	at com.example.demo.DemoApplication.lambda$main$1(DemoApplication.java:58)
	at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:47)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at org.reactivestreams.FlowAdapters$FlowPublisherFromReactive.subscribe(FlowAdapters.java:366)
	at io.smallrye.mutiny.operators.uni.builders.UniCreateFromPublisher$PublisherSubscriber.forward(UniCreateFromPublisher.java:41)
	at io.smallrye.mutiny.operators.uni.builders.UniCreateFromPublisher.subscribe(UniCreateFromPublisher.java:26)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni.subscribe(UniOnItemTransformToUni.java:25)
	at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
	at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.request(UniToMultiPublisher.java:73)
	at org.reactivestreams.FlowAdapters$ReactiveToFlowSubscription.request(FlowAdapters.java:182)
	at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108)
	at reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:291)
	at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70)
	at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onSubscribe(FlowAdapters.java:206)
	at io.smallrye.mutiny.converters.uni.UniToMultiPublisher.subscribe(UniToMultiPublisher.java:25)
	at org.reactivestreams.FlowAdapters$ReactivePublisherFromFlow.subscribe(FlowAdapters.java:348)
	at reactor.core.publisher.MonoFromPublisher.subscribe(MonoFromPublisher.java:64)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
	at reactor.core.publisher.LambdaMonoSubscriber.onSubscribe(LambdaMonoSubscriber.java:121)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:117)
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)
	at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4512)
	at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:202)
	at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4496)
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4578)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4478)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4414)
	at reactor.core.publisher.Mono.subscribe(Mono.java:4386)
	at com.example.demo.DemoApplication.main(DemoApplication.java:65)

The second Mono.deferContextual(c -> c.get("KEY")) (inside ReactiveWrapperConverters.toWrapper) is not working.

This is what ReactiveWrapperConverters.toWrapper (from spring-data-commons) do:

        @Override
	@SuppressWarnings({ "ConstantConditions", "unchecked" })
	public <T> Converter<Object, T> getConverter(Class<T> targetType) {
		return source -> {

			Publisher<?> publisher = source instanceof Publisher ? (Publisher<?>) source
					: RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(Publisher.class, source).toPublisher(source);

			ReactiveAdapter adapter = RegistryHolder.REACTIVE_ADAPTER_REGISTRY.getAdapter(targetType);

			return (T) adapter.fromPublisher(publisher);
		};
	}
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged or decided on label May 18, 2024
@snicoll snicoll changed the title Bug[?] Reactor Context not propagated from Mono to other Reactive implementation when adapting using ReactiveAdapterRegistry Reactor Context not propagated from Mono to other Reactive implementation when adapting using ReactiveAdapterRegistry May 18, 2024
@jhoeller jhoeller added the in: core Issues in core modules (aop, beans, core, context, expression) label May 21, 2024
@simonbasle simonbasle self-assigned this May 21, 2024
@simonbasle simonbasle added status: invalid An issue that we don't feel is valid and removed status: waiting-for-triage An issue we've not yet triaged or decided on labels May 21, 2024
@simonbasle
Copy link
Contributor

This is expected, as Reactor Context cannot be propagated through other flavours of Publisher.

In fact, "propagating" is not the best word to describe how context is accessed: in a reactive stream chain, each operator's subscriber has access to the next subscriber in line (to pass data down the chain), and in the case of Reactor-to-Reactor chains it means that an operator can also call CoreSubscriber#currentContext(). If in the middle of the chain there is a Subscriber that is not a Reactor's CoreSubscriber this prevents access to Context defined further down the line.

@simonbasle simonbasle closed this as not planned Won't fix, can't repro, duplicate, stale May 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: core Issues in core modules (aop, beans, core, context, expression) status: invalid An issue that we don't feel is valid
Projects
None yet
Development

No branches or pull requests

4 participants