Skip to content

Commit

Permalink
Merge pull request #1534 from benjchristensen/concat-backpressure
Browse files Browse the repository at this point in the history
Concat Backpressure
  • Loading branch information
benjchristensen committed Jul 30, 2014
2 parents 14a41f8 + 04349fa commit cf0c25f
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 37 deletions.
150 changes: 113 additions & 37 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorConcat.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import rx.Observable;
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.observers.SerializedSubscriber;
Expand All @@ -30,30 +33,55 @@
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/concat.png" alt="">
*
* @param <T> the source and result value type
* @param <T>
* the source and result value type
*/
public final class OperatorConcat<T> implements Operator<T, Observable<? extends T>> {
@Override
public Subscriber<? super Observable<? extends T>> call(final Subscriber<? super T> child) {
final SerializedSubscriber<T> s = new SerializedSubscriber<T>(child);
final SerialSubscription current = new SerialSubscription();
child.add(current);
return new ConcatSubscriber<T>(s, current);
ConcatSubscriber<T> cs = new ConcatSubscriber<T>(s, current);
ConcatProducer<T> cp = new ConcatProducer<T>(cs);
child.setProducer(cp);
return cs;
}


static final class ConcatProducer<T> implements Producer {
final ConcatSubscriber<T> cs;

ConcatProducer(ConcatSubscriber<T> cs) {
this.cs = cs;
}

@Override
public void request(long n) {
cs.requestFromChild(n);
}

}

static final class ConcatSubscriber<T> extends Subscriber<Observable<? extends T>> {
final NotificationLite<Observable<? extends T>> nl = NotificationLite.instance();
private final Subscriber<T> s;
private final Subscriber<T> child;
private final SerialSubscription current;
final ConcurrentLinkedQueue<Object> queue;

volatile ConcatInnerSubscriber<T> currentSubscriber;

volatile int wip;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<ConcatSubscriber> WIP_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip");

static final AtomicIntegerFieldUpdater<ConcatSubscriber> WIP_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip");

// accessed by REQUESTED_UPDATER
private volatile long requested;
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<ConcatSubscriber> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcatSubscriber.class, "requested");

public ConcatSubscriber(Subscriber<T> s, SerialSubscription current) {
super(s);
this.s = s;
this.child = s;
this.current = current;
this.queue = new ConcurrentLinkedQueue<Object>();
add(Subscriptions.create(new Action0() {
Expand All @@ -71,20 +99,42 @@ public void onStart() {
request(2);
}

private void requestFromChild(long n) {
// we track 'requested' so we know whether we should subscribe the next or not
if (REQUESTED_UPDATER.getAndAdd(this, n) == 0) {
if (currentSubscriber == null && wip > 0) {
// this means we may be moving from one subscriber to another after having stopped processing
// so need to kick off the subscribe via this request notification
subscribeNext();
// return here as we don't want to do the requestMore logic below (which would double request)
return;
}
}

if (currentSubscriber != null) {
// otherwise we are just passing it through to the currentSubscriber
currentSubscriber.requestMore(n);
}
}

private void decrementRequested() {
REQUESTED_UPDATER.decrementAndGet(this);
}

@Override
public void onNext(Observable<? extends T> t) {
queue.add(nl.next(t));
if (WIP_UPDATER.getAndIncrement(this) == 0) {
subscribeNext();
}
}

@Override
public void onError(Throwable e) {
s.onError(e);
child.onError(e);
unsubscribe();
}

@Override
public void onCompleted() {
queue.add(nl.completed());
Expand All @@ -95,39 +145,65 @@ public void onCompleted() {

void completeInner() {
request(1);
currentSubscriber = null;
if (WIP_UPDATER.decrementAndGet(this) > 0) {
subscribeNext();
}
}

void subscribeNext() {
Object o = queue.poll();
if (nl.isCompleted(o)) {
s.onCompleted();
} else if (o != null) {
Observable<? extends T> obs = nl.getValue(o);
Subscriber<T> sourceSub = new Subscriber<T>() {

@Override
public void onNext(T t) {
// TODO need to support backpressure here https://github.com/Netflix/RxJava/issues/1480
s.onNext(t);
}

@Override
public void onError(Throwable e) {
ConcatSubscriber.this.onError(e);
}

@Override
public void onCompleted() {
completeInner();
}

};
current.set(sourceSub);
obs.unsafeSubscribe(sourceSub);
if (requested > 0) {
Object o = queue.poll();
if (nl.isCompleted(o)) {
child.onCompleted();
} else if (o != null) {
Observable<? extends T> obs = nl.getValue(o);
currentSubscriber = new ConcatInnerSubscriber<T>(this, child, requested);
current.set(currentSubscriber);
obs.unsafeSubscribe(currentSubscriber);
}
} else {
// requested == 0, so we'll peek to see if we are completed, otherwise wait until another request
Object o = queue.peek();
if (nl.isCompleted(o)) {
child.onCompleted();
}
}
}
}

static class ConcatInnerSubscriber<T> extends Subscriber<T> {

private final Subscriber<T> child;
private final ConcatSubscriber<T> parent;

public ConcatInnerSubscriber(ConcatSubscriber<T> parent, Subscriber<T> child, long initialRequest) {
this.parent = parent;
this.child = child;
request(initialRequest);
}

void requestMore(long n) {
request(n);
}

@Override
public void onNext(T t) {
parent.decrementRequested();
child.onNext(t);
}

@Override
public void onError(Throwable e) {
// terminal error through parent so everything gets cleaned up, including this inner
parent.onError(e);
}

@Override
public void onCompleted() {
// terminal completion to parent so it continues to the next
parent.completeInner();
}

};
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.internal.util.RxRingBuffer;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import rx.subscriptions.BooleanSubscription;

Expand Down Expand Up @@ -660,4 +663,37 @@ public void testConcatOuterBackpressure() {
.take(1)
.toBlocking().single());
}

@Test
public void testInnerBackpressureWithAlignedBoundaries() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.range(0, RxRingBuffer.SIZE * 2)
.concatWith(Observable.range(0, RxRingBuffer.SIZE * 2))
.observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
.subscribe(ts);

ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals(RxRingBuffer.SIZE * 4, ts.getOnNextEvents().size());
}

/*
* Testing without counts aligned with buffer sizes because concat must prevent the subscription
* to the next Observable if request == 0 which can happen at the end of a subscription
* if the request size == emitted size. It needs to delay subscription until the next request when aligned,
* when not aligned, it just subscribesNext with the outstanding request amount.
*/
@Test
public void testInnerBackpressureWithoutAlignedBoundaries() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.range(0, (RxRingBuffer.SIZE * 2) + 10)
.concatWith(Observable.range(0, (RxRingBuffer.SIZE * 2) + 10))
.observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
.subscribe(ts);

ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals((RxRingBuffer.SIZE * 4) + 20, ts.getOnNextEvents().size());
}

}

0 comments on commit cf0c25f

Please sign in to comment.