diff --git a/SUMMARY.md b/SUMMARY.md index 815754a9..f01a3361 100644 --- a/SUMMARY.md +++ b/SUMMARY.md @@ -21,6 +21,7 @@ * [Major Releases](release-notes/rn-extensions/rn-cdi/rn-cdi-major-releases.md) * [JGroups](release-notes/rn-extensions/rn-jgroups/README.md) * [Major Releases](release-notes/rn-extensions/rn-jgroups/rn-jgroups-major-releases.md) + * [Minor Releases](release-notes/rn-extensions/rn-jgroups/rn-jgroups-minor-releases.md) * [Kafka](release-notes/rn-extensions/rn-kafka/README.md) * [Major Releases](release-notes/rn-extensions/rn-kafka/rn-kafka-major-releases.md) * [Minor Releases](release-notes/rn-extensions/rn-kafka/rn-kafka-minor-releases.md) @@ -32,6 +33,7 @@ * [Minor Releases](release-notes/rn-extensions/rn-mongo/rn-mongo-minor-releases.md) * [Multi-Tenancy](release-notes/rn-extensions/rn-multi-tenancy/README.md) * [Major Releases](release-notes/rn-extensions/rn-multi-tenancy/rn-multi-tenancy-major-releases.md) + * [Minor Releases](release-notes/rn-extensions/rn-multi-tenancy/rn-multi-tenancy-minor-releases.md) * [Reactor](release-notes/rn-extensions/rn-reactor/README.md) * [Major Releases](release-notes/rn-extensions/rn-reactor/rn-reactor-major-releases.md) * [Minor Releases](release-notes/rn-extensions/rn-reactor/rn-reactor-minor-releases.md) diff --git a/axon-framework/events/event-processors/README.md b/axon-framework/events/event-processors/README.md index a08a3100..367d709e 100644 --- a/axon-framework/events/event-processors/README.md +++ b/axon-framework/events/event-processors/README.md @@ -277,6 +277,15 @@ To that end, the supported dead-letter queue is a so-called `SequencedDeadLetter Integral to its design is to allow for queueing failed events and events that belong to a faulty sequence. It does so by maintaining a sequence identifier for each event, determined by the [sequencing policy](/axon-framework/events/event-processors/streaming.md#sequential-processing). +> **Is there support for Sagas?** +> +> Currently, there is *no* support for using a dead-letter queue for [sagas](/axon-framework/sagas/README.md). +> We've taken this decision as we cannot support a sequenced dead lettering approach as we do for regular event handling. +> +> Furthermore, we cannot do this, as a saga's associations can vary widely between events. +> Due to this, the sequence of events may change, breaking this level of support. +> Hence, there's no way of knowing whether a next event in the stream does or does not belong to a saga. + Note that you *cannot* share a dead-letter queue between different processing groups. Hence, each processing group you want to enable this behavior for should receive a unique dead-letter queue instance. @@ -305,21 +314,20 @@ A `JpaSequencedDeadLetterQueue` configuration example: {% tabs %} {% tab title="Axon Configuration API" %} ```java -public class DeadLetterQueueExampleConfig { - - public ConfigurerModule configureDeadLetterQueueFor(String processingGroup) { - return configurer -> configurer.eventProcessing( - eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue( - processingGroup, - configuration -> JpaSequencedDeadLetterQueue.builder() - .processingGroup(processingGroup) - .maxSequences(256) - .maxSequenceSize(256) - .entityManagerProvider(configuration.getComponent(EntityManagerProvider.class)) - .transactionManager(configuration.getComponent(TransactionManager.class)) - .serializer(configuration.serializer()) - .build() - ) +public class AxonConfig { + // ... + public void configureDeadLetterQueue(EventProcessingConfigurer processingConfigurer) { + // Replace "my-processing-group" for the processing group you want to configure the DLQ on. + processingConfigurer.registerDeadLetterQueue( + "my-processing-group", + config -> JpaSequencedDeadLetterQueue.builder() + .processingGroup("my-processing-group") + .maxSequences(256) + .maxSequenceSize(256) + .entityManagerProvider(config.getComponent(EntityManagerProvider.class)) + .transactionManager(config.getComponent(TransactionManager.class)) + .serializer(config.serializer()) + .build() ); } } @@ -328,22 +336,21 @@ public class DeadLetterQueueExampleConfig { {% tab title="Spring Boot AutoConfiguration" %} ```java @Configuration -public class DeadLetterQueueExampleConfig { - - @Autowired - public ConfigurerModule configureDeadLetterQueueFor(String processingGroup) { - return configurer -> configurer.eventProcessing( - eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue( - processingGroup, - configuration -> JpaSequencedDeadLetterQueue.builder() - .processingGroup(processingGroup) - .maxSequences(256) - .maxSequenceSize(256) - .entityManagerProvider(configuration.getComponent(EntityManagerProvider.class)) - .transactionManager(configuration.getComponent(TransactionManager.class)) - .serializer(configuration.serializer()) - .build() - ) +public class AxonConfig { + // ... + @Bean + public ConfigurerModule deadLetterQueueConfigurerModule() { + // Replace "my-processing-group" for the processing group you want to configure the DLQ on. + return configurer -> configurer.eventProcessing().registerDeadLetterQueue( + "my-processing-group", + config -> JpaSequencedDeadLetterQueue.builder() + .processingGroup("my-processing-group") + .maxSequences(256) + .maxSequenceSize(256) + .entityManagerProvider(config.getComponent(EntityManagerProvider.class)) + .transactionManager(config.getComponent(TransactionManager.class)) + .serializer(config.serializer()) + .build() ); } } @@ -505,10 +512,11 @@ See the following example for configuring our custom policy: {% tabs %} {% tab title="Axon Configuration API" %} ```java -public class EnqueuePolicyConfigurer { - - public void configureEnqueuePolicy(EventProcessingConfigurer configurer, String processingGroup) { - configurer.registerDeadLetterPolicy(processingGroup, config -> new MyEnqueuePolicy()); +public class AxonConfig { + // ... + public void configureEnqueuePolicy(EventProcessingConfigurer configurer) { + // Replace "my-processing-group" for the processing group you want to configure the policy on. + configurer.registerDeadLetterPolicy("my-processing-group", config -> new MyEnqueuePolicy()); } } ``` @@ -516,12 +524,13 @@ public class EnqueuePolicyConfigurer { {% tab title="Spring Boot AutoConfiguration" %} ```java @Configuration -public class EnqueuePolicyConfigurer { +public class AxonConfig { @Bean - public ConfigurerModule configureEnqueuePolicy(String processingGroup) { + public ConfigurerModule enqueuePolicyConfigurerModule() { + // Replace "my-processing-group" for the processing group you want to configure the policy on. return configurer -> configurer.eventProcessing() - .registerDeadLetterPolicy(processingGroup, config -> new MyEnqueuePolicy()); + .registerDeadLetterPolicy("my-processing-group", config -> new MyEnqueuePolicy()); } } ``` diff --git a/axon-framework/events/event-processors/streaming.md b/axon-framework/events/event-processors/streaming.md index f00e4863..b35a1641 100644 --- a/axon-framework/events/event-processors/streaming.md +++ b/axon-framework/events/event-processors/streaming.md @@ -201,7 +201,7 @@ public class AxonConfig { ### Configuring a Pooled Streaming Processor -Firstly, to specify that every new processors should default to a `PooledStreamingEventProcessor`, you can invoke the `usingPooledStreamingProcessors` method: +Firstly, to specify that every new processors should default to a `PooledStreamingEventProcessor`, you can invoke the `usingPooledStreamingEventProcessors` method: {% tabs %} {% tab title="Axon Configuration API" %} @@ -209,7 +209,7 @@ Firstly, to specify that every new processors should default to a `PooledStreami public class AxonConfig { // ... public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) { - processingConfigurer.usingPooledStreamingProcessors(); + processingConfigurer.usingPooledStreamingEventProcessors(); } } ``` @@ -222,7 +222,7 @@ public class AxonConfig { // ... @Autowired public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) { - processingConfigurer.usingPooledStreamingProcessors(); + processingConfigurer.usingPooledStreamingEventProcessors(); } } ``` @@ -377,13 +377,13 @@ To be able to reopen the stream at a later point, we should keep the progress so The progress is kept by updating and saving the `TrackingToken` after handling batches of events. Keeping the progress requires CRUD operation, for which the Streaming Processor uses the [`TokenStore`](#token-store). -For a Streaming Processor to process any events, it needs "a claim" on a `TrackingToken`. +For a Streaming Processor to process any events, it needs ["a claim"](#token-claims) on a `TrackingToken`. The processor will update this claim every time it has finished handling a batch of events. This so-called "claim extension" is, just as updating and saving of tokens, delegated to the Token Store. Hence, the Streaming Processors achieves collaboration among instances/threads through token claims. In the absence of a claim, a processor will actively try to retrieve one. -If a token claim is not extended for a configurable time window, other processor threads are able to "steal" the claim. +If a token claim is not extended for a configurable amount of time, other processor threads can ["steal"](#token-stealing) the claim. Token stealing can, for example, happen if event processing is slow or encountered some exceptions. ### Initial Tracking Token @@ -419,7 +419,7 @@ In those cases, we recommend to validate if any of the above situations occurred There are a couple of things we can configure when it comes to tokens. We can separate these options in "initial token" and "token claim" configuration, as described in the following sections: -**Initial Token** +#### Initial Token The [initial token](#initial-tracking-token) for a `StreamingEventProcessor` is configurable for every processor instance. When configuring the initial token builder function, the received input parameter is the `StreamableMessageSource`. @@ -501,7 +501,7 @@ public class AxonConfig { {% endtab %} {% endtabs %} -**Token Claims** +#### Token Claims As described [here](#tracking-tokens), a streaming processor should claim a token before it is allowed to perform any processing work. There are several scenarios where a processor may keep the claim for too long. @@ -587,6 +587,75 @@ public class AxonConfig { {% endtab %} {% endtabs %} +#### Token Stealing + +As described at the [start](#tracking-tokens), streaming processor threads can "steal" tokens from one another. +A token is "stolen" when a thread loses a [token claim](#token-claims). +Situations like this internally result in an `UnableToClaimTokenException,` caught by both streaming event processor implementations and translated into warn- or info-level log statements. + +Where the framework uses token claims to ensure that a single thread is processing a sequence of events, it supports token stealing to guarantee event processing is not blocked forever. +In short, the framework uses token stealing to unblock your streaming processor threads when processing takes too long. +Examples may include literal slow processing, blocking exceptional scenarios, and deadlocks. + +However, token stealing may occur as a surprise for some applications, making it an unwanted side effect. +As such, it is good to be aware of why tokens get stolen (as described above), but also when this happens and what the consequences are. + +##### When is a Token stolen? + +In practical terms, a token is stolen whenever the _claim timeout_ is exceeded. + +This timeout is met whenever the token's timestamp (e.g., the `timestamp` column of your `token_entry` table) exceeds the `claimTimeout` of the `TokenStore`. +By default, the `claimTimeout` value equals 10 seconds. +To adjust it, you must configure a `TokenStore` instance through its builder, as shown in the [Token Store](#token-store) section. + +The token's timestamp is equally crucial in deciding when the timeout is met. +The streaming processor thread holding the claim is in charge of updating the token timestamp. +This timestamp is updated whenever the thread finishes a batch of events or whenever the processor extends the claim. +When to extend a claim differs between the Tracking and Pooled Streaming processor. +You should check out the [token claim](#token-claims) section if you want to know how to configure these values. + +To further clarify, a streaming processor's thread needs to be able to update the token claim and, by extension, the timestamp to ensure it won't get stolen. +Hence, a staling processor thread will, one way or another, eventually lose the claim. + +Examples of when a thread may get its token stolen are: +- Overall slow event handling +- Too large event batch size +- Blocking operations inside event handlers +- Blocking exceptions inside event handlers + +##### What are the consequences of Token stealing? + +The consequence of token stealing is that an event may be handled twice (or more). + +When a thread steals a token, the original thread was _already_ processing events from the token's position. +To protect against doubling event handling, Axon Framework will combine committing the event handling task with updating the token. +As the token claim is required to update the token, the original thread will fail the update. +Following this, a rollback occurs on the [Unit of Work](/axon-framework/messaging-concepts/unit-of-work.md), resolving most issues arising from token stealing. + +The ability to rollback event handling tasks sheds light on the consequences of token stealing. +Most event processors project events into a projection stored within a database. +Furthermore, if you store the projection in the same database as the token, the rollback will ensure the change is not persisted. +Thus, the consequence of token stealing is limited to wasting processor cycles. +This scenario is why we recommend storing tokens and projections in the same database. + +If a rollback is out of the question for an event handling task, we strongly recommend making the task idempotent. +You may have this scenario when, for example, the projection and tokens do not reside in the same database. + or when the event handler dispatches an operation (e.g., through the `CommandGateway`). +In making the invoked operation idempotent, you ensure that whenever the thread stealing a token handles an event twice (or more), the outcome will be identical. + +Without idempotency, the consequences of token stealing can be manyfold: +- Your projection (stored in a different database than your tokens!) may incorrectly project the state. +- An event handler putting messages on a queue will put a message on the queue again. +- A Saga Event Handler invoking a third-party service will invoke that service again. +- An event handler sending an email will send that email again. + +In short, any operation introducing a side effect that isn't handled in an idempotent fashion will occur again when a token is stolen. + +Concluding, we can separate the consequence of token stealing into roughly three scenarios: +1. We can rollback the operation. In this case, the only consequence is wasted processor cycles. +2. The operation is idempotent. In this case, the only consequence is wasted processor cycles. +3. When the task cannot be rolled back nor performed in an idempotent fashion, compensating actions may be the way out. + ### Token Store The `TokenStore` provides the CRUD operations for the `StreamingEventProcessor` to interact with `TrackingTokens`. @@ -1037,7 +1106,7 @@ public class AxonConfig { EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig = (config, builder) -> builder.coordinatorExecutor(coordinatorExecutorBuilder) - .workerExecutorService(workerExecutorBuilder) + .workerExecutor(workerExecutorBuilder) .initialSegmentCount(32); processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig); @@ -1062,7 +1131,7 @@ public class AxonConfig { EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig = (config, builder) -> builder.coordinatorExecutor(coordinatorExecutorBuilder) - .workerExecutorService(workerExecutorBuilder) + .workerExecutor(workerExecutorBuilder) .initialSegmentCount(32); processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig); diff --git a/axon-framework/tuning/event-snapshots.md b/axon-framework/tuning/event-snapshots.md index 97753971..8d20ca5a 100644 --- a/axon-framework/tuning/event-snapshots.md +++ b/axon-framework/tuning/event-snapshots.md @@ -109,7 +109,6 @@ You could take the stance of dropping all the snapshots which are stored (for a It is also possible to filter out snapshot events when reading your Aggregate from the event store. To that end, a `SnapshotFilter` can be defined per Aggregate type or for the entire `EventStore`. - The `SnapshotFilter` is a functional interface, providing two main operations: `allow(DomainEventData