-
Notifications
You must be signed in to change notification settings - Fork 140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
=str Make use of ConcurrentHashMap instead of List to reduce contention. #408
Conversation
Is it possible to provide jmh benchmarks to show the difference? |
@mdedetrich I think this is a bug fix not a performance optimization, I can switch to |
Ah I see, I will get @jrudolph to review this since he made the original issue. |
After read the paper of TrieMap, I think I need to change it back to ConcurrentHashMap.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
stream/src/main/scala/org/apache/pekko/stream/stage/GraphStage.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused as to the use of AtomicReference[mutable.Map]
instead of just plain mutable.Map
.
Signed-off-by: He-Pin <hepin1989@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Yes, this was originally a performance improvement. Regardless of that fact, let's not just merge changes in performance critical code without running benchmarks or discussing arguments why they can be omitted. It has been a long time since I worked on this / proposed the change (almost 3 years... oh my). I guess in this case, it's fine, it seems after all they ran some benchmarks in Akka to validate it does not introduce obvious performance regressions.
The comment at its declaration explains it. There's a race condition between scheduling an async callback (i.e. an incoming external event into the stream) and shutting down a stream. The atomic reference makes sure, that we will report feedback to the async callback reliably even if it raced against shutdown (this is basically what this whole feature is about). |
Was: akka/akka#31262
refs: akka/akka#29557
Use an ConcurrentMap as set to trace the contention in onFeedbackDispatched