Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Returning Promises from Highland methods #290

Closed
joepie91 opened this issue Apr 27, 2015 · 30 comments
Closed

Returning Promises from Highland methods #290

joepie91 opened this issue Apr 27, 2015 · 30 comments

Comments

@joepie91
Copy link

My workflow looks somewhat like this:

_(itemStream)
    .map(function(item){ return doAsyncThing(item); })
    .each(function(item){ console.log(item); });

... where doAsyncThing returns a Bluebird promise. I had expected for Highland to await resolution of the promise and pass the final resolution value to the .each call (much like Bluebird's own .map and .each), but instead my .each method is receiving the Promise objects themselves.

Is it possible to make Highland behave like I'm trying to accomplish, without having to manually .then in the .each handler, as that would defeat the point of doing things this way?

@svozza
Copy link
Collaborator

svozza commented Apr 27, 2015

Use flatMap, like so:

_(itemStream)
    .flatMap(function(item){ return _(doAsyncThing(item)); })
    .each(function(item){ console.log(item); });

@apaleslimghost
Copy link
Collaborator

You can use flatMap and wrap the promise in a stream:

_(itemStream)
    .flatMap(function(item){ return _(doAsyncThing(item)); })
    .each(function(item){ console.log(item); });

@svozza
Copy link
Collaborator

svozza commented Apr 27, 2015

Snap!

@apaleslimghost
Copy link
Collaborator

👏

@svozza
Copy link
Collaborator

svozza commented Apr 27, 2015

Ha. Your answer is actually better because it explains why it works.

@joepie91
Copy link
Author

Thanks for the answer(s), that was impressive synchronicity :)

Anyhow, I do have two follow-up questions:

  1. You refer to flatMap, but what if I want to use some other method where it should wait for a resolved value? Are there flat* equivalents for all these kind of usecases?
  2. Is this the only possible syntax? As I'm using CoffeeScript, it's very awkward/messy to wrap 'inline' Promise chains in a constructor call. It'd look something like this:
_(Promise.try ->
    doThing()
.then (val) ->
    doOtherThing(val)
)

EDIT: Actually, a 3rd question: is there any particular reason Highland doesn't automatically wait for Promises from any kind of method, like Bluebird does?

@svozza
Copy link
Collaborator

svozza commented Apr 27, 2015

For question two, I guess you could do something like:

_(itemStream)
    .map(function(item){ return doAsyncThing(item); })
    .map(_)
    .each(function(item){ console.log(item); });

@apaleslimghost
Copy link
Collaborator

Question 3: because Highland isn't a Promise library. It can consume promises, like it can consume arrays, callbacks and event emitters, but its API is focused around transforming streamed values, not promises. It's like asking why Array.prototype.map doesn't wait for promises, or why Bluebird doesn't wait for Streams.

@joepie91
Copy link
Author

@svozza I had to replace the second map with flatMap, but other than that it works. Thanks!

@quarterto While I understand that rationale, I'd argue that allowing for Promise waiting would be a good idea because:

  • Promises have now been standardized as a representation of a "future value" - this basically just makes it a value like any other. It would greatly benefit interoperability to support this, at least to Promises/A+ spec.
  • This is a fairly common usecase - there aren't really any other standard ways to represent a future value, and given the very I/O-focused nature of Node.js, "future values" are almost inevitable for any non-trivial implementation. While callbacks also exist, they do not represent eventual values (rather eventual code), and are not as easily composable.

I understand that waiting for Promises may not always be the desirable thing to do - perhaps this could be a configuration option of some sort?

@vqvu
Copy link
Collaborator

vqvu commented Apr 27, 2015

Highland streams already represent a future array of values, so it doesn't really make sense to treat promises as something special.

You mentioned needing async versions of other operators besides map. We have flatMap and flatFilter what else do you need?

@joepie91
Copy link
Author

Highland streams already represent a future array of values, so it doesn't really make sense to treat promises as something special.

It absolutely does, for interoperability. Many third-party libraries use (or can use) Promises to represent eventual values, and right now it seems unnecessarily hard/verbose to use those libraries with Highland. In an ecosystem that is heavily (and increasingly) modularized, this is a pretty big deal.

You mentioned needing async versions of other operators besides map. We have flatMap and flatFilter what else do you need?

Frankly, I have no idea - I've only been using Highland for a short while now, so I haven't used most of it yet. Hence my rather generic question :)

@vqvu
Copy link
Collaborator

vqvu commented Apr 27, 2015

Honestly, I don't think wrapping all oulf your async calls in the stream constructor (_(doAsync(val))) is unnecessarily verbose (three characters is short for interop!). The idea is to turn promises into streams as soon as possible and use the Highland API. You shouldn't need to use the promises API at all.

Now, we do have streamifyAll. I can see us adding a custom handler to it like what bluebird has, so users can apply it to promise-based libraries. Not sure how easy or hard this would be though.

@joepie91
Copy link
Author

Honestly, I don't think wrapping all oulf your async calls in the stream constructor (_(doAsync(val))) is unnecessarily verbose (three characters is short for interop!). The idea is to turn promises into streams as soon as possible and use the Highland API. You shouldn't need to use the promises API at all.

It's not just about the three characters, though:

  • Things don't "just work" anymore. You need to remember to explicitly do something every time you use Promises. This incurs significant mental overhead, and seriously increases the likelihood of bugs. It also breaks the concept of Promises being "just like normal values, just asynchronously resolved".
  • In CoffeeScript and anything with similar function syntax (even ES6 arrow functions!), "wrapping an anonymous function" is actually a really hard thing to do cleanly. Defining a named function before using it, on the other hand, is often unnecessarily verbose in the types of usecases that Highland would be used for (and especially in .map/.filter calls).

Now, we do have streamifyAll. I can see us adding a custom handler to it like what bluebird has, so users can apply it to promise-based libraries. Not sure how easy or hard this would be though.

While this could work to some degree, it suffers from the same problem as promisifyAll - it's rather frustrating to use with libraries that aren't simply "one exported object with top-level functions". Database libraries are a good example of this being a problem.

@joepie91
Copy link
Author

For what it's worth, I've monkeypatched in this behaviour using a 'plugin' written with the exposed methods from #293, and it looks something like this (CoffeeScript):

module.exports = (_) ->
    _realSend = _.Stream::_send

    _.Stream::_send = (err, item) ->
        if item? and item != nil and item.then?
            _realSend.call(this, err, _(item))
        else
            _realSend.call(this, err, item)

I'd imagine that this kind of functionality is not always desired, so a real-world implementation would probably look a bit different.

@vqvu
Copy link
Collaborator

vqvu commented Apr 28, 2015

This may be where we differ, but promises only "just work" in the context of the monad. The moment you stop using it with other promises, you lose the "just work". There's no expectation that it'll have the same behavior when used with other data structures. This was more or less @quarterto's point. For operations like map, a promise is just another opaque value. You can even return another highland stream, and it won't do anything special. That's the point. In fact, a map that doesn't do that would break the Functor contract, and we want to have fantasy land compatibility eventually.

Now, you can argue that highland should coerce to a stream whenever we expect one, like for zipAll or flatMap or sequence. And maybe that's a good idea, but my personal opinion is that we should keep our interface as strict as possible and not do automatic coercion everywhere.

"wrapping an anonymous function" is actually a really hard thing to do cleanly.

I don't understand. Why would you need to wrap anonymous functions? If you can wrap an anonymous function, can't you wrap its return value instead?

Database libraries are a good example of this being a problem.

Can't you promisify the db object?

@oravecz
Copy link

oravecz commented Feb 11, 2016

I came into this convo after running into the same desire -- a way to "depromise" a stream. I can wrap the result in an _( promise ) clause so things go on. @vqvu asked what other functions may need a "flat" version. If flatFilter is provided, then its corollary might be useful: flatReject.

I too think there should be a way to resolve promises more gracefully in the stream despite the reasons against it. :) Perhaps a _( promises ).resolve() function?

@vqvu
Copy link
Collaborator

vqvu commented Feb 11, 2016

There is a way to resolve promises gracefully in Highland. It just doesn't happen automatically. _(promises).flatMap(_) (or _(promises).map(_).merge() if you don't care about the order) is equivalent to _(promises).resolve(). You can use this pattern to resolve any stream of things that the _ constructor support (e.g., iterators, node streams, arrays, or generators).

I can see adding flatReject to Highland for symmetry with the existing filter/reject pair (though @quarterto may disagree 😄). Though you can implement flatReject fairly easily like this, so I don't feel that strongly about it being missing.

// Same as s.flatReject(predicate)
s.flatFilter(_.seq(predicate, _.map(_.not)))

You can even extract the transform for reuse if you want.

const flatReject = predicate => _.flatFilter(_.seq(predicate, _.map(_.not)));

s.through(flatReject(predicate));
s2.through(flatReject(anotherPredicate));

@apaleslimghost
Copy link
Collaborator

There's also _(Promise.all(promises)), which is roughly equivalent to _(promises).flatMap(_).parallel(promises.length).collect().

though @quarterto may disagree 😄

I don't know quite what you're talking about 😉

@oravecz
Copy link

oravecz commented Feb 12, 2016

This is all very helpful as far as real-life examples and use cases. One thing I don't understand is:

This works as you specified:

    function triple( value ) {
        return Promise.resolve( value * 3 );
    }

    it( 'promise test', function ( done ) {
        h( [1, 2, 3, 4, 5, 6, 7, 8, 9] ).map( triple ).flatMap( h ).each( h.log ).done(done);
    }

However changing the initial map() to flatMap results in an Exception.

    it( 'promise test', function ( done ) {
        h( [1, 2, 3, 4, 5, 6, 7, 8, 9] ).flatMap( triple ).flatMap( h ).each( h.log ).done(done);
    }

Error: Expected Stream, got object
    at kinesis/node_modules/highland/lib/index.js:3049:26

@apaleslimghost
Copy link
Collaborator

The function passed to flatMap should always return a stream or array, but triple returns a promise.

@oravecz
Copy link

oravecz commented Feb 12, 2016

Thanks for that.

I think in some of the discussion above, we talked about how to resolve promises, but there is a flip side to them as well. How to handle the rejects. This is where some new functions would really shine.

function triple( value ) {
    var result = value * 3;
    if ( result % 5 === 0 ) return Promise.reject( result );
    return Promise.resolve( result );
}

it( 'promise test', function ( done ) {
    var a = _( [1, 2, 3, 4, 5, 6, 7, 8, 9] )
    .map( triple )      // Promise: resolves to 3x, rejects if divisible by 5
    .flatMap( _ )       // resolve promises
    .each( _.log )
    .done(done)
    ;
}

3
6
9
12

The test errors because the Promise.reject() stops the processing (without any exception report). done() doesn't get called and the test fails because of it.

Is it possible to extend highland functionality (via plugin, or alternative syntax) to achieve the following? It would be very helpful to the workflow system I am attempting.

it( 'promise test', function ( done ) {
    var a = _( [1, 2, 3, 4, 5, 6, 7, 8, 9] )
        .map( triple )      // Promise: resolves to 3x, rejects if divisible by 5
        ;
    var a$ = a.observe().resolve().each( _.log );
    var b = a.observe().reject().each( _.log );
    _([ a$, b ]).merge().done(done);
}

@vqvu
Copy link
Collaborator

vqvu commented Feb 12, 2016

Rejects are handled correctly: as stream errors. It's just that you're not catching them, so each throws. The each operator always does this when it encounters an error. You should catch errors before calling each. This works

var a = _( [1, 2, 3, 4, 5, 6, 7, 8, 9] )
    .map( triple )      // Promise: resolves to 3x, rejects if divisible by 5
    .flatMap( _ )       // resolve promises
    .errors(err => _.log('error', err)) // catch error.
    .each( _.log )
    .done(() => console.log('done'))
    ;

3
6
9
12
error 15
18
21
24
27
done

Is it possible to extend highland functionality

Yes. through is the primary extension point in 2.x. You pass it a transform function that takes in a Stream and returns another Stream. through simply calls the function, passing it this and returns the result. s.through(f).through(g) is chainable shorthand for g(f(s)).

A "plugin" operator would be typically implemented as functions that returns a transform function if your operator takes in an argument. All of the top-level operators (like _.map) are implemented this way.

// This is functionally how _.map is implemented.
_.map = function (f) {
    return s => s.map(f);
}

// You can implement a plugin directly.
function resolve(s) {
    return s
        .flatMap(_)
        .errors(err => {}); // get rid of rejects.
}

function reject(s) {
    return s
        .flatMap(_)
        .filter(x => false) // Filter out resolves.
        .errors((err, push) => push(null, err)); // Push rejects as values.
}

// Or as a constructor
function observePromises(done) {
    return s => {
        var a$ = s.observe().through(resolve).each(_.log);
        var b = s.observe().through(reject).each(_.log);
        return _([a$, b]).merge().done(done);
    }
}

it('promise test', function (done) {
    _([1, 2, 3, 4, 5, 6, 7, 8, 9])
        .map(triple)
        .through(observePromises(done));
}

@oravecz
Copy link

oravecz commented Feb 13, 2016

Again, thanks so much for putting this documentation somewhere. The existing documentation is a good reference guide, but at least for me, it doesn't inform me as to the real world use cases we have been discussing.

It is interesting that I can use Promise.reject() or throw new Error() and errors catches them all. It's unclear to me whether there is a way to "short-circuit" the stream flow and throw an error that interrupts the flow entirely?

I was able to get the following flow working as a single chain without fork() or observe() which simplifies the code somewhat.

// (Source) ---> Stage 1 ---> Stage 2 ---> Done
//          \--> error   \--> error

function double( value ) {
    var result = value * 2 + 2;
    if ( result % 5 === 0 ) return Promise.reject( result );
    return Promise.resolve( result );
}

function log( prefix ) {
    return function ( message ) {
        console.log( prefix + ':' + message );
    }
}

it.only( 'promise test', function ( done ) {
    var a = _( [1, 2, 3, 4, 5, 6, 7, 8, 9] )
        .map( double )      // Promise: resolves to 2x+2, rejects if divisible by 5
        .flatMap( _ )       // resolve promises
        .errors( log('stage 1 error') )
        .map( double )      // Promise: resolves to 2x+2, rejects if divisible by 5
        .flatMap( _ )       // resolve promises
        .errors( log('stage 2 error') )
        .each( log('result') )
        .done(done)
        ;
} );

@vqvu
Copy link
Collaborator

vqvu commented Feb 13, 2016

It's unclear to me whether there is a way to "short-circuit" the stream flow and throw an error that interrupts the flow entirely?

I'm unsure what why mean by "short-circuit"? Do you want to throw an error that immediately stops the stream and all processing? Is stopOnError what you want?

@oravecz
Copy link

oravecz commented Feb 14, 2016

You are correct that I want to throw an error that immediately stops the stream and all processing. However, I also want to throw errors that are handled by the errors function. I don't suppose I can have my cake and eat it too.

Is there a way to batch errors? In my example above, if I wanted to report log 2 errors at a time, can that be done without introducing observe()?

@vqvu
Copy link
Collaborator

vqvu commented Feb 14, 2016

You mean like this?

s.errors((err, push) => {
    log('stage 1 error', err);
    push(err); // Rethrow error.
})
    ... // Do other things.
    .stopOnError(...); // Later on, stop on error.
    .each( log('result') )
    .done(done)
    ;
});

If you have further questions about error handling, can you ask them in a new issue? This isn't about promises anymore 😃.

@oravecz
Copy link

oravecz commented Feb 14, 2016

You are right, of course. I'll ask the batch question in another thread. Thanks.

@fabiocbinbutter
Copy link

fabiocbinbutter commented Jun 17, 2016

Just my 2 cents, if I'm understanding this correctly. If you had var files=['a.txt','b.txt'] why would you expect _(files).map(readFileSync) but then insist on _(files).flatMap(file => _(readFile()) rather than just _(files).map(readFile). If promises can only return one value... it seems like asking people to write .flatMap(x=> [f(x)]) instead of just .map(f)

@vqvu
Copy link
Collaborator

vqvu commented Jun 18, 2016

I'm assuming that readFile returns a promise in your example.

I've talked about the reason why we don't want map to treat promises specially in a previous comment. Now, it's true that Promise.then allows you to return a promise from the callback. However, the reason for that is because it doesn't make any sense to have a promise of a promise (thus no reason to have a separate flatMap), not because there is something special about the promise type in a larger context.

Highland streams are not promises and promises are not the values they contain. Why would Highland, a library that is not at all related to promises (besides being a way to manage asynchrony), need to treat promises in a special way? Furthermore, can you guarantee that there's no situation in which anybody would want a stream of promises? Because that would be the result of making map automatically wait on promises.

Your analogy to arrays in your last sentence doesn't make sense. Highland allows you to say .map(f) rather than .flatMap(x => _([x])) too. I kind of understand the point you're trying to make, but it's difficult to make the comparison, because there's no one type in the sync world that is analogous to promises**.

For an extremely contrived example, suppose I created a special wrapper type called SyncPromise that contains exactly one value. Such a type may have a map method that allows you to return a SyncPromise from the callback and it will automatically unwrap it. Would you expect array.map(somethingThatReturnsSyncPromise) to result in an array that contains SyncPromise or the values within those SyncPromise? If the latter, why?

Of course, there's no reason to ever create the SyncPromise wrapper, so perhaps the point is moot.

**In a way, all javascript values are the sync analogue to promises.

@svozza
Copy link
Collaborator

svozza commented Jun 18, 2016

Another point is that if do decide to be Fantasyland compliant we won't be able to because we'll be breaking the monad laws. As an aside, there are plenty of people who consider the way then conflates map and flatMap to be a major design flaw in Promises, in fact, that's the reason the Fantasyland spec exists in the first place.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants