Skip to content

Conversation

@johnament
Copy link
Contributor

This is still a WIP, so not ready to be merged.

I'm looking to add support for Project Reactor as an rx invoker for CXF. It effectively mirrors what the other two are doing, but using reactor's APIs.

I'm still working on tests (e.g. I don't know if this works). the biggest I'm having right now is getting the systests to import in intellij. I've done it previously, but for some reason after switching to 3.2 it's no good. I've even gone in, blown away all .iml and the .idea folder, no luck. Anyone else seen something like this?

screen shot 2017-10-29 at 2 25 05 pm

@deki
Copy link
Contributor

deki commented Oct 29, 2017

Usually I have this issue if I miss some Maven profile selections. Adjusting the checkboxes normally fixes this.

@johnament
Copy link
Contributor Author

Hi @deki thanks for the tip. What profiles would you recommend? note that I only have the jaxrs systests module open in my IDE, but can go back to the broader one.

One other thing, i'm noticing that the settings jar from intellij doesn't put imports in the right order.

@deki
Copy link
Contributor

deki commented Oct 30, 2017

I usually check the everything profile. But in your case it looks more like the dependencies are not properly resolve. Hit the refresh button in the Maven projects tab (first one on the left). If this doesn't work, run File -> Invalidate Caches/ Restart.

}

@Override
public Flux get() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flux<Response> please :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be fixed.

}

private <R> Flux<R> flux(Future<R> future) {
Flux<R> flux = Flux.from(Mono.fromFuture(toCompletableFuture(future)));
Copy link
Member

@reta reta Nov 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part concerns. I would suggest to not use scheduler but pass executor directly to toCompletableFuture (so the supplyAsync call would be done outside of common ForkJoin pool). Otherwise we have too many pools and threads involved for no real reason.

Regarding the desired behavior, we would certainly want the subscription to be invoked on the separate thread (this is what subscribeOn does now). However, because we create Flux from CompletableFuture, intuitively the completion callback would be invoked from another thread, either the one from common ForkJoin pool or the provided one (unless then value is available immediately).

Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, the executor part is fixed. If there's an executor passed in we use that.

However, there's an interesting conundrum. I'm not sure how rx() is meant to work this way. For a Mono object its pretty clear, I had to tweak the test to use a version of the json processor that didn't treat the result as an array. Which makes sense, its meant to be a single object.

However, I need to figure out a way to check if the opening character when reading the result is a single or multiple result. Effectively I need the MessageBodyReader equivalent of StreamingResponseProvider. I'm kind of wondering how this works for the RxJava versions (but then again, there's no tests for this method, so I wonder if you guys ran into that issue already).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the client side one works against a specific URI and thus the expectations are known in advance, i.e, it is the collection or not which is returned

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I'm thinking as well, to make it less complicated. So what that means is if you work with it, by default you're getting Mono objects back (since it's typically one element). Any method that supports a GenericType I'm going to add a Flux version for that just takes the singular attribute, so that you end up with something like <R> Flux<R> getFlux(Class<R> responseType) which should give a pretty clean programming model.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify a bit, may be with the code examples in the JIRA issue ? I'm not sure what is a distinction between a single and multiple objects on the client side, example, on the server one can be streaming a sequence of Book objects, but on the client it can be a single Books object.

@cschneider
Copy link
Contributor

Do we need to specifically support reactor or is it enough to just depend on reactive streams?

@johnament
Copy link
Contributor Author

@cschneider there's a new rx() feature in the client adapter that is meant to support converting the response into the appropriate objects.

@cschneider
Copy link
Contributor

Got it now .. looks good this way.

<parent>
<groupId>org.apache.cxf</groupId>
<artifactId>cxf-parent</artifactId>
<version>3.2.1-SNAPSHOT</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.2.2-SNAPSHOT :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, just saw it.


private <R> Flux<R> flux(Future<R> future) {
Flux<R> flux = Flux.from(Mono.fromFuture(toCompletableFuture(future, executorService)));
if (scheduler != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnament Thanks for passing the executorService to toCompletableFuture. The scheduler and this snippet:

if (scheduler != null) {
            flux = flux.subscribeOn(scheduler);
}

are not needed. The reason is that the completion callback for Mono.fromFuture will be called in one of the executorService threads or ForkJoin pool (you could easily trace it). Moreover, we could run into issues with this code because it uses two threads from the same pool for toCompletableFuture and subscription, exposing ourselves for likelihood of livelocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the underlying call is happening via async webclient, we can't take full advantage of the reactive nature (but not sure the JSON format works well, i want to play around with SSE next after this is merged). Scheduler should not be used.

import reactor.core.publisher.Mono;

@SuppressWarnings("rawtypes")
public interface MonoRxInvoker extends RxInvoker<Mono> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use extends RxInvoker<Mono<?>>, you could get rid of @SuppressWarnings("rawtypes")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but then the overrides don't line up properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, whatever was happening isn't any longer.

import reactor.core.publisher.Flux;

@SuppressWarnings("rawtypes")
public interface FluxRxInvoker extends RxInvoker<Flux> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as for MonoRxInvoker, if you use extends RxInvoker<Flux<?>> you could remove @SuppressWarnings("rawtypes")

try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
Copy link
Member

@reta reta Nov 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using CompletionException would be more appropriate I think. Otherwise we would end up with longer ex-> RuntimeException(ex) -> CompletionException chain of causes.

@reta
Copy link
Member

reta commented Nov 7, 2017

@johnament It looks pretty good in general, thanks for addressing the comments. Are there any outstanding issues you are struggling with / need help?

@johnament
Copy link
Contributor Author

@reta there's open questions I have:

  • Leave the tests here or move into systests? Personally I don't like not having tests with the main code, and it's a trivial impact (14s build + test time).
  • Creating examples? Maybe later.
  • Should I add CDI integration? E.g. an implementation of JAXRSServerFactoryCustomizationExtension that adds the ReactorInvoker to the runtime.

@reta
Copy link
Member

reta commented Nov 8, 2017

@johnament my few cents on the questions you have:

  • I would suggest to move it to systests, not necessarily due to execution time but because of the fact we spawn servers (external components). This is not the official rule I think but it serves as the litmus test where tests belong.
  • Yes, this would be very good to have, at least 1 example in the samples distribution, but I can help with that (as well as adding the documentation to the CXF Wiki)
  • CDI would be good to have but it could be done later I think (unless you really need it from day 1).

@sberyozkin What do you think?

@cschneider
Copy link
Contributor

I would keep tests with the module unless they are slow or very involved. We spawn CXF endpoints in a lot of other modules tests.
The full cxf build takes quite a while. So it is very useful to be able to build a single module and get some good feedback if it is correct.
One other thing about the tests. There are a lot of sleeps in the tests. This is not good. I propose to replace this with Awaitility based retries.
For example:
assertEquals("Hello, world!", holder.value);
could be:
Awaitility.await().until(()->holder.value, equalTo("Hello world!"));

@sberyozkin
Copy link
Contributor

Hi All, my understanding is that it is a CXF convention to have unit tests with the modules and the end to end tests involving starting endpoints and clients and sending the data over some transports - in systests, so IMHO it makes sense to keep that way. John, Andriy, re CDI, yes, nice idea, but may be indeed at the next stage (and would that make a ReactorInvoker a default one when CDI is loaded ? something we can discuss later)

@johnament
Copy link
Contributor Author

Ok, I just pushed up the move to systests. Assuming that all passes in jenkins I'll merge and circle back on the remaining parts.

@johnament johnament merged commit f634fb8 into apache:master Nov 9, 2017
rmannibucau pushed a commit to rmannibucau/cxf that referenced this pull request Sep 23, 2018
rmannibucau pushed a commit to rmannibucau/cxf that referenced this pull request Mar 18, 2019
rmannibucau pushed a commit to rmannibucau/cxf that referenced this pull request May 31, 2019
rnetuka pushed a commit to rnetuka/cxf that referenced this pull request Oct 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants