Skip to content

Commit

Permalink
add backpressure support for defaultIfEmpty()
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Jun 23, 2015
1 parent c868ba9 commit b62698c
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 65 deletions.
3 changes: 2 additions & 1 deletion src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3858,7 +3858,8 @@ public final Observable<T> debounce(long timeout, TimeUnit unit, Scheduler sched
* @see <a href="http://reactivex.io/documentation/operators/defaultifempty.html">ReactiveX operators documentation: DefaultIfEmpty</a>
*/
public final Observable<T> defaultIfEmpty(T defaultValue) {
return lift(new OperatorDefaultIfEmpty<T>(defaultValue));
//if empty switch to an observable that emits defaultValue and supports backpressure
return switchIfEmpty(Observable.from((Arrays.asList(defaultValue))));
}

/**
Expand Down
64 changes: 0 additions & 64 deletions src/main/java/rx/internal/operators/OperatorDefaultIfEmpty.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.TestException;
import rx.observers.TestSubscriber;

public class OperatorDefaultIfEmptyTest {

Expand Down Expand Up @@ -85,4 +86,28 @@ public void onCompleted() {
verify(o, never()).onNext(any(Integer.class));
verify(o, never()).onCompleted();
}

@Test
public void testBackpressureEmpty() {
TestSubscriber<Integer> ts = TestSubscriber.create(0);
Observable.<Integer>empty().defaultIfEmpty(1).subscribe(ts);
ts.assertNoValues();
ts.assertNoTerminalEvent();
ts.requestMore(1);
ts.assertValue(1);
ts.assertCompleted();
}

@Test
public void testBackpressureNonEmpty() {
TestSubscriber<Integer> ts = TestSubscriber.create(0);
Observable.just(1,2,3).defaultIfEmpty(1).subscribe(ts);
ts.assertNoValues();
ts.assertNoTerminalEvent();
ts.requestMore(2);
ts.assertValues(1, 2);
ts.requestMore(1);
ts.assertValues(1, 2, 3);
ts.assertCompleted();
}
}

0 comments on commit b62698c

Please sign in to comment.