Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

onErrorFlatMap + OnErrorThrowable #892

Merged
merged 4 commits into from
Feb 18, 2014

Conversation

benjchristensen
Copy link
Member

I am working on solving a production error handling use case that needs the ability to handle errors on an Observable acting like a message-bus. This requires it to ignore errors if they occur.

With @headinthebox the idea of onErrorFlatMap evolved, but we now have a decision to make about the implementation.

It's pretty easy to solve the first half and allow onErrorFlatMap to return Observables that are injected into the output Observable but don't onComplete and thus allow the stream to continue.

However, we're looking at whether we can also capture the T value that caused the exception to be thrown so we can achieve use cases like this:

    @Test
    public void spliceAndContinueEmitting() {
        TestSubscriber<String> ts = new TestSubscriber<String>();
        Observable.from(1, 2, 3, 4, 5, 6).map(new Func1<Integer, String>() {

            @Override
            public String call(Integer v) {
                if (v < 2 || v > 5) {
                    return "Value=" + v;
                }
                throw new RuntimeException("error in map function: " + v);
            }

        }).onErrorFlatMap(new Func1<OnErrorThrowable, Observable<String>>() {

            @Override
            public Observable<String> call(OnErrorThrowable t) {
                return Observable.from("Error=" + t.getValue());
            }

        }).subscribe(ts);

        ts.assertTerminalEvent();
        System.out.println(ts.getOnErrorEvents());
        assertEquals(0, ts.getOnErrorEvents().size());
        System.out.println(ts.getOnNextEvents());
        ts.assertReceivedOnNext(Arrays.asList("Value=1", "Error=2", "Error=3", "Error=4", "Error=5", "Value=6"));
    }

This outputs:

[Value=1, Error=2, Error=3, Error=4, Error=5, Value=6]

To accomplish this however we must capture the value and wrap the Throwable in all operators that execute user-provided functions. This results in code like this:

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    onError(new OnErrorThrowable(e, t));
                }
            }

instead of this:

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    onError(e);
                }
            }

The drawback to this is that onError will now receive an OnErrorThrowable if the error comes from a user-provided function. The benefit is that the value associated with the failure is now accessible for debugging, reporting, feedback loops etc.

Operators affected by this are cast, doOnEach, filter, groupBy, map, scan, zip (and surely others) as these all take user-provided functions.

Are there reasons we should not wrap these errors inside an OnErrorThrowable?

@cloudbees-pull-request-builder

RxJava-pull-requests #823 FAILURE
Looks like there's a problem with this pull request

@benjchristensen
Copy link
Member Author

RxJava-pull-requests #823 FAILURE

These errors are known. Waiting on decision before proceeding.

@daveray
Copy link
Contributor

daveray commented Feb 18, 2014

Seems like a fairly major breaking change, at least for code that handles errors. I could see the extra info being useful, though, at least for "bug" exceptions you're not expecting. Intentional exceptions should hopefully already be conveying this info.

Anyway, seems reasonable as long as this behavior's well documented in every affected operator. Blocking operations that turn onError back into throw should also document that calling code may need to do something special to handle exceptions they may be expecting. What's another layer of exception unwrapping? :)

@jstnbckr
Copy link

This change will help address the exact use case mentioned above, when an unexpected/untrapped exception occurs in an observable sequence. Rather than shutdown the sequence, the change will allow the observable to remain 'open'. This is especially useful for 'system-oriented' use cases where the observable should not close even if an unexpected/untrapped error occurs. For example, an observable sequence processing user requests for dispatching, or an observable sequence accepting job requests for scheduling. Just because a request within the observable sequence may fail, for some unexpected reason, the 'system-oriented' sequence should remain available for future requests. Also, for debugging and troubleshooting, having the element/item that caused the exception is critical.

I found a way to pass the value down the chain without wrapping the Throwable. This way it only shows up if using `onErrorFlatMap` or looking at the final cause on any given Throwable.

A final cause will be added so Throwables end up like this:

java.lang.RuntimeException: Forced Failure
	at rx.operators.OperatorMapTest$5.call(OperatorMapTest.java:164)
	at rx.operators.OperatorMapTest$5.call(OperatorMapTest.java:1)
	at rx.operators.OperatorMap$1.onNext(OperatorMap.java:54)
	at rx.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:43)
	at rx.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:1)
	at rx.Observable$2.call(Observable.java:269)
	at rx.Observable$2.call(Observable.java:1)
	at rx.Observable$2.call(Observable.java:269)
	at rx.Observable$2.call(Observable.java:1)
	at rx.Observable.subscribe(Observable.java:7022)
	at rx.Observable.subscribe(Observable.java:6945)
	at rx.operators.OperatorMapTest.testMapWithError(OperatorMapTest.java:177)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: fail
	at rx.exceptions.OnErrorThrowable.decorate(OnErrorThrowable.java:55)
	at rx.operators.OperatorMap$1.onNext(OperatorMap.java:56)
	... 33 more

Note the final cause: Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: fail

Then when onErrorFlatMap is used, it retrieves that final cause out to create an OnErrorThrowable so people don't have to go fetch it from the cause chain.
@benjchristensen
Copy link
Member Author

I have committed a change that allows this to work without wrapping. Instead if adds a final cause to the Throwable being emitted via onError and then if onErrorFlatMap is used it can derive the value from the causal chain.

A stack trace would end up with something like this at the end of it:

Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: fail

Any problems with this approach? Is this preferred to wrapping?

@cloudbees-pull-request-builder

RxJava-pull-requests #831 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member Author

No complains so moving forward with this. Using the causal chain instead of wrapping should be low impact, not intrude on normal uses of Throwables (such as instanceof checks) while still allowing this new behavior and improved stack traces for tracking down what caused the exceptions.

benjchristensen added a commit that referenced this pull request Feb 18, 2014
@benjchristensen benjchristensen merged commit e676ddd into ReactiveX:master Feb 18, 2014
@benjchristensen benjchristensen deleted the onErrorFlatMap branch February 18, 2014 15:58
@cloudbees-pull-request-builder

RxJava-pull-requests #832 SUCCESS
This pull request looks good

jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants