Execute an async function per stream data event, pausing the stream when a concurrency limit is saturated
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
test
.gitignore
.travis.yml
Gruntfile.coffee
README.md
package.json
stream-worker.js

README.md

stream-worker build status

Execute an async function per stream data event, pausing the stream when a concurrency limit is saturated. Inspired by async.queue, optimized for streams.

Supports promises and callbacks.

The Basics

npm install stream-worker

Requiring:

var streamWorker = require('stream-worker');

Promise style:

function doWork(data){
  /* ... do some work with data ... */
  return Promise.resolve();
}
streamWorker(stream, doWork, {promises : true, concurrency : 10})
.then(function() {
  /* ... the stream is exhausted and all workers are finished ... */
}, function(err) {
  /* ... there was an error processing the stream ... */
})

Callback style:

function doWork(data, done){
  /* ... do some work with data ... */
  return done(err);;
}
streamWorker(stream, doWork, {promises : false, concurrency :10},
  function(err) {
    /* ... the stream is exhauseted and all workers are finished ... */
  }
);

Signature

streamWorker(stream, work, options, done)

Where options is an object with 2 optional parameters:

Parameter Default Description
promises false true if you want to use the above promises style
concurrency 10 specifies how many concurrent workers you want doing work in the stream

And done is a callback function if you use the callback workflow.