Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concat Backpressure #1534

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 113 additions & 37 deletions rxjava-core/src/main/java/rx/internal/operators/OperatorConcat.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import rx.Observable;
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.observers.SerializedSubscriber;
Expand All @@ -30,30 +33,55 @@
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/concat.png" alt="">
*
* @param <T> the source and result value type
* @param <T>
* the source and result value type
*/
public final class OperatorConcat<T> implements Operator<T, Observable<? extends T>> {
@Override
public Subscriber<? super Observable<? extends T>> call(final Subscriber<? super T> child) {
final SerializedSubscriber<T> s = new SerializedSubscriber<T>(child);
final SerialSubscription current = new SerialSubscription();
child.add(current);
return new ConcatSubscriber<T>(s, current);
ConcatSubscriber<T> cs = new ConcatSubscriber<T>(s, current);
ConcatProducer<T> cp = new ConcatProducer<T>(cs);
child.setProducer(cp);
return cs;
}


static final class ConcatProducer<T> implements Producer {
final ConcatSubscriber<T> cs;

ConcatProducer(ConcatSubscriber<T> cs) {
this.cs = cs;
}

@Override
public void request(long n) {
cs.requestFromChild(n);
}

}

static final class ConcatSubscriber<T> extends Subscriber<Observable<? extends T>> {
final NotificationLite<Observable<? extends T>> nl = NotificationLite.instance();
private final Subscriber<T> s;
private final Subscriber<T> child;
private final SerialSubscription current;
final ConcurrentLinkedQueue<Object> queue;

volatile ConcatInnerSubscriber<T> currentSubscriber;

volatile int wip;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<ConcatSubscriber> WIP_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip");

static final AtomicIntegerFieldUpdater<ConcatSubscriber> WIP_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip");

// accessed by REQUESTED_UPDATER
private volatile long requested;
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<ConcatSubscriber> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(ConcatSubscriber.class, "requested");

public ConcatSubscriber(Subscriber<T> s, SerialSubscription current) {
super(s);
this.s = s;
this.child = s;
this.current = current;
this.queue = new ConcurrentLinkedQueue<Object>();
add(Subscriptions.create(new Action0() {
Expand All @@ -71,20 +99,42 @@ public void onStart() {
request(2);
}

private void requestFromChild(long n) {
// we track 'requested' so we know whether we should subscribe the next or not
if (REQUESTED_UPDATER.getAndAdd(this, n) == 0) {
if (currentSubscriber == null && wip > 0) {
// this means we may be moving from one subscriber to another after having stopped processing
// so need to kick off the subscribe via this request notification
subscribeNext();
// return here as we don't want to do the requestMore logic below (which would double request)
return;
}
}

if (currentSubscriber != null) {
// otherwise we are just passing it through to the currentSubscriber
currentSubscriber.requestMore(n);
}
}

private void decrementRequested() {
REQUESTED_UPDATER.decrementAndGet(this);
}

@Override
public void onNext(Observable<? extends T> t) {
queue.add(nl.next(t));
if (WIP_UPDATER.getAndIncrement(this) == 0) {
subscribeNext();
}
}

@Override
public void onError(Throwable e) {
s.onError(e);
child.onError(e);
unsubscribe();
}

@Override
public void onCompleted() {
queue.add(nl.completed());
Expand All @@ -95,39 +145,65 @@ public void onCompleted() {

void completeInner() {
request(1);
currentSubscriber = null;
if (WIP_UPDATER.decrementAndGet(this) > 0) {
subscribeNext();
}
}

void subscribeNext() {
Object o = queue.poll();
if (nl.isCompleted(o)) {
s.onCompleted();
} else if (o != null) {
Observable<? extends T> obs = nl.getValue(o);
Subscriber<T> sourceSub = new Subscriber<T>() {

@Override
public void onNext(T t) {
// TODO need to support backpressure here https://github.com/Netflix/RxJava/issues/1480
s.onNext(t);
}

@Override
public void onError(Throwable e) {
ConcatSubscriber.this.onError(e);
}

@Override
public void onCompleted() {
completeInner();
}

};
current.set(sourceSub);
obs.unsafeSubscribe(sourceSub);
if (requested > 0) {
Object o = queue.poll();
if (nl.isCompleted(o)) {
child.onCompleted();
} else if (o != null) {
Observable<? extends T> obs = nl.getValue(o);
currentSubscriber = new ConcatInnerSubscriber<T>(this, child, requested);
current.set(currentSubscriber);
obs.unsafeSubscribe(currentSubscriber);
}
} else {
// requested == 0, so we'll peek to see if we are completed, otherwise wait until another request
Object o = queue.peek();
if (nl.isCompleted(o)) {
child.onCompleted();
}
}
}
}

static class ConcatInnerSubscriber<T> extends Subscriber<T> {

private final Subscriber<T> child;
private final ConcatSubscriber<T> parent;

public ConcatInnerSubscriber(ConcatSubscriber<T> parent, Subscriber<T> child, long initialRequest) {
this.parent = parent;
this.child = child;
request(initialRequest);
}

void requestMore(long n) {
request(n);
}

@Override
public void onNext(T t) {
parent.decrementRequested();
child.onNext(t);
}

@Override
public void onError(Throwable e) {
// terminal error through parent so everything gets cleaned up, including this inner
parent.onError(e);
}

@Override
public void onCompleted() {
// terminal completion to parent so it continues to the next
parent.completeInner();
}

};
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.internal.util.RxRingBuffer;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import rx.subscriptions.BooleanSubscription;

Expand Down Expand Up @@ -660,4 +663,37 @@ public void testConcatOuterBackpressure() {
.take(1)
.toBlocking().single());
}

@Test
public void testInnerBackpressureWithAlignedBoundaries() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.range(0, RxRingBuffer.SIZE * 2)
.concatWith(Observable.range(0, RxRingBuffer.SIZE * 2))
.observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
.subscribe(ts);

ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals(RxRingBuffer.SIZE * 4, ts.getOnNextEvents().size());
}

/*
* Testing without counts aligned with buffer sizes because concat must prevent the subscription
* to the next Observable if request == 0 which can happen at the end of a subscription
* if the request size == emitted size. It needs to delay subscription until the next request when aligned,
* when not aligned, it just subscribesNext with the outstanding request amount.
*/
@Test
public void testInnerBackpressureWithoutAlignedBoundaries() {
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Observable.range(0, (RxRingBuffer.SIZE * 2) + 10)
.concatWith(Observable.range(0, (RxRingBuffer.SIZE * 2) + 10))
.observeOn(Schedulers.computation()) // observeOn has a backpressured RxRingBuffer
.subscribe(ts);

ts.awaitTerminalEvent();
ts.assertNoErrors();
assertEquals((RxRingBuffer.SIZE * 4) + 20, ts.getOnNextEvents().size());
}

}