Skip to content

Commit

Permalink
CAMEL-7833 Added convenience method ReactiveCamel.to(...).
Browse files Browse the repository at this point in the history
Changed test to highlight the idea of creating routes with RX.
  • Loading branch information
Jyrki Ruuskanen authored and WillemJiang committed Apr 3, 2015
1 parent c67392f commit 3c3b0ed
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@ public <T> void sendTo(Observable<T> observable, Endpoint endpoint) {
}
}

/**
* Convenience method for creating CamelOperator instances
*/
public CamelOperator to(String uri) throws Exception {
return new CamelOperator(camelContext, uri);
}

/**
* Convenience method for creating CamelOperator instances
*/
public CamelOperator to(Endpoint endpoint) throws Exception {
return new CamelOperator(endpoint);
}

public CamelContext getCamelContext() {
return camelContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.observables.ConnectableObservable;

/**
*/
Expand All @@ -39,12 +41,16 @@ public void testCamelOperator() throws Exception {
mockEndpoint2.expectedMessageCount(1);
mockEndpoint3.expectedMessageCount(1);

Observable<Message> result = reactiveCamel.toObservable("direct:start")
.lift(new CamelOperator(camelContext, "mock:results1"))
ConnectableObservable<Message> route = reactiveCamel.toObservable("direct:start")
.lift(new CamelOperator(mockEndpoint1))
.lift(new CamelOperator(camelContext, "log:foo"))
.debounce(1, TimeUnit.SECONDS)
.lift(new CamelOperator(mockEndpoint2));
reactiveCamel.sendTo(result, "mock:results3");
.lift(reactiveCamel.to(mockEndpoint2))
.lift(reactiveCamel.to("mock:results3"))
.publish();

// Start the route
Subscription routeSubscription = route.connect();

// Send two test messages
producerTemplate.sendBody("direct:start", "<test/>");
Expand All @@ -53,5 +59,8 @@ public void testCamelOperator() throws Exception {
mockEndpoint1.assertIsSatisfied();
mockEndpoint2.assertIsSatisfied();
mockEndpoint3.assertIsSatisfied();

// Stop the route
routeSubscription.unsubscribe();
}
}

0 comments on commit 3c3b0ed

Please sign in to comment.