Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplementation of Concat, improved handling of Observable<Observable<T... #205

Merged
merged 4 commits into from
Apr 18, 2013

Conversation

abliss
Copy link
Contributor

@abliss abliss commented Mar 22, 2013

...>>.

The old version required all of the Observables to be generated and buffered
before the concat could begin. If the outer Observable was asynchronous, items
could be dropped (test added). The new version passes the test, and does the
best job I could (after examining several possible strategies) of achieving
clear and consistent semantics in accordance with the principle of least
surprise.

(My attempt to fix issue #202)

…e<T>>.

The old version required all of the Observable<T>s to be generated and buffered
before the concat could begin.  If the outer Observable was asynchronous, items
could be dropped (test added).  The new version passes the test, and does the
best job I could (after examining several possible strategies) of achieving
clear and consistent semantics in accordance with the principle of least
surprise.
@cloudbees-pull-request-builder

RxJava-pull-requests #45 ABORTED

@billyy
Copy link
Contributor

billyy commented Mar 25, 2013

There is another issue with this implementation. The countdown latch will always block the calling thread, which is not desirable. Anything that returns an observable should never be blocking. Rx is meant to support composition of operations and allows each operation to run concurrently.

@@ -2,5 +2,5 @@ rootProject.name='rxjava'
include 'rxjava-core', \
'language-adaptors:rxjava-groovy', \
'language-adaptors:rxjava-jruby', \
'language-adaptors:rxjava-clojure', \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this file get changed for the pull request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, sorry, I think I pushed from the wrong branch. I will fix this (and
the busted unit-test).
On Mar 26, 2013 4:43 PM, "Ben Christensen" notifications@github.com wrote:

In settings.gradle:

@@ -2,5 +2,5 @@ rootProject.name='rxjava'
include 'rxjava-core',
'language-adaptors:rxjava-groovy',
'language-adaptors:rxjava-jruby',
-'language-adaptors:rxjava-clojure', \

Why did this file get changed for the pull request?


Reply to this email directly or view it on GitHubhttps://github.com//pull/205/files#r3539571
.

@benjchristensen
Copy link
Member

While reviewing this I started playing with the unit tests and added 2 new ones. I also made them use inOrder.verify so they ensure correct ordering. I've pasted it below so you can merge into your branch instead of me complicating the merge with another branch.

The tests pass pass except testConcatUnsubscribe which deadlocks. It does this because a List won't return the subscription asynchronously so it can't unsubscribe meanwhile a countDownLatch is waiting for the change to trigger and the two latches end up waiting on each other.

I haven't thought through enough yet to determine if there is a way to solve that or if the unit test is just testing the wrong thing and thus a bad test.

The comment above from @billy about CountDownLatch originated from a discussion between him and I - but while reviewing this it is probably the right approach.

I was playing with other approaches that are purely non-blocking while retaining the correct concat sequential behavior but they all seem to require either:

  • another wrapping thread
  • stealing work from one of the child Observable threads and making it do work that is queued on the others ... and that seems like a bad idea even though it can be hacked to "work" but it does so in a way that is very unexpected and thus not something I want to pursue
  • queueing all onNext values which could be a bad memory issue ... and also means we eagerly subscribe which is not what we want

My biggest issue right now is that concat(o1, o2) is a very common usage but that results in List which will be blocking and unsubscribe doesn't work.

I'll think through this more but I'd appreciate your thoughts on how to handle testConcatUnsubscribe.

    public static class UnitTest {

        @Test
        public void testConcat() {
            @SuppressWarnings("unchecked")
            Observer<String> observer = mock(Observer.class);

            final String[] o = { "1", "3", "5", "7" };
            final String[] e = { "2", "4", "6" };

            final Observable<String> odds = Observable.toObservable(o);
            final Observable<String> even = Observable.toObservable(e);

            @SuppressWarnings("unchecked")
            Observable<String> concat = Observable.create(concat(odds, even));
            concat.subscribe(observer);

            verify(observer, times(7)).onNext(anyString());
        }

        @Test
        public void testConcatWithList() {
            @SuppressWarnings("unchecked")
            Observer<String> observer = mock(Observer.class);

            final String[] o = { "1", "3", "5", "7" };
            final String[] e = { "2", "4", "6" };

            final Observable<String> odds = Observable.toObservable(o);
            final Observable<String> even = Observable.toObservable(e);
            final List<Observable<String>> list = new ArrayList<Observable<String>>();
            list.add(odds);
            list.add(even);
            Observable<String> concat = Observable.create(concat(list));
            concat.subscribe(observer);

            verify(observer, times(7)).onNext(anyString());
        }

        @Test
        public void testConcatUnsubscribe() {
            final CountDownLatch callOnce = new CountDownLatch(1);
            final CountDownLatch okToContinue = new CountDownLatch(1);
            final TestObservable<String> w1 = new TestObservable<String>("one", "two", "three");
            final TestObservable<String> w2 = new TestObservable<String>(callOnce, okToContinue, "four", "five", "six");

            @SuppressWarnings("unchecked")
            Observer<String> aObserver = mock(Observer.class);
            @SuppressWarnings("unchecked")
            Observable<String> concat = Observable.create(concat(w1, w2));
            System.out.println("before subscribe");
            Subscription s1 = concat.subscribe(aObserver);
            System.out.println("after subscribe");
            try {
                //Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once.
                System.out.println("before callOnce.await");
                callOnce.await();
                System.out.println("after callOnce.await");
                s1.unsubscribe();
                System.out.println("after s1.unsubscribe");
                //Unblock the observable to continue.
                okToContinue.countDown();
                System.out.println("after countDown");
                w1.t.join();
                w2.t.join();
            } catch (Exception e) {
                e.printStackTrace();
                fail(e.getMessage());
            }

            InOrder inOrder = inOrder(aObserver);
            inOrder.verify(aObserver, times(1)).onNext("one");
            inOrder.verify(aObserver, times(1)).onNext("two");
            inOrder.verify(aObserver, times(1)).onNext("three");
            inOrder.verify(aObserver, times(1)).onNext("four");
            inOrder.verify(aObserver, never()).onNext("five");
            inOrder.verify(aObserver, never()).onNext("six");
            inOrder.verify(aObserver, times(1)).onCompleted();
        }

        @Test
        public void testMergeObservableOfObservables() {
            @SuppressWarnings("unchecked")
            Observer<String> observer = mock(Observer.class);

            final String[] o = { "1", "3", "5", "7" };
            final String[] e = { "2", "4", "6" };

            final Observable<String> odds = Observable.toObservable(o);
            final Observable<String> even = Observable.toObservable(e);

            Observable<Observable<String>> observableOfObservables = Observable.create(new Func1<Observer<Observable<String>>, Subscription>() {

                @Override
                public Subscription call(Observer<Observable<String>> observer) {
                    // simulate what would happen in an observable
                    observer.onNext(odds);
                    observer.onNext(even);
                    observer.onCompleted();

                    return new Subscription() {

                        @Override
                        public void unsubscribe() {
                            // unregister ... will never be called here since we are executing synchronously
                        }

                    };
                }

            });
            Observable<String> concat = Observable.create(concat(observableOfObservables));
            concat.subscribe(observer);
            verify(observer, times(7)).onNext(anyString());
        }

        /**
         * Simple concat of 2 asynchronous observables ensuring it emits in correct order.
         */
        @SuppressWarnings("unchecked")
        @Test
        public void testSimpleAsyncConcat() {
            Observer<String> observer = mock(Observer.class);

            TestObservable<String> o1 = new TestObservable<String>("one", "two", "three");
            TestObservable<String> o2 = new TestObservable<String>("four", "five", "six");

            Observable.concat(o1, o2).subscribe(observer);

            try {
                // wait for async observables to complete
                o1.t.join();
                o2.t.join();
            } catch (Exception e) {
                throw new RuntimeException("failed waiting on threads");
            }

            InOrder inOrder = inOrder(observer);
            inOrder.verify(observer, times(1)).onNext("one");
            inOrder.verify(observer, times(1)).onNext("two");
            inOrder.verify(observer, times(1)).onNext("three");
            inOrder.verify(observer, times(1)).onNext("four");
            inOrder.verify(observer, times(1)).onNext("five");
            inOrder.verify(observer, times(1)).onNext("six");
        }

        /**
         * Test an async Observable that emits more async Observables
         */
        @SuppressWarnings("unchecked")
        @Test
        public void testNestedAsyncConcat() throws Exception {
            Observer<String> observer = mock(Observer.class);

            final TestObservable<String> o1 = new TestObservable<String>("one", "two", "three");
            final TestObservable<String> o2 = new TestObservable<String>("four", "five", "six");
            final TestObservable<String> o3 = new TestObservable<String>("seven", "eight", "nine");
            final CountDownLatch allowThird = new CountDownLatch(1);

            final AtomicReference<Thread> parent = new AtomicReference<Thread>();
            Observable<Observable<String>> observableOfObservables = Observable.create(new Func1<Observer<Observable<String>>, Subscription>() {

                @Override
                public Subscription call(final Observer<Observable<String>> observer) {
                    final BooleanSubscription s = new BooleanSubscription();
                    parent.set(new Thread(new Runnable() {

                        @Override
                        public void run() {
                            try {
                                // emit first
                                if (!s.isUnsubscribed()) {
                                    System.out.println("Emit o1");
                                    observer.onNext(o1);
                                }
                                // emit second
                                if (!s.isUnsubscribed()) {
                                    System.out.println("Emit o2");
                                    observer.onNext(o2);
                                }

                                // wait until sometime later and emit third
                                try {
                                    allowThird.await();
                                } catch (InterruptedException e) {
                                    observer.onError(e);
                                }
                                if (!s.isUnsubscribed()) {
                                    System.out.println("Emit o3");
                                    observer.onNext(o3);
                                }

                            } catch (Exception e) {
                                observer.onError(e);
                            } finally {
                                System.out.println("Done parent Observable");
                                observer.onCompleted();
                            }
                        }
                    }));
                    parent.get().start();
                    return s;
                }
            });

            Observable.create(concat(observableOfObservables)).subscribe(observer);

            // wait for parent to start
            while (parent.get() == null) {
                Thread.sleep(1);
            }

            try {
                // wait for first 2 async observables to complete
                while (o1.t == null) {
                    Thread.sleep(1);
                }
                System.out.println("Thread1 started ... waiting for it to complete ...");
                o1.t.join();
                while (o2.t == null) {
                    Thread.sleep(1);
                }
                System.out.println("Thread2 started ... waiting for it to complete ...");
                o2.t.join();
            } catch (Exception e) {
                throw new RuntimeException("failed waiting on threads", e);
            }

            InOrder inOrder = inOrder(observer);
            inOrder.verify(observer, times(1)).onNext("one");
            inOrder.verify(observer, times(1)).onNext("two");
            inOrder.verify(observer, times(1)).onNext("three");
            inOrder.verify(observer, times(1)).onNext("four");
            inOrder.verify(observer, times(1)).onNext("five");
            inOrder.verify(observer, times(1)).onNext("six");
            // we shouldn't have the following 3 yet
            inOrder.verify(observer, never()).onNext("seven");
            inOrder.verify(observer, never()).onNext("eight");
            inOrder.verify(observer, never()).onNext("nine");
            // we should not be completed yet
            verify(observer, never()).onCompleted();
            verify(observer, never()).onError(any(Exception.class));

            // now allow the third
            allowThird.countDown();

            try {
                while (o3.t == null) {
                    Thread.sleep(1);
                }
                // wait for 3rd to complete
                o3.t.join();
            } catch (Exception e) {
                throw new RuntimeException("failed waiting on threads", e);
            }

            inOrder.verify(observer, times(1)).onNext("seven");
            inOrder.verify(observer, times(1)).onNext("eight");
            inOrder.verify(observer, times(1)).onNext("nine");

            inOrder.verify(observer, times(1)).onCompleted();
            verify(observer, never()).onError(any(Exception.class));
        }

        @SuppressWarnings("unchecked")
        @Test
        public void testBlockedObservableOfObservables() {
            Observer<String> observer = mock(Observer.class);

            final String[] o = { "1", "3", "5", "7" };
            final String[] e = { "2", "4", "6" };
            final Observable<String> odds = Observable.toObservable(o);
            final Observable<String> even = Observable.toObservable(e);
            final CountDownLatch callOnce = new CountDownLatch(1);
            final CountDownLatch okToContinue = new CountDownLatch(1);
            TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(callOnce, okToContinue, odds, even);
            Func1<Observer<String>, Subscription> concatF = concat(observableOfObservables);
            Observable<String> concat = Observable.create(concatF);
            concat.subscribe(observer);
            try {
                //Block main thread to allow observables to serve up o1.
                callOnce.await();
            } catch (Exception ex) {
                ex.printStackTrace();
                fail(ex.getMessage());
            }
            // The concated observable should have served up all of the odds.
            verify(observer, times(1)).onNext("1");
            verify(observer, times(1)).onNext("3");
            verify(observer, times(1)).onNext("5");
            verify(observer, times(1)).onNext("7");

            try {
                // unblock observables so it can serve up o2 and complete
                okToContinue.countDown();
                observableOfObservables.t.join();
            } catch (Exception ex) {
                ex.printStackTrace();
                fail(ex.getMessage());
            }
            // The concatenated observable should now have served up all the evens.
            verify(observer, times(1)).onNext("2");
            verify(observer, times(1)).onNext("4");
            verify(observer, times(1)).onNext("6");
        }

        private static class TestObservable<T> extends Observable<T> {

            private final Subscription s = new Subscription() {

                @Override
                public void unsubscribe() {
                    subscribed = false;
                }

            };
            private final List<T> values;
            private Thread t = null;
            private int count = 0;
            private boolean subscribed = true;
            private final CountDownLatch once;
            private final CountDownLatch okToContinue;

            public TestObservable(T... values) {
                this(null, null, values);
            }

            public TestObservable(CountDownLatch once, CountDownLatch okToContinue, T... values) {
                this.values = Arrays.asList(values);
                this.once = once;
                this.okToContinue = okToContinue;
            }

            @Override
            public Subscription subscribe(final Observer<T> observer) {
                t = new Thread(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            while (count < values.size() && subscribed) {
                                observer.onNext(values.get(count));
                                count++;
                                //Unblock the main thread to call unsubscribe.
                                if (null != once)
                                    once.countDown();
                                //Block until the main thread has called unsubscribe.
                                if (null != once)
                                    okToContinue.await();
                            }
                            if (subscribed)
                                observer.onCompleted();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                            fail(e.getMessage());
                        }
                    }

                });
                t.start();
                return s;
            }

        }

    }

Also restore the errant change to settings.gradle.
@abliss
Copy link
Contributor Author

abliss commented Mar 29, 2013

Thanks for the comments and the extra tests. I have merged them and
uploaded a new version.

Regarding testConcatUnsubscribe: I'm still not entirely sure of my footing,
but I believe the test is in error. It is demonstrating exactly the issue
which I mentioned in the new javadoc for OperationConcat. Specifically, it
sets up a situation wherein the inner observable's thread is blocked on the
outer observable's thread: the inner thread cannot proceed until the test
calls okToContinue.countDown(), which happens-after
concat.subscribe(aObserver), which blocks until the list has delivered both
w1 and w2.

Am I right that in general, an observer should block onNext() until it has
finished processing the item? This pushes the flow-control upstream, rather
than excessive buffering or throttling (which can be done explicitly in the
operation-chain if desired). If that's right, then I think concat must
block the outer onNext until it has finished servicing the inner sequence.

I do find it a little strange that Observable.from(Iterable) is a "shotgun
observable", i.e. the subscribe() blocks until all downstream operations
have completed on all items, which means it is impossible to ever
unsubscribe(). But given that this is correct behavior, I think that it
makes sense that concat(o1, o2) also cannot be unsubscribed-from. I added a
cautionary comment to the javadoc.

I have updated the test case to call concat.subscribe(aObserver) in a
background thread, which fixes the deadlock, and to expect all items to be
delivered despite an unsubscribe, which makes the test pass.

(I had also tried those three non-blocking approaches before uploading this
version. Thanks for checking my work. :)
On Mar 27, 2013 6:56 AM, "Ben Christensen" notifications@github.com wrote:

While reviewing this I started playing with the unit tests and added 2 new
ones. I also made them use inOrder.verify so they ensure correct ordering.
I've pasted it below so you can merge into your branch instead of me
complicating the merge with another branch.

The tests pass pass except testConcatUnsubscribe which deadlocks. It does
this because a List won't return the subscription asynchronously so it
can't unsubscribe meanwhile a countDownLatch is waiting for the change to
trigger and the two latches end up waiting on each other.

I haven't thought through enough yet to determine if there is a way to
solve that or if the unit test is just testing the wrong thing and thus a
bad test.

The comment above from @billy https://github.com/billy about
CountDownLatch originated from a discussion between him and I - but while
reviewing this it is probably the right approach.

I was playing with other approaches that are purely non-blocking while
retaining the correct concat sequential behavior but they all seem to
require either:

  • another wrapping thread
  • stealing work from one of the child Observable threads and making it
    do work that is queued on the others ... and that seems like a bad idea
    even though it can be hacked to "work" but it does so in a way that is very
    unexpected and thus not something I want to pursue
  • queueing all onNext values which could be a bad memory issue ... and
    also means we eagerly subscribe which is not what we want

My biggest issue right now is that concat(o1, o2) is a very common usage
but that results in List which will be blocking and unsubscribe doesn't
work.

I'll think through this more but I'd appreciate your thoughts on how to
handle testConcatUnsubscribe.

public static class UnitTest {

    @Test
    public void testConcat() {
        @SuppressWarnings("unchecked")
        Observer<String> observer = mock(Observer.class);

        final String[] o = { "1", "3", "5", "7" };
        final String[] e = { "2", "4", "6" };

        final Observable<String> odds = Observable.toObservable(o);
        final Observable<String> even = Observable.toObservable(e);

        @SuppressWarnings("unchecked")
        Observable<String> concat = Observable.create(concat(odds, even));
        concat.subscribe(observer);

        verify(observer, times(7)).onNext(anyString());
    }

    @Test
    public void testConcatWithList() {
        @SuppressWarnings("unchecked")
        Observer<String> observer = mock(Observer.class);

        final String[] o = { "1", "3", "5", "7" };
        final String[] e = { "2", "4", "6" };

        final Observable<String> odds = Observable.toObservable(o);
        final Observable<String> even = Observable.toObservable(e);
        final List<Observable<String>> list = new ArrayList<Observable<String>>();
        list.add(odds);
        list.add(even);
        Observable<String> concat = Observable.create(concat(list));
        concat.subscribe(observer);

        verify(observer, times(7)).onNext(anyString());
    }

    @Test
    public void testConcatUnsubscribe() {
        final CountDownLatch callOnce = new CountDownLatch(1);
        final CountDownLatch okToContinue = new CountDownLatch(1);
        final TestObservable<String> w1 = new TestObservable<String>("one", "two", "three");
        final TestObservable<String> w2 = new TestObservable<String>(callOnce, okToContinue, "four", "five", "six");

        @SuppressWarnings("unchecked")
        Observer<String> aObserver = mock(Observer.class);
        @SuppressWarnings("unchecked")
        Observable<String> concat = Observable.create(concat(w1, w2));
        System.out.println("before subscribe");
        Subscription s1 = concat.subscribe(aObserver);
        System.out.println("after subscribe");
        try {
            //Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once.
            System.out.println("before callOnce.await");
            callOnce.await();
            System.out.println("after callOnce.await");
            s1.unsubscribe();
            System.out.println("after s1.unsubscribe");
            //Unblock the observable to continue.
            okToContinue.countDown();
            System.out.println("after countDown");
            w1.t.join();
            w2.t.join();
        } catch (Exception e) {
            e.printStackTrace();
            fail(e.getMessage());
        }

        InOrder inOrder = inOrder(aObserver);
        inOrder.verify(aObserver, times(1)).onNext("one");
        inOrder.verify(aObserver, times(1)).onNext("two");
        inOrder.verify(aObserver, times(1)).onNext("three");
        inOrder.verify(aObserver, times(1)).onNext("four");
        inOrder.verify(aObserver, never()).onNext("five");
        inOrder.verify(aObserver, never()).onNext("six");
        inOrder.verify(aObserver, times(1)).onCompleted();
    }

    @Test
    public void testMergeObservableOfObservables() {
        @SuppressWarnings("unchecked")
        Observer<String> observer = mock(Observer.class);

        final String[] o = { "1", "3", "5", "7" };
        final String[] e = { "2", "4", "6" };

        final Observable<String> odds = Observable.toObservable(o);
        final Observable<String> even = Observable.toObservable(e);

        Observable<Observable<String>> observableOfObservables = Observable.create(new Func1<Observer<Observable<String>>, Subscription>() {

            @Override
            public Subscription call(Observer<Observable<String>> observer) {
                // simulate what would happen in an observable
                observer.onNext(odds);
                observer.onNext(even);
                observer.onCompleted();

                return new Subscription() {

                    @Override
                    public void unsubscribe() {
                        // unregister ... will never be called here since we are executing synchronously
                    }

                };
            }

        });
        Observable<String> concat = Observable.create(concat(observableOfObservables));
        concat.subscribe(observer);
        verify(observer, times(7)).onNext(anyString());
    }

    /**         * Simple concat of 2 asynchronous observables ensuring it emits in correct order.         */
    @SuppressWarnings("unchecked")
    @Test
    public void testSimpleAsyncConcat() {
        Observer<String> observer = mock(Observer.class);

        TestObservable<String> o1 = new TestObservable<String>("one", "two", "three");
        TestObservable<String> o2 = new TestObservable<String>("four", "five", "six");

        Observable.concat(o1, o2).subscribe(observer);

        try {
            // wait for async observables to complete
            o1.t.join();
            o2.t.join();
        } catch (Exception e) {
            throw new RuntimeException("failed waiting on threads");
        }

        InOrder inOrder = inOrder(observer);
        inOrder.verify(observer, times(1)).onNext("one");
        inOrder.verify(observer, times(1)).onNext("two");
        inOrder.verify(observer, times(1)).onNext("three");
        inOrder.verify(observer, times(1)).onNext("four");
        inOrder.verify(observer, times(1)).onNext("five");
        inOrder.verify(observer, times(1)).onNext("six");
    }

    /**         * Test an async Observable that emits more async Observables         */
    @SuppressWarnings("unchecked")
    @Test
    public void testNestedAsyncConcat() throws Exception {
        Observer<String> observer = mock(Observer.class);

        final TestObservable<String> o1 = new TestObservable<String>("one", "two", "three");
        final TestObservable<String> o2 = new TestObservable<String>("four", "five", "six");
        final TestObservable<String> o3 = new TestObservable<String>("seven", "eight", "nine");
        final CountDownLatch allowThird = new CountDownLatch(1);

        final AtomicReference<Thread> parent = new AtomicReference<Thread>();
        Observable<Observable<String>> observableOfObservables = Observable.create(new Func1<Observer<Observable<String>>, Subscription>() {

            @Override
            public Subscription call(final Observer<Observable<String>> observer) {
                final BooleanSubscription s = new BooleanSubscription();
                parent.set(new Thread(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            // emit first
                            if (!s.isUnsubscribed()) {
                                System.out.println("Emit o1");
                                observer.onNext(o1);
                            }
                            // emit second
                            if (!s.isUnsubscribed()) {
                                System.out.println("Emit o2");
                                observer.onNext(o2);
                            }

                            // wait until sometime later and emit third
                            try {
                                allowThird.await();
                            } catch (InterruptedException e) {
                                observer.onError(e);
                            }
                            if (!s.isUnsubscribed()) {
                                System.out.println("Emit o3");
                                observer.onNext(o3);
                            }

                        } catch (Exception e) {
                            observer.onError(e);
                        } finally {
                            System.out.println("Done parent Observable");
                            observer.onCompleted();
                        }
                    }
                }));
                parent.get().start();
                return s;
            }
        });

        Observable.create(concat(observableOfObservables)).subscribe(observer);

        // wait for parent to start
        while (parent.get() == null) {
            Thread.sleep(1);
        }

        try {
            // wait for first 2 async observables to complete
            while (o1.t == null) {
                Thread.sleep(1);
            }
            System.out.println("Thread1 started ... waiting for it to complete ...");
            o1.t.join();
            while (o2.t == null) {
                Thread.sleep(1);
            }
            System.out.println("Thread2 started ... waiting for it to complete ...");
            o2.t.join();
        } catch (Exception e) {
            throw new RuntimeException("failed waiting on threads", e);
        }

        InOrder inOrder = inOrder(observer);
        inOrder.verify(observer, times(1)).onNext("one");
        inOrder.verify(observer, times(1)).onNext("two");
        inOrder.verify(observer, times(1)).onNext("three");
        inOrder.verify(observer, times(1)).onNext("four");
        inOrder.verify(observer, times(1)).onNext("five");
        inOrder.verify(observer, times(1)).onNext("six");
        // we shouldn't have the following 3 yet
        inOrder.verify(observer, never()).onNext("seven");
        inOrder.verify(observer, never()).onNext("eight");
        inOrder.verify(observer, never()).onNext("nine");
        // we should not be completed yet
        verify(observer, never()).onCompleted();
        verify(observer, never()).onError(any(Exception.class));

        // now allow the third
        allowThird.countDown();

        try {
            while (o3.t == null) {
                Thread.sleep(1);
            }
            // wait for 3rd to complete
            o3.t.join();
        } catch (Exception e) {
            throw new RuntimeException("failed waiting on threads", e);
        }

        inOrder.verify(observer, times(1)).onNext("seven");
        inOrder.verify(observer, times(1)).onNext("eight");
        inOrder.verify(observer, times(1)).onNext("nine");

        inOrder.verify(observer, times(1)).onCompleted();
        verify(observer, never()).onError(any(Exception.class));
    }

    @SuppressWarnings("unchecked")
    @Test
    public void testBlockedObservableOfObservables() {
        Observer<String> observer = mock(Observer.class);

        final String[] o = { "1", "3", "5", "7" };
        final String[] e = { "2", "4", "6" };
        final Observable<String> odds = Observable.toObservable(o);
        final Observable<String> even = Observable.toObservable(e);
        final CountDownLatch callOnce = new CountDownLatch(1);
        final CountDownLatch okToContinue = new CountDownLatch(1);
        TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(callOnce, okToContinue, odds, even);
        Func1<Observer<String>, Subscription> concatF = concat(observableOfObservables);
        Observable<String> concat = Observable.create(concatF);
        concat.subscribe(observer);
        try {
            //Block main thread to allow observables to serve up o1.
            callOnce.await();
        } catch (Exception ex) {
            ex.printStackTrace();
            fail(ex.getMessage());
        }
        // The concated observable should have served up all of the odds.
        verify(observer, times(1)).onNext("1");
        verify(observer, times(1)).onNext("3");
        verify(observer, times(1)).onNext("5");
        verify(observer, times(1)).onNext("7");

        try {
            // unblock observables so it can serve up o2 and complete
            okToContinue.countDown();
            observableOfObservables.t.join();
        } catch (Exception ex) {
            ex.printStackTrace();
            fail(ex.getMessage());
        }
        // The concatenated observable should now have served up all the evens.
        verify(observer, times(1)).onNext("2");
        verify(observer, times(1)).onNext("4");
        verify(observer, times(1)).onNext("6");
    }

    private static class TestObservable<T> extends Observable<T> {

        private final Subscription s = new Subscription() {

            @Override
            public void unsubscribe() {
                subscribed = false;
            }

        };
        private final List<T> values;
        private Thread t = null;
        private int count = 0;
        private boolean subscribed = true;
        private final CountDownLatch once;
        private final CountDownLatch okToContinue;

        public TestObservable(T... values) {
            this(null, null, values);
        }

        public TestObservable(CountDownLatch once, CountDownLatch okToContinue, T... values) {
            this.values = Arrays.asList(values);
            this.once = once;
            this.okToContinue = okToContinue;
        }

        @Override
        public Subscription subscribe(final Observer<T> observer) {
            t = new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        while (count < values.size() && subscribed) {
                            observer.onNext(values.get(count));
                            count++;
                            //Unblock the main thread to call unsubscribe.
                            if (null != once)
                                once.countDown();
                            //Block until the main thread has called unsubscribe.
                            if (null != once)
                                okToContinue.await();
                        }
                        if (subscribed)
                            observer.onCompleted();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                    }
                }

            });
            t.start();
            return s;
        }

    }

}


Reply to this email directly or view it on GitHubhttps://github.com//pull/205#issuecomment-15496596
.

@cloudbees-pull-request-builder

RxJava-pull-requests #59 SUCCESS
This pull request looks good

@cloudbees-pull-request-builder

RxJava-pull-requests #60 SUCCESS
This pull request looks good

@billyy
Copy link
Contributor

billyy commented Apr 2, 2013

Thanks for update. Let me take a look and I will do some more
research/thought on the testConcatSubscribe test case.

On Fri, Mar 29, 2013 at 2:22 AM, CloudBees pull request builder plugin <
notifications@github.com> wrote:

RxJava-pull-requests #60https://netflixoss.ci.cloudbees.com/job/RxJava-pull-requests/60/SUCCESS

This pull request looks good


Reply to this email directly or view it on GitHubhttps://github.com//pull/205#issuecomment-15633763
.

@cloudbees-pull-request-builder

RxJava-pull-requests #72 SUCCESS
This pull request looks good

@billyy
Copy link
Contributor

billyy commented Apr 3, 2013

My original test case was intended to test the unsubscribe inside one of
child observable. The countdownlatch was used so that the code will have
a chance to unsubscribe in a predicable manner (unsubscribe after "four").
In a normal case, there should be no blocking between inner and outer
observable. I rewrote the case for better clarity and having the outer
Observable is in a separate thread (so the test will pass now). You
brought up a good point about "when" you can unsubscribe. Should it only
allow unsubscribe to happen during the outer onNext()? I need to do more
research on it. Here is the test I plan to add.

    @Test

public void testConcatUnsubscribeObservableOfObservable() {

        final CountDownLatch callOnce = new CountDownLatch(1);

        final CountDownLatch okToContinue = new CountDownLatch(1);

        final TestObservable<String> w1 = new TestObservable<String>(

"one", "two", "three");

        final TestObservable<String> w2 =

newTestObservable(callOnce, okToContinue,
"four", "five", "six");

        @SuppressWarnings("unchecked")

        Observer<String> aObserver = mock(Observer.class);

        @SuppressWarnings("unchecked")

TestObservable<Observable> observableOfObservables =
newTestObservable<Observable>(w1, w2);

        Func1<Observer<String>, Subscription> concatF =

concat(observableOfObservables);

        Observable<String> concat = Observable.create(concatF);



        Subscription s1 = concat.subscribe(aObserver);



        try {

            //Block main thread to allow observable "w1" to complete

and observable "w2" to call onNext exactly once.

        callOnce.await();

        //"four" has been processed by onNext()

            s1.unsubscribe();

            //"five" and "six" will NOT be processed by onNext()

            //Unblock the observable to continue.

            okToContinue.countDown();

            w1.t.join();

            w2.t.join();

        } catch (Exception e) {

            e.printStackTrace();

            fail(e.getMessage());

        }


        InOrder inOrder = inOrder(aObserver);

        inOrder.verify(aObserver, times(1)).onNext("one");

        inOrder.verify(aObserver, times(1)).onNext("two");

        inOrder.verify(aObserver, times(1)).onNext("three");

        inOrder.verify(aObserver, times(1)).onNext("four");

        inOrder.verify(aObserver, never()).onNext("five");

        inOrder.verify(aObserver, never()).onNext("six");

        verify(aObserver, never()).onCompleted();

        verify(aObserver, never()).onError(any(Exception.class));

}

On Tue, Apr 2, 2013 at 8:55 PM, CloudBees pull request builder plugin <
notifications@github.com> wrote:

RxJava-pull-requests #72https://netflixoss.ci.cloudbees.com/job/RxJava-pull-requests/72/SUCCESS

This pull request looks good


Reply to this email directly or view it on GitHubhttps://github.com//pull/205#issuecomment-15816463
.

@billyy
Copy link
Contributor

billyy commented Apr 3, 2013

The RX .NET version of Concat uses the ImmediateScheduler to execute the
operation, which will execute the operation on the same calling thread. So
if the observables are running on the same thread, it will be
synchronous/blocked. I have pulled in your changes from your fork and
added the additional unit test to test unsubscribe() with observables
running in different thread. I have also updated the javadoc about the
blocking with observable on the same thread. From my side, it looks good.

On Wed, Apr 3, 2013 at 10:20 AM, Billy Yuen billyy@gmail.com wrote:

My original test case was intended to test the unsubscribe inside one of
child observable. The countdownlatch was used so that the code will have
a chance to unsubscribe in a predicable manner (unsubscribe after "four").
In a normal case, there should be no blocking between inner and outer
observable. I rewrote the case for better clarity and having the outer
Observable is in a separate thread (so the test will pass now). You
brought up a good point about "when" you can unsubscribe. Should it only
allow unsubscribe to happen during the outer onNext()? I need to do more
research on it. Here is the test I plan to add.

    @Test

public void testConcatUnsubscribeObservableOfObservable() {

        final CountDownLatch callOnce = new CountDownLatch(1);

        final CountDownLatch okToContinue = new CountDownLatch(1);

        final TestObservable<String> w1 = new TestObservable<String>(

"one", "two", "three");

        final TestObservable<String> w2 = newTestObservable<String>(callOnce, okToContinue,

"four", "five", "six");

        @SuppressWarnings("unchecked")

        Observer<String> aObserver = mock(Observer.class);

        @SuppressWarnings("unchecked")

TestObservable<Observable> observableOfObservables = newTestObservable<Observable>(w1, w2);

        Func1<Observer<String>, Subscription> concatF =

concat(observableOfObservables);

        Observable<String> concat = Observable.create(concatF);



        Subscription s1 = concat.subscribe(aObserver);



        try {

            //Block main thread to allow observable "w1" to complete

and observable "w2" to call onNext exactly once.

        callOnce.await();

        //"four" has been processed by onNext()

            s1.unsubscribe();

            //"five" and "six" will NOT be processed by onNext()

            //Unblock the observable to continue.

            okToContinue.countDown();

            w1.t.join();

            w2.t.join();

        } catch (Exception e) {

            e.printStackTrace();

            fail(e.getMessage());

        }


        InOrder inOrder = inOrder(aObserver);

        inOrder.verify(aObserver, times(1)).onNext("one");

        inOrder.verify(aObserver, times(1)).onNext("two");

        inOrder.verify(aObserver, times(1)).onNext("three");

        inOrder.verify(aObserver, times(1)).onNext("four");

        inOrder.verify(aObserver, never()).onNext("five");

        inOrder.verify(aObserver, never()).onNext("six");

        verify(aObserver, never()).onCompleted();

        verify(aObserver, never()).onError(any(Exception.class));

}

On Tue, Apr 2, 2013 at 8:55 PM, CloudBees pull request builder plugin <
notifications@github.com> wrote:

RxJava-pull-requests #72https://netflixoss.ci.cloudbees.com/job/RxJava-pull-requests/72/SUCCESS

This pull request looks good


Reply to this email directly or view it on GitHubhttps://github.com//pull/205#issuecomment-15816463
.

@benjchristensen
Copy link
Member

I do find it a little strange that Observable.from(Iterable) is a "shotgun
observable", i.e. the subscribe() blocks until all downstream operations
have completed on all items, which means it is impossible to ever
unsubscribe(). But given that this is correct behavior, I think that it
makes sense that concat(o1, o2) also cannot be unsubscribed-from. I added a
cautionary comment to the javadoc.

Rx does not (or rarely does) add concurrency (see guideline 6.12) thus Observable.from is wrapping a synchronous Iterable therefore the Observable will be synchronous.

A Scheduler could be used to make the subscription to an Iterable happen on another Thread, or a custom Observable could be created that handles an Iterable asynchronously.

All Rx operators must be capable of handling both synchronous and asynchronous sequences, and if it's synchronous that means unsubscribe will not work (which means it's up to the implementor of an Observable to understand this and decided if it's okay to firehose the results).

For the case of concat my concern is that if all the sequences being combined are async then we need to retain the async behavior and not block and prevent unsubscription.

For example:

  • async or synchronous ObservableA with 10 items
  • async or synchronous ObservableB with 20 items
  • async ObservableC with infinite items (hot observable that never completed)

I should be able to do this:

concat(a, b, c).take(50)

This should get the 10 items from A, 20 items from B and first 20 items from C and then unsubscribe and continue.

Does this work with the current implementation and correctly unsubscribe from ObservableC?

@billyy
Copy link
Contributor

billyy commented Apr 3, 2013

Good point. The best way to tell is to build an unit test for this case.
I can expand my new test to include this case and see what happen.

On Wed, Apr 3, 2013 at 2:29 PM, Ben Christensen notifications@github.comwrote:

I do find it a little strange that Observable.from(Iterable) is a "shotgun
observable", i.e. the subscribe() blocks until all downstream operations
have completed on all items, which means it is impossible to ever
unsubscribe(). But given that this is correct behavior, I think that it
makes sense that concat(o1, o2) also cannot be unsubscribed-from. I added a
cautionary comment to the javadoc.

Rx does not (or rarely does) add concurrency (see guideline 6.12) thus
Observable.from is wrapping a synchronous Iterable therefore the
Observable will be synchronous.

A Scheduler could be used to make the subscription to an Iterable happen
on another Thread, or a custom Observable could be created that handles
an Iterable asynchronously.

All Rx operators must be capable of handling both synchronous and
asynchronous sequences, and if it's synchronous that means unsubscribe will
not work (which means it's up to the implementor of an Observable to
understand this and decided if it's okay to firehose the results).

For the case of concat my concern is that if all the sequences being
combined are async then we need to retain the async behavior and not block
and prevent unsubscription.

For example:

  • async or synchronous ObservableA with 10 items
  • async or synchronous ObservableB with 20 items
  • async ObservableC with infinite items (hot observable that never
    completed)

I should be able to do this:

concat(a, b, c).take(50)

This should get the 10 items from A, 20 items from B and first 20 items
from C and then unsubscribe and continue.

Does this work with the current implementation and correctly unsubscribe
from ObservableC?


Reply to this email directly or view it on GitHubhttps://github.com//pull/205#issuecomment-15866540
.

@benjchristensen benjchristensen merged commit b15f257 into ReactiveX:master Apr 18, 2013
jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
…te limiter. (ReactiveX#205)

* ReactiveX#35 Add Reactor support for circuit breaker, bulkhead and rate limiter.

* Fix Codacity warnings

* Commit build.gradle file for resilence4j-reactor

* Static import assertThat
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants