diff --git a/axon-framework/events/event-processors/streaming.md b/axon-framework/events/event-processors/streaming.md index 813ced29..b35a1641 100644 --- a/axon-framework/events/event-processors/streaming.md +++ b/axon-framework/events/event-processors/streaming.md @@ -377,13 +377,13 @@ To be able to reopen the stream at a later point, we should keep the progress so The progress is kept by updating and saving the `TrackingToken` after handling batches of events. Keeping the progress requires CRUD operation, for which the Streaming Processor uses the [`TokenStore`](#token-store). -For a Streaming Processor to process any events, it needs "a claim" on a `TrackingToken`. +For a Streaming Processor to process any events, it needs ["a claim"](#token-claims) on a `TrackingToken`. The processor will update this claim every time it has finished handling a batch of events. This so-called "claim extension" is, just as updating and saving of tokens, delegated to the Token Store. Hence, the Streaming Processors achieves collaboration among instances/threads through token claims. In the absence of a claim, a processor will actively try to retrieve one. -If a token claim is not extended for a configurable time window, other processor threads are able to "steal" the claim. +If a token claim is not extended for a configurable amount of time, other processor threads can ["steal"](#token-stealing) the claim. Token stealing can, for example, happen if event processing is slow or encountered some exceptions. ### Initial Tracking Token @@ -419,7 +419,7 @@ In those cases, we recommend to validate if any of the above situations occurred There are a couple of things we can configure when it comes to tokens. We can separate these options in "initial token" and "token claim" configuration, as described in the following sections: -**Initial Token** +#### Initial Token The [initial token](#initial-tracking-token) for a `StreamingEventProcessor` is configurable for every processor instance. When configuring the initial token builder function, the received input parameter is the `StreamableMessageSource`. @@ -501,7 +501,7 @@ public class AxonConfig { {% endtab %} {% endtabs %} -**Token Claims** +#### Token Claims As described [here](#tracking-tokens), a streaming processor should claim a token before it is allowed to perform any processing work. There are several scenarios where a processor may keep the claim for too long. @@ -587,6 +587,75 @@ public class AxonConfig { {% endtab %} {% endtabs %} +#### Token Stealing + +As described at the [start](#tracking-tokens), streaming processor threads can "steal" tokens from one another. +A token is "stolen" when a thread loses a [token claim](#token-claims). +Situations like this internally result in an `UnableToClaimTokenException,` caught by both streaming event processor implementations and translated into warn- or info-level log statements. + +Where the framework uses token claims to ensure that a single thread is processing a sequence of events, it supports token stealing to guarantee event processing is not blocked forever. +In short, the framework uses token stealing to unblock your streaming processor threads when processing takes too long. +Examples may include literal slow processing, blocking exceptional scenarios, and deadlocks. + +However, token stealing may occur as a surprise for some applications, making it an unwanted side effect. +As such, it is good to be aware of why tokens get stolen (as described above), but also when this happens and what the consequences are. + +##### When is a Token stolen? + +In practical terms, a token is stolen whenever the _claim timeout_ is exceeded. + +This timeout is met whenever the token's timestamp (e.g., the `timestamp` column of your `token_entry` table) exceeds the `claimTimeout` of the `TokenStore`. +By default, the `claimTimeout` value equals 10 seconds. +To adjust it, you must configure a `TokenStore` instance through its builder, as shown in the [Token Store](#token-store) section. + +The token's timestamp is equally crucial in deciding when the timeout is met. +The streaming processor thread holding the claim is in charge of updating the token timestamp. +This timestamp is updated whenever the thread finishes a batch of events or whenever the processor extends the claim. +When to extend a claim differs between the Tracking and Pooled Streaming processor. +You should check out the [token claim](#token-claims) section if you want to know how to configure these values. + +To further clarify, a streaming processor's thread needs to be able to update the token claim and, by extension, the timestamp to ensure it won't get stolen. +Hence, a staling processor thread will, one way or another, eventually lose the claim. + +Examples of when a thread may get its token stolen are: +- Overall slow event handling +- Too large event batch size +- Blocking operations inside event handlers +- Blocking exceptions inside event handlers + +##### What are the consequences of Token stealing? + +The consequence of token stealing is that an event may be handled twice (or more). + +When a thread steals a token, the original thread was _already_ processing events from the token's position. +To protect against doubling event handling, Axon Framework will combine committing the event handling task with updating the token. +As the token claim is required to update the token, the original thread will fail the update. +Following this, a rollback occurs on the [Unit of Work](/axon-framework/messaging-concepts/unit-of-work.md), resolving most issues arising from token stealing. + +The ability to rollback event handling tasks sheds light on the consequences of token stealing. +Most event processors project events into a projection stored within a database. +Furthermore, if you store the projection in the same database as the token, the rollback will ensure the change is not persisted. +Thus, the consequence of token stealing is limited to wasting processor cycles. +This scenario is why we recommend storing tokens and projections in the same database. + +If a rollback is out of the question for an event handling task, we strongly recommend making the task idempotent. +You may have this scenario when, for example, the projection and tokens do not reside in the same database. + or when the event handler dispatches an operation (e.g., through the `CommandGateway`). +In making the invoked operation idempotent, you ensure that whenever the thread stealing a token handles an event twice (or more), the outcome will be identical. + +Without idempotency, the consequences of token stealing can be manyfold: +- Your projection (stored in a different database than your tokens!) may incorrectly project the state. +- An event handler putting messages on a queue will put a message on the queue again. +- A Saga Event Handler invoking a third-party service will invoke that service again. +- An event handler sending an email will send that email again. + +In short, any operation introducing a side effect that isn't handled in an idempotent fashion will occur again when a token is stolen. + +Concluding, we can separate the consequence of token stealing into roughly three scenarios: +1. We can rollback the operation. In this case, the only consequence is wasted processor cycles. +2. The operation is idempotent. In this case, the only consequence is wasted processor cycles. +3. When the task cannot be rolled back nor performed in an idempotent fashion, compensating actions may be the way out. + ### Token Store The `TokenStore` provides the CRUD operations for the `StreamingEventProcessor` to interact with `TrackingTokens`.