From 3a0d891f179fc29d369b869fc1ac565154e8fc23 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 10 Jul 2014 09:11:41 -0700 Subject: [PATCH] Concurrency Fixes for RxRingBuffer & Merge --- rxjava-core/src/main/java/rx/Subscriber.java | 3 +- .../rx/internal/operators/OperatorMerge.java | 127 +++++++----------- .../java/rx/internal/util/RxRingBuffer.java | 54 +++++++- .../internal/operators/OperatorMergeTest.java | 32 +++++ 4 files changed, 133 insertions(+), 83 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Subscriber.java b/rxjava-core/src/main/java/rx/Subscriber.java index cba011ed66..3ffdda84e7 100644 --- a/rxjava-core/src/main/java/rx/Subscriber.java +++ b/rxjava-core/src/main/java/rx/Subscriber.java @@ -108,9 +108,10 @@ protected Producer onSetProducer(Producer producer) { public final void setProducer(Producer producer) { producer = onSetProducer(producer); - int toRequest = requested; + int toRequest; boolean setProducer = false; synchronized (this) { + toRequest = requested; p = producer; if (op != null) { // middle operator ... we pass thru unless a request has been made diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java index f583886982..0f37d465db 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java @@ -91,7 +91,7 @@ public void onStart() { // we decouple the Producer chain while keeping the Subscription chain together (perf benefit) via super(actual) request(RxRingBuffer.SIZE); } - + /* * This is expected to be executed sequentially as per the Rx contract or it will not work. */ @@ -309,12 +309,7 @@ public Boolean call(InnerSubscriber s) { if (s.q != null) { long r = mergeProducer.requested; int emitted = 0; - if (r < 0) { - emitted += drainAll(s); - } else if (r > 0) { - emitted += drainRequested(s, r); - } - + emitted += s.drainQueue(); if (emitted > 0) { /* * `s.emitted` is not volatile (because of performance impact of making it so shown by JMH tests) @@ -336,46 +331,6 @@ public Boolean call(InnerSubscriber s) { return Boolean.TRUE; } - private int drainRequested(InnerSubscriber s, long r) { - int emitted = 0; - // drain what was requested - long toEmit = r; - Object o; - for (int i = 0; i < toEmit; i++) { - o = s.q.poll(); - if (o == null) { - // no more items - break; - } else if (s.q.isCompleted(o)) { - completeInner(s); - } else { - if (!s.q.accept(o, actual)) { - emitted++; - } - } - } - - // decrement the number we emitted from outstanding requests - mergeProducer.REQUESTED.getAndAdd(mergeProducer, -emitted); - return emitted; - } - - private int drainAll(InnerSubscriber s) { - int emitted = 0; - // drain it all - Object o; - while ((o = s.q.poll()) != null) { - if (s.q.isCompleted(o)) { - completeInner(s); - } else { - if (!s.q.accept(o, actual)) { - emitted++; - } - } - } - return emitted; - } - }; @Override @@ -451,7 +406,6 @@ private static final class InnerSubscriber extends Subscriber { @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "once"); private final RxRingBuffer q = RxRingBuffer.getSpmcInstance(); - private boolean mayNeedToDrain = false; /* protected by emitLock */ int emitted = 0; final int THRESHOLD = (int) (q.capacity() * 0.7); @@ -526,12 +480,9 @@ private void emit(T t, boolean complete) { if (parentSubscriber.getEmitLock()) { enqueue = false; try { - // when we have the lock, nothing else can cause producer.requested to decrement, but it can increment at any time - if (mayNeedToDrain) { - // drain the queue if there is anything in it before emitting the current value - emitted += drainQueue(); - mayNeedToDrain = false; - } + // drain the queue if there is anything in it before emitting the current value + emitted += drainQueue(); + // } if (producer == null) { // no backpressure requested if (complete) { @@ -600,7 +551,7 @@ private void emit(T t, boolean complete) { * r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 78.795 1.766 ops/s * } */ - mayNeedToDrain = !parentSubscriber.drainQueuesIfNeeded(); + parentSubscriber.drainQueuesIfNeeded(); } } @@ -611,41 +562,57 @@ private void enqueue(T t, boolean complete) { } else { q.onNext(t); } - mayNeedToDrain = true; } catch (MissingBackpressureException e) { onError(e); } } - private int drainQueue() { - int emittedWhileDraining = 0; - if (q != null) { - if (producer == null) { - Object o; - while ((o = q.poll()) != null) { - if (!q.accept(o, parentSubscriber.actual)) { - // non-terminal event so let's increment count - emittedWhileDraining++; - } + private int drainRequested() { + int emitted = 0; + // drain what was requested + long toEmit = producer.requested; + Object o; + for (int i = 0; i < toEmit; i++) { + o = q.poll(); + if (o == null) { + // no more items + break; + } else if (q.isCompleted(o)) { + parentSubscriber.completeInner(this); + } else { + if (!q.accept(o, parentSubscriber.actual)) { + emitted++; } + } + } + + // decrement the number we emitted from outstanding requests + producer.REQUESTED.getAndAdd(producer, -emitted); + return emitted; + } + + private int drainAll() { + int emitted = 0; + // drain it all + Object o; + while ((o = q.poll()) != null) { + if (q.isCompleted(o)) { + parentSubscriber.completeInner(this); } else { - long toEmit = producer.requested; - for (int i = 0; i < toEmit; i++) { - Object o = q.poll(); - if (o == null) { - break; - } else { - if (!q.accept(o, parentSubscriber.actual)) { - // non-terminal event so let's increment count - emittedWhileDraining++; - } - } + if (!q.accept(o, parentSubscriber.actual)) { + emitted++; } - // decrement the number we emitted from outstanding requests - producer.REQUESTED.getAndAdd(producer, -emittedWhileDraining); } } - return emittedWhileDraining; + return emitted; + } + + private int drainQueue() { + if (producer != null) { + return drainRequested(); + } else { + return drainAll(); + } } } } diff --git a/rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java b/rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java index 2d9e77f21b..dabb857d7f 100644 --- a/rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java +++ b/rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java @@ -25,6 +25,10 @@ import rx.internal.util.unsafe.SpscArrayQueue; import rx.internal.util.unsafe.UnsafeAccess; +/** + * This assumes Spsc or Spmc usage. This means only a single producer calling the on* methods. This is the Rx contract of an Observer. + * Concurrent invocations of on* methods will not be thread-safe. + */ public class RxRingBuffer implements Subscription { public static RxRingBuffer getSpscInstance() { @@ -71,6 +75,29 @@ public static RxRingBuffer getSpmcInstance() { * r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 23951121.098 1982380.330 ops/s * r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 1142.351 33.592 ops/s * + * With SynchronizedQueue (synchronized LinkedList) + * + * r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 33231667.136 685757.510 ops/s + * r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 74623.614 5493.766 ops/s + * r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 22907359.257 707026.632 ops/s + * r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 22222.410 320.829 ops/s + * + * With ArrayBlockingQueue + * + * Benchmark Mode Samples Score Score error Units + * r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 2389804.664 68990.804 ops/s + * r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 27384.274 1411.789 ops/s + * r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 26497037.559 91176.247 ops/s + * r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 17985.144 237.771 ops/s + * + * With ArrayBlockingQueue and Object Pool + * + * Benchmark Mode Samples Score Score error Units + * r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 12465685.522 399070.770 ops/s + * r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 27701.294 395.217 ops/s + * r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 26399625.086 695639.436 ops/s + * r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 17985.427 253.190 ops/s + * * With SpscArrayQueue (single consumer, so failing 1 unit test) * - requires access to Unsafe * @@ -130,7 +157,15 @@ public static RxRingBuffer getSpmcInstance() { private final int size; private final ObjectPool> pool; - private volatile Object terminalState; + /** + * We store the terminal state separately so it doesn't count against the size. + * We don't just +1 the size since some of the queues require sizes that are a power of 2. + * This is a subjective thing ... wanting to keep the size (ie 1024) the actual number of onNext + * that can be sent rather than something like 1023 onNext + 1 terminal event. It also simplifies + * checking that we have received only 1 terminal event, as we don't need to peek at the last item + * or retain a boolean flag. + */ + public volatile Object terminalState; public static final int SIZE = 1024; @@ -233,7 +268,22 @@ public Object poll() { } Object o; o = queue.poll(); - if (o == null && terminalState != null) { + /* + * benjchristensen July 10 2014 => The check for 'queue.size() == 0' came from a very rare concurrency bug where poll() + * is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case, + * "o == null" and there is a terminal state, but now "queue.size() > 0" and we should NOT return the terminalState. + * + * The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on* + * or needing to enqueue terminalState. + * + * This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires + * a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it + * is currently the way it is. + * + * This performs fine as long as we don't use a queue implementation where the size() impl has to scan the whole list, + * such as ConcurrentLinkedQueue. + */ + if (o == null && terminalState != null && queue.size() == 0) { o = terminalState; // once emitted we clear so a poll loop will finish terminalState = null; diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeTest.java index 0988b148fa..e1b18645f6 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorMergeTest.java @@ -973,4 +973,36 @@ public boolean hasNext() { return observable; } + @Test + public void mergeManyAsyncSingle() { + TestSubscriber ts = new TestSubscriber(); + Observable> os = Observable.range(1, 10000).map(new Func1>() { + + @Override + public Observable call(final Integer i) { + return Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber s) { + if (i < 500) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + s.onNext(i); + s.onCompleted(); + } + + }).subscribeOn(Schedulers.computation()).cache(); + } + + }); + Observable.merge(os).subscribe(ts); + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + assertEquals(10000, ts.getOnNextEvents().size()); + } + }