New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(engine): thread-safe transient subscription state #13072
Conversation
This changes the existing transient state holder to be thread safe. Allows for future changes where we need concurrent read and write operations.
The previous class hierarchy was a bit confusing. With these changes here, the `DbMessageSubscriptionState` is responsible for all state changes. When necessary, it reads from and writes to the transient state directly. This also adjusts names and package paths to indicate that reading and writing transient state is not a mutable operation.
The previous class hierarchy was a bit confusing. With these changes here, the `DbProcessMessageSubscriptionState` is responsible for all state changes. When necessary, it reads from and writes to the transient state directly. This also adjusts names and package paths to indicate that reading and writing transient state is not a mutable operation.
9afa964
to
f8a7dc4
Compare
Thanks @oleschoenburg ❤️ I'll have a look at this tomorrow |
Sorry @oleschoenburg, I forgot about this one. It'll be my top priority tomorrow 🙇 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @oleschoenburg 👏
👍 Another amazing set of changes!
❌ I'm not confident about the thread safety of the TransientPendingSubscriptionState. I might misunderstand how the ConcurrentHashMap works, but please have another look.
...e/src/main/java/io/camunda/zeebe/engine/state/message/TransientPendingSubscriptionState.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔧 I also noticed this still after my review
@@ -151,13 +155,20 @@ public void updateToCorrelatingState(final MessageSubscriptionRecord record) { | |||
|
|||
updateCorrelatingFlag(subscription, true); | |||
|
|||
transientState.add(record); | |||
transientState.add( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔧 Although the implementation of add
and update
are equivalent, semantically, it might make more sense to use update
here.
} | ||
|
||
@Override | ||
public void updateToOpeningState(final ProcessMessageSubscriptionRecord record) { | ||
update(record, s -> s.setRecord(record).setOpening()); | ||
transientState.add(record); | ||
transientState.add( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔧 Although the implementation of add
and update
are equivalent, semantically, it might make more sense to use update
here.
} | ||
|
||
@Override | ||
public void updateToClosingState(final ProcessMessageSubscriptionRecord record) { | ||
update(record, s -> s.setRecord(record).setClosing()); | ||
transientState.add(record); | ||
transientState.add( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔧 Although the implementation of add
and update
are equivalent, semantically, it might make more sense to use update
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oleschoenburg You have convinced me here that this is indeed thread safe.
👍 LGTM (please consider my suggestion before merging)
Although the implementation is equivalent, using `update` instead of `add` is clearer.
bors merge |
Build succeeded: |
Closes #12798 by making
TransientSubscriptionCommandState
thread safe. This allows data sharing between scheduled tasks and processing.The real change is 839744f with two follow up commits that significantly simplify the class hierarchy but add lot of diff noise. The refactorings can be split off if necessary.