Skip to content
This repository has been archived by the owner on Jan 27, 2023. It is now read-only.

Event sourcing and rx-jersey #4

Closed
asereda opened this issue Apr 27, 2017 · 5 comments
Closed

Event sourcing and rx-jersey #4

asereda opened this issue Apr 27, 2017 · 5 comments

Comments

@asereda
Copy link

asereda commented Apr 27, 2017

Hello,

I would like to ask your opinion (and ideas) about the following usecase. Not sure if rx-jersey library is supposed to directly address it, but would be nice to know view of the author (and reader).

Both RX (observable pattern) and event sourcing are becoming popular solutions for today's implementation of services. We would like to leverage rxjava as internal event bus where messages are being published and observed. In this scenario, Jersey becomes just an endpoint which transforms HTTP payload into stream of events (and vice versa).

classical jersey approach

public class MyService {
    @POST   
    @Path("process")
    Single<ResponseEvent> process(RequestEvent event) {
        return Single.just(new ResponseEvent());
    } 
    
    @POST
    @Path("ack")
    Completable ack(AckEvent event) {
        return Completable.complete();
    } 
}

Using RX subject (eventbus)

/**
 * Implementation which uses RX java to process (and publish) events
 */
public class MyService {
    private final Subject<Event> events;
        
    public MyService(Subject<Event> events) {
        this.events = events;
    }
    
    public void start() {
       events.ofType(RequestEvent.class)
           .subscribe(x -> events.onNext(new ResponseEvent()));
           
       events.ofType(AckEvent.class)
           .subscribe(x -> events.onNext(new EmptyEvent()));
    }
}

Our issue becomes correctly mapping request and response events which may span multiple threads (ie having request scope for each event). There are a couple of solutions but none of them is ideal, unfortunately:

  1. Having explicit correlationId on each event. The drawback is that it has to be manually copied for each event (eg. new ResponseEvent(request.correlationId())). Which is not nice in business logic.
  2. Having separate type Message<T> with payload (which internally stores correlationId and request context). The problem here is that users subscribe to Message and not business level Event (Subject<Message> instead of Subject<Event>)
  3. Having continuation-local (for request) variables in RX. Which simulate request scope by using ThreadLocals and custom Schedulers (to copy threadlocal information from one thread to another). see RXJava discussion and schedulers blog post. The issue here is that as soon as events are accessed on the same thread (eg. combined with groupBy) thread-local information will be lost.

Please let me know if there are better alternatives.

Thanks in advance.

@alex-shpak
Copy link
Owner

Hi!

Unfortunately I didn't have chance to try event sourcing in wild, I just know the concept.
But from my point of view it's better to separate JAX-RS implementation (which supposed to be replaceable) from event sourcing service and implement thread local logic inside. Similar to example number 3,

Also in case number 1 maybe setting correlation id might be solved with dependency injection which will set ID automatically, or some interceptor. If this is the only issue - maybe it's easiest solution. For example it can be interceptor or custom body writer.

If it's any helpful :)

@asereda
Copy link
Author

asereda commented May 2, 2017

I was trying to use interfaces as configuration to hook jersey and rx java. Endpoints are defined as follows (via java interface) :

public interface Endpoints {
    @POST
    @Path("/foo")
    String foo(String message);

    @POST
    @Path("/bar")
     String foo(MyModel model);
}

Some service would subscribe to Observable<Event> (http process) and publish to Observed<Event> (http reply)

Since jersey doesn't like interfaces as resources, I registered custom model processor to inject special inflector (see handleBy method).

Does it look like the right approach ?

@alex-shpak
Copy link
Owner

I think you might want look at using some message broker for delivering messages? Because rx-jersey is working with single responses even if returned value is Observable or Flowable. I plan to add Server Sent Events at some point but I don't know when. So event "streaming" might be possible only in direction from client to backend (which looks ok from your example)

But I think I'm still missing what you want to achieve ¯\_(ツ)_/¯
Maybe you can give me live example of it.

@asereda
Copy link
Author

asereda commented May 19, 2017

I'm not sure I have a live example of it.

But let's say from Jersey I'd like to obtain something like Observable<ContainerRequest> then I'll be able to process each request internally like a pipeline.

Maybe combine Jersey "stream" with Kakfa (Observable<KafkaEvent>).

Does it make sense ?

@alex-shpak
Copy link
Owner

Okay, I think I understood,
In this case jersey might be overkill, in order to get requests as stream you can use RxNetty, which is nicely integrated with RxJava.

In case of using RxJersey I suppose you can make RxInterceptor and feed your stream from it (as you suggested in the beginning via Subject). But library can't provide you with created stream, because by nature it works with Singles

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants