-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Closed
Description
Observable.combineLatestDelayError sends error event after complete event happened and treated as unhandled.
Checked on rx.Observable
, io.reactivex.Observable
and Flowable
@Test
public void testCombine() {
rx.observers.TestSubscriber<Integer> testSubscriber = rx.observers.TestSubscriber.create();
rx.Observable<Long> emptyObservable = rx.Observable.empty();
rx.Observable<Object> errorObservable = rx.Observable.error(new Exception());
rx.Observable.combineLatestDelayError(
Arrays.asList(
emptyObservable
.doOnEach(integerNotification -> System.out.println("emptyObservable: " + integerNotification))
.doOnTerminate(() -> System.out.println("emptyObservable: doFinally")),
errorObservable
.doOnEach(integerNotification -> System.out.println("errorObservable: " + integerNotification))
.doOnTerminate(() -> System.out.println("errorObservable: doFinally"))),
objects -> 0
)
.doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
.doOnTerminate(() -> System.out.println("combineLatestDelayError: doFinally"))
.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
}
@Test
public void testCombine2() {
TestObserver<Integer> testObserver = TestObserver.create();
Observable<Long> emptyObservable = Observable.empty();
Observable<Object> errorObservable = Observable.error(new Exception());
Observable.combineLatestDelayError(
Arrays.asList(
emptyObservable
.doOnEach(integerNotification -> System.out.println("emptyObservable: " + integerNotification))
.doFinally(() -> System.out.println("emptyObservable: doFinally")),
errorObservable
.doOnEach(integerNotification -> System.out.println("errorObservable: " + integerNotification))
.doFinally(() -> System.out.println("errorObservable: doFinally"))),
objects -> 0
)
.doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
.doFinally(() -> System.out.println("combineLatestDelayError: doFinally"))
.subscribe(testObserver);
testObserver.awaitTerminalEvent();
}
@Test
public void testCombine2Flowable() {
TestSubscriber<Integer> testObserver = TestSubscriber.create();
Flowable<Integer> emptyFlowable = Flowable.empty();
Flowable<Object> errorFlowable = Flowable.error(new Exception());
Flowable.combineLatestDelayError(
Arrays.asList(
emptyFlowable
.doOnEach(integerNotification -> System.out.println("emptyFlowable: " + integerNotification))
.doFinally(() -> System.out.println("emptyFlowable: doFinally")),
errorFlowable
.doOnEach(integerNotification -> System.out.println("errorFlowable: " + integerNotification))
.doFinally(() -> System.out.println("errorFlowable: doFinally"))),
objects -> 0
)
.doOnEach(integerNotification -> System.out.println("combineLatestDelayError: " + integerNotification))
.doFinally(() -> System.out.println("combineLatestDelayError: doFinally"))
.subscribe(testObserver);
testObserver.awaitTerminalEvent();
}
Output:
testCombine
emptyObservable: [rx.Notification@2b4a2ec7 OnCompleted]
emptyObservable: doFinally
combineLatestDelayError: [rx.Notification@2b4a2ec7 OnCompleted]
combineLatestDelayError: doFinally
testCombine2
emptyObservable: OnCompleteNotification
combineLatestDelayError: OnCompleteNotification
combineLatestDelayError: doFinally
emptyObservable: doFinally
errorObservable: OnErrorNotification[java.lang.Exception]
errorObservable: doFinally
java.lang.Exception
at com.myproject.Test.testCombine2(Test.java:298)
// not really important Stacktrace
Exception in thread "main" java.lang.Exception
at com.myproject.Test.testCombine2(Test.java:298)
// repeat of not important Stacktrace
testCombine2Flowable
emptyFlowable: OnCompleteNotification
combineLatestDelayError: OnCompleteNotification
combineLatestDelayError: doFinally
emptyFlowable: doFinally
If error emitter goes first or add some timer instead of empty, then everything is ok.
Also noticed difference in events order between 1.x and 2.x. Is it correct?
TarasMazepa, adenisyuk, mbezzubenko and Soon-gz