Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rxjava-core/src/main/java/rx/Subscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,10 @@ protected Producer onSetProducer(Producer producer) {

public final void setProducer(Producer producer) {
producer = onSetProducer(producer);
int toRequest = requested;
int toRequest;
boolean setProducer = false;
synchronized (this) {
toRequest = requested;
p = producer;
if (op != null) {
// middle operator ... we pass thru unless a request has been made
Expand Down
127 changes: 47 additions & 80 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void onStart() {
// we decouple the Producer chain while keeping the Subscription chain together (perf benefit) via super(actual)
request(RxRingBuffer.SIZE);
}

/*
* This is expected to be executed sequentially as per the Rx contract or it will not work.
*/
Expand Down Expand Up @@ -309,12 +309,7 @@ public Boolean call(InnerSubscriber<T> s) {
if (s.q != null) {
long r = mergeProducer.requested;
int emitted = 0;
if (r < 0) {
emitted += drainAll(s);
} else if (r > 0) {
emitted += drainRequested(s, r);
}

emitted += s.drainQueue();
if (emitted > 0) {
/*
* `s.emitted` is not volatile (because of performance impact of making it so shown by JMH tests)
Expand All @@ -336,46 +331,6 @@ public Boolean call(InnerSubscriber<T> s) {
return Boolean.TRUE;
}

private int drainRequested(InnerSubscriber<T> s, long r) {
int emitted = 0;
// drain what was requested
long toEmit = r;
Object o;
for (int i = 0; i < toEmit; i++) {
o = s.q.poll();
if (o == null) {
// no more items
break;
} else if (s.q.isCompleted(o)) {
completeInner(s);
} else {
if (!s.q.accept(o, actual)) {
emitted++;
}
}
}

// decrement the number we emitted from outstanding requests
mergeProducer.REQUESTED.getAndAdd(mergeProducer, -emitted);
return emitted;
}

private int drainAll(InnerSubscriber<T> s) {
int emitted = 0;
// drain it all
Object o;
while ((o = s.q.poll()) != null) {
if (s.q.isCompleted(o)) {
completeInner(s);
} else {
if (!s.q.accept(o, actual)) {
emitted++;
}
}
}
return emitted;
}

};

@Override
Expand Down Expand Up @@ -451,7 +406,6 @@ private static final class InnerSubscriber<T> extends Subscriber<T> {
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<InnerSubscriber> ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(InnerSubscriber.class, "once");
private final RxRingBuffer q = RxRingBuffer.getSpmcInstance();
private boolean mayNeedToDrain = false;
/* protected by emitLock */
int emitted = 0;
final int THRESHOLD = (int) (q.capacity() * 0.7);
Expand Down Expand Up @@ -526,12 +480,9 @@ private void emit(T t, boolean complete) {
if (parentSubscriber.getEmitLock()) {
enqueue = false;
try {
// when we have the lock, nothing else can cause producer.requested to decrement, but it can increment at any time
if (mayNeedToDrain) {
// drain the queue if there is anything in it before emitting the current value
emitted += drainQueue();
mayNeedToDrain = false;
}
// drain the queue if there is anything in it before emitting the current value
emitted += drainQueue();
// }
if (producer == null) {
// no backpressure requested
if (complete) {
Expand Down Expand Up @@ -600,7 +551,7 @@ private void emit(T t, boolean complete) {
* r.o.OperatorMergePerf.mergeNSyncStreamsOfN 1000 thrpt 5 78.795 1.766 ops/s
* } </pre>
*/
mayNeedToDrain = !parentSubscriber.drainQueuesIfNeeded();
parentSubscriber.drainQueuesIfNeeded();
}
}

Expand All @@ -611,41 +562,57 @@ private void enqueue(T t, boolean complete) {
} else {
q.onNext(t);
}
mayNeedToDrain = true;
} catch (MissingBackpressureException e) {
onError(e);
}
}

private int drainQueue() {
int emittedWhileDraining = 0;
if (q != null) {
if (producer == null) {
Object o;
while ((o = q.poll()) != null) {
if (!q.accept(o, parentSubscriber.actual)) {
// non-terminal event so let's increment count
emittedWhileDraining++;
}
private int drainRequested() {
int emitted = 0;
// drain what was requested
long toEmit = producer.requested;
Object o;
for (int i = 0; i < toEmit; i++) {
o = q.poll();
if (o == null) {
// no more items
break;
} else if (q.isCompleted(o)) {
parentSubscriber.completeInner(this);
} else {
if (!q.accept(o, parentSubscriber.actual)) {
emitted++;
}
}
}

// decrement the number we emitted from outstanding requests
producer.REQUESTED.getAndAdd(producer, -emitted);
return emitted;
}

private int drainAll() {
int emitted = 0;
// drain it all
Object o;
while ((o = q.poll()) != null) {
if (q.isCompleted(o)) {
parentSubscriber.completeInner(this);
} else {
long toEmit = producer.requested;
for (int i = 0; i < toEmit; i++) {
Object o = q.poll();
if (o == null) {
break;
} else {
if (!q.accept(o, parentSubscriber.actual)) {
// non-terminal event so let's increment count
emittedWhileDraining++;
}
}
if (!q.accept(o, parentSubscriber.actual)) {
emitted++;
}
// decrement the number we emitted from outstanding requests
producer.REQUESTED.getAndAdd(producer, -emittedWhileDraining);
}
}
return emittedWhileDraining;
return emitted;
}

private int drainQueue() {
if (producer != null) {
return drainRequested();
} else {
return drainAll();
}
}
}
}
54 changes: 52 additions & 2 deletions rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import rx.internal.util.unsafe.SpscArrayQueue;
import rx.internal.util.unsafe.UnsafeAccess;

/**
* This assumes Spsc or Spmc usage. This means only a single producer calling the on* methods. This is the Rx contract of an Observer.
* Concurrent invocations of on* methods will not be thread-safe.
*/
public class RxRingBuffer implements Subscription {

public static RxRingBuffer getSpscInstance() {
Expand Down Expand Up @@ -71,6 +75,29 @@ public static RxRingBuffer getSpmcInstance() {
* r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 23951121.098 1982380.330 ops/s
* r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 1142.351 33.592 ops/s
*
* With SynchronizedQueue (synchronized LinkedList)
*
* r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 33231667.136 685757.510 ops/s
* r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 74623.614 5493.766 ops/s
* r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 22907359.257 707026.632 ops/s
* r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 22222.410 320.829 ops/s
*
* With ArrayBlockingQueue
*
* Benchmark Mode Samples Score Score error Units
* r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 2389804.664 68990.804 ops/s
* r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 27384.274 1411.789 ops/s
* r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 26497037.559 91176.247 ops/s
* r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 17985.144 237.771 ops/s
*
* With ArrayBlockingQueue and Object Pool
*
* Benchmark Mode Samples Score Score error Units
* r.i.RxRingBufferPerf.createUseAndDestroy1 thrpt 5 12465685.522 399070.770 ops/s
* r.i.RxRingBufferPerf.createUseAndDestroy1000 thrpt 5 27701.294 395.217 ops/s
* r.i.RxRingBufferPerf.ringBufferAddRemove1 thrpt 5 26399625.086 695639.436 ops/s
* r.i.RxRingBufferPerf.ringBufferAddRemove1000 thrpt 5 17985.427 253.190 ops/s
*
* With SpscArrayQueue (single consumer, so failing 1 unit test)
* - requires access to Unsafe
*
Expand Down Expand Up @@ -130,7 +157,15 @@ public static RxRingBuffer getSpmcInstance() {
private final int size;
private final ObjectPool<Queue<Object>> pool;

private volatile Object terminalState;
/**
* We store the terminal state separately so it doesn't count against the size.
* We don't just +1 the size since some of the queues require sizes that are a power of 2.
* This is a subjective thing ... wanting to keep the size (ie 1024) the actual number of onNext
* that can be sent rather than something like 1023 onNext + 1 terminal event. It also simplifies
* checking that we have received only 1 terminal event, as we don't need to peek at the last item
* or retain a boolean flag.
*/
public volatile Object terminalState;

public static final int SIZE = 1024;

Expand Down Expand Up @@ -233,7 +268,22 @@ public Object poll() {
}
Object o;
o = queue.poll();
if (o == null && terminalState != null) {
/*
* benjchristensen July 10 2014 => The check for 'queue.size() == 0' came from a very rare concurrency bug where poll()
* is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case,
* "o == null" and there is a terminal state, but now "queue.size() > 0" and we should NOT return the terminalState.
*
* The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on*
* or needing to enqueue terminalState.
*
* This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires
* a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it
* is currently the way it is.
*
* This performs fine as long as we don't use a queue implementation where the size() impl has to scan the whole list,
* such as ConcurrentLinkedQueue.
*/
if (o == null && terminalState != null && queue.size() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't queue.isEmpty() be better in that case, since it's often an optimized method separate from the implementation of size() (see ConcurrentLinkedQueue for instance), and be clearer since what you want to check is if the collection is empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'll make that change. Thanks!

o = terminalState;
// once emitted we clear so a poll loop will finish
terminalState = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -973,4 +973,36 @@ public boolean hasNext() {
return observable;
}

@Test
public void mergeManyAsyncSingle() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable<Observable<Integer>> os = Observable.range(1, 10000).map(new Func1<Integer, Observable<Integer>>() {

@Override
public Observable<Integer> call(final Integer i) {
return Observable.create(new OnSubscribe<Integer>() {

@Override
public void call(Subscriber<? super Integer> s) {
if (i < 500) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
s.onNext(i);
s.onCompleted();
}

}).subscribeOn(Schedulers.computation()).cache();
}

});
Observable.merge(os).subscribe(ts);
ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals(10000, ts.getOnNextEvents().size());
}

}