From b1bb5866e38b5426c7a44745b97f4269451cc785 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20COL?= Date: Fri, 24 Feb 2017 14:21:01 +0100 Subject: [PATCH] Migration of EIP documentation --- camel-core/src/main/docs/eips/loop-eip.adoc | 167 +++++ .../src/main/docs/eips/recipientList-eip.adoc | 415 +++++++++++ .../src/main/docs/eips/resequence-eip.adoc | 260 +++++++ .../src/main/docs/eips/routingSlip-eip.adoc | 114 +++ camel-core/src/main/docs/eips/sample-eip.adoc | 96 +++ camel-core/src/main/docs/eips/script-eip.adoc | 84 +++ camel-core/src/main/docs/eips/sort-eip.adoc | 98 +++ camel-core/src/main/docs/eips/split-eip.adoc | 688 ++++++++++++++++++ .../src/main/docs/eips/throttle-eip.adoc | 95 +++ .../src/main/docs/eips/validate-eip.adoc | 77 ++ 10 files changed, 2094 insertions(+) create mode 100644 camel-core/src/main/docs/eips/loop-eip.adoc create mode 100644 camel-core/src/main/docs/eips/recipientList-eip.adoc create mode 100644 camel-core/src/main/docs/eips/resequence-eip.adoc create mode 100644 camel-core/src/main/docs/eips/routingSlip-eip.adoc create mode 100644 camel-core/src/main/docs/eips/sample-eip.adoc create mode 100644 camel-core/src/main/docs/eips/script-eip.adoc create mode 100644 camel-core/src/main/docs/eips/sort-eip.adoc create mode 100644 camel-core/src/main/docs/eips/split-eip.adoc create mode 100644 camel-core/src/main/docs/eips/throttle-eip.adoc create mode 100644 camel-core/src/main/docs/eips/validate-eip.adoc diff --git a/camel-core/src/main/docs/eips/loop-eip.adoc b/camel-core/src/main/docs/eips/loop-eip.adoc new file mode 100644 index 0000000000000..c1147ecf3342d --- /dev/null +++ b/camel-core/src/main/docs/eips/loop-eip.adoc @@ -0,0 +1,167 @@ +## Loop EIP +The Loop allows for processing a message a number of times, possibly in a different way for each iteration. Useful mostly during testing. + +[NOTE] +.Default mode +==== +Notice by default the loop uses the same exchange throughout the looping. So the result from the previous iteration will be used for the next (eg Pipes and Filters). From Camel 2.8 onwards you can enable copy mode instead. See the options table for more details. +==== + +### Options + +// eip options: START +The Loop EIP supports 2 options which are listed below: + +{% raw %} +[width="100%",cols="3,1m,6",options="header"] +|======================================================================= +| Name | Java Type | Description +| copy | Boolean | If the copy attribute is true a copy of the input Exchange is used for each iteration. That means each iteration will start from a copy of the same message. By default loop will loop the same exchange all over so each iteration may have different message content. +| doWhile | Boolean | Enables the while loop that loops until the predicate evaluates to false or null. +|======================================================================= +{% endraw %} +// eip options: END + + +### Exchange properties +For each iteration two properties are set on the Exchange. Processors can rely on these properties to process the Message in different ways. +[width="100%",cols="3,6",options="header"] +|======================================================================= +| Property | Description +| CamelLoopSize | Total number of loops. This is not available if running the loop in while loop mode. +| CamelLoopIndex | Index of the current iteration (0 based) +|======================================================================= + +### Examples +The following example shows how to take a request from the *direct:x* endpoint, then send the message repetitively to *mock:result*. The number of times the message is sent is either passed as an argument to `loop()`, or determined at runtime by evaluating an expression. The expression *must* evaluate to an int, otherwise a `RuntimeCamelException` is thrown. + +#### Using the Fluent Builders +Pass loop count as an argument +[source,java] +--------------------- +from("direct:a").loop(8).to("mock:result"); +--------------------- + +Use expression to determine loop count +[source,java] +--------------------- +from("direct:b").loop(header("loop")).to("mock:result"); +--------------------- + +Use expression to determine loop count +[source,java] +--------------------- +from("direct:c").loop().xpath("/hello/@times").to("mock:result"); +--------------------- + +#### Using the Spring XML Extensions +Pass loop count as an argument +[source,xml] +--------------------- + + + + 8 + + + +--------------------- + +Use expression to determine loop count +[source,xml] +--------------------- + + + +
loop
+ +
+
+--------------------- + +For further examples of this pattern in use you could look at one of the junit test case. + +#### Using copy mode +*Available as of Camel 2.8* + +Now suppose we send a message to "direct:start" endpoint containing the letter A. + +The output of processing this route will be that, each "mock:loop" endpoint will receive "AB" as message. + +[source,java] +--------------------- +from("direct:start") + // instruct loop to use copy mode, which mean it will use a copy of the input exchange + // for each loop iteration, instead of keep using the same exchange all over + .loop(3).copy() + .transform(body().append("B")) + .to("mock:loop") + .end() + .to("mock:result"); +--------------------- + +However if we do *not* enable copy mode then "mock:loop" will receive "AB", "ABB", "ABBB", etc. messages. + +[source,java] +--------------------- +from("direct:start") + // by default loop will keep using the same exchange so on the 2nd and 3rd iteration its + // the same exchange that was previous used that are being looped all over + .loop(3) + .transform(body().append("B")) + .to("mock:loop") + .end() + .to("mock:result"); +--------------------- + +The equivalent example in XML DSL in copy mode is as follows: + +[source,xml] +--------------------- + + + + + 3 + + ${body}B + + + + + +--------------------- + +#### Using while mode +*Available as of Camel 2.17* + +The loop can act like a while loop that loops until the expression evaluates to false or null. + +For example the route below loops while the length of the message body is 5 or less characters. Notice that the DSL uses *loopDoWhile*. + +[source,java] +--------------------- +from("direct:start") + .loopDoWhile(simple("${body.length} <= 5")) + .to("mock:loop") + .transform(body().append("A")) + .end() + .to("mock:result"); +--------------------- + +And the same example in XML: +[source,xml] +--------------------- + + + + ${body.length} <= 5 + + + A${body} + + + + +--------------------- + +Notice in XML that the while loop is turned on using the *doWhile* attribute. + +### Using This Pattern +If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out. diff --git a/camel-core/src/main/docs/eips/recipientList-eip.adoc b/camel-core/src/main/docs/eips/recipientList-eip.adoc new file mode 100644 index 0000000000000..ffc0e97f0f302 --- /dev/null +++ b/camel-core/src/main/docs/eips/recipientList-eip.adoc @@ -0,0 +1,415 @@ +## Recipient List EIP +The link:http://www.enterpriseintegrationpatterns.com/RecipientList.html[Recipient List] from the EIP patterns allows you to route messages to a number of dynamically specified recipients. + +image:http://www.enterpriseintegrationpatterns.com/img/RecipientList.gif[image] + +The recipients will receive a copy of the *same* Exchange, and Camel will execute them sequentially. + +### Options + +// eip options: START +The Recipient List EIP supports 15 options which are listed below: + +{% raw %} +[width="100%",cols="3,1m,6",options="header"] +|======================================================================= +| Name | Java Type | Description +| delimiter | String | Delimiter used if the Expression returned multiple endpoints. Can be turned off using the value false. The default value is +| parallelProcessing | Boolean | If enabled then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed before it continues. Its only the sending and processing the replies from the recipients which happens concurrently. +| strategyRef | String | Sets a reference to the AggregationStrategy to be used to assemble the replies from the recipients into a single outgoing message from the RecipientList. By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy +| strategyMethodName | String | This option can be used to explicit declare the method name to use when using POJOs as the AggregationStrategy. +| strategyMethodAllowNull | Boolean | If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich) when using POJOs as the AggregationStrategy +| executorServiceRef | String | Refers to a custom Thread Pool to be used for parallel processing. Notice if you set this option then parallel processing is automatic implied and you do not have to enable that option as well. +| stopOnException | Boolean | Will now stop further processing if an exception or failure occurred during processing of an org.apache.camel.Exchange and the caused exception will be thrown. Will also stop if processing the exchange failed (has a fault message) or an exception was thrown and handled by the error handler (such as using onException). In all situations the recipient list will stop further processing. This is the same behavior as in pipeline which is used by the routing engine. The default behavior is to not stop but continue processing till the end +| ignoreInvalidEndpoints | Boolean | Ignore the invalidate endpoint exception when try to create a producer with that endpoint +| streaming | Boolean | If enabled then Camel will process replies out-of-order eg in the order they come back. If disabled Camel will process replies in the same order as defined by the recipient list. +| timeout | Long | Sets a total timeout specified in millis when using parallel processing. If the Recipient List hasn't been able to send and process all replies within the given timeframe then the timeout triggers and the Recipient List breaks out and continues. Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. If the timeout is reached with running tasks still remaining certain tasks for which it is difficult for Camel to shut down in a graceful manner may continue to run. So use this option with a bit of care. +| onPrepareRef | String | Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send or any custom logic needed before the exchange is send. +| shareUnitOfWork | Boolean | Shares the org.apache.camel.spi.UnitOfWork with the parent and each of the sub messages. Recipient List will by default not share unit of work between the parent exchange and each recipient exchange. This means each sub exchange has its own individual unit of work. +| cacheSize | Integer | Sets the maximum size used by the org.apache.camel.impl.ProducerCache which is used to cache and reuse producers when using this recipient list when uris are reused. +| parallelAggregate | Boolean | If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe. +| stopOnAggregateException | Boolean | If enabled unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used. Currently aggregation time exceptions do not stop the route processing when parallelProcessing is used. Enabling this option allows to work around this behavior. The default value is false for the sake of backward compatibility. +|======================================================================= +{% endraw %} +// eip options: END + + +### Static Recipient List +The following example shows how to route a request from an input *queue:a* endpoint to a static list of destinations + +#### Using Annotations +You can use the RecipientList Annotation on a POJO to create a Dynamic Recipient List. For more details see the Bean Integration. + +#### Using the Fluent Builders + +[source,java] +--------------------- +RouteBuilder builder = new RouteBuilder() { + public void configure() { + errorHandler(deadLetterChannel("mock:error")); + + from("direct:a") + .multicast().to("direct:b", "direct:c", "direct:d"); + } +}; +--------------------- + +#### Using the Spring XML Extensions + +[source,xml] +--------------------- + + + + + + + + + + +--------------------- + +### Dynamic Recipient List +Usually one of the main reasons for using the Recipient List pattern is that the list of recipients is dynamic and calculated at runtime. The following example demonstrates how to create a dynamic recipient list using an Expression (which in this case extracts a named header value dynamically) to calculate the list of endpoints which are either of type Endpoint or are converted to a String and then resolved using the endpoint URIs. + +#### Using the Fluent Builders + +[source,java] +--------------------- +RouteBuilder builder = new RouteBuilder() { + public void configure() { + errorHandler(deadLetterChannel("mock:error")); + + from("direct:a") + .recipientList(header("foo")); + } +}; +--------------------- + +The above assumes that the header contains a list of endpoint URIs. The following takes a single string header and tokenizes it + +[source,java] +--------------------- +from("direct:a").recipientList( + header("recipientListHeader").tokenize(",")); +--------------------- + +##### Iteratable value +The dynamic list of recipients that are defined in the header must be iterable such as: + +* `java.util.Collection` +* `java.util.Iterator` +* arrays +* `org.w3c.dom.NodeList` +* a single String with values separated by comma +* any other type will be regarded as a single value + +#### Using the Spring XML Extensions +[source,xml] +--------------------- + + + + + $foo + + + +--------------------- + +For further examples of this pattern in action you could take a look at one of the junit test cases. + +##### Using delimiter in Spring XML +In Spring DSL you can set the delimiter attribute for setting a delimiter to be used if the header value is a single String with multiple separated endpoints. By default Camel uses comma as delimiter, but this option lets you specify a custom delimiter to use instead. + +[source,xml] +--------------------- + + + + +
myHeader
+
+
+--------------------- + +So if *myHeader* contains a `String` with the value `"activemq:queue:foo, activemq:topic:hello , log:bar"` then Camel will split the `String` using the delimiter given in the XML that was comma, resulting into 3 endpoints to send to. You can use spaces between the endpoints as Camel will trim the value when it lookup the endpoint to send to. + +[NOTE] +In Java DSL you use the `tokenizer` to achieve the same. The route above in Java DSL: + +[source,java] +--------------------- +from("direct:a").recipientList(header("myHeader").tokenize(",")); +--------------------- + +In *Camel 2.1* its a bit easier as you can pass in the delimiter as 2nd parameter: + +[source,java] +--------------------- +from("direct:a").recipientList(header("myHeader"), "#"); +--------------------- + +### Sending to multiple recipients in parallel +*Available as of Camel 2.2* + +The Recipient List now supports `parallelProcessing` that for example Splitter also supports. You can use it to use a thread pool to have concurrent tasks sending the Exchange to multiple recipients concurrently. + +[source,java] +--------------------- +from("direct:a").recipientList(header("myHeader")).parallelProcessing(); +--------------------- + +And in Spring XML it is an attribute on the recipient list tag. + +[source,xml] +--------------------- + + + +
myHeader
+
+
+--------------------- + +### Stop continuing in case one recipient failed +*Available as of Camel 2.2* + +The Recipient List now supports `stopOnException` that for example Splitter also supports. You can use it to stop sending to any further recipients in case any recipient failed. + +[source,java] +--------------------- +from("direct:a").recipientList(header("myHeader")).stopOnException(); +--------------------- + +And in Spring XML its an attribute on the recipient list tag. + +[source,xml] +--------------------- + + + +
myHeader
+
+
+--------------------- + +[NOTE] +You can combine parallelProcessing and stopOnException and have them both true. + +### Ignore invalid endpoints +*Available as of Camel 2.3* + +The Recipient List now supports `ignoreInvalidEndpoints` (like the Routing Slip). You can use it to skip endpoints which are invalid. + +[source,java] +--------------------- +from("direct:a").recipientList(header("myHeader")).ignoreInvalidEndpoints(); +--------------------- + +And in Spring XML it is an attribute on the recipient list tag. + +[source,xml] +--------------------- + + + +
myHeader
+
+
+--------------------- + +Then let us say the `myHeader` contains the following two endpoints `direct:foo,xxx:bar`. The first endpoint is valid and works. However the second one is invalid and will just be ignored. Camel logs at INFO level about it, so you can see why the endpoint was invalid. + + +### Using custom `AggregationStrategy` +*Available as of Camel 2.2* + +You can now use your own `AggregationStrategy` with the Recipient List. However this is rarely needed. +What it is good for is that in case you are using Request Reply messaging then the replies from the recipients can be aggregated. +By default Camel uses `UseLatestAggregationStrategy` which just keeps that last received reply. If you must remember all the bodies that all the recipients sent back, +then you can use your own custom aggregator that keeps those. It is the same principle as with the Aggregator EIP so check it out for details. + +[source,java] +--------------------- +from("direct:a") + .recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy()) + .to("direct:b"); +--------------------- + +And in Spring XML it is again an attribute on the recipient list tag. + +[source,xml] +--------------------- + + + +
myHeader
+
+ +
+ + +--------------------- + +### Knowing which endpoint when using custom `AggregationStrategy` +Available as of Camel 2.12 + +When using a custom `AggregationStrategy` then the `aggregate` method is always invoked in sequential order (also if parallel processing is enabled) of the endpoints the Recipient List is using. +However from Camel 2.12 onwards this is easier to know as the `newExchange` Exchange now has a property stored (key is `Exchange.RECIPIENT_LIST_ENDPOINT` with the uri of the Endpoint. +So you know which endpoint you are aggregating from. The code block shows how to access this property in your Aggregator. + +[source,java] +--------------------- +@Override +public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + String uri = newExchange.getProperty(Exchange.RECIPIENT_LIST_ENDPOINT, String.class); + ... +} +--------------------- + +### Using custom thread pool +*Available as of Camel 2.2* + +A thread pool is only used for `parallelProcessing`. You supply your own custom thread pool via the `ExecutorServiceStrategy` (see Camel's Threading Model), +the same way you would do it for the `aggregationStrategy`. By default Camel uses a thread pool with 10 threads (subject to change in future versions). + +### Using method call as recipient list +You can use a Bean to provide the recipients, for example: + +[source,java] +--------------------- +from("activemq:queue:test").recipientList().method(MessageRouter.class, "routeTo"); +--------------------- + +And then `MessageRouter`: + +[source,java] +--------------------- +public class MessageRouter { + + public String routeTo() { + String queueName = "activemq:queue:test2"; + return queueName; + } +} +--------------------- + +When you use a Bean then do *not* use the `@RecipientList` annotation as this will in fact add yet another recipient list, so you end up having two. Do *not* do the following. + +[source,java] +--------------------- +public class MessageRouter { + + @RecipientList + public String routeTo() { + String queueName = "activemq:queue:test2"; + return queueName; + } +} +--------------------- + +You should only use the snippet above (using `@RecipientList`) if you just route to a Bean which you then want to act as a recipient list. + +So the original route can be changed to: + +[source,java] +--------------------- +from("activemq:queue:test").bean(MessageRouter.class, "routeTo"); +--------------------- + +Which then would invoke the routeTo method and detect that it is annotated with `@RecipientList` and then act accordingly as if it was a recipient list EIP. + +### Using timeout +*Available as of Camel 2.5* + +If you use `parallelProcessing` then you can configure a total `timeout` value in millis. Camel will then process the messages in parallel until the timeout is hit. This allows you to continue processing if one message consumer is slow. For example you can set a timeout value of 20 sec. + +[WARNING] +.Tasks may keep running +==== +If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel to shut down in a graceful manner may continue to run. So use this option with a bit of care. We may be able to improve this functionality in future Camel releases. +==== + +For example in the unit test below you can see that we multicast the message to 3 destinations. We have a timeout of 2 seconds, which means only the last two messages can be completed within the timeframe. This means we will only aggregate the last two which yields a result aggregation which outputs "BC". + +[source,java] +--------------------- +from("direct:start") + .multicast(new AggregationStrategy() { + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + if (oldExchange == null) { + return newExchange; + } + + String body = oldExchange.getIn().getBody(String.class); + oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class)); + return oldExchange; + } + }) + .parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c") + // use end to indicate end of multicast route + .end() + .to("mock:result"); + +from("direct:a").delay(1000).to("mock:A").setBody(constant("A")); + +from("direct:b").to("mock:B").setBody(constant("B")); + +from("direct:c").to("mock:C").setBody(constant("C")); +--------------------- + +[NOTE] +.Timeout in other EIPs +==== +This timeout feature is also supported by Splitter and both multicast and recipientList. +==== + +By default if a timeout occurs the `AggregationStrategy` is not invoked. However you can implement a special version + +[source,java] +.TimeoutAwareAggregationStrategy +--------------------- +public interface TimeoutAwareAggregationStrategy extends AggregationStrategy { + + /** + * A timeout occurred + * + * @param oldExchange the oldest exchange (is null on first aggregation as we only have the new exchange) + * @param index the index + * @param total the total + * @param timeout the timeout value in millis + */ + void timeout(Exchange oldExchange, int index, int total, long timeout); +--------------------- + +This allows you to deal with the timeout in the `AggregationStrategy` if you really need to. + +[NOTE] +.Timeout is total +==== +The timeout is total, which means that after X time, Camel will aggregate the messages which have completed within the timeframe. +The remainders will be cancelled. Camel will also only invoke the `timeout` method in the `TimeoutAwareAggregationStrategy` once, for the first index which caused the timeout. +==== + +### Using onPrepare to execute custom logic when preparing messages +*Available as of Camel 2.8* + +See details at Multicast + +### Using ExchangePattern in recipients +*Available as of Camel 2.15* + +The recipient list will by default use the current Exchange Pattern. Though one can imagine use-cases where one wants to send a message to a recipient using a different exchange pattern. For example you may have a route that initiates as an InOnly route, but want to use InOut exchange pattern with a recipient list. To do this in earlier Camel releases, you would need to change the exchange pattern before the recipient list, or use onPrepare option to alter the pattern. From Camel 2.15 onwards, you can configure the exchange pattern directly in the recipient endpoints. + +For example in the route below we pick up new files (which will be started as InOnly) and then route to a recipient list. As we want to use InOut with the ActiveMQ (JMS) endpoint we can now specify this using the exchangePattern=InOut option. Then the response from the JMS request/reply will then be continued routed, and thus the response is what will be stored in as a file in the outbox directory. + +[source,java] +--------------------- +from("file:inbox") + // the exchange pattern is InOnly initially when using a file route + .recipientList().constant("activemq:queue:inbox?exchangePattern=InOut") + .to("file:outbox"); +--------------------- + +[WARNING] +==== +The recipient list will not alter the original exchange pattern. So in the example above the exchange pattern will still be InOnly when the message is routed to the file:outbox endpoint. + +If you want to alter the exchange pattern permanently then use the .setExchangePattern option. See more details at Request Reply and Event Message. +==== + +### Using This Pattern +If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out. diff --git a/camel-core/src/main/docs/eips/resequence-eip.adoc b/camel-core/src/main/docs/eips/resequence-eip.adoc new file mode 100644 index 0000000000000..9ac09952b5ed2 --- /dev/null +++ b/camel-core/src/main/docs/eips/resequence-eip.adoc @@ -0,0 +1,260 @@ +## Resequence EIP +The link:http://www.enterpriseintegrationpatterns.com/Resequencer.html[Resequencer] from the link:https://camel.apache.org/enterprise-integration-patterns.html[EIP patterns] allows you to reorganise messages based on some comparator. + +By default in Camel we use an Expression to create the comparator; so that you can compare by a message header or the body or a piece of a message etc. + +image:http://www.enterpriseintegrationpatterns.com/img/Resequencer.gif[image] + +### Options + +// eip options: START +The Resequence EIP supports 1 options which are listed below: + +{% raw %} +[width="100%",cols="3,1m,6",options="header"] +|======================================================================= +| Name | Java Type | Description +| resequencerConfig | ResequencerConfig | To configure the resequencer in using either batch or stream configuration. Will by default use batch configuration. +|======================================================================= +{% endraw %} +// eip options: END + + +[NOTE] +.Change in Camel 2.7 +==== +The `` and `` tags in XML DSL in the Resequencer EIP must now be configured in the top, and not in the bottom. +So if you use those, then move them up just below the `` EIP starts in the XML. If you are using Camel older than 2.7, then those configs should be at the bottom. +==== + +Camel supports two resequencing algorithms: + +* *Batch resequencing* collects messages into a batch, sorts the messages and sends them to their output. +* *Stream resequencing* re-orders (continuous) message streams based on the detection of gaps between messages. + +By default the Resequencer does not support duplicate messages and will only keep the last message, in case a message arrives with the same message expression. However in the batch mode you can enable it to allow duplicates. + +### Batch Resequencing +The following example shows how to use the batch-processing resequencer so that messages are sorted in order of the *body()* expression. That is messages are collected into a batch (either by a maximum number of messages per batch or using a timeout) then they are sorted in order and then sent out to their output. + +#### Using the Fluent Builders +[source,java] +--------------------- +from("direct:start") + .resequence().body() + .to("mock:result"); +--------------------- + +This is equivalent to +[source,java] +--------------------- +from("direct:start") + .resequence(body()).batch() + .to("mock:result"); +--------------------- + +The batch-processing resequencer can be further configured via the `size()` and `timeout()` methods. +[source,java] +--------------------- +from("direct:start") + .resequence(body()).batch().size(300).timeout(4000L) + .to("mock:result") +--------------------- + +This sets the batch size to 300 and the batch timeout to 4000 ms (by default, the batch size is 100 and the timeout is 1000 ms). Alternatively, you can provide a configuration object. + +[source,java] +--------------------- +from("direct:start") + .resequence(body()).batch(new BatchResequencerConfig(300, 4000L)) + .to("mock:result") +--------------------- + +So the above example will reorder messages from endpoint *direct:a* in order of their bodies, to the endpoint *mock:result*. + +Typically you'd use a header rather than the body to order things; or maybe a part of the body. So you could replace this expression with + +[source,java] +--------------------- +resequencer(header("mySeqNo")) +--------------------- + +for example to reorder messages using a custom sequence number in the header `mySeqNo`. + +You can of course use many different Expression languages such as XPath, XQuery, SQL or various Scripting Languages. + +#### Using the Spring XML Extensions +[source,xml] +--------------------- + + + + + body + + + + + + +--------------------- + +### Allow Duplicates +*Available as of Camel 2.4* + +In the `batch` mode, you can now allow duplicates. In Java DSL there is a `allowDuplicates()` method and in Spring XML there is an `allowDuplicates=true` attribute on the `` you can use to enable it. + +### Reverse +*Available as of Camel 2.4* + +In the `batch` mode, you can now reverse the expression ordering. By default the order is based on 0..9,A..Z, which would let messages with low numbers be ordered first, and thus also also outgoing first. In some cases you want to reverse order, which is now possible. + +In Java DSL there is a `reverse()` method and in Spring XML there is an `reverse=true` attribute on the `` you can use to enable it. + +### Resequence JMS messages based on JMSPriority +*Available as of Camel 2.4* + +It's now much easier to use the Resequencer to resequence messages from JMS queues based on JMSPriority. For that to work you need to use the two new options `allowDuplicates` and `reverse`. + +[source,java] +--------------------- +from("jms:queue:foo") + // sort by JMSPriority by allowing duplicates (message can have same JMSPriority) + // and use reverse ordering so 9 is first output (most important), and 0 is last + // use batch mode and fire every 3th second + .resequence(header("JMSPriority")).batch().timeout(3000).allowDuplicates().reverse() + .to("mock:result"); +--------------------- + +Notice this is *only* possible in the `batch` mode of the Resequencer. + +### Ignore invalid exchanges +*Available as of Camel 2.9* + +The Resequencer EIP will from Camel 2.9 onwards throw a `CamelExchangeException` if the incoming Exchange is not valid for the resequencer - ie. the expression cannot be evaluated, such as a missing header. +You can use the option `ignoreInvalidExchanges` to ignore these exceptions which means the Resequencer will then skip the invalid Exchange. + +[source,java] +--------------------- +from("direct:start") + .resequence(header("seqno")).batch().timeout(1000) + // ignore invalid exchanges (they are discarded) + .ignoreInvalidExchanges() + .to("mock:result"); +--------------------- + +This option is available for both batch and stream resequencer. + +### Reject Old Exchanges +*Available as of Camel 2.11* + +This option can be used to prevent out of order messages from being sent regardless of the event that delivered messages downstream (capacity, timeout, etc). If enabled using `rejectOld()`, the Resequencer will throw a `MessageRejectedException` when an incoming Exchange is "older" (based on the Comparator) than the last delivered message. This provides an extra level of control with regards to delayed message ordering. + +[source,java] +--------------------- +from("direct:start") + .onException(MessageRejectedException.class).handled(true).to("mock:error").end() + .resequence(header("seqno")).stream().timeout(1000).rejectOld() + .to("mock:result"); +--------------------- + +This option is available for the stream resequencer only. + +### Stream Resequencing +The next example shows how to use the stream-processing resequencer. Messages are re-ordered based on their sequence numbers given by a seqnum header using gap detection and timeouts on the level of individual messages. + +#### Using the Fluent Builders + +[source,java] +--------------------- +from("direct:start").resequence(header("seqnum")).stream().to("mock:result"); +--------------------- + +The stream-processing resequencer can be further configured via the `capacity()` and `timeout()` methods. +[source,java] +--------------------- +from("direct:start") + .resequence(header("seqnum")).stream().capacity(5000).timeout(4000L) + .to("mock:result") +--------------------- + +This sets the resequencer's capacity to 5000 and the timeout to 4000 ms (by default, the capacity is 1000 and the timeout is 1000 ms). Alternatively, you can provide a configuration object. +[source,java] +--------------------- +from("direct:start") + .resequence(header("seqnum")).stream(new StreamResequencerConfig(5000, 4000L)) + .to("mock:result") +--------------------- + +The stream-processing resequencer algorithm is based on the detection of gaps in a message stream rather than on a fixed batch size. +Gap detection in combination with timeouts removes the constraint of having to know the number of messages of a sequence (i.e. the batch size) in advance. Messages must contain a unique sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the successor of 3 is missing. The resequencer therefore has to retain message 5 until message 4 arrives (or a timeout occurs). + +If the maximum time difference between messages (with successor/predecessor relationship with respect to the sequence number) in a message stream is known, then the resequencer's timeout parameter should be set to this value. In this case it is guaranteed that all messages of a stream are delivered in correct order to the next processor. The lower the timeout value is compared to the out-of-sequence time difference the higher is the probability for out-of-sequence messages delivered by this resequencer. Large timeout values should be supported by sufficiently high capacity values. The capacity parameter is used to prevent the resequencer from running out of memory. + +By default, the stream resequencer expects long sequence numbers but other sequence numbers types can be supported as well by providing a custom expression. + +[source,java] +--------------------- +public class MyFileNameExpression implements Expression { + + public String getFileName(Exchange exchange) { + return exchange.getIn().getBody(String.class); + } + + public Object evaluate(Exchange exchange) { + // parser the file name with YYYYMMDD-DNNN pattern + String fileName = getFileName(exchange); + String[] files = fileName.split("-D"); + Long answer = Long.parseLong(files[0]) * 1000 + Long.parseLong(files[1]); + return answer; + } + + + public T evaluate(Exchange exchange, Class type) { + Object result = evaluate(exchange); + return exchange.getContext().getTypeConverter().convertTo(type, result); + } + +} +from("direct:start").resequence(new MyFileNameExpression()).stream().timeout(100).to("mock:result"); +--------------------- + +or custom comparator via the comparator() method + +[source,java] +--------------------- +ExpressionResultComparator comparator = new MyComparator(); +from("direct:start") + .resequence(header("seqnum")).stream().comparator(comparator) + .to("mock:result"); +--------------------- + +or via a StreamResequencerConfig object. + +[source,java] +--------------------- +ExpressionResultComparator comparator = new MyComparator(); +StreamResequencerConfig config = new StreamResequencerConfig(100, 1000L, comparator); + +from("direct:start") + .resequence(header("seqnum")).stream(config) + .to("mock:result"); +--------------------- + +Using the Spring XML Extensions + +[source,xml] +--------------------- + + + + + in.header.seqnum + + + + + +--------------------- + +### Further Examples +For further examples of this pattern in use you could look at the batch-processing resequencer junit test case and the stream-processing resequencer junit test case + +### Using This Pattern +If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out. diff --git a/camel-core/src/main/docs/eips/routingSlip-eip.adoc b/camel-core/src/main/docs/eips/routingSlip-eip.adoc new file mode 100644 index 0000000000000..842c00e387331 --- /dev/null +++ b/camel-core/src/main/docs/eips/routingSlip-eip.adoc @@ -0,0 +1,114 @@ +## Routing Slip EIP +The Routing Slip from the link:https://camel.apache.org/enterprise-integration-patterns.html[EIP patterns] allows you to route a message consecutively through a series of processing steps where the sequence of steps is not known at design time and can vary for each message. + +image:http://www.enterpriseintegrationpatterns.com/img/RoutingTableSimple.gif[image] + + +### Options + +// eip options: START +The Routing Slip EIP supports 3 options which are listed below: + +{% raw %} +[width="100%",cols="3,1m,6",options="header"] +|======================================================================= +| Name | Java Type | Description +| uriDelimiter | String | Sets the uri delimiter to use +| ignoreInvalidEndpoints | Boolean | Ignore the invalidate endpoint exception when try to create a producer with that endpoint +| cacheSize | Integer | Sets the maximum size used by the org.apache.camel.impl.ProducerCache which is used to cache and reuse producers when using this recipient list when uris are reused. +|======================================================================= +{% endraw %} +// eip options: END + +### Example +The following route will take any messages sent to the Apache ActiveMQ queue SomeQueue and pass them into the Routing Slip pattern. + +[source,java] +--------------------- +from("activemq:SomeQueue") + .routingSlip("aRoutingSlipHeader"); +--------------------- + +Messages will be checked for the existence of the `aRoutingSlipHeader` header. +The value of this header should be a comma-delimited list of endpoint URIs you wish the message to be routed to. +The Message will be routed in a pipeline fashion, i.e., one after the other. From *Camel 2.5* the Routing Slip will set a property, `Exchange.SLIP_ENDPOINT`, on the Exchange which contains the current endpoint as it advanced though the slip. This allows you to _know_ how far we have processed in the slip. + +The Routing Slip will compute the slip *beforehand* which means, the slip is only computed once. If you need to compute the slip _on-the-fly_ then use the Dynamic Router pattern instead. + +### Configuration Options +Here we set the header name and the URI delimiter to something different. + +#### Using the Fluent Builders +[source,java] +--------------------- +from("direct:c").routingSlip(header("aRoutingSlipHeader"), "#"); +--------------------- + +#### Using the Spring XML Extensions + +[source,xml] +--------------------- + + + + +
aRoutingSlipHeader
+
+
+
+--------------------- + +### Ignore Invalid Endpoints +*Available as of Camel 2.3* + +The Routing Slip now supports ignoreInvalidEndpoints which the Recipient List also supports. You can use it to skip endpoints which are invalid. +[source,java] +--------------------- +from("direct:a") + .routingSlip("myHeader") + .ignoreInvalidEndpoints(); +--------------------- + +And in Spring XML its an attribute on the recipient list tag: + +[source,xml] +--------------------- + + + +
myHeader
+
+
+--------------------- + +Then let's say the myHeader contains the following two endpoints direct:foo,xxx:bar. The first endpoint is valid and works. However the second endpoint is invalid and will just be ignored. Camel logs at INFO level, so you can see why the endpoint was invalid. + +### Expression Support +*Available as of Camel 2.4* + +The Routing Slip now supports to take the expression parameter as the Recipient List does. You can tell Camel the expression that you want to use to get the routing slip. + +[source,java] +--------------------- +from("direct:a") + .routingSlip(header("myHeader")) + .ignoreInvalidEndpoints(); +--------------------- + +And in Spring XML its an attribute on the recipient list tag. +[source,xml] +--------------------- + + + + +
myHeader
+
+
+--------------------- + +### Further Examples +For further examples of this pattern in use you could look at the routing slip test cases. + +### Using This Pattern +If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out. diff --git a/camel-core/src/main/docs/eips/sample-eip.adoc b/camel-core/src/main/docs/eips/sample-eip.adoc new file mode 100644 index 0000000000000..1a551eb6ebc25 --- /dev/null +++ b/camel-core/src/main/docs/eips/sample-eip.adoc @@ -0,0 +1,96 @@ +## Sample EIP +*Available as of Camel 2.1* + +A sampling throttler allows you to extract a sample of the exchanges from the traffic through a route. + +It is configured with a sampling period during which only a single exchange is allowed to pass through. All other exchanges will be stopped. +Will by default use a sample period of 1 seconds. + +### Options +// eip options: START +The Sample EIP supports 3 options which are listed below: + +{% raw %} +[width="100%",cols="3,1m,6",options="header"] +|======================================================================= +| Name | Java Type | Description +| samplePeriod | Long | Sets the sample period during which only a single Exchange will pass through. +| messageFrequency | Long | Sets the sample message count which only a single Exchange will pass through after this many received. +| units | TimeUnit | Sets the time units for the sample period defaulting to seconds. +|======================================================================= +{% endraw %} +// eip options: END + + +### Samples +You use this EIP with the `sample` DSL as show in these samples. + +#### Using the Fluent Builders +These samples also show how you can use the different syntax to configure the sampling period: + +[source,java] +--------------------- +from("direct:sample") + .sample() + .to("mock:result"); + +from("direct:sample-configured") + .sample(1, TimeUnit.SECONDS) + .to("mock:result"); + +from("direct:sample-configured-via-dsl") + .sample().samplePeriod(1).timeUnits(TimeUnit.SECONDS) + .to("mock:result"); + +from("direct:sample-messageFrequency") + .sample(10) + .to("mock:result"); + +from("direct:sample-messageFrequency-via-dsl") + .sample().sampleMessageFrequency(5) + .to("mock:result"); +--------------------- + +#### Using the Spring XML Extensions +And the same example in Spring XML is: + +[source,xml] +--------------------- + + + + + + + + + + + + + + + + + + +--------------------- + +And since it uses a default of 1 second you can omit this configuration in case you also want to use 1 second +[source,xml] +--------------------- + + + + + + + +--------------------- + +### Using This Pattern +If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out. + +### See Also + +* link:./throttle-eip.adoc[Throttler] +* link:/aggregator-eip.adoc[Aggregator] diff --git a/camel-core/src/main/docs/eips/script-eip.adoc b/camel-core/src/main/docs/eips/script-eip.adoc new file mode 100644 index 0000000000000..ab9ed9055c7ab --- /dev/null +++ b/camel-core/src/main/docs/eips/script-eip.adoc @@ -0,0 +1,84 @@ +## Script EIP +*Available as of Camel 2.16* + +Is used to execute a script which does not change the message (by default). +This is useful when you need to invoke some logic that are not in Java code such as JavaScript, +Groovy or any of the other Languages. The message body is not changed (by default) however the scripting +context has access to the current Exchange and can essentially change the message or headers directly. +But the return value from the script is discarded and not used. +If the return value should be used as a changed message body then use link:./message-translator.adoc[Message Translator] EIP instead. + +### Options + +// eip options: START +The Script EIP supports 0 options which are listed below: + +{% raw %} +[width="100%",cols="3,1m,6",options="header"] +|======================================================================= +| Name | Java Type | Description +|======================================================================= +{% endraw %} +// eip options: END + +### Using from Java DSL +The route below will read the file contents and validate them against a regular expression. + +[source,java] +--------------------- +from("file://inbox") + .script().groovy("// some groovy code goes here") + .to("bean:MyServiceBean.processLine"); +--------------------- + +### Using from Spring DSL +And from XML its easy as well + +[source,xml] +--------------------- + + + + + + + +--------------------- + +Mind that you can use CDATA in XML if the groovy scrip uses < > etc + +[source,xml] +--------------------- + + + + + + + +--------------------- + +### Using external script files +You can refer to external script files instead of inlining the script. For example to load a groovy script from the classpath you need to prefix the value with *resource:* as shown: + +[source,xml] +--------------------- + + + + + + + +--------------------- + +You can also refer to the script from the file system with file: instead of classpath: such as file:/var/myscript.groovy + +### Using This Pattern +If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out. diff --git a/camel-core/src/main/docs/eips/sort-eip.adoc b/camel-core/src/main/docs/eips/sort-eip.adoc new file mode 100644 index 0000000000000..38b980bc8bffd --- /dev/null +++ b/camel-core/src/main/docs/eips/sort-eip.adoc @@ -0,0 +1,98 @@ +## Sort EIP +Sort can be used to sort a message. Imagine you consume text files and before processing each file you want to be sure the content is sorted. + +Sort will by default sort the body using a default comparator that handles numeric values or uses the string representation. You can provide your own comparator, and even an expression to return the value to be sorted. Sort requires the value returned from the expression evaluation is convertible to `java.util.List` as this is required by the JDK sort operation. + +### Options + +// eip options: START +The Sort EIP supports 1 options which are listed below: + +{% raw %} +[width="100%",cols="3,1m,6",options="header"] +|======================================================================= +| Name | Java Type | Description +| comparatorRef | String | Sets a reference to lookup for the comparator to use for sorting +|======================================================================= +{% endraw %} +// eip options: END + + +### Using from Java DSL +In the route below it will read the file content and tokenize by line breaks so each line can be sorted. +[source,java] +--------------------- +from("file://inbox").sort(body().tokenize("\n")).to("bean:MyServiceBean.processLine"); +--------------------- + +You can pass in your own comparator as a 2nd argument: +[source,java] +--------------------- +from("file://inbox").sort(body().tokenize("\n"), new MyReverseComparator()).to("bean:MyServiceBean.processLine"); +--------------------- + +### Using from Spring DSL +In the route below it will read the file content and tokenize by line breaks so each line can be sorted. + +*Camel 2.7 or better* +[source,xml] +--------------------- + + + + body + + + +--------------------- + +*Camel 2.6 or older* +[source,xml] +--------------------- + + + + + body + + + + +--------------------- + +And to use our own comparator we can refer to it as a spring bean: + +*Camel 2.7 or better* +[source,xml] +--------------------- + + + + body + + + + + +--------------------- + +*Camel 2.6 or older* +[source,xml] +--------------------- + + + + + body + + + + + + +--------------------- + +Besides ``, you can supply an expression using any language you like, so long as it returns a list. + +### Using This Pattern +If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out. diff --git a/camel-core/src/main/docs/eips/split-eip.adoc b/camel-core/src/main/docs/eips/split-eip.adoc new file mode 100644 index 0000000000000..1ea29c8a74378 --- /dev/null +++ b/camel-core/src/main/docs/eips/split-eip.adoc @@ -0,0 +1,688 @@ +## Split EIP +The link:http://www.enterpriseintegrationpatterns.com/patterns/messaging/Sequencer.html[Splitter] from the link:enterprise-integration-patterns.html[EIP patterns] allows you split a message into a number of pieces and process them individually. + +image:http://www.enterpriseintegrationpatterns.com/img/Sequencer.gif[image] + +You need to specify a Splitter as `split()`. In earlier versions of Camel, you need to use `splitter()`. + + +// eip options: START +The Split EIP supports 12 options which are listed below: + +{% raw %} +[width="100%",cols="3,1m,6",options="header"] +|======================================================================= +| Name | Java Type | Description +| parallelProcessing | Boolean | If enabled then processing each splitted messages occurs concurrently. Note the caller thread will still wait until all messages has been fully processed before it continues. Its only processing the sub messages from the splitter which happens concurrently. +| strategyRef | String | Sets a reference to the AggregationStrategy to be used to assemble the replies from the splitted messages into a single outgoing message from the Splitter. By default Camel will use the original incoming message to the splitter (leave it unchanged). You can also use a POJO as the AggregationStrategy +| strategyMethodName | String | This option can be used to explicit declare the method name to use when using POJOs as the AggregationStrategy. +| strategyMethodAllowNull | Boolean | If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich) when using POJOs as the AggregationStrategy +| executorServiceRef | String | Refers to a custom Thread Pool to be used for parallel processing. Notice if you set this option then parallel processing is automatic implied and you do not have to enable that option as well. +| streaming | Boolean | When in streaming mode then the splitter splits the original message on-demand and each splitted message is processed one by one. This reduces memory usage as the splitter do not split all the messages first but then we do not know the total size and therefore the link org.apache.camel.ExchangeSPLIT_SIZE is empty. In non-streaming mode (default) the splitter will split each message first to know the total size and then process each message one by one. This requires to keep all the splitted messages in memory and therefore requires more memory. The total size is provided in the link org.apache.camel.ExchangeSPLIT_SIZE header. The streaming mode also affects the aggregation behavior. If enabled then Camel will process replies out-of-order eg in the order they come back. If disabled Camel will process replies in the same order as the messages was splitted. +| stopOnException | Boolean | Will now stop further processing if an exception or failure occurred during processing of an org.apache.camel.Exchange and the caused exception will be thrown. Will also stop if processing the exchange failed (has a fault message) or an exception was thrown and handled by the error handler (such as using onException). In all situations the splitter will stop further processing. This is the same behavior as in pipeline which is used by the routing engine. The default behavior is to not stop but continue processing till the end +| timeout | Long | Sets a total timeout specified in millis when using parallel processing. If the Splitter hasn't been able to split and process all the sub messages within the given timeframe then the timeout triggers and the Splitter breaks out and continues. Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. If the timeout is reached with running tasks still remaining certain tasks for which it is difficult for Camel to shut down in a graceful manner may continue to run. So use this option with a bit of care. +| onPrepareRef | String | Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send or any custom logic needed before the exchange is send. +| shareUnitOfWork | Boolean | Shares the org.apache.camel.spi.UnitOfWork with the parent and each of the sub messages. Splitter will by default not share unit of work between the parent exchange and each splitted exchange. This means each splitted exchange has its own individual unit of work. +| parallelAggregate | Boolean | If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe. +| stopOnAggregateException | Boolean | If enabled unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used. Currently aggregation time exceptions do not stop the route processing when parallelProcessing is used. Enabling this option allows to work around this behavior. The default value is false for the sake of backward compatibility. +|======================================================================= +{% endraw %} +// eip options: END + +### Exchange properties +The following properties are set on each Exchange that are split: + +[width="100%",cols="3,1m,6",options="header"] +|======================================================================= +| Property | Type | Description +| `CamelSplitIndex` | `int` | A split counter that increases for each Exchange being split. The counter starts from 0. +| `CamelSplitSize` | `int` | The total number of Exchanges that was splitted. This header is not applied for stream based splitting. From *Camel 2.9* onwards this header is also set in stream based splitting, but only on the completed Exchange. +| `CamelSplitComplete` | `boolean` | *Camel 2.4*: Whether or not this Exchange is the last. +|======================================================================= + + +### Examples +The following example shows how to take a request from the *direct:a* endpoint the split it into pieces using an Expression, then forward each piece to *direct:b* + +#### Using the Fluent Builders + +[source,java] +--------------------- +RouteBuilder builder = new RouteBuilder() { + public void configure() { + errorHandler(deadLetterChannel("mock:error")); + + from("direct:a") + .split(body(String.class).tokenize("\n")) + .to("direct:b"); + } +}; +--------------------- + +The splitter can use any Expression language so you could use any of the Languages Supported such as XPath, XQuery, SQL or one of the Scripting Languages to perform the split. e.g. + +[source,java] +--------------------- +from("activemq:my.queue").split(xpath("//foo/bar")).convertBodyTo(String.class).to("file://some/directory") +--------------------- + +#### Using the Spring XML Extensions + +[source,xml] +--------------------- + + + + + /invoice/lineItems + + + + +--------------------- + +For further examples of this pattern in use you could look at one of the junit test case. + +### Splitting a Collection, Iterator or Array +A common use case is to split a Collection, Iterator or Array from the message. In the sample below we simply use an Expression to identify the value to split. + +[source,java] +--------------------- +from("direct:splitUsingBody").split(body()).to("mock:result"); + +from("direct:splitUsingHeader").split(header("foo")).to("mock:result"); +--------------------- + +In Spring XML you can use the Simple language to identify the value to split. + +[source,java] +--------------------- + + ${body} + + + + + ${header.foo} + + +--------------------- + +### Using Tokenizer from Spring XML Extensions* +You can use the tokenizer expression in the Spring DSL to split bodies or headers using a token. This is a common use-case, so we provided a special *tokenizer* tag for this. +In the sample below we split the body using a @ as separator. You can of course use comma or space or even a regex pattern, also set regex=true. + +[source,xml] +--------------------- + + + + + + + + + +--------------------- + +### What the Splitter returns + +*Camel 2.2 or older:* +The Splitter will by default return the *last* splitted message. + +*Camel 2.3 and newer* +The Splitter will by default return the original input message. + +*For all versions* +You can override this by suppling your own strategy as an AggregationStrategy. There is a sample on this page (Split aggregate request/reply sample). Notice its the same strategy as the Aggregator supports. This Splitter can be viewed as having a build in light weight Aggregator. + +### Parallel execution of distinct 'parts' + +If you want to execute all parts in parallel you can use special notation of `split()` with two arguments, where the second one is a *boolean* flag if processing should be parallel. e.g. + +[source,java] +--------------------- +XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); +from("activemq:my.queue").split(xPathBuilder, true).to("activemq:my.parts"); +--------------------- + +The boolean option has been refactored into a builder method `parallelProcessing` so its easier to understand what the route does when we use a method instead of true|false. + +[source,java] +--------------------- +XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); +from("activemq:my.queue").split(xPathBuilder).parallelProcessing().to("activemq:my.parts"); +--------------------- + +### Stream based + +[NOTE] +.Splitting big XML payloads +==== +The XPath engine in Java and saxon will load the entire XML content into memory. And thus they are not well suited for very big XML payloads. +Instead you can use a custom Expression which will iterate the XML payload in a streamed fashion. From Camel 2.9 onwards you can use the Tokenizer language +which supports this when you supply the start and end tokens. From Camel 2.14, you can use the XMLTokenizer language which is specifically provided for tokenizing XML documents. +==== + +You can split streams by enabling the streaming mode using the streaming builder method. + +[source,java] +--------------------- +from("direct:streaming").split(body().tokenize(",")).streaming().to("activemq:my.parts"); +--------------------- + +You can also supply your custom splitter to use with streaming like this: + +[source,java] +--------------------- +import static org.apache.camel.builder.ExpressionBuilder.beanExpression; +from("direct:streaming") + .split(beanExpression(new MyCustomIteratorFactory(), "iterator")) + .streaming().to("activemq:my.parts") +--------------------- + +### Streaming big XML payloads using Tokenizer language + + +There are two tokenizers that can be used to tokenize an XML payload. The first tokenizer uses the same principle as in the text tokenizer to scan the XML payload and extract a sequence of tokens. + +*Available as of Camel 2.9* + +If you have a big XML payload, from a file source, and want to split it in streaming mode, then you can use the Tokenizer language with start/end tokens to do this with low memory footprint. + +[NOTE] +.StAX component +==== +The Camel StAX component can also be used to split big XML files in a streaming mode. See more details at StAX. +==== + +For example you may have a XML payload structured as follows +[source,xml] +-------------------------------------------------------- + + + + + + + +... + + + + +-------------------------------------------------------- + +Now to split this big file using XPath would cause the entire content to be loaded into memory. So instead we can use the Tokenizer language to do this as follows: +[source,java] +-------------------------------------------------------- +from("file:inbox") + .split().tokenizeXML("order").streaming() + .to("activemq:queue:order"); +-------------------------------------------------------- + +In XML DSL the route would be as follows: +[source,xml] +-------------------------------------------------------- + + + + + + + +-------------------------------------------------------- + +Notice the `tokenizeXML` method which will split the file using the tag name of the child node (more precisely speaking, the local name of the element without its namespace prefix if any), which mean it will grab the content between the and tags (incl. the tokens). So for example a splitted message would be as follows: +[source,xml] +-------------------------------------------------------- + + + +-------------------------------------------------------- + +If you want to inherit namespaces from a root/parent tag, then you can do this as well by providing the name of the root/parent tag: +[source,xml] +-------------------------------------------------------- + + + + + + + +-------------------------------------------------------- + +And in Java DSL its as follows: +[source,java] +-------------------------------------------------------- +from("file:inbox") + .split().tokenizeXML("order", "orders").streaming() + .to("activemq:queue:order"); +-------------------------------------------------------- + +Available as of Camel 2.13.1, you can set the above inheritNamsepaceTagName property to "*" to include the preceding context in each token (i.e., generating each token enclosed in its ancestor elements). It is noted that each token must share the same ancestor elements in this case. + +The above tokenizer works well on simple structures but has some inherent limitations in handling more complex XML structures. + +*Available as of Camel 2.14* + +The second tokenizer uses a StAX parser to overcome these limitations. This tokenizer recognizes XML namespaces and also handles simple and complex XML structures more naturally and efficiently. + +To split using this tokenizer at {urn:shop}order, we can write + +[source,java] +-------------------------------------------------------- +Namespaces ns = new Namespaces("ns1", "urn:shop"); +... +from("file:inbox") + .split().xtokenize("//ns1:order", 'i', ns).streaming() + .to("activemq:queue:order) +-------------------------------------------------------- + +Two arguments control the behavior of the tokenizer. The first argument specifies the element using a path notation. This path notation uses a subset of xpath with wildcard support. The second argument represents the extraction mode. The available extraction modes are: + +[width="100%",cols="3,6",options="header"] +|======================================================================= +| Mode | Description +| i | injecting the contextual namespace bindings into the extracted token (default) +| w | wrapping the extracted token in its ancestor context +| u | unwrapping the extracted token to its child content +| t | extracting the text content of the specified element +|======================================================================= + +Having an input XML +[source,xml] +-------------------------------------------------------- + + 1232014-02-25... +... +-------------------------------------------------------- + +Each mode will result in the following tokens, +[width="100%",cols="3,6",options="header"] +|======================================================================= +| Mode | Description +| i | 1232014-02-25... +| w | + + 1232014-02-25... + + +| u | 1232014-02-25... +| t | 1232014-02-25... +|======================================================================= + +In XML DSL, the equivalent route would be written as follows: + +[source,xml] +-------------------------------------------------------- + + + + + //ns1:order + + + + +-------------------------------------------------------- + + or setting the extraction mode explicitly as + +[source,java] +-------------------------------------------------------- +... +//ns1:order +... +-------------------------------------------------------- + +Note that this StAX based tokenizer's uses StAX Location API and requires a StAX Reader implementation (e.g., woodstox) that correctly returns the offset position pointing to the beginning of each event triggering segment (e.g., the offset position of '<' at each start and end element event). If you use a StAX Reader which does not implement that API correctly it results in invalid xml snippets after the split. For example the snippet could be wrong terminated: +[source,java] +-------------------------------------------------------- +...< .... ... +-------------------------------------------------------- + +### Splitting files by grouping N lines together +*Available as of Camel 2.10* + +The Tokenizer language has a new option group that allows you to group N parts together, for example to split big files into chunks of 1000 lines. +[source,java] +-------------------------------------------------------- +from("file:inbox") + .split().tokenize("\n", 1000).streaming() + .to("activemq:queue:order"); +-------------------------------------------------------- + +And in XML DSL + +[source,xml] +-------------------------------------------------------- + + + + + + + +-------------------------------------------------------- + +The group option is a number that must be a positive number that dictates how many groups to combine together. Each part will be combined using the token. + +So in the example above the message being sent to the activemq order queue, will contain 1000 lines, and each line separated by the token (which is a new line token). + +The output when using the group option is always a `java.lang.String` type. + +### Specifying a custom aggregation strategy +This is specified similar to the Aggregator. + +### Specifying a custom ThreadPoolExecutor +You can customize the underlying ThreadPoolExecutor used in the parallel splitter. In the Java DSL try something like this: +[source,java] +-------------------------------------------------------- +XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); + +ExecutorService pool = ... + +from("activemq:my.queue") + .split(xPathBuilder).executorService(pool) + .to("activemq:my.parts"); +-------------------------------------------------------- + +### Using a Pojo to do the splitting + +As the Splitter can use any Expression to do the actual splitting we leverage this fact and use a *method* expression to invoke a Bean to get the splitted parts. + +The Bean should return a value that is iterable such as: `java.util.Collection`, `java.util.Iterator` or an array. + +So the returned value, will then be used by Camel at runtime, to split the message. + +[TIP] +.Streaming mode and using pojo +==== +When you have enabled the streaming mode, then you should return a `Iterator` to ensure streamish fashion. For example if the message is a big file, then by using an iterator, that returns a piece of the file in chunks, in the next method of the `Iterator` ensures low memory footprint. This avoids the need for reading the entire content into memory. For an example see the source code for the TokenizePair implementation. +==== + +In the route we define the Expression as a method call to invoke our Bean that we have registered with the id mySplitterBean in the Registry. + + +[source,java] +-------------------------------------------------------- +from("direct:body") + // here we use a POJO bean mySplitterBean to do the split of the payload + .split().method("mySplitterBean", "splitBody") + .to("mock:result"); +from("direct:message") + // here we use a POJO bean mySplitterBean to do the split of the message + // with a certain header value + .split().method("mySplitterBean", "splitMessage") + .to("mock:result"); +-------------------------------------------------------- + +And the logic for our Bean is as simple as. Notice we use Camel Bean Binding to pass in the message body as a String object. + +[source,java] +-------------------------------------------------------- +public class MySplitterBean { + + /** + * The split body method returns something that is iteratable such as a java.util.List. + * + * @param body the payload of the incoming message + * @return a list containing each part splitted + */ + public List splitBody(String body) { + // since this is based on an unit test you can of cause + // use different logic for splitting as Camel have out + // of the box support for splitting a String based on comma + // but this is for show and tell, since this is java code + // you have the full power how you like to split your messages + List answer = new ArrayList(); + String[] parts = body.split(","); + for (String part : parts) { + answer.add(part); + } + return answer; + } + + /** + * The split message method returns something that is iteratable such as a java.util.List. + * + * @param header the header of the incoming message with the name user + * @param body the payload of the incoming message + * @return a list containing each part splitted + */ + public List splitMessage(@Header(value = "user") String header, @Body String body) { + // we can leverage the Parameter Binding Annotations + // http://camel.apache.org/parameter-binding-annotations.html + // to access the message header and body at same time, + // then create the message that we want, splitter will + // take care rest of them. + // *NOTE* this feature requires Camel version >= 1.6.1 + List answer = new ArrayList(); + String[] parts = header.split(","); + for (String part : parts) { + DefaultMessage message = new DefaultMessage(); + message.setHeader("user", part); + message.setBody(body); + answer.add(message); + } + return answer; + } +} +-------------------------------------------------------- + + +### Split aggregate request/reply sample +This sample shows how you can split an Exchange, process each splitted message, aggregate and return a combined response to the original caller using request/reply. +The route below illustrates this and how the split supports a *`aggregationStrategy`* to hold the in progress processed messages: + +[source,java] +-------------------------------------------------------- +// this routes starts from the direct:start endpoint +// the body is then splitted based on @ separator +// the splitter in Camel supports InOut as well and for that we need +// to be able to aggregate what response we need to send back, so we provide our +// own strategy with the class MyOrderStrategy. +from("direct:start") + .split(body().tokenize("@"), new MyOrderStrategy()) + // each splitted message is then send to this bean where we can process it + .to("bean:MyOrderService?method=handleOrder") + // this is important to end the splitter route as we do not want to do more routing + // on each splitted message + .end() + // after we have splitted and handled each message we want to send a single combined + // response back to the original caller, so we let this bean build it for us + // this bean will receive the result of the aggregate strategy: MyOrderStrategy + .to("bean:MyOrderService?method=buildCombinedResponse") +-------------------------------------------------------- + +And the OrderService bean is as follows: + +[source,java] +-------------------------------------------------------- +public static class MyOrderService { + + private static int counter; + + /** + * We just handle the order by returning a id line for the order + */ + public String handleOrder(String line) { + LOG.debug("HandleOrder: " + line); + return "(id=" + ++counter + ",item=" + line + ")"; + } + + /** + * We use the same bean for building the combined response to send + * back to the original caller + */ + public String buildCombinedResponse(String line) { + LOG.debug("BuildCombinedResponse: " + line); + return "Response[" + line + "]"; + } +} +-------------------------------------------------------- + +And our custom *`aggregationStrategy`* that is responsible for holding the in progress aggregated message that after the splitter is ended will be sent to the *`buildCombinedResponse`* method for final processing before the combined response can be returned to the waiting caller. + +[source,java] +-------------------------------------------------------- +/** + * This is our own order aggregation strategy where we can control + * how each splitted message should be combined. As we do not want to + * loos any message we copy from the new to the old to preserve the + * order lines as long we process them + */ +public static class MyOrderStrategy implements AggregationStrategy { + + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + // put order together in old exchange by adding the order from new exchange + + if (oldExchange == null) { + // the first time we aggregate we only have the new exchange, + // so we just return it + return newExchange; + } + + String orders = oldExchange.getIn().getBody(String.class); + String newLine = newExchange.getIn().getBody(String.class); + + LOG.debug("Aggregate old orders: " + orders); + LOG.debug("Aggregate new order: " + newLine); + + // put orders together separating by semi colon + orders = orders + ";" + newLine; + // put combined order back on old to preserve it + oldExchange.getIn().setBody(orders); + + // return old as this is the one that has all the orders gathered until now + return oldExchange; + } +} +-------------------------------------------------------- + +So lets run the sample and see how it works. + +We send an Exchange to the *direct:start* endpoint containing a IN body with the String value: A@B@C. The flow is: +[source,xml] +-------------------------------------------------------- +HandleOrder: A +HandleOrder: B +Aggregate old orders: (id=1,item=A) +Aggregate new order: (id=2,item=B) +HandleOrder: C +Aggregate old orders: (id=1,item=A);(id=2,item=B) +Aggregate new order: (id=3,item=C) +BuildCombinedResponse: (id=1,item=A);(id=2,item=B);(id=3,item=C) +Response to caller: Response[(id=1,item=A);(id=2,item=B);(id=3,item=C)] +-------------------------------------------------------- + +### Stop processing in case of exception +*Available as of Camel 2.1* + +The Splitter will by default continue to process the entire Exchange even in case of one of the splitted message will thrown an exception during routing. + +For example if you have an Exchange with 1000 rows that you split and route each sub message. During processing of these sub messages an exception is thrown at the 17th. What Camel does by default is to process the remainder 983 messages. You have the chance to remedy or handle this in the `AggregationStrategy`. + +But sometimes you just want Camel to stop and let the exception be propagated back, and let the Camel error handler handle it. You can do this in Camel 2.1 by specifying that it should stop in case of an exception occurred. This is done by the `stopOnException` option as shown below: + +[source,java] +-------------------------------------------------------- +from("direct:start") + .split(body().tokenize(",")).stopOnException() + .process(new MyProcessor()) + .to("mock:split"); +-------------------------------------------------------- + +And using XML DSL you specify it as follows: +[source,xml] +-------------------------------------------------------- + + + + + + + + +-------------------------------------------------------- + +### Using onPrepare to execute custom logic when preparing messages +*Available as of Camel 2.8* +See details at Multicast + +### Sharing unit of work +*Available as of Camel 2.8* +The Splitter will by default not share unit of work between the parent exchange and each splitted exchange. This means each sub exchange has its own individual unit of work. +For example you may have an use case, where you want to split a big message. And you want to regard that process as an atomic isolated operation that either is a success or failure. In case of a failure you want that big message to be moved into a dead letter queue. To support this use case, you would have to share the unit of work on the Splitter. + +Here is an example in Java DSL +[source,java] +-------------------------------------------------------- +errorHandler(deadLetterChannel("mock:dead").useOriginalMessage() + .maximumRedeliveries(3).redeliveryDelay(0)); + +from("direct:start") + .to("mock:a") + // share unit of work in the splitter, which tells Camel to propagate failures from + // processing the splitted messages back to the result of the splitter, which allows + // it to act as a combined unit of work + .split(body().tokenize(",")).shareUnitOfWork() + .to("mock:b") + .to("direct:line") + .end() + .to("mock:result"); + +from("direct:line") + .to("log:line") + .process(new MyProcessor()) + .to("mock:line"); +-------------------------------------------------------- + +Now in this example what would happen is that in case there is a problem processing each sub message, the error handler will kick in (yes error handling still applies for the sub messages). *But* what doesn't happen is that if a sub message fails all redelivery attempts (its exhausted), then its *not* moved into that dead letter queue. The reason is that we have shared the unit of work, so the sub message will report the error on the shared unit of work. When the Splitter is done, it checks the state of the shared unit of work and checks if any errors occurred. And if an error occurred it will set the exception on the Exchange and mark it for rollback. The error handler will yet again kick in, as the Exchange has been marked as rollback and it had an exception as well. No redelivery attempts is performed (as it was marked for rollback) and the Exchange will be moved into the dead letter queue. + +Using this from XML DSL is just as easy as you just have to set the shareUnitOfWork attribute to true: + +[source,xml] +-------------------------------------------------------- + + + + + + + + + + + + + + + + + + + + + + + + + + + + +-------------------------------------------------------- + +[NOTE] +.Implementation of shared unit of work +==== +So in reality the unit of work is not shared as a single object instance. Instead `SubUnitOfWork` is attached to their parent, and issues callback to the parent about their status (commit or rollback). This may be refactored in Camel 3.0 where larger API changes can be done. +==== + +### Using This Pattern +If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out. diff --git a/camel-core/src/main/docs/eips/throttle-eip.adoc b/camel-core/src/main/docs/eips/throttle-eip.adoc new file mode 100644 index 0000000000000..ce9b652ee8d70 --- /dev/null +++ b/camel-core/src/main/docs/eips/throttle-eip.adoc @@ -0,0 +1,95 @@ +## Throttle EIP +The Throttler Pattern allows you to ensure that a specific endpoint does not get overloaded, or that we don't exceed an agreed SLA with some external service. + +### Options + +// eip options: START +The Throttle EIP supports 5 options which are listed below: + +{% raw %} +[width="100%",cols="3,1m,6",options="header"] +|======================================================================= +| Name | Java Type | Description +| executorServiceRef | String | Sets the ExecutorService which could be used by throttle definition +| timePeriodMillis | Long | Sets the time period during which the maximum request count is valid for +| asyncDelayed | Boolean | Enables asynchronous delay which means the thread will not block while delaying. +| callerRunsWhenRejected | Boolean | Whether or not the caller should run the task when it was rejected by the thread pool. Is by default true +| rejectExecution | Boolean | Whether or not throttler throws the ThrottlerRejectedExecutionException when the exchange exceeds the request limit Is by default false +|======================================================================= +{% endraw %} +// eip options: END + +### Examples +#### Using the Fluent Builders + +[source,java] +--------------------- +from("seda:a").throttle(3).timePeriodMillis(10000).to("log:result", "mock:result"); +--------------------- + +So the above example will throttle messages all messages received on *seda:a* before being sent to *mock:result* ensuring that a maximum of 3 messages are sent in any 10 second window. +Note that since `timePeriodMillis` defaults to 1000 milliseconds, just setting the `maximumRequestsPerPeriod` has the effect of setting the maximum number of requests per second. So to throttle requests at 100 requests per second between two endpoints, it would look more like this... + +[source,java] +--------------------- +from("seda:a").throttle(100).to("seda:b"); +--------------------- + +For further examples of this pattern in use you could look at the junit test case. + +#### Using the Spring XML Extensions +##### Camel 2.7.x or older +[source,xml] +--------------------- + + + + + + +--------------------- + +##### Camel 2.8 onwards +In Camel 2.8 onwards you must set the maximum period as an Expression as shown below where we use a Constant expression: +[source,xml] +--------------------- + + + + + 3 + + + + +--------------------- + +### Dynamically changing maximum requests per period +*Available as of Camel 2.8* + +Since we use an Expression you can adjust this value at runtime, for example you can provide a header with the value. At runtime Camel evaluates the expression and converts the result to a `java.lang.Long` type. In the example below we use a header from the message to determine the maximum requests per period. If the header is absent, then the Throttler uses the old value. So that allows you to only provide a header if the value is to be changed: +[source,xml] +--------------------- + + + + +
throttleValue
+ + +
+
+--------------------- + +### Asynchronous delaying +*Available as of Camel 2.4* + +You can let the Throttler use non blocking asynchronous delaying, which means Camel will use a scheduler to schedule a task to be executed in the future. The task will then continue routing. This allows the caller thread to not block and be able to service other messages, etc. + +[source,java] +--------------------- +from("seda:a").throttle(100).asyncDelayed().to("seda:b"); +--------------------- + +### Using This Pattern +If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out. diff --git a/camel-core/src/main/docs/eips/validate-eip.adoc b/camel-core/src/main/docs/eips/validate-eip.adoc new file mode 100644 index 0000000000000..c9ac0c7781a5c --- /dev/null +++ b/camel-core/src/main/docs/eips/validate-eip.adoc @@ -0,0 +1,77 @@ +## Validate EIP +*Available as of Camel 2.3* + +Validate uses an expression or predicates to validate the contents of a message. It is useful for ensuring that messages are valid before attempting to process them. + +You can use the validate DSL with all kind of Predicates and Expressions. Validate evaluates the Predicate/Expression and if it is false a `PredicateValidationException` is thrown. If it is true message processing continues. + +### Options + +// eip options: START +The Validate EIP supports 0 options which are listed below: + +{% raw %} +[width="100%",cols="3,1m,6",options="header"] +|======================================================================= +| Name | Java Type | Description +|======================================================================= +{% endraw %} +// eip options: END + +### Using from Java DSL +The route below will read the file contents and validate them against a regular expression. + +[source,java] +--------------------- +from("file://inbox") + .validate(body(String.class).regex("^\\w{10}\\,\\d{2}\\,\\w{24}$")) +.to("bean:MyServiceBean.processLine"); +--------------------- + +Validate is not limited to the message body. You can also validate the message header. + +[source,java] +--------------------- +from("file://inbox") + .validate(header("bar").isGreaterThan(100)) +.to("bean:MyServiceBean.processLine"); +--------------------- + +You can also use validate together with simple. + +[source,java] +--------------------- +from("file://inbox") + .validate(simple("${in.header.bar} == 100")) +.to("bean:MyServiceBean.processLine"); +--------------------- + +### Using from Spring DSL +To use validate in the Spring DSL, the easiest way is to use simple expressions. +[source,xml] +--------------------- + + + + ${body} regex ^\\w{10}\\,\\d{2}\\,\\w{24}$ + + + + + +--------------------- + +The XML DSL to validate the message header would looks like this: +[source,xml] +--------------------- + + + + ${in.header.bar} == 100 + + + + + +--------------------- + +### Using This Pattern +If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.