Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eager ConcatMap #3017

Closed
benjchristensen opened this issue Jun 10, 2015 · 19 comments
Closed

Eager ConcatMap #3017

benjchristensen opened this issue Jun 10, 2015 · 19 comments

Comments

@benjchristensen
Copy link
Member

Is there already a name for something in the ecosystem that is like an eager concatMap, or ordered flatMap?

I often see the pattern of people wanting to eagerly kick off several Observables, but merge them together in order.

It is absolutely not a good idea for infinite lists, but similar to how concatMap and flatMap can both be dangerous if used wrong, an eagerConcatMap that eagerly subscribes and caches the inner Observables could be useful.

Of course this can be done without adding anything to RxJava, it's just tedious.

Bad idea? Does it already have a name that I'm unaware of?

@akarnokd
Copy link
Member

I can't help you with the name and it is non-trivial to change merge() to support it (#2928): fast-path would require unbounded buffers for the later sources; the scalar queue wouldn't really work if interleaved with regular sources and potentially one would need to wrap each value into an indexed container anyway.

In theory, after getting rid of the optimizations, the plain merge's round-robin collector can be modified to stick to the first buffer until it is terminated before moving onto the next.

(In my plans for the semi-auto-parallelization, I thought about the join option to be either ordered or unordered merge.)

@benjchristensen
Copy link
Member Author

non-trivial to change merge() to support it

Merge does not need to be changed. I can implement this today by composition. For example, I iterate over an array of Observables, .cache().subscribe() them, then concat those together.

@abersnaze
Copy link
Contributor

One use case for a eager concat was a bug that I ran into a while ago with using concat and publish with a closure.

source.publish((pub) -> Observable.eagerConcat(pub.map(...), pub.map(...)));

@stealthcode
Copy link

This would also be useful where the onSubscribe makes an async call. We will need a way to order the responses in the order that the producer generated them from a request. This would of course require buffering.

@davidmoten
Copy link
Collaborator

I like the idea. Out of interest I think concat(Observable<T)) is a little bit eager internally at the moment inasmuch as it requests the next observable before the first has completed. It doesn't subscribe to it before the current observable has completed though which is more what you are referring to.

@ypresto
Copy link
Contributor

ypresto commented Jul 1, 2015

I was misunderstanding concat operator as it subscribes each observables eagerly.

image

http://reactivex.io/documentation/operators/concat.html

In this figure concat is very, very looks eager :P

@benjchristensen
Copy link
Member Author

It doesn't subscribe to it before the current observable has completed though which is more what you are referring to.

That's the point. A request does not represent subscription work. It just means downstream is capable of receiving. Let's not confuse those points. An Iterable should be used if even request can't be handled.

@benjchristensen
Copy link
Member Author

In this figure concat is very, very looks eager :P

Yes, that diagram looks wrong. It shows both as subscribing immediately, which it definitely does not do.

/cc @DavidMGross

@DavidMGross
Copy link
Collaborator

Ah; I thought I'd fixed that. I'll go back and make sure the right diagrams
get copied over. I can't fix the rxmarbles version, though; that's out of
my bailiwick (@andrestaltz)

On Wed, Jul 1, 2015 at 9:24 AM, Ben Christensen notifications@github.com
wrote:

In this figure concat is very, very looks eager :P

Yes, that diagram looks wrong. It shows both as subscribing immediately,
which it definitely does not do.

/cc @DavidMGross https://github.com/DavidMGross


Reply to this email directly or view it on GitHub
#3017 (comment).

David M. Gross
PLP Consulting

@DavidMGross
Copy link
Collaborator

Oh; I see what happened. That particular illustration (
http://reactivex.io/documentation/operators/images/concat.hot.png) is meant
to illustrate what happens when both of the observables passed to concat
are "hot" observables. In such a case, as I understood it, an onError
notification from the second Observable may "jump the queue" and interrupt
the emissions from the resulting Observable even if the first Observable's
emissions haven't finished yet. Is that not correct?

The other illustrations on the page (e.g. under the RxJava and RxGroovy
accordion-folds) show the more ordinary behavior with cold Observables and
without onError notifications, and these show the subscription to the
second Observable being delayed until the first Observable completes.

On Wed, Jul 1, 2015 at 2:42 PM, PLP Consulting (David Gross) <
davgross@netflix.com> wrote:

Ah; I thought I'd fixed that. I'll go back and make sure the right
diagrams get copied over. I can't fix the rxmarbles version, though;
that's out of my bailiwick (@andrestaltz)

On Wed, Jul 1, 2015 at 9:24 AM, Ben Christensen notifications@github.com
wrote:

In this figure concat is very, very looks eager :P

Yes, that diagram looks wrong. It shows both as subscribing immediately,
which it definitely does not do.

/cc @DavidMGross https://github.com/DavidMGross


Reply to this email directly or view it on GitHub
#3017 (comment).

David M. Gross
PLP Consulting

David M. Gross
PLP Consulting

@ypresto
Copy link
Contributor

ypresto commented Jul 22, 2015

It is very useful to implement observable for data-store backed value.
Implementing with concat or startWith will miss update notification from backed store before first fetch request is finished (i.e. before subscribing to second observable).

Refer: pwittchen/prefser#47

(I know BehaviorSubject is enough to implement observable for just a variable. :)

@ypresto
Copy link
Contributor

ypresto commented Aug 5, 2015

Oops... there is pitfall when implementing get-and-observe with eager concatMap.

If Observable.just(observableForGet, observableForObserve).eagerConcatMap(o -> o) will subscribe observableForGet then observableForObserve, it will miss updates between subscription of first and second observable.

  1. eagerConcatMap calls observableForGet.subscribe().
  2. observableForGet calls onNext() from another thread.
  3. Someone updates backed store from yet another thread.
  4. observableForObserve should call onNext() but it is not subscribed yet!
  5. eagerConcatMap calls observableForObserve.subscribe().

It is easier to use merge for this purpose like below:

Observable.merge(observableForObserve, observableForGet)

This will subscribe observableForObserve at first, so no update will be missed.

@akarnokd
Copy link
Member

akarnokd commented Aug 5, 2015

Such operator with backpressure has limited use because even if it subscribes to all sources at once, they can produce up to RxRingBuffer.SIZE values upfront until each of them gets drained to completion.

@ypresto If observableForObserve is hot, merge may not see all of its values either.

@ypresto
Copy link
Contributor

ypresto commented Aug 5, 2015

@akarnokd

Thanks, reproduced with snippet like below:

Observable.interval(100, TimeUnit.MILLISECONDS)
    .flatMap(l -> observableForGet)
    .mergeWith(observableForGet);

It calls onNext() only once, not per 100msec...

And I found it may deliver outdated data if observableForObserve called onNext() after observableForGet stored fetch result in local var.

Then how do I implement get-and-observe with RxJava?
I use shared stream of cache()d observable in another project:

private Observable<List<Item>> mSharedRequestQueueObservable = updateNotificationObservable
                .onBackpressureLatest()
                .map(aVoid -> getObservable.cache()) // cache to share result with all subscribers
                .share();

public Observable<List<Item>> getAndObserve() {
        return Observable.merge(mSharedRequestQueueObservable, getObservable.nest())
                .flatMap(observable -> observable) // not using any schedulers to execute synchronously
                .subscribeOn(Schedulers.io());
}

(Let me know if mailing list is better place to discuss :)

@ypresto
Copy link
Contributor

ypresto commented Aug 6, 2015

Sorry, I was using wrong snippet of Observable.interval(...) with publish() (not share()) after interval() call.
It just works well with hot observable (share()).

Observable.interval(100, TimeUnit.MILLISECONDS).share()
    .flatMap(l -> observableForGet)
    .mergeWith(observableForGet);

@stealthcode
Copy link

It looks like this thread may have gone off the rails. It seems like there is interest in having an eager concat operator. I think I have implemented one in the internal processing of #3203. It's a composition of BufferUntilSubscriber subjects and Observable.concat().

@akarnokd
Copy link
Member

akarnokd commented Sep 3, 2015

I think the following example does exactly this:

Observable<Long> source1 = Observable.intervalRange(0, 20, 100, 100, TimeUnit.MILLISECONDS);
Observable<Long> source2 = Observable.intervalRange(20, 20, 100, 100, TimeUnit.MILLISECONDS);
Observable<Long> source3 = Observable.intervalRange(40, 20, 100, 100, TimeUnit.MILLISECONDS);

Observable<Observable<Long>> sources = Observable.just(source1, source2, source3);

sources.map(v -> {
    ConnectableObservable<Long> c = v.replay();
    c.subscribe(); // to cache all
    c.connect(); // starting now
    return c;
})
.onBackpressureBuffer() // make sure all source started
.concatMap(v -> v)
.toBlocking()
.forEach(System.out::println);

Edit: The following alternative doesn't retain everything:

Observable<Long> source1 = Observable.intervalRange(0, 20, 100, 100, TimeUnit.MILLISECONDS);
Observable<Long> source2 = Observable.intervalRange(20, 20, 100, 100, TimeUnit.MILLISECONDS);
Observable<Long> source3 = Observable.intervalRange(40, 20, 100, 100, TimeUnit.MILLISECONDS);

Observable<Observable<Long>> sources = Observable.just(source1, source2, source3);

sources.map(v -> {
    UnicastSubject<Long> subject = UnicastSubject.create();
    v.subscribe(subject);
    return subject;
})
.onBackpressureBuffer() // make sure all source started
.concatMap(v -> v)
.toBlocking()
.forEach(System.out::println);

@akarnokd
Copy link
Member

See #3357.

@akarnokd
Copy link
Member

akarnokd commented Nov 9, 2015

The new operator has been delivered with 1.0.15.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants