This community extension focuses on bringing two great Open Source frameworks closer together, the camunda BPM platform and Apache Camel. Note, that a community extension is not part of the supported product of Camunda.
See example project 'camel use cases' for code for all of the use cases shown in the above model.
Environment requirements:
camunda-bpm-camel Version | Java Version | Camel Version |
---|---|---|
>= 0.8 | >= JDK 1.8 | >= 3.2 |
0.6 - 0.7 | >= JDK 1.8 | <= 3.1 |
0.4 - 0.5 | = JDK 1.7 | <= 2.x |
<= 0.3 | < JDK 1.7 | <= 2.x |
Please also check Camel requirements: http://camel.apache.org/what-are-the-dependencies.html (e.g. Camel 2.14 onwards requires JDK 1.7 or better).
Now follow the docs on the environment you want to run in. In Spring it will look like:
<dependency>
<groupId>org.camunda.bpm.extension.camel</groupId>
<artifactId>camunda-bpm-camel-spring</artifactId>
<version>0.8</version>
</dependency>
The Spring bean id camel
can be made available easily then:
<bean id="camel" class="org.camunda.bpm.camel.spring.impl.CamelServiceImpl">
<property name="processEngine" ref="processEngine"/>
<property name="camelContext" ref="camelContext"/>
</bean>
Use the following expression in a ServiceTask to send all the process instance variables as a map to Camel endpoint:
${camel.sendTo('<camel endpoint>')}
Hint: You will also get variables which were set but contain a null value.
Alternatively you can specify which process instance variables you want to send to Camel with:
${camel.sendTo('<camel endpoint>', '<comma-separated list of process variables>')}
Hint: missing or null value variables will cause to throw an IllegalArgumentException. You can append a question mark to each name of a variable which is allowed to be missing or null (e.g. 'mayBeNullVar?,mustNotBeNullVar').
Additionally you can specify a correlationKey to send to Camel. It can be used to correlate a response message. The route for the response must contain a parameter correlationKeyName with the name of the process variable which is used for correlation:
${camel.sendTo('<camel endpoint>', '<comma-separated list of process variables>', 'correlationKey')}
The properties CamundaBpmProcessInstanceId
, CamundaBpmBusinessKey
(if available) and CamundaBpmCorrelationKey
(if set) will be available to any downstream processors in the Camel route.
The following use cases are supported by the camunda BPM Camel component (see Camel Components).
A direct consumer to start process instances.
The following URI parameters are supported:
Parameter | Description |
---|---|
processDefinitionKey |
the process definition key of the process to start an instance of |
copyBodyAsVariable |
name of the process variable to which the body of the Camel should be copied. Default is camelBody |
copyHeaders |
whether the Camel message headers should be copied as process variables |
copyProperties |
whether the Camel exchange properties should be copied as process variables |
If the Camel message body is a map, then all the keys will be copied as process variables of the started instance.
If the property CamundaBpmBusinessKey
is available on the incoming message then it will be associated with the started process instance and can be later followed to look it up.
The properties CamundaBpmProcessInstanceId
, CamundaBpmProcessDefinitionId
and CamundaBpmBusinessKey
are available to the downstream processors in the Camel route as Camel exchange properties.
Example: camunda-bpm://start?processDefinitionKey=startProcessFromRoute©BodyAsVariable=var1
Starts a process instance of the process definition startProcessFromRoute
with the body of the message as a map with process variable var1
as a key.
A direct consumer to send a message to the process engine. This can either:
- trigger the start of a new process instance, see Start Message Event
- send a message to a waiting process instances. The process instance might either wait in a ReceiveTask or an Intermediate Message Event.
The following URI parameters are supported:
Parameter | Description |
---|---|
messageName |
the name of the message in the BPMN 2.0 XML (mandatory if you correlate to a Intermediate Message Event or a ReceiveTask with a message reference) |
activityId |
the id of the ReceiveTask in the BPMN 2.0 XML (mandatory if the process instance waits in a ReceiveTask without a message reference - considered as deprecated) |
correlationKeyName |
the name of a process variable to which the property CamundaBpmCorrelationKey will be correlated |
copyBodyAsVariable |
name of the process variable to which the body of the Camel should be copied. Default is camelBody . |
processDefinitionKey |
the process definition key of the process definition this operation is related to. In case of working without a message this can help to make correlation unique, it is always an optional parameter. |
Note that either one of the properties CamundaBpmProcessInstanceId
, CamundaBpmBusinessKey
or CamundaBpmCorrelationKey
need to be present in the message if it is correlated to a waiting process instance. Usage of CamundaBpmCorrelationKey
and / or CamundaBpmBusinessKey
is preferred.
With version 7.4.0 Camunda introduced external tasks. With version 7.5.0 further improvements were added.
Since version 0.5 of camunda-bpm-camel it is possible to consume external tasks by Camel endpoints. There are advantages by doing so:
- You don't have to place Camel instructions into your BPMN which is another level of technical decoupling.
- External tasks are not processed by the thread processing the workflow and therefore they do not block the origin thread which may be part of Camunda's worker thread-pool.
- If you have asynchronous communication external tasks in combination with Camel are a great deal to split the service task into three transactions: The first is about the workflow currently processing (context: Camunda); the second is for sending the request (context: Camel) and the third for processing the response (context: Camel).
- External tasks may be used as some sort of queue - especially if the service does some outbound network communication. In some installations it might be sufficient to use this aspect instead of using an ESB.
Example: camunda-bpm:poll-externalTasks?topic=topic1
The following URI parameters are supported:
Parameter | Description |
---|---|
topic |
(mandatory) The name of the topic as configured for the external task in BPMN. The endpoint will only consume tasks of this topic. |
maxTasksPerPoll |
(optional, default: 5) The endpoint is a polling consumer. This parameter defines the number of tasks fetched by each poll. Further configuration concerning scheduling of polling can be found at the description of Camel's scheduler component. |
workerId |
(optional, default: the endpoint URI *) The workerId used to poll Camunda for external tasks. Polled tasks are locked for that workerId. So if the task is not completed by the endpoint itself (see parameter async ), then the workerId used to complete the task must be the same used to poll the task. |
lockDuration |
(optional, default: 60s) Once a task is fetched it is locked for other consumers. This parameter defines how long it is locked if there is no further interaction. |
retries |
The number of times the external task will be tried to resolve before an incident is raised. |
retryTimeout |
(optional, default: 500ms) The timeout between subsequent retries. |
retryTimeouts |
(optional, no default) A comma separated list of timeouts used for retries. This is useful when calling an external services which might be down or simply busy. Using retryTimeouts=5s,30s,5m in combination with will retryTimeout=30m&retries=5 will use the retry timeout sequence 5, 30 seconds, 5, 30 and 30 minutes. |
variablesToFetch |
(optional, no default) A list of process instance variables which will be fetched for every external task consumed. |
deserializeVariables |
(optional, default: true for Camunda >= 7.6.0 otherwise false) Controls whether serialized variables should be deserialized on fetching them among external tasks. |
async |
(optional, default: false) Usually you want to complete the task once the exchange is processed. If you want to do asynchronous communication then you can use async=true to not complete the tasks consumed by the endpoint. Therefore it is up to your responsibility to complete the task once you processed the asynchronous response. See also camunda-bpm://async-externalTask . |
*) The endpoint URI is not the URI remarked in your configuration file. It is a URI generated by Camel but
it is very similar to your URI. An example: camunda-bpm:poll-externalTasks?topic=topic1&maxTasksPerPoll=5
-> camunda-bpm://poll-externalTasks?maxTasksPerPoll=5&topic=topic1
(parameters are sorted).
The exchange produced by the endpoint got several properties:
CamundaBpmProcessInstanceId
CamundaBpmProcessDefinitionId
CamundaBpmProcessDefinitionKey
CamundaBpmProcessInstancePrio
Additionally these in-headers are set:
CamundaBpmExternalTask
contains the entire LockedExternalTask objectCamundaBpmExternalAttemptsStarted
contains how many times the task was processed but failedCamundaBpmExternalRetriesLeft
contains how many times the task will be processed any more if this execution fails- the in-body is a map containing the process instance variables requested (Map< String, Object >)
If the reply-body contains a map (Map< String, Object >) this map is treated as a list of process instance variables and therefore used to update the process. If the reply-body is a string it is used as a BPMN error code to signal an error to the process. If processing the exchange fails (e.g. an exception is caught) then the exception's message is used to mark the task as failed (which might cause further retries or an incident if the number of retries elapsed). Additionally exceptions can be annotated by @org.camunda.bpm.camel.component.externaltasks.SetExternalTaskRetries
to control how the current exception effects the retry counter.
Hint: Processing of the route is not done in parallel even if the route uses a AsyncProcessor. This is Camel's default behavior of a polling Consumer! If you want to process the external tasks in parallel you have to define your own scheduler:
Example: camunda-bpm:poll-externalTasks?topic=topic1&scheduler=#topic1Scheduler
A scheduler is found by lookup in Camel's registry (what kind of registry is used depends on your environment, typical registries are CDI BeanManager and JNDI). This is an example of CDI producer building a scheduler which processes two external tasks in parallel:
@Named("topic1Scheduler")
public ScheduledPollConsumerScheduler produceTopic1Scheduler() {
final DefaultScheduledPollConsumerScheduler scheduledPollConsumerScheduler = new DefaultScheduledPollConsumerScheduler();
scheduledPollConsumerScheduler.setConcurrentTasks(2);
return scheduledPollConsumerScheduler;
}
By using camunda-bpm://poll-externalTasks
' parameter async
at value true
it is up to your responsibility to complete the task once your processing is done. A typical situation is when you want to process the response of asynchronous communication. In this situation you may use this processor endpoint (see Camel docs)
to complete the external task. The endpoint also supports throwing an incident or raising an BPM error as described for camunda-bpm://poll-externalTasks
.
For using it the incoming message of the Camel route must include a parameter CamundaBpmExternalTaskId
which contains the id of the external task.
Example: camunda-bpm:async-externalTask?topic=topic1
Typical usage:
<from uri="direct:bar" /><to uri="camunda-bpm:async-externalTask" />
<from uri="direct:bar" /><to uri="camunda-bpm:async-externalTask?onCompletion=true" /><to uri="some-other-endpoint" />
Parameter | Description |
---|---|
topic |
(option) The name of the topic. If given the external task is checked for belonging to that topic. |
workerId |
(optional) The id of the worker. If given the external task is checked for belonging to that worker id. |
retries |
The number of times the external task will be tried to resolve before an incident is raised if processing the task by the route fails. |
retryTimeout |
(optional, default: 500ms) The timeout between subsequent retries. |
retryTimeouts |
(optional, no default) A comma separated list of timeouts used for retries. This is useful when calling an external services which might be down or simply busy. Using retryTimeouts=5s,30s,5m in combination with will retryTimeout=30m&retries=5 will use the retry timeout sequence 5, 30 seconds, 5, 30 and 30 minutes. |
onCompletion |
(optional, default: false) Setting this parameter to true the external task is completed after the entire Camel route was processed. This allows you to define this endpoint direct behind the route's start endpoint. The main advantage of doing so is that it catches thrown exceptions and treats them as failure. If this endpoint is at the end of the Camel route any exception throw by preceding processors not being treated in that way. Using this parameter set to true forces exactly the same behavior as done by the endpoint camunda-bpm://poll-externalTasks and it's parameter async set to false . Additionally if exchanges out body is equal to the string CamundaBpmExternalTaskIgnore then the current action will be ignored and won't effect the external task in any way. |
These in-headers are set to be used subsequent endpoints:
CamundaBpmExternalAttemptsStarted
contains how many times the task was processed but failedCamundaBpmExternalRetriesLeft
contains how many times the task will be processed any more if this execution fails
The Exception org.camunda.bpm.camel.component.externaltasks.NoSuchExternalTaskException
(derived from RuntimeCamelException) might be thrown on processing an external task. The reason for this circumstance might be a situation in a BPM process in which the task implemented by the external task (e.g. ServiceTask) was cancelled. Tasks are cancelled due to interrupting events (e.g. timer event).
Check the existing integration tests for guidance on how to use the current supported features in your projects: Spring or CDI. To run the CDI integration tests do mvn -DskipITs=false
.
Further there exists the example project 'camel use cases'
<dependency>
<groupId>org.camunda.bpm.extension.camel</groupId>
<artifactId>camunda-bpm-camel-spring</artifactId>
<version>0.8</version>
</dependency>
In your Spring configuration you need to configure the CamelService
like this:
<bean id="camel" class="org.camunda.bpm.camel.spring.impl.CamelServiceImpl">
<property name="processEngine" ref="processEngine"/>
<property name="camelContext" ref="camelContext"/>
</bean>
The Spring bean id camel
will be then available to expressions used in ServiceTasks to send data to Camel.
<dependency>
<groupId>org.camunda.bpm.extension.camel</groupId>
<artifactId>camunda-bpm-camel-cdi</artifactId>
<version>0.8</version>
</dependency>
The CDI configuration needs a bit more work - especially for bootstrapping Camel. The easiest is to do this in a Singleton Startup EJB (see Example: CamelBootStrap.java):
@Singleton
@Startup
public class CamelBootStrap {
@Inject
private CdiCamelContext cdiCamelContext;
@Inject
private ProcessEngine processEngine;
@Inject
private MyCamelRouteBuilder routeBuilder; // your own route declaration
@PostConstruct
public void init() throws Exception {
CamundaBpmComponent component = new CamundaBpmComponent(processEngine);
component.setCamelContext(cdiCamelContext);
cdiCamelContext.addComponent("camunda-bpm", component);
cdiCamelContext.addRoutes(routeBuilder);
cdiCamelContext.start();
}
@PreDestroy
public void shutDown() throws Exception {
cdiCamelContext.stop();
}
}
Best read Apache Camel's CDI documentation and have a look at the CDI integration tests here for guidance.
<dependency>
<groupId>org.camunda.bpm.extension.camel</groupId>
<artifactId>camunda-bpm-camel-blueprint</artifactId>
<version>0.8</version>
</dependency>
The OSGi Framework is used to retrieve the ProcessEngine
and a DefaultCamelContext
therefore the bean definition of the CamelServiceImpl
is obsolete.
The camunda-bpm-osgi project is used with the blueprint-wrapper context.xml
. The BlueprintELResolver
was extended by the CamelBlueprintELResolver
. You need to replace the class of the ‘blueprintELResolver’ bean in the context.xml:
...
<bean id="blueprintELResolver" class=" org.camunda.bpm.camel.blueprint.CamelBlueprintELResolver" />
...
This project is part of the camunda BPM incubation space. Feedback, pull requests, ... you name it... are very welcome! Meet us on the Camunda BPM Forum.
This library started as a fork of Activiti's Apache Camel module and the following people have contributed to its further development in the context of camunda BPM: contributors.
This software is licensed under the terms you find in the file named LICENSE.txt
in the root directory.