Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flatMap and flow-based programming in Highland #371

Closed
mlconnor opened this issue Sep 8, 2015 · 17 comments
Closed

flatMap and flow-based programming in Highland #371

mlconnor opened this issue Sep 8, 2015 · 17 comments
Labels

Comments

@mlconnor
Copy link

mlconnor commented Sep 8, 2015

In this contrived example, I loop the output stream of the input back into itself creating a recursion-like behavior. It produces 1, 2, 4, 8, 16, 32, 64, 128, 256, 512.

_ = require('highland')
input = _([1])
input
  .doto((v)-> console.log(v))
  .map((v)-> v * 2)
  .filter((v)-> v <= 512)
  .pipe(input)

This doesn't seem to work if I use a flatMap. It will only print out the first value, 1. My goal here is that I would like to return multiple values for each input value.

_ = require('highland')
input = _([1])
input
  .doto((v)-> console.log(v))
  .flatMap((v)-> return H([v * 2]))
  .filter((v)-> v <= 512)
  .pipe(input)

Is this a bug or is there something I need to do differently when using a flatMap?

@vqvu
Copy link
Collaborator

vqvu commented Sep 9, 2015

You can't loop streams like that. In general, you can only directly write to streams that were created with no source (i.e., _()). This includes piping. I'm a little surprised your first example worked at all.

I'm not sure exactly what you want to do, but here's a way to do recursion on the stream. It acts a lot like rewrite rules in a context free grammar. For each item, you decide whether or not to replace it with one or more values until you reach the terminal value.

function rec(x, count) {
    if (x % 2 == 1 || count > 8) {
        return _([x]); // terminal case
    } else {
        // rewrite rule.
        return _([x - 1, x * 2, x - 1]).flatMap(function (y) {
            return rec(y, count + 1);
        });
    }
}

rec(2, 1).each(_.log);

This outputs

1
3
7
15
31
63
127
255
512
255
127
63
31
15
7
3
1

I might be of more help if you give a little more detail about what you're trying to do.

@mlconnor
Copy link
Author

mlconnor commented Sep 9, 2015

Victor, thanks for your response. That makes sense. It seemed intuitively dirty. A little more on what I'm trying to do.

There is a programming paradigm called Flow Based Programming where you build functionality with graphs. This is perfect for Big Data scenarios for stream processing, ETL, and cloud automation. NoFlo is a Node.js implementation that inspired me. Nifi is a similar Apache Incubator proejct. As an example, NoFlo implemented Jekyll, a static site generator, using this approach. Image below.

Jekyll in NoFlo

It's interesting to see how they implement a recursive directory reader. The list of files that come out of the directory reader simply get filtered for directories and piped back into the directory reader. In my quest to implement this simple recursive reader, I found it challenging to pipe data back into itself.

The big question I have is whether or not Highland is a good fit? I feel that it is because it handles back pressure well..

The code below actually works but being a Type-A developer, I get obsessed with doing things right since I plan to build upon this concept.

H = require('highland')
fs = require('fs')
fsPath = require('path')

###
  directory >---m----------> dirFilesStream >-------------f----> out
                |                                         |
                |                                         |
                +---< returnPipe <--< directoryFilter <---+

  legend: (m)erge  (f)ork

 + directory         has the initial file
 + dirListStream     does a directory listing
 + out               prints out the full path of the file
 + directoryFilter   runs stat and filters on directories
 + returnPipe        the only way i can

###

directory = H(['~/someDirectory'])

# this is the pipe that will carry data back into the system
returnPipe = H.pipeline(
  H.doto((v)-> console.log(v)) # doesn't work if i comment out this line
)

# grabs a list of files in the directory
dirFilesStream = H.merge([directory, returnPipe])
  .flatten()
  .flatMap((parentPath)->
    return H.wrapCallback(fs.readdir)(parentPath)
      .flatten()
      .map((path)->
        return fsPath.join(parentPath,path)
      )
  )

# this is the output for the system.  it will just print out the list of files for now.
out = dirFilesStream
  .fork()
  .each((v)->console.log(v))

# check stats and filter only directories
# and then pipe them back to the return pipe
directoryFilter = dirFilesStream
  .fork()
  .flatMap((path)->
    return H.wrapCallback(fs.stat)(path)
      .doto((fileStat)->
        fileStat.path = path
      )
  )
  .filter((v)-> return v.isDirectory())
  .map((v)-> return v.path)
  .pipe(returnPipe)

@vqvu
Copy link
Collaborator

vqvu commented Sep 10, 2015

I got this to work without pipeline or pipe. I think it's closer to what you would want.

var directory = _(['/home/vqvu/Desktop']);
var mergePoint = _();

var dirFilesStream = mergePoint
    .merge()
    .flatMap(function (parentPath) {
        return _.wrapCallback(fs.readdir)(parentPath)
            .sequence()
            .map(function (path) {
                return fsPath.join(parentPath, path);
            });
    });

var out = dirFilesStream;

// Create the return pipe without using pipe! See below for why.
var returnPipe = dirFilesStream
    .observe()
    .flatFilter(function (path) {
        return _.wrapCallback(fs.stat)(path)
            .map(function (v) {
                return v.isDirectory();
            });
    });

// Connect up the merge point now that we have all of our streams.
mergePoint.write(directory);
mergePoint.write(returnPipe);
mergePoint.end();

// Release backpressure.
out.each(_.log);

Note that I'm not using pipe, and I'm using observe to construct returnPipe instead of fork.

I'm not using pipe because it won't pass along errors (mimicing the behavior of node streams' pipe). Usually, you'd use through to get a pipe that passes along errors. Technical details about why we can't in this cat at the bottom.

You have to use observe instead of fork. Here's the problem if you use fork: for the stream to start flowing,

  1. mergePoint needs to be resumed by its downstream consumer, dirFilesStream.
  2. dirFilesStream needs to be resumed by out and returnPipe.
  3. out is resumed by calling each on it, but returnPipe needs to be resumed by mergePoint.

Hence, a deadlock. Backpressure is working against you here.

The reason why your code worked only when you call pipeline with an argument is because of a bug in pipeline that effectively disables backpressure. Calling pipeline with no arguments is equivalent to calling _(). Fixing the bug causes the deadlock again. You likely see a few files displayed before the first directory; that happens because you're using pipe, and pipe is a little looser with backpressure. The stream is allowed to get started, but soon run into the same issue again.

It may seem like using observe is not ideal, but in fact, the only data that gets queued in returnPipe are all unvisited sibling directories of the current path. This is the same information that you need to keep around to do a depth-first traversal of a tree imperatively using a while loop.

The only non-ideal behavior relates to merge. The current implementation of merge tries to be fair to all of its sources, so it'll try to consume from all sources in a round-robin way, if possible. If directory contained more than one element, you'll eventually end up trying to traverse all of those directories at once, which uses more memory than necessary.

What you really need is a custom merge that prioritizes returnPipe over directory.

I think this is a cool and novel use case (novel to me anyway) for Highland, so I'm interested in hearing what general functionality we can add, beyond those mentioned in this post, to make it easier.

Why we can't use through

through assumes you have a through stream. The current implementation allows you to pass a non-Highland through stream, and roughly does this this.pipe(throughStream).pipe(_()) along with error handling. As a side effect, it renders you incapable of consuming from throughStream yourself, if it happens to be a Highland Stream (since it calls pipe on throughStream, which consumes it.). Highland streams cannot be consumed multiple times without fork or observe.

Most of the time this is OK, since the method returns a different stream that you can consume. In this case, however, you want to pipe to returnPipe, which must be consumed by the merge point. Hence the problem.

Edit: fork not necessary at all, actually.
Edit2: Use flatFilter.

@mlconnor
Copy link
Author

Wow, that was an impressive response, I had to read through it a few times. Thank you so much for taking the time Victor. I think Highland needs a litany of examples like this to help people better understand how it works. There are several important concepts that I learned just by reading through your code. I'm hoping that once I have a small arsenal of these concepts that I can build out the flow models.

@vqvu
Copy link
Collaborator

vqvu commented Sep 11, 2015

Oh, no problem. It was a really interesting exercise for me. I've never considered routing a stream back on itself like this before. So I learned something too. I hope you get to where you want with the flow models. If you find some general feature that would help reduce boilerplate/improve functionality, let us know. I think this is quite a powerful concept, so having some direct support in the library for the paradigm would be good.

I totally agree with you about the examples. We've always talked about having a Highland "cookbook", but I've personally never had the time or motivation to sit down and actually do it. Answering targeted questions like these are much more fun. Though, I bet if someone were to go through all of the old issues and extract the code samples from them, we'd be most of the way there.

@apaleslimghost
Copy link
Collaborator

We've always talked about having a Highland "cookbook", but I've personally never had the time or motivation to sit down and actually do it

Ditto. One thing that constantly comes up in issues is people not getting the usefulness of flatMap. But then to explain that we run the risk of turning the cookbook into a monad tutorial...

@gunar
Copy link

gunar commented Nov 25, 2015

Heys, guys. Is there a simpler way to solve this?

The script I'm working on has to request data from an API in pages, hence the need for recursion i.e. repeated requests until completion. Simplicity (or readability) is precisely what I was longing for with highland, but doing this with promises and recursion seems much simpler.

Thank you very much for your time.

@vqvu
Copy link
Collaborator

vqvu commented Nov 25, 2015

It depends on what exactly you're trying to do. If all you're doing is requesting for data and feeding the result to some processing code, then promises may indeed be simpler.

function query(url, page) {
    // Assuming doRequest returns a promise.
    return doRequest(url, page)
        .then(result => {
            var res = process(result);
            if (result.nextPage) {
                res = res.then(ignore => query(url, result.nextPage));
            }
            return res;
        });
}

// Use like this.
query(url, 1)
    .then(arg => console.log("I'm done!"));

If you want a stream of pages that you'll then perform stream operations on, you can do it...

iteratively:

function query(url) {
    var page = 1;
    return _((push, next) => {
        if (page != null) {
            // Assuming doRequest is a node-style callback function.
            doRequest(url, page, (err, res) => {
                push(err, res);
                if (!err) {
                    page = res.nextPage;
                    next();
                } else {
                    page = null;
                }
        } else {
            push(null, _.nil);
        }
    });
}

// Use like this
query(url)
    .map(...)
    .otherTransform(...);

or recursively:

function query(url, page) {
    return _((push, next) => {
        // Assuming doRequest is a node-style callback function.
        doRequest(url, page, (err, res) => {
            push(err, res);
            if (!err && res.nextPage) {
                // next(stream) basically delegates to the specified stream.
                // It emits what the specified stream emits.
                next(query(url, res.nextPage));
            } else {
                push(null, _.nil);
            }
        }
    });
}

// Use like this
query(url)
    .map(...)
    .otherTransform(...);

There's no one-liner for doing it though.

@gunar
Copy link

gunar commented Nov 26, 2015

@vqvu thank you very much. I thought "Flow-style" was the only way to use recursion in streams and I got scared LOL.

This issue got me interested and I've made a programming exercise out of it: Comparing multiple ways to solve the recursive directory listing problem. You can check it in this gist. There are three solutions using streams, and of course there are still infinite more. It'd be AWESOME to get feedback, folks :)

Avoiding Stream-forks makes up for less code, not sure if more readable though.

Thank you.

@vqvu vqvu changed the title flatMap issue flatMap and flow-based programming in Highland Jan 9, 2016
@zoellner
Copy link

@vqvu thank you for the detailed explanation and example. I have a question regarding the end of those streams. When I change the last line to the following, it doesn't print the 'done'.
So I am wondering if you could explain when each of those streams is ending in the example above and what eventually leads to exit the process?
Or if I were to wrap the above code into an async function, where would I call the callback?

out.each(_.log).done(function() {console.log('done');});

I am working on a code based on the snippet above that has a more complex dirFilesStream (with batching etc.) and am running into the problem that my process does not terminate even after all the streams stop producing data.

@vqvu
Copy link
Collaborator

vqvu commented Mar 14, 2016

I know what the problem is, but I don't know how to fix it.

Here's the problem:

  1. out can't end until the mergePoint ends.
  2. mergePoint can't end until returnPipe and directory both ends.
  3. directory can easily end, but returnPipe can't end until dirFilesStream ends.
  4. Since dirFilesStream === out, we end up with a circular dependency where returnPipe and out are both waiting for each other to end.

The problem is essentially that the recursion can't detect the base case (i.e., when there are no more directories). Because of asynchrony, it's difficult to tell the difference between "no more data" and "no more data right now, but more data is being produced".

I don't know enough about flow-based programming to be able to tell you if this is an inherent issue with recursive networks or if it is simply a problem with this particular construction.

You may be better off implementing the directory walk recursively as a single transfrom rather than via the flow-based approach.

@zoellner
Copy link

Thank you, I had an idea it is something like that. Unfortunately my problem is not a directory walk. It is closer to the web request example above, but it needs batching and rate limiting, so the returnPipe approach would have been more elegant.

@vqvu
Copy link
Collaborator

vqvu commented Mar 14, 2016

I thought about it some more, and there's a way to do recursion while still maintaining the returnPipe approach.

It doesn't look like a flow network but accomplishes the same thing.

// Applies a transform to the stream if it is not empty.
// The reverse of `otherwise`.
function ifNotEmpty(transform) {
    return function (stream) {
        return _(function (push, next) {
            stream.pull(function (err, x) {
                if (err) {
                    push(err);
                    next();
                } else if (x === _.nil) {
                    push(null, x);
                } else {
                    var nextStream = _([x])
                        .concat(stream)
                        .through(transform);
                    next(nextStream);
                }
            });
        });
    }
}

// Recursion transform.
// transform - The main transform.
// computeReturnPipe - A transform that computes what data to feed back
//     into the recursive system. It is provided with the result of the
//     main transform.
function recurse(transform, computeReturnPipe) {
    return function (stream) {
        return stream.through(recurse2)
            .flatten();
    };

    function recurse2(stream) {
        var output = transform(stream);
        var moreData = output.observe()
            .through(computeReturnPipe)
            .through(ifNotEmpty(recurse2));

        return _([output, moreData]);
    }
}

function readDir(stream) {
    return stream
        .flatMap(function (parentPath) {
            return _.wrapCallback(fs.readdir)(parentPath)
                .sequence()
                .map(function (path) {
                    return fsPath.join(parentPath, path);
                });
        });
}

function filterDir(stream) {
    return stream
        .flatFilter(function (path) {
            return _.wrapCallback(fs.stat)(path)
                .map(function (v) {
                    return v.isDirectory();
                });
        });
}

_(['/home/vqvu/Desktop'])
    .through(recurse(readDir, filterDir))
    .each(_.log)
    .done(function () {
        console.log('done');
    });

Edit: process -> transform.

@vqvu
Copy link
Collaborator

vqvu commented Mar 14, 2016

The above assumes that the base-case is an empty stream, so it is perhaps not quite as general as the flow-based approach. Specifically, it doesn't work if the computeReturnPipe transform creates data from nothing (i.e., if it is possible for it to return a non-empty stream when passed an empty stream).

This assumption is what allows the stream to detect when there is no more data.

@zoellner
Copy link

Thank you! I will have a closer look at this, will take me a few days to implement with my specific problem.

@zoellner
Copy link

This works great! Definitely cookbook worthy :-)

One minor detail (just to not confuse anyone looking at the code above): In the comment under "Recursion transform" you call the first argument transform but in the code it is called process

@vqvu vqvu added the cookbook label Mar 15, 2016
@vqvu
Copy link
Collaborator

vqvu commented Mar 15, 2016

In the comment under "Recursion transform" you call the first argument transform but in the code it is called process

Oops! Fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants