Permalink
Switch branches/tags
Find file Copy path
a39733e Mar 17, 2018
3 contributors

Users who have contributed to this file

@davsclaus @nicolaferraro @oscerd
451 lines (347 sloc) 21.2 KB

Saga EIP

Available as of Camel 2.21

The Saga EIP provides a way to define a series of related actions in a Camel route that should be either completed successfully (all of them) or not-executed/compensated. Sagas implementations are able to coordinate distributed services communicating using any transport towards a globally consistent outcome.

Although their main purpose is similar, Sagas are different from classical ACID distributed (XA) transactions because the status of the different participating services is guaranteed to be consistent only at the end of the Saga and not in any intermediate step (lack of isolation).

Conversely, Sagas are suitable for many use cases where usage of distributed transactions is discouraged. For example, services participating in a Saga are allowed to use any kind of datastore: classical databases or even NoSQL non-transactional datastores. Sagas are also suitable for being used in stateless cloud services as they do not require a transaction log to be stored alongside the service.

Differently from transactions, Sagas are also not required to be completed in a small amount of time, because they don’t use database-level locks. They can live for a longer timespan: from few seconds to several days. The Saga EIP implementation based on the Microprofile sandbox spec (see camel-lra) is indeed called LRA that stands for "Long Running Action". It also supports coordination of external heterogeneous services, written with any language/technology and also running outside a JVM.

Sagas don’t use locks on data, instead they define the concept of "Compensating Action" that is an action that should be executed when the standard flow encounters an error, with the purpose of restoring the status that was present before the flow execution. Compensating actions can be declared in Camel routes using the Java or XML DSL and will be invoked by Camel only when needed (if the saga is cancelled due to an error).

The Saga EIP supports 6 options which are listed below:

Name Description Default Type

propagation

Set the Saga propagation mode (REQUIRED, REQUIRES_NEW, MANDATORY, SUPPORTS, NOT_SUPPORTED, NEVER).

REQUIRED

SagaPropagation

completionMode

Determine how the saga should be considered complete. When set to AUTO, the saga is completed when the exchange that initiates the saga is processed successfully, or compensated when it completes exceptionally. When set to MANUAL, the user must complete or compensate the saga using the saga:complete or saga:compensate endpoints.

AUTO

SagaCompletionMode

timeoutInMilliseconds

Set the maximum amount of time for the Saga. After the timeout is expired, the saga will be compensated automatically (unless a different decision has been taken in the meantime).

Long

compensation

The compensation endpoint URI that must be called to compensate all changes done in the route. The route corresponding to the compensation URI must perform compensation and complete without error. If errors occur during compensation, the saga service may call again the compensation URI to retry.

SagaActionUri Definition

completion

The completion endpoint URI that will be called when the Saga is completed successfully. The route corresponding to the completion URI must perform completion tasks and terminate without error. If errors occur during completion, the saga service may call again the completion URI to retry.

SagaActionUri Definition

option

Allows to save properties of the current exchange in order to re-use them in a compensation/completion callback route. Options are usually helpful e.g. to store and retrieve identifiers of objects that should be deleted in compensating actions. Option values will be transformed into input headers of the compensation/completion exchange.

List

Exchange properties

The following properties are set on each Exchange that is participating to a Saga (normal actions, compensating actions and completions):

Property Type Description

Long-Running-Action

String

A globally unique identifier for the Saga that can be propagated to remote systems using transport-level headers (e.g. HTTP).

Saga Service Configuration

The Saga EIP requires that a service implementing the interface org.apache.camel.saga.CamelSagaService is added to the Camel context.

Camel currently supports the following Saga Services:

  • InMemorySagaService: it is a basic implementation of the Saga EIP that does not support advanced features (no remote context propagation, no consistency guarantee in case of application failure).

  • LRASagaService: it is a fully-fledged implementation of the Saga EIP based on Microprofile sandbox LRA specification that supports remote context propagation and provides consistency guarantees in case of application failure.

Using the In-Memory Saga Service

The in-memory Saga service is not recommended for production environments as it does not support persistence of the Saga status (it is kept only in-memory), so it cannot guarantee consistency of Sagas in case of application failure (e.g. JVM crash).

Also, when using a in-memory Saga service, Saga contexts cannot be propagated to remote services using transport-level headers (it can be done with other implementations).

Users that want to use the in-memory saga service should add the following code to customize the Camel context.

context.addService(new org.apache.camel.impl.saga.InMemorySagaService());

The service belongs to the camel-core module.

Using the LRA Saga Service

The LRA Saga Service is an implementation based on the Microprofile sandbox LRA specification. It leverages a external Saga coordinator to control the execution of the various steps of the Saga. The proposed reference implementation for the LRA specification is the Narayana LRA Coordinator. Users can follows instructions present on the Narayana website to startup a remote instance of the coordinator.

The URL of the LRA coordinator is a required parameter of the Camel LRA service. The Camel application and the LRA service communicate using the HTTP protocol.

In order to use the LRA Saga service, maven users will need to add the following dependency to their pom.xml

<dependency>
 <groupId>org.apache.camel</groupId>
 <artifactId>camel-lra</artifactId>
 <!-- use the same version as your Camel core version -->
 <version>x.y.z</version>
</dependency>

A Camel REST context is also required to be present for the LRA implementation to work. You may add camel-undertow for example.

<dependency>
 <groupId>org.apache.camel</groupId>
 <artifactId>camel-undertow</artifactId>
 <!-- use the same version as your Camel core version -->
 <version>x.y.z</version>
</dependency>
Note

The LRA implementation of the Saga EIP will add some web endpoints under the "/lra-participant" path. Those endpoints will be used by the LRA coordinator for calling back the application.

// Configure the LRA saga service
org.apache.camel.service.lra.LRASagaService sagaService = new org.apache.camel.service.lra.LRASagaService();
sagaService.setCoordinatorUrl("http://lra-service-host");
sagaService.setLocalParticipantUrl("http://my-host-as-seen-by-lra-service:8080/context-path");

// Add it to the Camel context
context.addService(sagaService);
Using the LRA Saga Service in Spring Boot

Spring Boot users can use a simplified configuration model for the LRA Saga Service. Maven users can include the camel-lra-starter module in their project:

<dependency>
 <groupId>org.apache.camel</groupId>
 <artifactId>camel-lra-starter</artifactId>
 <!-- use the same version as your Camel core version -->
 <version>x.y.z</version>
</dependency>

<dependency>
 <groupId>org.apache.camel</groupId>
 <artifactId>camel-undertow-starter</artifactId>
 <!-- use the same version as your Camel core version -->
 <version>x.y.z</version>
</dependency>

Configuration can be done in the Spring Boot application.yaml file:

application.yaml
camel:
  service:
    lra:
      enabled: true
      coordinator-url: http://lra-service-host
      local-participant-url: http://my-host-as-seen-by-lra-service:8080/context-path

Once done, the Saga EIP can be directly used inside Camel routes and it will use the LRA Saga Service under the hood.

Examples

Suppose you want to place a new order and you have two distinct services in your system: one managing the orders and one managing the credit. Logically you can place a order if you have enough credit for it.

With the Saga EIP you can model the direct:buy route as a Saga composed of two distinct actions, one to create the order and one to take the credit. Both actions must be executed, or none of them: a order placed without credit can be considered a inconsistent outcome (as well as a payment without an order).

from("direct:buy")
  .saga()
    .to("direct:newOrder")
    .to("direct:reserveCredit");

That’s it. The buy action will not change for the rest of the examples. We’ll just see different options that can be used to model the "New Order" and "Reserve Credit" actions in the following.

Note
We have used a direct endpoint to model the two actions since this example can be used with both implementations of the Saga service, but we could have used http or other kinds of endpoint with the LRA Saga service.

Both services called by the direct:buy route can participate to the Saga and declare their compensating actions.

from("direct:newOrder")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(orderManagerService, "newOrder")
    .log("Order ${body} created");

Here the propagation mode is set to MANDATORY meaning that any exchange flowing in this route must be already part of a saga (and it is the case in this example, since the saga is created in the direct:buy route).

The direct:newOrder route declares a compensating action that is called direct:cancelOrder, responsible for undoing the order in case the saga is cancelled.

Each exchange always contains a Exchange.SAGA_LONG_RUNNING_ACTION header that here is used as id of the order. This is done in order to identify the order to delete in the corresponding compensating action, but it is not a requirement (options can be used as alternative solution).

The compensating action of direct:newOrder is direct:cancelOrder and it’s shown below:

from("direct:cancelOrder")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(orderManagerService, "cancelOrder")
  .log("Order ${body} cancelled");

It is called automatically by the Saga EIP implementation when the order should be cancelled.

It should not terminate with error. In case an error is thrown in the direct:cancelOrder route, the EIP implementation should periodically retry to execute the compensating action up to a certain limit. This means that any compensating action must be idempotent, so it should take into account that it may be triggered multiple times and should not fail in any case.

If compensation cannot be done after all retries, a manual intervention process should be triggered by the Saga implementation.

Note

It may happen that due to a delay in the execution of the direct:newOrder route the Saga is cancelled by another party in the meantime (due to an error in a parallel route or a timeout at Saga level).

So, when the compensating action direct:cancelOrder is called, it may not find the Order record that should be cancelled. It is important, in order to guarantee full global consistency, that any main action and its corresponding compensating action are commutative, i.e. if compensation occurs before the main action it shoud have the same effect.

Another possible approach, when using a commutative behavior is not possible, is to consistently fail in the compensating action until data produced by the main action is found (or the maximum number of retries is exhausted): this approach may work in many contexts, but it’s heuristic.

The credit service may be implemented almost in the same way as the order service.

// action
from("direct:reserveCredit")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:refundCredit")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(creditService, "reserveCredit")
    .log("Credit ${header.amount} reserved in action ${body}");

// compensation
from("direct:refundCredit")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(creditService, "refundCredit")
  .log("Credit for action ${body} refunded");

Here the compensating action for a credit reservation is a refund.

This completes the example. It can be run with both implementations of the Saga EIP, as it does not involve remote endpoints.

Further options will be shown next.

Handling Completion Events

It is often required to do some processing when the Saga is completed. Compensation endpoints are invoked when something wrong happens and the Saga is cancelled. Equivalently, completion endpoints can be invoked to do further processing when the Saga is completed successfully.

For example, in the order service above, we may need to know when the order is completed (and the credit reserved) to actually start preparing the order. We will not want to start to prepare the order if the payment is not done (unlike most modern CPUs that give you access to reserved memory before ensuring that you have rights to read it).

This can be done easily with a modified version of the direct:newOrder endpoint:

from("direct:newOrder")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
  .completion("direct:completeOrder") // completion endpoint
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(orderManagerService, "newOrder")
    .log("Order ${body} created");

// direct:cancelOrder is the same as in the previous example

// called on successful completion
from("direct:completeOrder")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(orderManagerService, "findExternalId")
  .to("jms:prepareOrder")
  .log("Order ${body} sent for preparation");

When the Saga is completed, the order is sent to a JMS queue for preparation.

Like compensating actions, also completion actions may be called multiple times by the Saga coordinator (especially in case of errors, like network errors). In this example, the service listening to the prepareOrder JMS queue should be prepared to hold possible duplicates (see the Idempotent Consumer EIP for examples on how to handle duplicates).

Using Custom Identifiers and Options

The example shown so far use the Exchange.SAGA_LONG_RUNNING_ACTION as identifier for the resources (order and credit). This is not always a desired approach, as it may pollute the business logic and the data model.

An alternative approach is to use Saga options to "register" custom identifiers. For example, the credit service may be refactored as follows:

// action
from("direct:reserveCredit")
  .bean(idService, "generateCustomId") // generate a custom Id and set it in the body
  .to("direct:creditReservation")

// delegate action
from("direct:creditReservation")
  .saga()
  .propagation(SagaPropagation.SUPPORTS)
  .option("CreditId", body()) // mark the current body as needed in the compensating action
  .compensation("direct:creditRefund")
    .bean(creditService, "reserveCredit")
    .log("Credit ${header.amount} reserved. Custom Id used is ${body}");

// called only if the saga is cancelled
from("direct:creditRefund")
  .transform(header("CreditId")) // retrieve the CreditId option from headers
  .bean(creditService, "refundCredit")
  .log("Credit for Custom Id ${body} refunded");

Note how the previous listing is not using the Exchange.SAGA_LONG_RUNNING_ACTION header at all.

Since the direct:creditReservation endpoint can be now called also from outside a Saga, the propagation mode can be set to SUPPORTS.

Multiple options can be declared in a Saga route.

Setting Timeouts

Sagas are long running actions, but this does not mean that they should not have a bounded timeframe to execute. Setting timeouts on Sagas is always a good practice as it guarantees that a Saga does not remain stuck forever in the case of machine failure.

Note
The Saga EIP implementation may have a default timeout set on all Sagas that don’t specify it explicitly

When the timeout expires, the Saga EIP will decide to cancel the Saga (and compensate all participants), unless a different decision has been taken before.

Timeouts can be set on Saga participants as follows:

from("direct:newOrder")
  .saga()
  .timeout(1, TimeUnit.MINUTES) // newOrder requires that the saga is completed within 1 minute
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
  .completion("direct:completeOrder")
    // ...
    .log("Order ${body} created");

All participants (e.g. credit service, order service) can set their own timeout. The minimum value of those timeouts is taken as timeout for the saga when they are composed together.

A timeout can also be specified at saga level as follows:

from("direct:buy")
  .saga()
  .timeout(5, TimeUnit.MINUTES) // timeout at saga level
    .to("direct:newOrder")
    .to("direct:reserveCredit");

Choosing Propagation

In the examples above, we have used the MANDATORY and SUPPORTS propagation modes, but also the REQUIRED propagation mode, that is the default propagation used when nothing else is specified.

These propagation modes map 1:1 the equivalent modes used in transactional contexts. Here’s a summary of their meaning:

Propagation Description

REQUIRED

Join the existing saga or create a new one if it does not exist.

REQUIRES_NEW

Always create a new saga. Suspend the old saga and resume it when the new one terminates.

MANDATORY

A saga must be already present. The existing saga is joined.

SUPPORTS

If a saga already exists, then join it.

NOT_SUPPORTED

If a saga already exists, it is suspended and resumed when the current block completes.

NEVER

The current block must never be invoked within a saga.

Using Manual Completion (Advanced)

When a Saga cannot be all executed in a synchronous way, but it requires e.g. communication with external services using asynchronous communication channels, the completion mode cannot be set to AUTO (default), because the saga is not completed when the exchange that creates it is done.

This is often the case for Sagas that have long execution times (hours, days). In these cases, the MANUAL completion mode should be used.

from("direct:mysaga")
  .saga()
  .completionMode(SagaCompletionMode.MANUAL)
  .completion("direct:finalize")
  .timeout(2, TimeUnit.HOURS)
    .to("seda:newOrder")
    .to("seda:reserveCredit");

// Put here asynchronous processing for seda:newOrder and seda:reserveCredit
// They will send asynchronous callbacks to seda:operationCompleted

from("seda:operationCompleted") // an asynchronous callback
  .saga()
  .propagation(SagaPropagation.MANDATORY)
    .bean(controlService, "actionExecuted")
    .choice()
      .when(body().isEqualTo("ok"))
        .to("saga:complete") // complete the current saga manually (saga component)
    .end()

// You can put here the direct:finalize endpoint to execute final actions

Setting the completion mode to MANUAL means that the saga is not completed when the exchange is processed in the route direct:mysaga but it will last longer (max duration is set to 2 hours).

When both asynchronous actions are completed the saga is completed. The call to complete is done using the Camel Saga Component’s saga:complete endpoint. There’s is a similar endpoint for manually compensating the Saga (saga:compensate).

Apparently the addition of the saga markers do not add much value to the flow: it works also if you remove all Saga EIP configuration. But Sagas add a lot of value, since they guarantee that even in the presence of unexpected issues (servers crashing, messages are lost) there will always be a consistent outcome: order placed and credit reserved, or none of them changed. In particular, if the Saga is not completed within 2 hours, the compensation mechanism will take care of fixing the status.

XML Configuration

Saga features are also available for users that want to use the XML configuration.

The following snipped shows an example:

<route>
  <from uri="direct:start"/>
  <saga>
    <compensation uri="direct:compensation" />
    <completion uri="direct:completion" />
    <option optionName="myOptionKey">
      <constant>myOptionValue</constant>
    </option>
    <option optionName="myOptionKey2">
      <constant>myOptionValue2</constant>
    </option>
  </saga>
  <to uri="direct:action1" />
  <to uri="direct:action2" />
</route>