Skip to content
77 changes: 73 additions & 4 deletions axon-framework/events/event-processors/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,13 @@ To be able to reopen the stream at a later point, we should keep the progress so
The progress is kept by updating and saving the `TrackingToken` after handling batches of events.
Keeping the progress requires CRUD operation, for which the Streaming Processor uses the [`TokenStore`](#token-store).

For a Streaming Processor to process any events, it needs "a claim" on a `TrackingToken`.
For a Streaming Processor to process any events, it needs ["a claim"](#token-claims) on a `TrackingToken`.
The processor will update this claim every time it has finished handling a batch of events.
This so-called "claim extension" is, just as updating and saving of tokens, delegated to the Token Store.
Hence, the Streaming Processors achieves collaboration among instances/threads through token claims.

In the absence of a claim, a processor will actively try to retrieve one.
If a token claim is not extended for a configurable time window, other processor threads are able to "steal" the claim.
If a token claim is not extended for a configurable amount of time, other processor threads can ["steal"](#token-stealing) the claim.
Token stealing can, for example, happen if event processing is slow or encountered some exceptions.

### Initial Tracking Token
Expand Down Expand Up @@ -419,7 +419,7 @@ In those cases, we recommend to validate if any of the above situations occurred
There are a couple of things we can configure when it comes to tokens.
We can separate these options in "initial token" and "token claim" configuration, as described in the following sections:

**Initial Token**
#### Initial Token

The [initial token](#initial-tracking-token) for a `StreamingEventProcessor` is configurable for every processor instance.
When configuring the initial token builder function, the received input parameter is the `StreamableMessageSource`.
Expand Down Expand Up @@ -501,7 +501,7 @@ public class AxonConfig {
{% endtab %}
{% endtabs %}

**Token Claims**
#### Token Claims

As described [here](#tracking-tokens), a streaming processor should claim a token before it is allowed to perform any processing work.
There are several scenarios where a processor may keep the claim for too long.
Expand Down Expand Up @@ -587,6 +587,75 @@ public class AxonConfig {
{% endtab %}
{% endtabs %}

#### Token Stealing

As described at the [start](#tracking-tokens), streaming processor threads can "steal" tokens from one another.
A token is "stolen" when a thread loses a [token claim](#token-claims).
Situations like this internally result in an `UnableToClaimTokenException,` caught by both streaming event processor implementations and translated into warn- or info-level log statements.

Where the framework uses token claims to ensure that a single thread is processing a sequence of events, it supports token stealing to guarantee event processing is not blocked forever.
In short, the framework uses token stealing to unblock your streaming processor threads when processing takes too long.
Examples may include literal slow processing, blocking exceptional scenarios, and deadlocks.

However, token stealing may occur as a surprise for some applications, making it an unwanted side effect.
As such, it is good to be aware of why tokens get stolen (as described above), but also when this happens and what the consequences are.

##### When is a Token stolen?

In practical terms, a token is stolen whenever the _claim timeout_ is exceeded.

This timeout is met whenever the token's timestamp (e.g., the `timestamp` column of your `token_entry` table) exceeds the `claimTimeout` of the `TokenStore`.
By default, the `claimTimeout` value equals 10 seconds.
To adjust it, you must configure a `TokenStore` instance through its builder, as shown in the [Token Store](#token-store) section.

The token's timestamp is equally crucial in deciding when the timeout is met.
The streaming processor thread holding the claim is in charge of updating the token timestamp.
This timestamp is updated whenever the thread finishes a batch of events or whenever the processor extends the claim.
When to extend a claim differs between the Tracking and Pooled Streaming processor.
You should check out the [token claim](#token-claims) section if you want to know how to configure these values.

To further clarify, a streaming processor's thread needs to be able to update the token claim and, by extension, the timestamp to ensure it won't get stolen.
Hence, a staling processor thread will, one way or another, eventually lose the claim.

Examples of when a thread may get its token stolen are:
- Overall slow event handling
- Too large event batch size
- Blocking operations inside event handlers
- Blocking exceptions inside event handlers

##### What are the consequences of Token stealing?

The consequence of token stealing is that an event may be handled twice (or more).

When a thread steals a token, the original thread was _already_ processing events from the token's position.
To protect against doubling event handling, Axon Framework will combine committing the event handling task with updating the token.
As the token claim is required to update the token, the original thread will fail the update.
Following this, a rollback occurs on the [Unit of Work](/axon-framework/messaging-concepts/unit-of-work.md), resolving most issues arising from token stealing.

The ability to rollback event handling tasks sheds light on the consequences of token stealing.
Most event processors project events into a projection stored within a database.
Furthermore, if you store the projection in the same database as the token, the rollback will ensure the change is not persisted.
Thus, the consequence of token stealing is limited to wasting processor cycles.
This scenario is why we recommend storing tokens and projections in the same database.

If a rollback is out of the question for an event handling task, we strongly recommend making the task idempotent.
You may have this scenario when, for example, the projection and tokens do not reside in the same database.
or when the event handler dispatches an operation (e.g., through the `CommandGateway`).
In making the invoked operation idempotent, you ensure that whenever the thread stealing a token handles an event twice (or more), the outcome will be identical.

Without idempotency, the consequences of token stealing can be manyfold:
- Your projection (stored in a different database than your tokens!) may incorrectly project the state.
- An event handler putting messages on a queue will put a message on the queue again.
- A Saga Event Handler invoking a third-party service will invoke that service again.
- An event handler sending an email will send that email again.

In short, any operation introducing a side effect that isn't handled in an idempotent fashion will occur again when a token is stolen.

Concluding, we can separate the consequence of token stealing into roughly three scenarios:
1. We can rollback the operation. In this case, the only consequence is wasted processor cycles.
2. The operation is idempotent. In this case, the only consequence is wasted processor cycles.
3. When the task cannot be rolled back nor performed in an idempotent fashion, compensating actions may be the way out.

### Token Store

The `TokenStore` provides the CRUD operations for the `StreamingEventProcessor` to interact with `TrackingTokens`.
Expand Down