This repository has been archived by the owner. It is now read-only.

Promise list to observable #109

Closed
slorber opened this Issue Feb 10, 2014 · 10 comments

Comments

Projects
None yet
4 participants
@slorber

slorber commented Feb 10, 2014

It would be cool to easily be able to transform a promise list to an observable.

As I didn't find anything to do it in RxJs I build my own stuff but not sure of the implementation.
It seems to work nice so far with the following code:

    /**
     * Permits to create an RxJs observable based on a list of promises
     * @param promiseList the list of promise you want to convert as an RxJs Observable
     * @param subject the type of Rx Subject you want to use (default to ReplaySubject)
     * @param onError, an optional callback for handling errors
     * @return {*}
     */
    promiseListToObservable: function(promiseList, subject, onError) {
        if ( promiseList.length == 0 ) {
            return Rx.Observable.empty();
        }
        // Default to ReplaySubject
        var subject = subject || new Rx.ReplaySubject();
        // Default to non-blocking error logging
        var onError = onError || function(error) {
            console.debug("Promise error catched in promiseListToObservable: ",error);
            // true means the stream won't continue.
            return false;
        };
        var i = 0;
        promiseList.map(function(promise) {
            promise.then(
                function (promiseValue) {
                    subject.onNext(promiseValue);
                    i++;
                    if ( i == promiseList.length ) {
                        subject.onCompleted();
                    }
                },
                function (error) {
                    var doStop = onError(error);
                    if ( doStop ) {
                        subject.onError(error);
                    }
                    else {
                        i++;
                        if ( i == promiseList.length ) {
                            subject.onCompleted();
                        }
                    }
                }
            )
        });
        return subject.asObservable();
    }
@theghostbel

This comment has been minimized.

Contributor

theghostbel commented Feb 10, 2014

Can you show usage example?

@slorber

This comment has been minimized.

slorber commented Feb 10, 2014

Sure, it is useful in the RDF / Linked Data world.
I've implemented the following in this project:
https://github.com/stample/rdflib-pg-extension

We have a list of URL and we request them and we want a stream of the responses.
An unreachable URL should not cause the stream to fail and should be skipped.
You can see a stream of requests being fired here:
http://jsfiddle.net/D5Gqs/4/
Each request done returns a Q promise and I want a stream on that :)

@mattpodwysocki

This comment has been minimized.

Member

mattpodwysocki commented Feb 15, 2014

@slorber Looks good. Could you turn it into a PR?

@mattpodwysocki

This comment has been minimized.

Member

mattpodwysocki commented Mar 13, 2014

@slorber Any update on turning this into something we can ship with our promises support?

@slorber

This comment has been minimized.

slorber commented Mar 14, 2014

@mattpodwysocki sorry I forgot.

I don't know what else I can do, but I can do a PR with the actual code.
Maybe we could also add a threshold for the maximum amount of errors before the stream fails or something like that

@mattpodwysocki

This comment has been minimized.

Member

mattpodwysocki commented Mar 16, 2014

@slorber why would we not want to do a simple Rx.Observable.fromArray and then flatMap with Rx.Observable.fromPromise?

Rx.Observable.fromPromiseList = function (promises) {
  return Rx.Observable.fromArray(promises)
    .flatMap(Rx.Observable.fromPromise);
};
@slorber

This comment has been minimized.

slorber commented Mar 17, 2014

This seems a good idea at first @mattpodwysocki but that will stop the stream on the first promise error right?

By the way, doesn't it also mean that the promise result will be ordered in the same order as the promise array? not sure about this. In my case I just want results in any order and don't want to wait for the first promise result to be able to get the 2nd

In my case I need to be able to continue to receive items while some promises of the array have failed

@bman654

This comment has been minimized.

bman654 commented Mar 25, 2014

@slorber flatMap uses merge underneath the hood and so does not preserve order.

To handle your error concerns:

Rx.Observable.fromPromiseList = function (promises, errorHandler) {
  return Rx.Observable.fromArray(promises)
    .flatMap(function (p) {
         var o = Rx.Observable.fromPromise(p);
         if (errorHandler) { o = o.catch(errorHandler); }
         return o;
     });
};

// usage with errorHandler which fails after 5 promises fail
var count = 0;
var promiseObservable = Rx.Observable.fromPromiseList(promises, function (err) {
    return ++count < 5 ? Rx.Observable.empty() : Rx.Observable.throw(err);
});
@mattpodwysocki

This comment has been minimized.

Member

mattpodwysocki commented Sep 30, 2014

@slorber you might want to use concatMap at that point to preserve the order.

@slorber

This comment has been minimized.

slorber commented Sep 30, 2014

ok thanks nice to know :)

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.