Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Still problems with unsubscribed observables calling subscriber #1590

Closed
mttkay opened this issue Aug 15, 2014 · 9 comments
Closed

Still problems with unsubscribed observables calling subscriber #1590

mttkay opened this issue Aug 15, 2014 · 9 comments

Comments

@mttkay
Copy link
Contributor

mttkay commented Aug 15, 2014

So, this is a follow-up to #1407, which I thought had been fixed at first, since the crashes were gone for one particular case, but now I see crashes in our app in different places that are exactly like this. This is with RxJava 0.20-RC3.

Not wanting to jump to conclusions, but as far as I understand the fix that went in we now make sure that as soon as the "downstream" subscriber (not sure if I'm getting the terminology right here) unsubscribes from the sequence, then we unsubscribe immediately. Is that correct?

I found another case where it's still not working, and it might be related to the use of the cache operator. Again, as in the other ticket subscription, this is a fragment subscribing to an observable in onViewCreated, then unsubscribing in onDestroyView, but still receiving calls to onNext, crashing it, since it will be detached from the window at that point in time.

The only difference I could find to the case where it is now working is the use of the cache operator. Looking at it, it specifically says in the docs that the subscription returned from it will not unsubscribe from the source observable. Might that be the problem, i.e. will the fix that went in for #1407 not have any effect on this, as it attempted to fix it based on subscriber/subscription interaction?

@benjchristensen
Copy link
Member

Correct that when cache is used, the source becomes decoupled and "hot" since it is now multicasting. This means the upstream from cache will continue until termination. However, any children (observers/subscribers) subscribed to the cache will disconnect upon unsubscribe so it doesn't receive anything when the cache is populated.

as soon as the "downstream" subscriber (not sure if I'm getting the terminology right here) unsubscribes from the sequence, then we unsubscribe immediately. Is that correct?

What are you referring to when you say "we unsubscribe immediately"?

When going over async boundaries, such as observeOn there can be race conditions that allow onNext/onError events to emit to the Subscriber after it has unsubscribed. It is "best effort" across threads/networks to shut down and some items may still emit before it shuts down.

If there is an operator/subscriber sensitive to possibly receiving onNext events after an unsubscribe, it should filter/ignore them by checking isUnsubscribed(), similar to take which filters after unsubscribe to meet its contract in case the upstream has a race condition: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OperatorTake.java#L63

What about this use case is so sensitive that it crashes? Can you provide more information on the use case so I can provide more concrete guidance?

@mttkay
Copy link
Contributor Author

mttkay commented Aug 23, 2014

When going over async boundaries, such as observeOn there can be race conditions that allow onNext/onError events to emit to the Subscriber after it has unsubscribed. It is "best effort" across threads/networks to shut down and some items may still emit before it shuts down.

Thanks for clarifying. That's indeed a big issue on Android then. The problem is that unsubscribing from an observable as part of a (synchronous) life-cycle callback in Android must ensure that no more messages will arrive in the subscriber, since Android will do its clean up synchronously / immediately:

// An Activity is a screen context object in the presentation layer of an Android app
class MyActivity extends Activity {

  // this callback signals that the current screen context, and all its views, will be released.
  // this is then where an RxAndroid app would release its UI subscribers
  void onDestroy() {
     subscription.unsubscribe(); // this is asynchronous
     super.onDestroy(); // this is synchronous
  }

}

In the above code snippet, line 2 will always finish before line 1, since unsubscribe merely posts a request for unsubscription on the message looper, it does not actually unsubscribe right away. That is a problem, since Android will perform its cleanup before any subscribers get released, and this most likely results in crashes, as these subscribers might attempt to access now defunct views.

We can work around this with `OperatorConditionalBinding' which will ignore "out of band" messages that arrive after Android has released all views. I'm just not a fan of a defensive programming style, so I'm wondering if there's something that we can do to release subscribers immediately?

@mttkay
Copy link
Contributor Author

mttkay commented Aug 23, 2014

Out of interest, why does SafeSubscriber not check isUnsubscribed before forwarding notifications to its delegate?

Do you think it makes sense to add a new subscriber wrapper or base class which only forwards notifications if it's not unsubscribed from?

@dpsm
Copy link
Contributor

dpsm commented Aug 23, 2014

@mttkay what if InnerHandlerThreadScheduler.schedule(..) checked the calling Looper against the one of it's internal Handler and invoke the action synchronously? That should turn the unsubscribe() sync when called from the Android UI thread!

Let me know what you think, and I can put a quick patch together for that :)

@mttkay
Copy link
Contributor Author

mttkay commented Aug 24, 2014

@dpsm I remember we tried that before, and it caused problems with recursive scheduling, resulting in stack overflows. The scheduler implementation has changed a few times though, so this might not be valid anymore.

In fact this was attempted multiple times in different PRs. Some of the discussion you can find here:
#1102

@mttkay
Copy link
Contributor Author

mttkay commented Aug 24, 2014

That said, there might be a case to be made for optimizing unsubscribe specifically. The previous PRs were all meant as general optimizations for the case where you observeOn the main UI thread, but notifications already arrive on the UI thread, so an additional round trip through the event looper was deemed unnecessary. We decided to not do it, since it increased complexity and lessened predictability of behavior, and they were all solvable in app code anyway.

This problem goes well beyond a mere performance issue though, as it affects stability.

@dpsm
Copy link
Contributor

dpsm commented Aug 27, 2014

@mttkay regarding recursive scheduling resulting in stack overflows, the provided Action should be responsible to break the recursion and avoid the stack overflow just like any recursive call should it not? I am not sure I understand the issue so please apologize if I am not following you :)

Another alternative would be something like this:

    private ThreadLocal<Queue<Action0>> pendingActions = new ThreadLocal<Queue<Action0>>() {
            @Override
            protected Queue<Action0> initialValue() {
                return new LinkedList<Action0>();
            }
        };

        private ThreadLocal<Boolean> isCalling = new ThreadLocal<Boolean>() {
            @Override
            protected Boolean initialValue() {
                return false;
            }
        };

        @Override
        public Subscription schedule(final Action0 action) {
            final Looper handlerLooper = handler.getLooper();
            if (Looper.myLooper() == handlerLooper) {
                if (isCalling.get()) {
                    Queue<Action0> actions = pendingActions.get();
                    actions.add(action);
                } else {
                    isCalling.set(true);
                    action.call();

                    Queue<Action0> actions = pendingActions.get();
                    while(!actions.isEmpty()) {
                        actions.poll().call();
                    }
                    isCalling.set(false);
                }
                return Subscriptions.empty();
            } else {
                return schedule(action, 0, TimeUnit.MILLISECONDS);
            }
        }

thoughts?

@dpsm
Copy link
Contributor

dpsm commented Sep 10, 2014

@mttkay thoughts on my comments above? Now that the Android project is somewhat independent, I'd be happy to help you and others setting up some roadmap of things we want to work on!?

@mttkay
Copy link
Contributor Author

mttkay commented Sep 12, 2014

Hey @dpsm sorry for not staying on top of this, a bit swamped at the moment. We're still setting up RxAndroid and I'd like to get the build and project setup hurdles out of our feet first.

Meanwhile I'd like to start by moving all RxAndroid related issues over to that project. Could you repost your proposal here? ReactiveX/RxAndroid#3

Then we can close this out and move the discussion over. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants