Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ObserveOn Fixes #602

Merged
merged 1 commit into from
Dec 13, 2013
Merged

Conversation

benjchristensen
Copy link
Member

  • refactor to remove imperative assignment of innerScheduler for clearer concurrency and data access
  • fix subscription leak (Composite+MultipleAssignment instead of just Composite)
  • remove confusing conditional logic for inner/outer scheduler

@cloudbees-pull-request-builder

RxJava-pull-requests #538 ABORTED

@headinthebox
Copy link
Contributor

@akarnokd good question, but in your code, you would call onError on the same observer. Which is strange, and then the call to onError can fail itself.

What you propose is when the onNext of an observer in a subscribe fails, you would call the onError of that same observer.

@akarnokd
Copy link
Member

@headinthebox True. Maybe the not.accept needs to be altered or left out entirely and not switch-case'd here with the proper handling of onError throws case.

@benjchristensen
Copy link
Member Author

Generally onNext errors are left to the outer SafeObserver wrapper to handle and only user-provided functions are directly handled. There may be an edge-case to make this work though since we're hopping threads. I'll need to play with some unit tests.

See here for where the on* methods are handled: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/Observable.java#L235

Here is the SafeObserver handling the onNext failure: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/SafeObserver.java#L125

And then if onError fails it handles it as well: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/SafeObserver.java#L89

This is not logic we want to replicate everywhere, nor should we need to do this at every layer of wrapping, which is why we only add it to externally provided Observer instances.

@benjchristensen
Copy link
Member Author

This unit test suggests that the error handling is working okay due to SafeObserver wrapping the user provided Observer:

    /**
     * Test that an error from a user provided Observer.onNext is handled and emitted to the onError
     * even when done across thread boundaries with observeOn
     */
    @Test
    public void testOnNextErrorAcrossThread() throws InterruptedException {
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicReference<Throwable> caughtError = new AtomicReference<Throwable>();
        Observable<Long> o = Observable.interval(50, TimeUnit.MILLISECONDS);
        Observer<Long> observer = new Observer<Long>() {

            @Override
            public void onCompleted() {
                System.out.println("completed");
                latch.countDown();
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("error: " + e);
                caughtError.set(e);
                latch.countDown();
            }

            @Override
            public void onNext(Long args) {
                throw new RuntimeException("forced failure");
            }
        };
        o.observeOn(Schedulers.newThread()).subscribe(observer);

        latch.await(2000, TimeUnit.MILLISECONDS);
        assertNotNull(caughtError.get());
    }

Is there some use case I'm missing?

@cloudbees-pull-request-builder

RxJava-pull-requests #545 ABORTED

@headinthebox
Copy link
Contributor

A bit unexpected. In RxJava

val xs = Observable.from(1 to 200, NewThreadScheduler()).subscribe(
x => { throw new Exception("boom") },
e => { println(e.getMessage); throw e; }
)

prints "boom"

In .NET it throws the exception on the worker thread

http://social.msdn.microsoft.com/Forums/en-US/ee5dba5d-eea9-4d85-8f58-c2e1c71ef33a/about-the-contracts-in-rx-exceptions-and-blocking?forum=rx

http://blogs.msdn.com/b/rxteam/archive/2012/06/20/reactive-extensions-v2-0-release-candidate-available-now.aspx

@benjchristensen
Copy link
Member Author

Which is unexpected, .Net or Java behavior?

@headinthebox
Copy link
Contributor

The Java behavior. You cannot totally guard against exceptions thrown in user-supplied subscribe. What happens when an exception is thrown in onCompleted or in onError. On in when the onError that get invoked because the onNext threw throws.

(Similarly when you inside Observable.create(...) you have to catch exceptions inside closures)

@benjchristensen
Copy link
Member Author

It correctly will throw on the thread if the onError handler also fails.

If they have a valid onError handler why would we not want to call it? That's completely unexpected for an error to be thrown when they have provided a valid onError handler, thus we always try and pass to onError and only if that also fails then throw on whatever random thread it's on.

See here: https://github.com/Netflix/RxJava/blob/ec88d58a5fd4159c7b5392f47b40db2e7a2c57b8/rxjava-core/src/main/java/rx/operators/SafeObserver.java#L88

@headinthebox
Copy link
Contributor

IMHO it give a false sense of security and it is unpredictable.

When you see xs.subscribe(...) it may (a) call onError, (b) swallow the exception, (c) throw on a random thread, or (d) (a) and (b), or (e) (a) and (c).

Which is not any better than not doing anything special, since then you have the same set of outcomes.

@benjchristensen
Copy link
Member Author

Yes it may result in that if the user implemented both a bad onNext and onError but that is a valid case for unexpected behavior. We found in production that it was a real surprise and problem when onNext errors weren't propagated to onError because we teach everyone that when using Rx all errors will be propagated to onError.

Also, when people do things right and push side-effects into the subscribe onNext, it's easy to get failures there but very rarely is their onError handler complex enough to also fail.

In short, catching an onNext failure and propagating to onError is the 'least surprising' way of propagating errors and handles the most common cases. If someone's inError handler is also broken we have no choice but to throw but then do so with a message that very clearly states how bad that is and what happened.

I'm going to proceed with this merge and we can discuss 'SafeObserver' behavior elsewhere as it is a different topic than this.

@benjchristensen
Copy link
Member Author

No I'm not going to merge ... Unit tests aren't passing on this but they are on the master branch :-)

@headinthebox
Copy link
Contributor

Go ahead.

@benjchristensen
Copy link
Member Author

This change has caused non-determinism with the following unit tests:

rx.operators.OperationParallelTest > testParallel
rx.operators.OperationParallelMergeTest > testNumberOfThreads
rx.operators.OperationMergeDelayErrorTest > testCompositeErrorDelayed2
rx.operators.OperationParallelMergeTest > testNumberOfThreadsOnScheduledMerge
rx.operators.OperationGroupByTest > testError

- fix subscription leak (Composite+MultipleAssignment instead of just Composite)
- add unit tests
@benjchristensen
Copy link
Member Author

I have revised this pull request. The previous update was accidentally subscribing to the source Observable on the new thread instead of only observing on the new thread.

I reverted the code, added the unit tests and then refactored to just fix the Subscription leak. Otherwise the structure is similar as before.

@cloudbees-pull-request-builder

RxJava-pull-requests #552 SUCCESS
This pull request looks good

benjchristensen added a commit that referenced this pull request Dec 13, 2013
@benjchristensen benjchristensen merged commit cf494e8 into ReactiveX:master Dec 13, 2013
@benjchristensen benjchristensen deleted the observeOn branch December 13, 2013 07:32
rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants