This is an extension for VertX that provides support for Reactive Extensions (RX) using the RxJava library
This allows VertX developers to use the RxJava type-safe composable API to build VertX verticles
- The module wraps the VertX core objects to add Observable support so it is tightly bound to the VertX release.
- This module also contains the Netflix RxJava library.
- The module can be installed as a non-running module or built as a standalone jar and included in other modules.
The module name is com.meez.mod-rxjava
.
The module is for use with VertX 1.3.1.final only.
For VertX 2.0.0 use the mod-rxvertx module instead.
Currently Observable wrappers are provided for
- EventBus
- FileSystem
- HttpServer
- HttpClient
- NetServer
- NetClient
- Timer
There are also base Observable adapters that map Handler and AsyncResultHandler to Observable that can be used to call other Handler based APIs.
In future, additional wrappers will be provided for
- SockJSServer
To access the Rx methods you just wrap the existing Vertx instance with an instance of meez.rxvertx.java.RxVertx
. The methods of RxVertx
will return the appropriately wrapper subsystem.
e.g. to send a message to the eventbus
RxVertx rxVertx=new RxVertx(vertx);
Observable<String> req=rxVertx.eventBus().sendRx("foo");
req
.subscribe(new Action1<String>(){
public void call(String resp) {
System.out.println("got response");
}
})
All standard API methods of the form
void method(args...,Handler<T> handler)
are available in the form
Observable<T> method(args...)
The support class RxSupport
provides several helper methods for some standard tasks
There are several Func1 codec methods to go from Buffer
to JsonObject
and Buffer
to a Java Pojo (using ObjectMapper
)
There are two primary wrappers
Convert a ReadStream
into an Observable<Buffer>
Stream the output of an Observable
to a WriteStream
.
please note that this method does not handle writeQueueFull
so cannot be used as a pump
The real power of RxJava comes from composing asynchronous flows as part of a workflow. mod-rxjava
provides several pipeline helpers to enable building handler pipelines.
eg. building a Json HttpServer
RxVertx rx=new RxVertx(vertx);
// Create a new HttpServerPipeline that takes Json requests
server=rx.createHttpServer().requestHandler(new HttpServerPipeline<JsonObject>() {
// Request pipeline for JsonObject request
public Observable<JsonObject> process(Observable<HttpServerRequest> request) {
return request
// Fetch the request body into a Json Object
.flatMap(RxHttpSupport.decodeBody())
// Simple pong responder
.map(new Func1<JsonObject,JsonObject>() {
public JsonObject call(JsonObject in) {
// Handle the request
// Return the response as a JsonObject (pipeline will encode as Json)
return new JsonObject()...;
}
});
}
}).listen(8080);