Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable.using sliently ignores exception of dispose action #2464

Closed
ghost opened this issue Jan 17, 2015 · 25 comments
Closed

Observable.using sliently ignores exception of dispose action #2464

ghost opened this issue Jan 17, 2015 · 25 comments
Labels
Milestone

Comments

@ghost
Copy link

ghost commented Jan 17, 2015

Code:

import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;

class Main {

    public static void main(String[] args) {
        Observable.using(
                new Func0<String>() {
                    @Override
                    public String call() {
                        System.out.println("Calling resourceFactory");
                        return "resourceFactory";
                    }
                },
                new Func1<String, Observable<String>>() {
                    @Override
                    public Observable<String> call(String str) {
                        System.out.println("Calling observableFactory");
                        return Observable.just("observableFactory");
                    }
                },
                new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println("Calling disposeAction");
                        throw new RuntimeException("disposeAction");
                    }
                }
        ).subscribe(
                new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println("onNext: " + s);
                    }
                },
                new Action1<Throwable>() {
                    @Override
                    public void call(Throwable e) {
                        System.out.println("onError: " + e.getMessage());
                    }
                },
                new Action0() {
                    @Override
                    public void call() {
                        System.out.println("onComplete");
                    }
                }
        );
    }
}

Output:

Calling resourceFactory
Calling observableFactory
onNext: observableFactory
onComplete
Calling disposeAction

This program finishes without error, and the RuntimeException thrown in dispose action is silently ignored.

@akarnokd
Copy link
Member

Hi. At that point, it can't notify the subscriber because it received a termination event before. The best thing we can do is to route the exception into the plugin system.

@ghost
Copy link
Author

ghost commented Jan 17, 2015

Can it be made so that onCompleted will be called after the dispose action returns successfully? So that if the dispose action throws an error, it can be notified with onError. This will make its behaviour consistent with the rest of the error notification workflow.

@zsxwing
Copy link
Member

zsxwing commented Jan 18, 2015

@Laec, according to the Rx design guide, you should not throw an error in the dispose action:

6.18. Unsubscription should not throw
As the Rx’s composition makes that subscriptions are chained, so are unsubscriptions. Because of this, any operator can call an unsubscription at any time. Because of this, just throwing an exception will lead to the application crashing unexpectedly. As the observer instance is already unsubscribed, it cannot be used for receiving the exception either. Because of this, exceptions in unsubscriptions should be avoided.

@ghost
Copy link
Author

ghost commented Jan 18, 2015

Hi @zsxwing the issue is not whether one should or should not throw the exception, sometimes unexpected errors do occur. According to your quote, I would expect a crash, which generates good visibility, but currently it sliently ignores the error and moves on, which is bad.

@zsxwing
Copy link
Member

zsxwing commented Jan 19, 2015

Can it be made so that onCompleted will be called after the dispose action returns successfully?

I think the essential question is:

How RxJava handle exceptions from unsubscribe?

In many places, RxJava assumes unsubscribe won't throw any exception, such as OperatorTake. If unsubscribe throws some error, it may send it to onError (will be swallowed if the subscriber has already received an error), or notify the RxJava plugin, or crash the current thread.

As @akarnokd said, the best solution for RxJava may be sending the exception to the RxJava plugin.

@ghost
Copy link
Author

ghost commented Jan 19, 2015

I don't mind how it gets handled internally, as long as the default behaviour is fail fast - crash, this gives people a chance to notice the issue and fix it.

@akarnokd
Copy link
Member

How about:

class Main {

    public static void main(String[] args) {
        final AsyncSubject<Void> onFree = AsyncSubject.create();
        Observable.using(
                new Func0<String>() {
                    @Override
                    public String call() {
                        System.out.println("Calling resourceFactory");
                        return "resourceFactory";
                    }
                },
                new Func1<String, Observable<String>>() {
                    @Override
                    public Observable<String> call(String str) {
                        System.out.println("Calling observableFactory");
                        return Observable.just("observableFactory");
                    }
                },
                new Action1<String>() {
                    @Override
                    public void call(String s) {

                        System.out.println("Calling disposeAction");
                        try {
                            throw new RuntimeException("disposeAction");
                            // onFree.onCompleted();
                        } catch (Throwable ex) {
                           onFree.onError(ex);
                        }
                    }
                }
        ).mergeWith(onFree).subscribe(
                new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println("onNext: " + s);
                    }
                },
                new Action1<Throwable>() {
                    @Override
                    public void call(Throwable e) {
                        System.out.println("onError: " + e.getMessage());
                    }
                },
                new Action0() {
                    @Override
                    public void call() {
                        System.out.println("onComplete");
                    }
                }
        );
    }
}

@ghost
Copy link
Author

ghost commented Jan 19, 2015

I feel that's just a short term workaround for the issue. We can't expect users of Observable.using to do this every time (and we certainly can't expect them to be aware of this issue), it's boilerplate and awkward and even more so when you need to chain multiple Observable.using together.

@akarnokd
Copy link
Member

You can always roll your own operator so you can hide the boilerplate in a reusable fashion. Here is an example:

public final class MyUsing<T, Resource> implements OnSubscribe<T> {
    private final Func0<Resource> resourceFactory;
    private final Func1<? super Resource, ? extends Observable<? extends T>> observableFactory;
    private final Action2<? super Resource, Observer<T>> dispose;
    public MyUsing(Func0<Resource> resourceFactory,
            Func1<? super Resource, ? extends Observable<? extends T>> observableFactory,
            Action2<? super Resource, Observer<T>> dispose) {
        this.resourceFactory = resourceFactory;
        this.observableFactory = observableFactory;
        this.dispose = dispose;
    }
    @Override
    public void call(Subscriber<? super T> subscriber) {
        try {
            final Resource resource = resourceFactory.call();
            final AsyncSubject<T> disposeOutcome = AsyncSubject.create();
            subscriber.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    dispose.call(resource, disposeOutcome);
                }
            }));
            @SuppressWarnings("unchecked")
            Observable<T> observable = (Observable<T>)observableFactory.call(resource);
            observable.mergeWith(disposeOutcome).subscribe(subscriber);
        } catch (Throwable e) {
            // eagerly call unsubscribe since this operator is specifically about resource management
            subscriber.unsubscribe();
            // then propagate error
            subscriber.onError(e);
        }
    }
    public Observable<T> toObservable() {
        return Observable.create(this);
    }
}

@ghost
Copy link
Author

ghost commented Jan 19, 2015

Thanks for the suggestion, but do you agree that the underlying issue is in RxJava itself that it sliently swallows errors? If you do then I think the fix should be done to RxJava itself instead.

@davidmoten
Copy link
Collaborator

Note that without addressing the core issue of RxJava silently swallowing errors, the eager disposal overload for using proposed in #2759 would emit an onError event on disposal failure if disposeEagerly is set to true.

@akarnokd
Copy link
Member

This issue is a bit aged, what is the verdict?

@davidmoten
Copy link
Collaborator

Looks to me that we should do these things:

  • document in using javadoc that disposal should not throw

  • call an RxJava plugin when disposal does throw (I'm not familiar with this side of things)

    I can knock up a PR with these if someone has some RxJava plugin advice for me.

@zsxwing
Copy link
Member

zsxwing commented Aug 13, 2015

Looks it's a very simple fix to call the RxJava plugin: just replace this line

with

                try {
                    unsubscribe();
                } catch (Throwable e) {
                    _onError(e);
                }

@davidmoten
Copy link
Collaborator

Thanks @zsxwing, that's nice and simple. @akarnokd happy to proceed with this?

@akarnokd
Copy link
Member

That would call onError after an onCompleted (successful and unsuccessful) so even in the current form it violates the protocol.

@davidmoten
Copy link
Collaborator

Yep, you're right. So I'll just need code like this in the catch on the dispose action in using?

    try {
        RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
    } catch (Throwable e) {
        e.printStackTrace();
    }

@davidmoten
Copy link
Collaborator

Stepping back a bit, we probably should put the catch in SafeSubscriber , don't call _onError(e) but rather call the plugin handler like _onError does at the start of its method. i.e.

try {
    unsubscribe();
} catch (Throwable e) {
    try {
        RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
    } catch (Throwable pluginException) {
        handlePluginException(pluginException);
    }
}

@davidmoten
Copy link
Collaborator

By the way, the change to SafeSubscriber would only work for the using scenario if unsubscription all the way up the chain is synchronous. Is this an assumption we make and should it be documented in the Observable contract?

@zsxwing
Copy link
Member

zsxwing commented Aug 14, 2015

If unsubscription is asynchronous and SafeSubscriber won't swallow it, it should be caught by the user or handled by the thread's UncaughtExceptionHandler. Right? Looks it's acceptable.

@davidmoten
Copy link
Collaborator

Yeah I agree @zsxwing. If some Operator is introduced into the chain that unsubscribes from upstream asynchronously then it becomes its responsibility to handle exceptions (by for example calling the RxJavaPlugin error handler). So I can knock up a PR for this or would you like to do it @zsxwing as it was your fix?

@zsxwing
Copy link
Member

zsxwing commented Aug 14, 2015

@davidmoten feel free to send a PR :)

@davidmoten
Copy link
Collaborator

I've submitted a fix for SafeSubscriber as described but I realize it only fixes part of the problem being that we are now covered for when unsubscribe is called after completion. If unsubscribe is called before termination we still need using to report an error to the RxJavaPlugin error handler itself. I'll submit another PR for that one.

@davidmoten
Copy link
Collaborator

Still thinking about this. I suspect post termination unsubscribe failures are the ones that need RxJavaPlugin error handler notifications and no special handling is required in using if the SafeSubscriber fix goes in. In the case of pre-termination unsubscribe, intermediate or endpoint Subscribers will in general throw from an onNext, onError, or onCompleted or catch the exception and push it downstream as an onError signal so that ultimately this failure is not lost.

@benjchristensen benjchristensen added this to the 1.0.x milestone Aug 28, 2015
@akarnokd
Copy link
Member

akarnokd commented Feb 9, 2016

The using operator now has an eager flag that will close the resource before emitting the terminal event, thus you may get the exception from closing the resource.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants