New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Process events with identical TrackingToken
together in the PooledStreamingEventProcessor
#2275
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add test case to ensure last event isn't handled twice. Also, fix the WorkPackage, so that the test succeeds, but the multi-upcaster test fails. #bug/psep-upcasted-events
Both the Coordinator and WorkPackage should be aware subsequent events may have the same TrackingToken. This happens whenever a Multi-upcaster is used to split an event into several entries. As these events originally where one, they should be handled in one transaction too. The coordinator should collect these and provide them together in a list to the WorkPackage. The WorkPackage should during processing ensure it combines these again, as otherwise processing entries are handled separately. #bug/psep-upcasted-events
smcvb
added
Type: Bug
Use to signal issues that describe a bug within the system.
Priority 1: Must
Highest priority. A release cannot be made if this issue isn’t resolved.
Status: In Progress
Use to signal this issue is actively worked on.
labels
Jul 6, 2022
abuijze
requested changes
Jul 7, 2022
messaging/src/main/java/org/axonframework/eventhandling/pooled/WorkPackage.java
Outdated
Show resolved
Hide resolved
messaging/src/main/java/org/axonframework/eventhandling/pooled/WorkPackage.java
Outdated
Show resolved
Hide resolved
messaging/src/main/java/org/axonframework/eventhandling/pooled/WorkPackage.java
Outdated
Show resolved
Hide resolved
messaging/src/main/java/org/axonframework/eventhandling/pooled/WorkPackage.java
Outdated
Show resolved
Hide resolved
smcvb
commented
Jul 7, 2022
...src/test/java/org/axonframework/springboot/PooledStreamingEventProcessorIntegrationTest.java
Outdated
Show resolved
Hide resolved
- Remove debugging system.out line - Wrap assert in assertWithin as it failed a couple of times - Introduce a Default- and BatchProcessingEntry. This for one ensure we resolve a potential concurrency issue, where a Coordinator schedules events while the WorkPackage is consuming them. Secondly, it simplifies the processEvents() method and clarifies the change between default processing and batch processing for the upcasted events. #2275
Rename method for clarity and move assert operation into method #2275
Adjust validation for resiliency. There's a window of opportunity that the WorkPackage is fast and adds another trackerStatus in the mix. For verification, we can come by with a single entry, though. #2275
Adjust validation for resiliency. There's a window of opportunity that the WorkPackages are fast and that one of them already failed before this validation. This means there are 15 instead of 16 status' present. #2275
abuijze
reviewed
Jul 7, 2022
messaging/src/main/java/org/axonframework/eventhandling/pooled/WorkPackage.java
Outdated
Show resolved
Hide resolved
Move adding entries to the ProcessingEntry. That way we do not have to do an instanceof check. #2275
Use a ProcessingEntry instead of a DefaultProcessingEntry #2275
abuijze
approved these changes
Jul 7, 2022
Kudos, SonarCloud Quality Gate passed! |
smcvb
added
Status: Resolved
Use to signal that work on this issue is done.
and removed
Status: In Progress
Use to signal this issue is actively worked on.
labels
Jul 7, 2022
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
Priority 1: Must
Highest priority. A release cannot be made if this issue isn’t resolved.
Status: Resolved
Use to signal that work on this issue is done.
Type: Bug
Use to signal issues that describe a bug within the system.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This pull request can be regarded as a follow up for #2067, doing it right this time.
Pull request #2067 introduced an issue where one failing segment (read:
WorkPackage
) somewhere in the middle of the stream would cause the other segments to pick up the last event again.This was in essence the issue of the
Coordinator
, as it would reset the event stream to the failing segment once it got picked up again.Due to this, the stream was traversed again, causing the last handled event by all other segments to be scheduled again.
This scenario, followed by the
WorkPackage
accepting an event with an identical token (as introduced in PR #2067) caused duplicate handling of the last event.This pull request resolves this predicament by very consciously moving into a different processing branch for event scheduling if subsequent events are present with the same token.
The
Coordinator
ensure they're grouped into a collection and provided together.The
WorkPackage
will in turn group them to process them within the same batch.The changes are accompanied by unit tests and an integration test to validate the correct process.