From 89390e3db2e391f40860ba82630532af584eed72 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 2 Oct 2015 09:06:46 +0200 Subject: [PATCH] DoOnEach: report both original exception and callback exception. This came up in a [Stackoverflow](http://stackoverflow.com/questions/32889008/do-operators-instead-of-a-whole-subscriber) answer. If the doOnError's callback or the doOnEach's onError method throws, any non-fatal exception replaced the original error which got lost. This PR will wrap them both into a CompositeException. 2.x note: since Java 8 supports `addSuppressed` all callbacks in this situation either attach to the original exception or the original exception is attached to the callback's exception. --- src/main/java/rx/Observable.java | 7 +++ .../internal/operators/OperatorDoOnEach.java | 7 ++- .../operators/OperatorDoOnEachTest.java | 48 +++++++++++++------ 3 files changed, 45 insertions(+), 17 deletions(-) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 0b50ef9268..d9121c8619 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -4362,6 +4362,10 @@ public final void onNext(T v) { /** * Modifies the source Observable so that it notifies an Observer for each item it emits. *

+ * In case the onError of the supplied observer throws, the downstream will receive a composite exception containing + * the original exception and the exception thrown by onError. If the onNext or the onCompleted methods + * of the supplied observer throws, the downstream will be terminated and wil receive this thrown exception. + *

* *

*
Scheduler:
@@ -4380,6 +4384,9 @@ public final Observable doOnEach(Observer observer) { /** * Modifies the source Observable so that it invokes an action if it calls {@code onError}. *

+ * In case the onError action throws, the downstream will receive a composite exception containing + * the original exception and the exception thrown by onError. + *

* *

*
Scheduler:
diff --git a/src/main/java/rx/internal/operators/OperatorDoOnEach.java b/src/main/java/rx/internal/operators/OperatorDoOnEach.java index 4b3e8d54cf..1e3a680dac 100644 --- a/src/main/java/rx/internal/operators/OperatorDoOnEach.java +++ b/src/main/java/rx/internal/operators/OperatorDoOnEach.java @@ -15,9 +15,11 @@ */ package rx.internal.operators; +import java.util.Arrays; + import rx.*; import rx.Observable.Operator; -import rx.exceptions.Exceptions; +import rx.exceptions.*; /** * Converts the elements of an observable sequence to the specified type. @@ -62,7 +64,8 @@ public void onError(Throwable e) { try { doOnEachObserver.onError(e); } catch (Throwable e2) { - Exceptions.throwOrReport(e2, observer); + Exceptions.throwIfFatal(e2); + observer.onError(new CompositeException(Arrays.asList(e, e2))); return; } observer.onError(e); diff --git a/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java b/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java index 2ad9a36828..3c4cf9f9bb 100644 --- a/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java +++ b/src/test/java/rx/internal/operators/OperatorDoOnEachTest.java @@ -17,25 +17,19 @@ import static org.junit.Assert.*; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import rx.Observable; -import rx.Observer; -import rx.Subscriber; -import rx.exceptions.OnErrorNotImplementedException; -import rx.functions.Action1; -import rx.functions.Func1; +import static org.mockito.Mockito.*; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.*; +import org.mockito.*; + +import rx.*; +import rx.exceptions.*; +import rx.functions.*; +import rx.observers.TestSubscriber; + public class OperatorDoOnEachTest { @Mock @@ -201,4 +195,28 @@ public void call(Object o) { System.out.println("Received exception: " + e); } } + + @Test + public void testOnErrorThrows() { + TestSubscriber ts = TestSubscriber.create(); + + Observable.error(new TestException()) + .doOnError(new Action1() { + @Override + public void call(Throwable e) { + throw new TestException(); + } + }).subscribe(ts); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(CompositeException.class); + + CompositeException ex = (CompositeException)ts.getOnErrorEvents().get(0); + + List exceptions = ex.getExceptions(); + assertEquals(2, exceptions.size()); + assertTrue(exceptions.get(0) instanceof TestException); + assertTrue(exceptions.get(1) instanceof TestException); + } } \ No newline at end of file