Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1852df8
Updated not working code samples
Sep 26, 2022
2c5f920
Add AF 4.6.1 release notes
smcvb Oct 6, 2022
c9f22cb
Add Multi-Tenancy extesions 4.6.1 release notes
smcvb Oct 13, 2022
bb6b7a2
Add Tracing extesions 4.6.1 release notes
smcvb Oct 13, 2022
447d479
Add JGroups and MultiTenancy minor release link to summary
smcvb Oct 13, 2022
c9e273a
add AxonServer SE 4.6.5
MGathier Oct 26, 2022
66c0508
Merge remote-tracking branch 'origin/4.6' into 4.6
MGathier Oct 26, 2022
94b6cc0
add AxonServer EE 4.6.5
MGathier Oct 26, 2022
02d3631
Method name fix: usingPooledStreamingProcessors --> usingPooledStream…
OLibutzki Oct 28, 2022
4d9ebe3
Renamed deprecated method: workerExecutorService ->
OLibutzki Oct 28, 2022
c3c74df
Append RevisionSnapshotFilter description to FilteringSnapshotEvents
smcvb Nov 1, 2022
5a74542
Add a notice stating why Sagas do not support the DLQ
smcvb Nov 1, 2022
04a1621
Introduce a dedicated token stealing section
smcvb Nov 2, 2022
df960a1
Add missing link to UnitOfWork
smcvb Nov 2, 2022
d4c6d92
Replace command sample for third-party service
smcvb Nov 2, 2022
4f4d063
Update axon-framework/tuning/event-snapshots.md
smcvb Nov 4, 2022
1aa6d32
Merge pull request #293 from AxonIQ/enhancement/revision-snapshot-filter
smcvb Nov 4, 2022
a23f080
Update axon-framework/events/event-processors/streaming.md
smcvb Nov 4, 2022
434f6a6
Update axon-framework/events/event-processors/streaming.md
smcvb Nov 4, 2022
77cd95c
Update axon-framework/events/event-processors/streaming.md
smcvb Nov 4, 2022
afc1f3c
Update axon-framework/events/event-processors/streaming.md
smcvb Nov 4, 2022
e93e776
Update axon-framework/events/event-processors/streaming.md
smcvb Nov 4, 2022
c3f78b1
Update axon-framework/events/event-processors/streaming.md
smcvb Nov 4, 2022
a34f4d6
Replace ascertain for guarantee
smcvb Nov 4, 2022
fe6f730
Add 4.6.2 release notes
smcvb Nov 4, 2022
331225d
Merge pull request #294 from AxonIQ/enhancement/saga-dlq-notice
smcvb Nov 8, 2022
1e9bc8d
Merge pull request #295 from AxonIQ/enhancement/288
smcvb Nov 8, 2022
81e0c63
Merge remote-tracking branch 'origin/4.6' into dead-letter-example-code
smcvb Nov 8, 2022
b4d7c16
Fine tune code samples for DLQ configuration
smcvb Nov 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* [Major Releases](release-notes/rn-extensions/rn-cdi/rn-cdi-major-releases.md)
* [JGroups](release-notes/rn-extensions/rn-jgroups/README.md)
* [Major Releases](release-notes/rn-extensions/rn-jgroups/rn-jgroups-major-releases.md)
* [Minor Releases](release-notes/rn-extensions/rn-jgroups/rn-jgroups-minor-releases.md)
* [Kafka](release-notes/rn-extensions/rn-kafka/README.md)
* [Major Releases](release-notes/rn-extensions/rn-kafka/rn-kafka-major-releases.md)
* [Minor Releases](release-notes/rn-extensions/rn-kafka/rn-kafka-minor-releases.md)
Expand All @@ -32,6 +33,7 @@
* [Minor Releases](release-notes/rn-extensions/rn-mongo/rn-mongo-minor-releases.md)
* [Multi-Tenancy](release-notes/rn-extensions/rn-multi-tenancy/README.md)
* [Major Releases](release-notes/rn-extensions/rn-multi-tenancy/rn-multi-tenancy-major-releases.md)
* [Minor Releases](release-notes/rn-extensions/rn-multi-tenancy/rn-multi-tenancy-minor-releases.md)
* [Reactor](release-notes/rn-extensions/rn-reactor/README.md)
* [Major Releases](release-notes/rn-extensions/rn-reactor/rn-reactor-major-releases.md)
* [Minor Releases](release-notes/rn-extensions/rn-reactor/rn-reactor-minor-releases.md)
Expand Down
85 changes: 47 additions & 38 deletions axon-framework/events/event-processors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,15 @@ To that end, the supported dead-letter queue is a so-called `SequencedDeadLetter
Integral to its design is to allow for queueing failed events and events that belong to a faulty sequence.
It does so by maintaining a sequence identifier for each event, determined by the [sequencing policy](/axon-framework/events/event-processors/streaming.md#sequential-processing).

> **Is there support for Sagas?**
>
> Currently, there is *no* support for using a dead-letter queue for [sagas](/axon-framework/sagas/README.md).
> We've taken this decision as we cannot support a sequenced dead lettering approach as we do for regular event handling.
>
> Furthermore, we cannot do this, as a saga's associations can vary widely between events.
> Due to this, the sequence of events may change, breaking this level of support.
> Hence, there's no way of knowing whether a next event in the stream does or does not belong to a saga.

Note that you *cannot* share a dead-letter queue between different processing groups.
Hence, each processing group you want to enable this behavior for should receive a unique dead-letter queue instance.

Expand Down Expand Up @@ -305,21 +314,20 @@ A `JpaSequencedDeadLetterQueue` configuration example:
{% tabs %}
{% tab title="Axon Configuration API" %}
```java
public class DeadLetterQueueExampleConfig {

public ConfigurerModule configureDeadLetterQueueFor(String processingGroup) {
return configurer -> configurer.eventProcessing(
eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue(
processingGroup,
configuration -> JpaSequencedDeadLetterQueue.builder()
.processingGroup(processingGroup)
.maxSequences(256)
.maxSequenceSize(256)
.entityManagerProvider(configuration.getComponent(EntityManagerProvider.class))
.transactionManager(configuration.getComponent(TransactionManager.class))
.serializer(configuration.serializer())
.build()
)
public class AxonConfig {
// ...
public void configureDeadLetterQueue(EventProcessingConfigurer processingConfigurer) {
// Replace "my-processing-group" for the processing group you want to configure the DLQ on.
processingConfigurer.registerDeadLetterQueue(
"my-processing-group",
config -> JpaSequencedDeadLetterQueue.builder()
.processingGroup("my-processing-group")
.maxSequences(256)
.maxSequenceSize(256)
.entityManagerProvider(config.getComponent(EntityManagerProvider.class))
.transactionManager(config.getComponent(TransactionManager.class))
.serializer(config.serializer())
.build()
);
}
}
Expand All @@ -328,22 +336,21 @@ public class DeadLetterQueueExampleConfig {
{% tab title="Spring Boot AutoConfiguration" %}
```java
@Configuration
public class DeadLetterQueueExampleConfig {

@Autowired
public ConfigurerModule configureDeadLetterQueueFor(String processingGroup) {
return configurer -> configurer.eventProcessing(
eventProcessingConfigurer -> eventProcessingConfigurer.registerDeadLetterQueue(
processingGroup,
configuration -> JpaSequencedDeadLetterQueue.builder()
.processingGroup(processingGroup)
.maxSequences(256)
.maxSequenceSize(256)
.entityManagerProvider(configuration.getComponent(EntityManagerProvider.class))
.transactionManager(configuration.getComponent(TransactionManager.class))
.serializer(configuration.serializer())
.build()
)
public class AxonConfig {
// ...
@Bean
public ConfigurerModule deadLetterQueueConfigurerModule() {
// Replace "my-processing-group" for the processing group you want to configure the DLQ on.
return configurer -> configurer.eventProcessing().registerDeadLetterQueue(
"my-processing-group",
config -> JpaSequencedDeadLetterQueue.builder()
.processingGroup("my-processing-group")
.maxSequences(256)
.maxSequenceSize(256)
.entityManagerProvider(config.getComponent(EntityManagerProvider.class))
.transactionManager(config.getComponent(TransactionManager.class))
.serializer(config.serializer())
.build()
);
}
}
Expand Down Expand Up @@ -505,23 +512,25 @@ See the following example for configuring our custom policy:
{% tabs %}
{% tab title="Axon Configuration API" %}
```java
public class EnqueuePolicyConfigurer {

public void configureEnqueuePolicy(EventProcessingConfigurer configurer, String processingGroup) {
configurer.registerDeadLetterPolicy(processingGroup, config -> new MyEnqueuePolicy());
public class AxonConfig {
// ...
public void configureEnqueuePolicy(EventProcessingConfigurer configurer) {
// Replace "my-processing-group" for the processing group you want to configure the policy on.
configurer.registerDeadLetterPolicy("my-processing-group", config -> new MyEnqueuePolicy());
}
}
```
{% endtab %}
{% tab title="Spring Boot AutoConfiguration" %}
```java
@Configuration
public class EnqueuePolicyConfigurer {
public class AxonConfig {

@Bean
public ConfigurerModule configureEnqueuePolicy(String processingGroup) {
public ConfigurerModule enqueuePolicyConfigurerModule() {
// Replace "my-processing-group" for the processing group you want to configure the policy on.
return configurer -> configurer.eventProcessing()
.registerDeadLetterPolicy(processingGroup, config -> new MyEnqueuePolicy());
.registerDeadLetterPolicy("my-processing-group", config -> new MyEnqueuePolicy());
}
}
```
Expand Down
87 changes: 78 additions & 9 deletions axon-framework/events/event-processors/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,15 @@ public class AxonConfig {

### Configuring a Pooled Streaming Processor

Firstly, to specify that every new processors should default to a `PooledStreamingEventProcessor`, you can invoke the `usingPooledStreamingProcessors` method:
Firstly, to specify that every new processors should default to a `PooledStreamingEventProcessor`, you can invoke the `usingPooledStreamingEventProcessors` method:

{% tabs %}
{% tab title="Axon Configuration API" %}
```java
public class AxonConfig {
// ...
public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) {
processingConfigurer.usingPooledStreamingProcessors();
processingConfigurer.usingPooledStreamingEventProcessors();
}
}
```
Expand All @@ -222,7 +222,7 @@ public class AxonConfig {
// ...
@Autowired
public void configureProcessorDefault(EventProcessingConfigurer processingConfigurer) {
processingConfigurer.usingPooledStreamingProcessors();
processingConfigurer.usingPooledStreamingEventProcessors();
}
}
```
Expand Down Expand Up @@ -377,13 +377,13 @@ To be able to reopen the stream at a later point, we should keep the progress so
The progress is kept by updating and saving the `TrackingToken` after handling batches of events.
Keeping the progress requires CRUD operation, for which the Streaming Processor uses the [`TokenStore`](#token-store).

For a Streaming Processor to process any events, it needs "a claim" on a `TrackingToken`.
For a Streaming Processor to process any events, it needs ["a claim"](#token-claims) on a `TrackingToken`.
The processor will update this claim every time it has finished handling a batch of events.
This so-called "claim extension" is, just as updating and saving of tokens, delegated to the Token Store.
Hence, the Streaming Processors achieves collaboration among instances/threads through token claims.

In the absence of a claim, a processor will actively try to retrieve one.
If a token claim is not extended for a configurable time window, other processor threads are able to "steal" the claim.
If a token claim is not extended for a configurable amount of time, other processor threads can ["steal"](#token-stealing) the claim.
Token stealing can, for example, happen if event processing is slow or encountered some exceptions.

### Initial Tracking Token
Expand Down Expand Up @@ -419,7 +419,7 @@ In those cases, we recommend to validate if any of the above situations occurred
There are a couple of things we can configure when it comes to tokens.
We can separate these options in "initial token" and "token claim" configuration, as described in the following sections:

**Initial Token**
#### Initial Token

The [initial token](#initial-tracking-token) for a `StreamingEventProcessor` is configurable for every processor instance.
When configuring the initial token builder function, the received input parameter is the `StreamableMessageSource`.
Expand Down Expand Up @@ -501,7 +501,7 @@ public class AxonConfig {
{% endtab %}
{% endtabs %}

**Token Claims**
#### Token Claims

As described [here](#tracking-tokens), a streaming processor should claim a token before it is allowed to perform any processing work.
There are several scenarios where a processor may keep the claim for too long.
Expand Down Expand Up @@ -587,6 +587,75 @@ public class AxonConfig {
{% endtab %}
{% endtabs %}

#### Token Stealing

As described at the [start](#tracking-tokens), streaming processor threads can "steal" tokens from one another.
A token is "stolen" when a thread loses a [token claim](#token-claims).
Situations like this internally result in an `UnableToClaimTokenException,` caught by both streaming event processor implementations and translated into warn- or info-level log statements.

Where the framework uses token claims to ensure that a single thread is processing a sequence of events, it supports token stealing to guarantee event processing is not blocked forever.
In short, the framework uses token stealing to unblock your streaming processor threads when processing takes too long.
Examples may include literal slow processing, blocking exceptional scenarios, and deadlocks.

However, token stealing may occur as a surprise for some applications, making it an unwanted side effect.
As such, it is good to be aware of why tokens get stolen (as described above), but also when this happens and what the consequences are.

##### When is a Token stolen?

In practical terms, a token is stolen whenever the _claim timeout_ is exceeded.

This timeout is met whenever the token's timestamp (e.g., the `timestamp` column of your `token_entry` table) exceeds the `claimTimeout` of the `TokenStore`.
By default, the `claimTimeout` value equals 10 seconds.
To adjust it, you must configure a `TokenStore` instance through its builder, as shown in the [Token Store](#token-store) section.

The token's timestamp is equally crucial in deciding when the timeout is met.
The streaming processor thread holding the claim is in charge of updating the token timestamp.
This timestamp is updated whenever the thread finishes a batch of events or whenever the processor extends the claim.
When to extend a claim differs between the Tracking and Pooled Streaming processor.
You should check out the [token claim](#token-claims) section if you want to know how to configure these values.

To further clarify, a streaming processor's thread needs to be able to update the token claim and, by extension, the timestamp to ensure it won't get stolen.
Hence, a staling processor thread will, one way or another, eventually lose the claim.

Examples of when a thread may get its token stolen are:
- Overall slow event handling
- Too large event batch size
- Blocking operations inside event handlers
- Blocking exceptions inside event handlers

##### What are the consequences of Token stealing?

The consequence of token stealing is that an event may be handled twice (or more).

When a thread steals a token, the original thread was _already_ processing events from the token's position.
To protect against doubling event handling, Axon Framework will combine committing the event handling task with updating the token.
As the token claim is required to update the token, the original thread will fail the update.
Following this, a rollback occurs on the [Unit of Work](/axon-framework/messaging-concepts/unit-of-work.md), resolving most issues arising from token stealing.

The ability to rollback event handling tasks sheds light on the consequences of token stealing.
Most event processors project events into a projection stored within a database.
Furthermore, if you store the projection in the same database as the token, the rollback will ensure the change is not persisted.
Thus, the consequence of token stealing is limited to wasting processor cycles.
This scenario is why we recommend storing tokens and projections in the same database.

If a rollback is out of the question for an event handling task, we strongly recommend making the task idempotent.
You may have this scenario when, for example, the projection and tokens do not reside in the same database.
or when the event handler dispatches an operation (e.g., through the `CommandGateway`).
In making the invoked operation idempotent, you ensure that whenever the thread stealing a token handles an event twice (or more), the outcome will be identical.

Without idempotency, the consequences of token stealing can be manyfold:
- Your projection (stored in a different database than your tokens!) may incorrectly project the state.
- An event handler putting messages on a queue will put a message on the queue again.
- A Saga Event Handler invoking a third-party service will invoke that service again.
- An event handler sending an email will send that email again.

In short, any operation introducing a side effect that isn't handled in an idempotent fashion will occur again when a token is stolen.

Concluding, we can separate the consequence of token stealing into roughly three scenarios:
1. We can rollback the operation. In this case, the only consequence is wasted processor cycles.
2. The operation is idempotent. In this case, the only consequence is wasted processor cycles.
3. When the task cannot be rolled back nor performed in an idempotent fashion, compensating actions may be the way out.

### Token Store

The `TokenStore` provides the CRUD operations for the `StreamingEventProcessor` to interact with `TrackingTokens`.
Expand Down Expand Up @@ -1037,7 +1106,7 @@ public class AxonConfig {

EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
(config, builder) -> builder.coordinatorExecutor(coordinatorExecutorBuilder)
.workerExecutorService(workerExecutorBuilder)
.workerExecutor(workerExecutorBuilder)
.initialSegmentCount(32);

processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig);
Expand All @@ -1062,7 +1131,7 @@ public class AxonConfig {

EventProcessingConfigurer.PooledStreamingProcessorConfiguration psepConfig =
(config, builder) -> builder.coordinatorExecutor(coordinatorExecutorBuilder)
.workerExecutorService(workerExecutorBuilder)
.workerExecutor(workerExecutorBuilder)
.initialSegmentCount(32);

processingConfigurer.registerPooledStreamingEventProcessorConfiguration("my-processor", psepConfig);
Expand Down
18 changes: 17 additions & 1 deletion axon-framework/tuning/event-snapshots.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ You could take the stance of dropping all the snapshots which are stored (for a
It is also possible to filter out snapshot events when reading your Aggregate from the event store.
To that end, a `SnapshotFilter` can be defined per Aggregate type or for the entire `EventStore`.


The `SnapshotFilter` is a functional interface, providing two main operations: `allow(DomainEventData<?)` and `combine(SnapshotFilter)`.
The former provides the `DomainEventData` which reflects the snapshot events.
The latter allows combining several `SnapshotFilter`s together.
Expand Down Expand Up @@ -147,7 +146,24 @@ public class GiftCard {...}
{% endtab %}
{% endtabs %}

The above snippet would be feasible to follow _if_ fine-grained control is required when filtering snapshots from the store.
For example, when your snapshots are not based on the Aggregate class (which is the default).
When this is not required, you can base yourself on the default `SnapshotFilter` - the `RevisionSnapshotFilter`.

To configure this `SnapshotFilter`, all you have to do is use the `@Revision` annotation on your Aggregate class.
In doing so, the `RevisionSnapshotFilter` is set, filtering non-matching snapshots from the `Repository`'s loading process, based on the value maintained within the `@Revision` annotation.

Through this, with every new production deployment of your application that adjusts the Aggregate state, you would only have to adjust the revision value in the annotation.
Check out the following example for how to set this up:

```java
// "1" is an example revision value.
// You're free to choose whatever value that fits your application's versioning scheme.
@Revision("1")
public class GiftCard {
// Omitted aggregate internals for simplicity.
}
```

### Initializing an Aggregate based on a Snapshot Event

Expand Down
Loading