Mule ESB specific bindings for RxJava.
The following demonstrates an asynchronous HTTP to Redis bridge, that only accepts one request per remote IP:
rxMule
.observeEndpointAsync(new URI("http://localhost:8080/publish"))
.distinct(
muleEvent -> {
final String remoteAddressAndPort =
muleEvent
.getMessage()
.getInboundProperty(
"MULE_REMOTE_CLIENT_ADDRESS");
return substringBefore(remoteAddressAndPort, ":");
})
.subscribe(
asAction((MessageConsumer)
muleEvent -> {
redisModule.publish(
"http-requests",
false,
muleEvent.getMessageAsString(),
muleEvent);
LOGGER.info("Published: {}", muleEvent);
}));
This module adds a number of classes to RxJava that make it possible to observe:
- Mule inbound endpoints from traditional transports, including global endpoints and endpoints defined by URIs,
- raw message sources, like the new HTTP Listener Connector,
- Anypoint Connectors message sources.
RxMule also provides Func1
and Action1
wrappers to help processing Mule events with outbound Mule endpoints or Anypoint Connectors methods.
On top of that RxMule, provides helpers for using message transformers and creating new messages from scratch.
In short, RxMule allows creating Observable<MuleEvent>
instances from different sources.
A MuleEvent is what Mule creates and processes.
It wraps a MuleMessage which contains the actual
data and meta-data that's being processed. Keep in mind that both objects are mutable, though only by the thread that is owning the event.
You can read more about the structure of a
MuleMessage
here.
Take a look at the integration tests to have a better idea of all you can do with RxMule.
Releases are available on Central. Snapshot builds are available in the Sonatype OSS Snapshots repository:
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
If you need to build the latest snapshot yourself, run:
mvn clean install
Note that some integration tests require a local instance of Redis running on port 6379
.
If Redis is not available there, these tests will self-disable.
You can quickly start a local Redis for tests with:
docker run -d -p 6379:6379 --name redis redis
and connect to it with:docker run -it --link redis:redis --rm redis sh -c 'exec redis-cli -h "$REDIS_PORT_6379_TCP_ADDR" -p "$REDIS_PORT_6379_TCP_PORT"'
Copyright © 2015 David Dossot - MIT License