Experimental RingBuffer using WriterReaderPhaser#2243
Conversation
Non-blocking emissions with phase controlled unsubscribe/release.
|
cc @akarnokd |
|
This failure again: |
|
An update related to this is available at #1908 (comment) |
|
The implementation is incorrect. Once flipped, the onXXX should be routed to the nop_queue, but essentially if the criticalValueAtEnter is less than zero, the unsubscription has happened and nothing should be enqueued. The release() is not idempotent because it can flip a second time back to the emitting-enabled mode. I believe by joining the ringbuffer and the phaser, you can get away with a single ingress atomic counter. It will provide he phase bit and the writerIndex at the same time. In addition, I figured out why the multi-consumer queue was needed: it was because an async clear() acts as a second reader using the same poll() and thus would corrupt an SPSC. Maybe an SPSC queue's poll can be modified so a filling the queue with nulls and moving the reader's index up to the writer's index is workable. |
|
Why do they need to be routed anywhere? All future emissions will now return immediately since released == true. Also, a single counter had the same performance problem. That was the WIP implementation. |
|
Released field is redundant because a negative critical value can indicate a terminal state; one field less. The WIP produced some nice results, ecxept on a few cases where it drastically underperformed. Could you point me to the sources of this variant to see if more nanoseconds can be saved? |
Okay, but how is the implementation incorrect, even if it may have other optimizations?
The test is in https://github.com/benjchristensen/RxJava/blob/ring-buffer-wip/src/main/java/rx/internal/util/RxRingBuffer.java |
As per discussion at ReactiveX#2243 (comment) Performance numbers are the same with this optimization: with WriterReaderPhaser using phase state ``` Benchmark (size) Mode Samples Score Score error Units r.o.OperatorMergePerf.merge1SyncStreamOfN 1 thrpt 5 4930586.615 1177558.483 ops/s r.o.OperatorMergePerf.merge1SyncStreamOfN 1000 thrpt 5 26117.610 412.515 ops/s r.o.OperatorMergePerf.merge1SyncStreamOfN 1000000 thrpt 5 25.466 0.775 ops/s r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1 thrpt 5 102486.748 1377.611 ops/s r.o.OperatorMergePerf.mergeNAsyncStreamsOfN 1000 thrpt 5 4.087 0.233 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1 thrpt 5 4457316.389 363539.115 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOf1 100 thrpt 5 473724.591 21719.743 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOf1 1000 thrpt 5 40023.766 3562.388 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1 thrpt 5 5220487.374 405664.161 ops/s r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 23.860 1.395 ops/s r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1 thrpt 5 78372.613 2185.710 ops/s r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN 1000 thrpt 5 2189.509 30.923 ops/s r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1 thrpt 5 5168348.297 107791.880 ops/s r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000 thrpt 5 38955.533 4054.434 ops/s r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1 1000000 thrpt 5 35.013 2.609 ops/s ```
|
I just pushed the change you recommended and eliminated the use of |
|
Method release is not idempotent and may flip the phaser back to even. I think the critical section in poll might be the cause of the performance drop. I see that without it, an ongoing poll might pick up a value from a new use of the queue. I'll think about this a bit more. An improvement could be if you eliminated the size() call before each onNext; a failed offer is enough indication of a missing backpressure, no need to count the queue all the time. |
|
I did some experimenting on this branch, here are the results: The The The In conclusion, I'd chose SpscArrayQueue + two lazy writer-reader-phaser implementation to replace 1.x; it gets close to the baseline and is correct to avoid false sharing of queues between subsequent uses. See implementation here. |
|
Looks interesting. I'll play with the implementation you provided. Do you want to provide a cleaned up pull request of your proposed winner? |
|
Sure. |
As per discussion in #2189 this is an attempt at implementing
RxRingBufferusingWriterReaderPhaser.The performance numbers are worse than #2189, inline with the RW Lock and WIP implementations.
See the last column: