Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock-free, MPSC-queue based, fast-path serializing Observer. #1235

Merged

Conversation

akarnokd
Copy link
Member

I've rewritten the SerializedObserver to improve performance. It is now lock-free, uses a multiple-producer-single-consumer queue based on Netty's implementation and employs a fast-path logic for single-threaded write-through.

Benchmarked by measuring how fast 500k integers can get through it, if running 1-8 producers at the same time. For a single threaded case, master gives about 18 MOps/s, this implementation gives ~36 MOps/s (would be ~16 MOps/s on the slow path). For producers > 2, master gives ~5.5 MOps/s and this gives ~11.5 MOps/s. For 2 producers, aka 1 producer - 1 consumer, master gives ~4.5 MOps and this gives ~8.5 MOps/s.

The two new class, PaddedAtomicInteger and MpscPaddedQueue will come in handy with other lock-free structures such as Schedulers, etc. We may consider adding back the rx.util or some other sub-package to store these helper classes: they don't need to be part of the public API but can be left public to enable cross-package access internally.

Things I learned during the implementation:

  • It is worth padding the wip counter to fit a cache line so the constant cache thrashing won't affect the parent class' other fields, most likely.
  • Using FieldUpdaters saves space but sun.misc.Unsafe can add 8-10% more throughput. To avoid platform issues, I stayed with the FieldUpdaters.
  • Using getAndIncrement and decrementAndGet are intrinsified in Java 8 and are compiled to a single x86 instruction, which generally outperforms any CAS loop. Same is true for the getAndSet.
  • Padding out the tail in the MpscPaddedQueue again helps separate producers trashing on the tail and a consumer reading the head. Without it, the throughput would decrease by about ~1.5 MOps/s
  • By adding the fast-path logic, the single-threaded throughput increases by a factor of 2 since it avoids an unnecessary enqueue and dequeue and all associated volatile writes. However, if taking the fast-path fails, it incurs extra cost on the slow path for everyone else because of the +1 failed CAS at the start. To fix this case, I've introduced a flag that enables and disables fast-path. To disable the fast path, the active fast path checks if it was able to change wip to zero. If not, it means there was concurrent access and continues on the emission loop path, but disables the fast-path then on. It is basically a detector for concurrent use. Since such concurrent use may be transient, the loop counts how many elements it had to emit, an if it was only 1 or 2, it reenables the fast-path. This limit is the result of trying several values with the benchmark above.
  • The fast-path logic has its weak spot in 2 producer case compared to a plain MPSC queue running in SPSC mode; the latter gives about ~11 MOps/s which is better than this implementation's ~8.5 MOps/s. In contrast, the single-treaded use for the plain MPSC is only ~16 MOps/s. Both implementations perform the same if producers > 2. Therefore, I decided it is more worth having an implementation that is weak for 2 producers but otherwise is as good or outperforms the alternatives. Note that if one knows the number of producers up front, one can create a more specialized implementation, but this is not the case with RxJava operators. This may affect merge and co which serialize multiple sources. Note however, that if the source speed isn't that high as in the benchmark, this implementation still provide less latency than the alternatives because the fast-path would be most likely open if the source emission is interleaved.

@cloudbees-pull-request-builder

RxJava-pull-requests #1136 SUCCESS
This pull request looks good

@akarnokd
Copy link
Member Author

One additional node. It is possible the JVM will eliminate the serialized part in the master's SerializedObserver and thus giving the througput of 40-100 MOps/s. I usually get this effect if I run the test with 2M values. The 500k above was chosen to avoid this optimization. Unfortunately, one can't win against or replicate this behavior from within Java.

* if the {@link Observer} is null.
*/
@SuppressWarnings("unchecked")
public boolean accept2(Observer<? super T> o, Object n) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the return value just be added to the accept method without a performance hit instead of having the awkwardly named, almost exactly the same accept2 method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most likely yes.

@benjchristensen
Copy link
Member

Your research on this is awesome. I look forward to playing with this queue implementation and seeing it's performance compared to alternatives.

However, this slows down the normal non-contended use cases:

With MPSC

r.u.PerfTransforms.flatMapTransformsUsingFrom        1  thrpt         5  4093463.403   158457.270    ops/s
r.u.PerfTransforms.flatMapTransformsUsingFrom     1024  thrpt         5    11767.693      252.340    ops/s

Master branch

r.u.PerfTransforms.flatMapTransformsUsingFrom        1  thrpt         5  4729230.173    99504.604    ops/s
r.u.PerfTransforms.flatMapTransformsUsingFrom     1024  thrpt         5    12648.153      105.077    ops/s

Test using:

../gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 5 -prof GC .*PerfTransforms.flatMapTransformsUsingFrom*'

I don't think SerializedObserver is the right candidate for non-blocking/lock-free due to the reasons we discussed a couple months ago in #999 and #962, particularly this conclusion:

The "queue and lock" model performs well in the non-contended case and under contention, despite not being a very elegant solution and requiring the use of mutex locks for the state changes (but it does not hold the locks during notification).

This applies in this case because we are NOT exchanging events between threads, we are actively trying to not do that.

A high-performance queue (particularly of the lock-free kind) will make great sense however in places such as observeOn and/or Scheduler implementations where it has a producer thread and a consumer thread, such as discussed in #1190.

I suggest we get this queue and counter into rx.internal.util and start performance testing them as replacements everywhere we're using a queue. I would for now leave SerializedObserver alone as we spent a lot of time on that before with at least 3 different implementations and came to the conclusion that the JVM can do a better job for that use case than we can.

@benjchristensen
Copy link
Member

Here is my branch where I merged this PR and moved the code into rx.internal.util along with a README file: https://github.com/benjchristensen/RxJava/tree/serializer-mpsc

https://github.com/benjchristensen/RxJava/tree/serializer-mpsc/rxjava-core/src/main/java/rx/internal/util

@benjchristensen
Copy link
Member

By the way, I'll spend more time testing the contended cases. Don't close this PR, I'm not done reviewing it all.

@akarnokd
Copy link
Member Author

The JVM can remove a synchronized block if it detects a single-threaded use; I can't compete with that by using atomics. This is a tradeoff scenario: get very fast synchronous behavior or get double throughput in contended case. If we value the fast synchronous behavior more, then there is no need to go atomic just make sure we don't use wait/notify.

@benjchristensen
Copy link
Member

I know that, we determined that in #1190. If we're going to change the decision on what tradeoff to make then we need to go back and revisit why we made the decision we did.

The expected common case is when there is not contention because of how flatMap is done everywhere.

I think we need a broader set of use cases to determine which direction the tradeoff should be made.

@benjchristensen
Copy link
Member

This was automatically closed as I merged the queue implementation. We still need to finish discussing the SerializedObserver part.

@akarnokd
Copy link
Member Author

I have my doubts now. Some simple benchmarks show improvements, other jmh benchmarks don't. Without industrial use cases, I think synchronized constructs are enough because if either low contention or single thread use. In the latter, JVM optimizations will always win. For example, I tried double checked locking with Composite and barely got 10Mops/s. The current version gives 1.3Gops/s if run within a 100k loop long enough.

@daschl
Copy link
Contributor

daschl commented May 30, 2014

I think if in doubt, let's just not outsmart the JVM :)

@benjchristensen
Copy link
Member

Here is an adhoc conversation on Twitter with some experts in this: https://twitter.com/benjchristensen/status/472362740510494721

I'd like to improve our perf testing and have cases for uncontended, normal occasional contention and highly contended and test it in a few places. I'll do the upgrade of JMH today so we have the latest code. Once I have those in place I'll put it out there for review and guidance.

@headinthebox
Copy link
Contributor

For what it is worth, I find the statemachine approach leads to hard to read code, so if there is no obvious win, I'd go for simple.

@akarnokd akarnokd deleted the SerializedObserverPerf521 branch January 20, 2015 15:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants