Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable.interval(1, 1, TimeUnit.SECONDS, mainThread()) continues execution if unsubscribed from onNext #214

Closed
konmik opened this issue Aug 8, 2015 · 7 comments

Comments

@konmik
Copy link

konmik commented Aug 8, 2015

Subj.

Reproduction:

    final AtomicReference<Subscription> subscriptionAtomicReference = new AtomicReference<>();
    subscriptionAtomicReference.set(Observable.interval(1, 1, TimeUnit.SECONDS, mainThread())
        .doOnNext(new Action1<Long>() {
            @Override
            public void call(Long value) {
                Log.v("INTERVAL", "" + value);
                subscriptionAtomicReference.get().unsubscribe();
            }
        })
        .subscribe());

Output:

INTERVAL﹕ 0
INTERVAL﹕ 1
INTERVAL﹕ 2
INTERVAL﹕ 3
etc.

Workaround:

    final AtomicReference<Subscription> subscriptionAtomicReference = new AtomicReference<>();
    subscriptionAtomicReference.set(Observable.interval(1, 1, TimeUnit.SECONDS)
        .observeOn(mainThread())
        .doOnNext(new Action1<Long>() {
            @Override
            public void call(Long value) {
                Log.v("INTERVAL", "" + value);
                subscriptionAtomicReference.get().unsubscribe();
            }
        })
        .subscribe());

Output:

INTERVAL﹕ 0

@konmik konmik changed the title Observable.interval(0, 1, TimeUnit.SECONDS, mainThread()) continues execution if unsubscribed from onNext Observable.interval(1, 1, TimeUnit.SECONDS, mainThread()) continues execution if unsubscribed from onNext Aug 8, 2015
@dlew
Copy link
Collaborator

dlew commented Aug 9, 2015

Interesting. It looks like we aren't checking for unsubscription before scheduling new work, which (at least according to akanorkd) seems to be a good idea.

I'll work on a PR; I've already got a fix (it's simple) but I'm trying to figure out the best way to test it.

@JakeWharton
Copy link
Member

Fixed on master and now available in 1.0.1-SNAPSHOT

@akarnokd
Copy link
Member

akarnokd commented Aug 9, 2015

I've checked the patch and it doesn't completely solve the problem. If there is an usubscription race between the new isUnsubscribed() check and the actual scheduling of the task, you have the same problem.

Maybe I haven't properly expressed this in the relevant blog post, but you need to schedule first then register the remove action with the ScheduledAction. This way, even if the task gets scheduled despite the worker is unsubscribed, it will be cancelled right away. In the current form, if the unsubscription triggers the remove, the task is not scheduled yet and nothing happens, then the task gets scheduled.

The correct way to do this:

public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            if (compositeSubscription.isUnsubscribed()) {
                return Subscriptions.unsubscribed();
            }

            action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action);

            final ScheduledAction scheduledAction = new ScheduledAction(action);
            scheduledAction.addParent(compositeSubscription);
            compositeSubscription.add(scheduledAction);

            handler.postDelayed(scheduledAction, unit.toMillis(delayTime));

            scheduledAction.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    handler.removeCallbacks(scheduledAction);
                }
            }));

            return scheduledAction;
        }

Unrelated but:

subscriptionAtomicReference.get().unsubscribe();

The scheduled task may get executed before the subscriptionAtomicReference gets set in the original thread causing an NPE at this line.

@dlew
Copy link
Collaborator

dlew commented Aug 9, 2015

The truth is that I haven't gotten to that post yet. (It's a lot of information to digest so I've been taking them slowly.)

I highly appreciate the help. I'll submit another PR with the correct ordering. Do you know if there's a way to test this race condition?

@dlew
Copy link
Collaborator

dlew commented Aug 9, 2015

@akarnokd Actually, another question (while we're on correctness). The linked post wraps the ScheduledAction with another Runnable that checks for isUnsubscribed() - would that be correct here? I.e.:

public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
    if (compositeSubscription.isUnsubscribed()) {
        return Subscriptions.unsubscribed();
    }

    action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action);

    final ScheduledAction scheduledAction = new ScheduledAction(action);
    scheduledAction.addParent(compositeSubscription);
    compositeSubscription.add(scheduledAction);

    final Runnable r = new Runnable() {
        @Override
        public void run() {
            if (!scheduledAction.isUnsubscribed()) {
                scheduledAction.run();
            }
        }
    };

    handler.postDelayed(r, unit.toMillis(delayTime));

    scheduledAction.add(Subscriptions.create(new Action0() {
        @Override
        public void call() {
            handler.removeCallbacks(r);
        }
    }));

    return scheduledAction;
}

@akarnokd
Copy link
Member

akarnokd commented Aug 9, 2015

Add a scheduler hook that unsubscribes the worker and schedule a task with 100ms delay. The old code will execute this task and my new code won't.

@akarnokd
Copy link
Member

akarnokd commented Aug 9, 2015

It depends on how eager you want to be by not executing a task that got cancelled just before it started to run. I assume the call to removeCallbacks is O(n) because r could be anywhere even in a priority queue. If r is next, there is only a tiny overlap possibility of unsubscription and running so isUnsubscribed() check might get past it. If r is far behind and delayed, I'd think the remove succeeds way before r gets to run.

In RxJava, the threadpools use CAS to cancel a future atomically. If that succeeds, the task won't run. If it fails, another CAS enters an interrupting state and then interrupts the thread. Since you don't have the ability check for isUnsubscribed() in the actual task anyway, it doesn't really matter if you check isUnsubscribed() and is more likely it evaluates to false 99.9% of the time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants