Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.transform() async callback? #24

Closed
at0g opened this issue Mar 18, 2014 · 7 comments
Closed

.transform() async callback? #24

at0g opened this issue Mar 18, 2014 · 7 comments

Comments

@at0g
Copy link

at0g commented Mar 18, 2014

Is it possible to do an async operation in the transform method?

.transform(function(data, done){
   mongoose.model('user').findOne({ username: data.username }, function(err, result){
     if(err) throw err;
     return done({ userId: result._id });
  }
});
@doug-martin
Copy link
Contributor

You could call .pause() on the stream but this could lead to really long parse times as each lookup requires a call to the database

We do something similar but we do this in the .on("record") event.

For example to reduce the total IO as a large number of requests could build up.

//up above
var users = [],
      FIND_EVERY = 1000;
.on("record", function(data){
    users.push(data);
    if(users.length === FIND_EVERY){
       csvStream.pause(); //no more record events will be produced until resume is called
       doSomethingWithUsers(users, function(err, res){
           if(err){
              console.log(err.stack);
          }else{
            csvStream.resume(); //ok continue parsing
          }
       });
    }
});

Hope this helps.

@neverfox
Copy link

neverfox commented May 6, 2014

I imagine you can use Promises to accomplish this without pausing.

@bebraw
Copy link

bebraw commented Jun 25, 2014

Thanks for highlighting the usage of pause. I have to load 400M of CSV data to a database. This solution seems reliable.

Initially I used a async.queue for dealing with the incoming data but at it simply got swamped by the amount of data and didn't work. Lesson learned. :)

Wouldn't the ideal solution be to use pipe? I couldn't get this to work. As far as I understand if this worked, it would be compatible with async too and deal with the exhaustion problem (backpressure) effectively. You would simply write a Writable stream that would write to database and execute callback when done (resumes stream).

It's possible I'm missing something obvious here but that would seem the ideal solution for me as you don't have to muck with pause and resume.

@doug-martin
Copy link
Contributor

Im not sure I understand can you please provide an example of what you are trying to do

Thanks!

-Doug

@bebraw
Copy link

bebraw commented Jun 26, 2014

@doug-martin Sure. I would expect the following to work more or less:

...

var stream = csv.fromPath(path, {
    headers: true
}).
pipe(function(data, _, next) {
    // write to db now (async, resumes on next)
    db.getOrCreate(data.id, next);
});

Obviously this won't work in the current version. I understand I could add .pipe(csv.createWriteStream({headers: true})) in between, parse that in another step and then write to database but that would be missing the point.

Allowing piping like this would make it possible to skip the pause/resume business.

@TimNZ
Copy link

TimNZ commented Aug 16, 2014

I agree with bebraw, at a minimum pass a next function to on('record', function(data,next){ }) for async support, and deprecate the pause/resume which is non-standard.

@doug-martin doug-martin mentioned this issue Aug 27, 2014
@doug-martin
Copy link
Contributor

There is now async support for transform and validate I decided not to go the on("record" method because I would have to override the EventEmitter, and you can implement your own stream to handle that case. Feedback is always welcome!

-Doug

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants