Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OnSubscribeCombineLatest#MultiSourceProducer request method get different result? #3813

Closed
fanturbo opened this issue Mar 31, 2016 · 7 comments
Labels

Comments

@fanturbo
Copy link

In request method,there is a line code ':o.unsafeSubscribe(s);' and I find the unsafeSubscribe()'s note is 'Subscribes to an Observable and invokes {@link OnSubscribe} function without any contract protection,error handling, unsubscribe, or execution hooks.'.
This is my code:

Observable<Long> observable1 = Observable.interval(0, 1000, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong * 5;
                        }
                    }).take(5);
            Observable<Long> observable2 = Observable.interval(500, 1500, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong * 10;
                        }
                    }).take(4);
            Observable.combineLatest(observable2, observable1, new Func2<Long, Long, Long>() {
                @Override
                public Long call(Long aLong, Long aLong2) {
                    Log.i("ppppp", "combineLatest aLong = " + aLong + "   aLong2 =" + aLong2);
                    return aLong + aLong2;
                }
            }).subscribe(new Subscriber<Long>() {
                @Override
                public void onCompleted() {
                    System.out.println("Sequence complete.");
                }
                @Override
                public void onError(Throwable e) {
                    System.err.println("Error: " + e.getMessage());
                }
                @Override
                public void onNext(Long aLong) {
                    System.out.println("combineLatest Next: " + aLong);
                }
            });

I run this code and get two different results.
(1)
Next: 0
Next: 5
Next: 10
Next: 20
Next: 25
Next: 35
Next: 40
Next: 50
(2)
Next: 0
Next: 5
Next: 15
Next: 20
Next: 25
Next: 35
Next: 40
Next: 50

@akarnokd
Copy link
Member

You have two intervals which will fire together at t = 2000 ms where there is no guarantee which will arrive first to the sum in combineLatest.

@fanturbo
Copy link
Author

fanturbo commented Apr 1, 2016

@akarnokd thanks.I want to ask another question.combineLatest opreator's explanation is 'Combines a collection of source Observables by emitting an item that aggregates the latest values of each of the source Observables each time an item is received from any of the source Observables, where this aggregation is defined by a specified function.‘.
When observable1 emit the first item,and the observable2 doesn't emit the first item,why the Combines don't emit the result?In my code I think it should print this:
Next: 0
Next: 0
Next: 5
Next: 15
Next: 20
Next: 25
Next: 35
Next: 40
Next: 50

@akarnokd
Copy link
Member

akarnokd commented Apr 1, 2016

By definition, combineLatest emits only when all sources have emitted at least one item. Otherwise you wouldn't have a full row of values to work with.

@fanturbo
Copy link
Author

fanturbo commented Apr 1, 2016

In other words, combineLatest emits when all sources have emiited at least one item,and then if one of source Observables emits an item, the Combines will emits their results?

@akarnokd
Copy link
Member

akarnokd commented Apr 1, 2016

If you have a full row, that is the first time the combiner function is called.

PublishSubject<String> a = PublishSubject.create();
PublishSubject<String> b = PublishSubject.create();
PublishSubject<String> c = PublishSubject.create();

Observable.combineLatest(a, b, c, (u, v, w) -> u + v + w).subscribe(System.out::println);

a.onNext("1");
b.onNext("1");
a.onNext("2");
b.onNext("2");
b.onNext("3");
System.out.println("Full row:");
c.onNext("1");

It will print

Full row:
231

@fanturbo
Copy link
Author

fanturbo commented Apr 1, 2016

@akarnokd thanks for your reply.Your sample is awesome.(ps:Did you find my english is very poor? )

@akarnokd
Copy link
Member

akarnokd commented Apr 2, 2016

It was readable.

@akarnokd akarnokd closed this as completed Apr 2, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants