Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

subscribeOn + groupBy #869

Merged
merged 12 commits into from
Feb 14, 2014

Conversation

benjchristensen
Copy link
Member

Some changes on top of #864 as part of work on #844.

Primarily I did two things here:

  • get unsubscribe working again via subscribeOn for synchronous Observables
  • leverage new subscribeOn behavior to make the groupBy unit tests work

@benjchristensen
Copy link
Member Author

@akarnokd Thanks for the excellent work in #864.

Even though this doesn't solve it for all cases, it at least gives us the ability to solve the time gap problem in specific uses.

Any reason not to proceed with merging this into the codebase?

*
* See https://github.com/Netflix/RxJava/issues/844 for more information.
*/
return dontLoseEvents
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still needs decision as currently GroupedObservable will be buffered in unlimited mode with the subscribeOn(Scheduler) overload. I favor explicitly requested buffering instead of type-based buffering.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree, it just means groupBy is dangerous for anyone to use subscribeOn with.

- I want to have it for internal usage but am not ready to publicly expose it.
- the previously performed observeOn changes appear to have resolved the non-determinism
- timeout test could be interrupted when unsubscribed
- groupBy.subscribeOn needs blocking buffer
@benjchristensen
Copy link
Member Author

@zsxwing Can you take a look at this unit test? I'm not sure if it's a legit problem or not. When timeout happens it calls unsubscribe which can result in the work being interrupted. This in turn causes it to not pass this test, but the test may just need to account for work possibly being interrupted when a timeout occurs. I'd appreciate your review.

rx.operators.OperatorTimeoutWithSelectorTest > testTimeoutSelectorWithTimeoutAndOnNextRaceCondition FAILED
    org.mockito.exceptions.verification.VerificationInOrderFailure at OperatorTimeoutWithSelectorTest.java:421

See changes I did here to not block, but it sometimes still fails: benjchristensen@54b19be#diff-381ec8d12950bcd8ddf7e4751875d74fR349

@zsxwing
Copy link
Member

zsxwing commented Feb 14, 2014

@benjchristensen Where is the InterruptedException from? When I was writing this test, I assume that there is no InterruptedException. So I don't handle the InterruptedException.

@cloudbees-pull-request-builder

RxJava-pull-requests #803 ABORTED

@benjchristensen
Copy link
Member Author

It's from the line: benjchristensen@43437fe#diff-a726afb863d72e4bd1938fcc01f443b3R101

The scheduler is unsubscribed which will cancel a future (when bread based) that interrupts the thread.

@benjchristensen
Copy link
Member Author

I don't know what is causing the tests to abort. Other than this timeout one being discussed all others pass on my machine.

@zsxwing
Copy link
Member

zsxwing commented Feb 14, 2014

The following codes which try to dispatch the unsubscribe to the scheduler have been removed.
benjchristensen@a394a7d#diff-a726afb863d72e4bd1938fcc01f443b3L106
I'm curious why.

…test to be consistent with the new subscribeOn
@zsxwing
Copy link
Member

zsxwing commented Feb 14, 2014

I sent a PR to fix this test: benjchristensen#7

@benjchristensen
Copy link
Member Author

The following codes which try to dispatch the unsubscribe

Because no one can give me a good reason for why the unsubscribe should be scheduled on the scheduler rather than execute directly. The unsubscribe is coming from the opposite direction and scheduling doesn't work going backwards.

For example, someone could write code like this (not necessarily for good reason):

  1. Observable.create(..) => emits on Thread[RxNewThreadScheduler-1,5,main], receives unsubscribe from Thread[RxNewThreadScheduler-2,5,main]
  2. map(...) => Thread[RxNewThreadScheduler-1,5,main]
  3. subscribeOn(newThread-A)
  4. observeOn(newThread-B)
  5. take => unsubscribes on Thread[RxNewThreadScheduler-2,5,main]
  6. flatMap => Thread[RxNewThreadScheduler-2,5,main]
  7. subscribe => receives on Thread[RxNewThreadScheduler-2,5,main]

Note how the unsubscribe is received on a different thread than is used to emit data. We can't schedule back on to that other thread, because scheduling goes down not up. Thus, I see no point in scheduling an unsubscribe on the thread going down when the event is being passed up, and there could be any number of subscribeOn or observeOn in a sequence.

Considering this, why should we schedule the unsubscribe on the thread meant for emitting onNext, onError and onCompleted?


public static void main(String args[]) {
        TestSubscriber<String> ts = new TestSubscriber<String>(new Observer<String>() {

            @Override
            public void onCompleted() {
                System.out.println("OnCompleted");
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(String t) {
                System.out.println("Received: " + t + " on thread: " + Thread.currentThread());
            }

        });
        Observable.create(new OnSubscribe<Integer>() {

            @Override
            public void call(Subscriber<? super Integer> s) {
                System.out.println("OnSubscribed on thread: " + Thread.currentThread());
                s.add(Subscriptions.create(new Action0() {

                    @Override
                    public void call() {
                        System.out.println("Unsubscribe received on thread: " + Thread.currentThread());
                    }

                }));
                for (int i = 0; !s.isUnsubscribed(); i++) {
                    s.onNext(i);
                }
            }

        }).map(new Func1<Integer, String>() {

            @Override
            public String call(Integer i) {
                System.out.println("Map function on thread: " + Thread.currentThread());
                return "value-" + i;
            }

        }).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).take(10).flatMap(new Func1<String, Observable<String>>() {

            @Override
            public Observable<String> call(String s) {
                System.out.println("FlatMap function on thread: " + Thread.currentThread());
                return Observable.from("network-result+" + s);
            }

        }).subscribe(ts);

        ts.awaitTerminalEvent();
    }

Output is:

OnSubscribed on thread: Thread[RxNewThreadScheduler-1,5,main]
Map function on thread: Thread[RxNewThreadScheduler-1,5,main]
Map function on thread: Thread[RxNewThreadScheduler-1,5,main]
Map function on thread: Thread[RxNewThreadScheduler-1,5,main]
FlatMap function on thread: Thread[RxNewThreadScheduler-2,5,main]
Received: network-result+value-0 on thread: Thread[RxNewThreadScheduler-2,5,main]
Map function on thread: Thread[RxNewThreadScheduler-1,5,main]
FlatMap function on thread: Thread[RxNewThreadScheduler-2,5,main]
Received: network-result+value-1 on thread: Thread[RxNewThreadScheduler-2,5,main]
FlatMap function on thread: Thread[RxNewThreadScheduler-2,5,main]
Map function on thread: Thread[RxNewThreadScheduler-1,5,main]
Received: network-result+value-2 on thread: Thread[RxNewThreadScheduler-2,5,main]
FlatMap function on thread: Thread[RxNewThreadScheduler-2,5,main]
Map function on thread: Thread[RxNewThreadScheduler-1,5,main]
Received: network-result+value-3 on thread: Thread[RxNewThreadScheduler-2,5,main]
FlatMap function on thread: Thread[RxNewThreadScheduler-2,5,main]
Map function on thread: Thread[RxNewThreadScheduler-1,5,main]
Received: network-result+value-4 on thread: Thread[RxNewThreadScheduler-2,5,main]
FlatMap function on thread: Thread[RxNewThreadScheduler-2,5,main]
Map function on thread: Thread[RxNewThreadScheduler-1,5,main]
Received: network-result+value-5 on thread: Thread[RxNewThreadScheduler-2,5,main]
FlatMap function on thread: Thread[RxNewThreadScheduler-2,5,main]
Map function on thread: Thread[RxNewThreadScheduler-1,5,main]
Received: network-result+value-6 on thread: Thread[RxNewThreadScheduler-2,5,main]
FlatMap function on thread: Thread[RxNewThreadScheduler-2,5,main]
Map function on thread: Thread[RxNewThreadScheduler-1,5,main]
Received: network-result+value-7 on thread: Thread[RxNewThreadScheduler-2,5,main]
FlatMap function on thread: Thread[RxNewThreadScheduler-2,5,main]
Map function on thread: Thread[RxNewThreadScheduler-1,5,main]
Received: network-result+value-8 on thread: Thread[RxNewThreadScheduler-2,5,main]
FlatMap function on thread: Thread[RxNewThreadScheduler-2,5,main]
Map function on thread: Thread[RxNewThreadScheduler-1,5,main]
Received: network-result+value-9 on thread: Thread[RxNewThreadScheduler-2,5,main]
OnCompleted
Unsubscribe received on thread: Thread[RxNewThreadScheduler-2,5,main]

@benjchristensen
Copy link
Member Author

I actually wonder if the issue is more related to observeOn.

@akarnokd
Copy link
Member

I thought about this. This might be an artifact of the .NET's threads, pools, etc. The closest example I could come up was the Swing scheduler where subscription should happen on the EDT and unsubscription as well, but Swing's addlistener/removelistener methods are generally thread-safe.

@benjchristensen
Copy link
Member Author

Here is an example without subscribeOn or observeOn and I'm curious how it should behave.

    public static void main(String args[]) {

        TestSubscriber<Integer> ts = new TestSubscriber<Integer>(new Observer<Integer>() {

            @Override
            public void onCompleted() {
                System.out.println("OnCompleted");
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }

            @Override
            public void onNext(Integer t) {
                System.out.println("Received: " + t + " on thread: " + Thread.currentThread());
            }

        });
        Subscription s = Observable.create(new OnSubscribe<Integer>() {

            @Override
            public void call(final Subscriber<? super Integer> s) {
                Schedulers.newThread().schedule(new Action1<Inner>() {

                    @Override
                    public void call(Inner t1) {
                        System.out.println("OnSubscribed on thread: " + Thread.currentThread());
                        s.add(Subscriptions.create(new Action0() {

                            @Override
                            public void call() {
                                System.out.println("Unsubscribe received on thread: " + Thread.currentThread());
                            }

                        }));
                        for (int i = 0; !s.isUnsubscribed(); i++) {
                            s.onNext(i);
                        }
                    }

                });
            }

        }).subscribe(ts);

        s.unsubscribe();

        ts.awaitTerminalEvent();

    }

It receives the unsubscribe from the main thread:

OnSubscribed on thread: Thread[RxNewThreadScheduler-1,5,main]
Unsubscribe received on thread: Thread[main,5,main]
Received: 0 on thread: Thread[RxNewThreadScheduler-1,5,main]

@benjchristensen
Copy link
Member Author

If someone is using the Rx provided Subscribers then everything is thread-safe. If they are implementing their own and modifying a mutable variable when they receive the unsubscribe, then it could be an issue.

Add timeout to CoundDownLatch, ignore InterruptException and fix the tes...
@cloudbees-pull-request-builder

RxJava-pull-requests #805 SUCCESS
This pull request looks good

@headinthebox
Copy link
Contributor

It does come from .NET and binding events

You must do button += clickHandler and button -= clickHandler on the UI thread, or things will blow up. I guess that might be true in Swing/JavaFx as well.

The only reason .NET Rx has subscribeOn is to ensure that subscribe and unsubscribe happen on the specified scheduler.

benjchristensen added a commit that referenced this pull request Feb 14, 2014
@benjchristensen benjchristensen merged commit b9fe278 into ReactiveX:master Feb 14, 2014
@benjchristensen benjchristensen deleted the subscribeOn+groupBy branch February 14, 2014 20:01
@zsxwing
Copy link
Member

zsxwing commented Feb 15, 2014

From the codes of javax.swing.event.EventListenerList which is used by Swing components here: http://www.grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/7u40-b43/javax/swing/event/EventListenerList.java , only add and remove are protected by synchronized. I feel there may be some concurrent issues if calling add and remove out of EDT.

@zsxwing
Copy link
Member

zsxwing commented Feb 16, 2014

According to the doc here http://docs.oracle.com/javase/tutorial/uiswing/concurrency/dispatch.html

Swing event handling code runs on a special thread known as the event dispatch thread. Most code that invokes Swing methods also runs on this thread. This is necessary because most Swing object methods are not "thread safe": invoking them from multiple threads risks thread interference or memory consistency errors. Some Swing component methods are labelled "thread safe" in the API specification; these can be safely invoked from any thread. All other Swing component methods must be invoked from the event dispatch thread. Programs that ignore this rule may function correctly most of the time, but are subject to unpredictable errors that are difficult to reproduce.

and the Swing's Threading Policy here http://docs.oracle.com/javase/7/docs/api/javax/swing/package-summary.html

In general Swing is not thread safe. All Swing components and related classes, unless otherwise documented, must be accessed on the event dispatching thread.

I think addXXXlistener/removeXXXlistener is not thread-safe out of the EDT.

@headinthebox
Copy link
Contributor

I think addXXXlistener/removeXXXlistener is not thread-safe out of the EDT.

Same in .NET.

benjchristensen added a commit to benjchristensen/RxJava that referenced this pull request Feb 17, 2014
Working with @headinthebox based on discussions at ReactiveX#869 and ReactiveX#880 (comment) we determined that there are times when `unsubscribeOn` behavior is needed.

The `subscribeOn` operator can not mix `subscribe` and `unsubscribe` scheduling behavior without breaking the `lift`/`Subscriber` behavior that allows unsubscribing synchronous sources. The newly added `unsubscribeOn` operator will not work with synchronous unsubscribes, but it will work for the targeted use cases such as UI event handlers.
benjchristensen added a commit to benjchristensen/RxJava that referenced this pull request Feb 17, 2014
Working with @headinthebox based on discussions at ReactiveX#869 and ReactiveX#880 (comment) we determined that there are times when `unsubscribeOn` behavior is needed.

The `subscribeOn` operator can not mix `subscribe` and `unsubscribe` scheduling behavior without breaking the `lift`/`Subscriber` behavior that allows unsubscribing synchronous sources. The newly added `unsubscribeOn` operator will not work with synchronous unsubscribes, but it will work for the targeted use cases such as UI event handlers.
@benjchristensen
Copy link
Member Author

See #890 for work done with @headinthebox that resulted in simplification of subscribeOn and a new operator unsubscribeOn.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants