Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming Between Tasks #79

thejmazz opened this issue Aug 26, 2017 · 1 comment

Streaming Between Tasks #79

thejmazz opened this issue Aug 26, 2017 · 1 comment


Copy link

In order to be a streaming workflow engine, we need to support the ability to pipe between tasks. This does not add much to the "traditional" pipeline which mostly reads/writes files between tasks, but it can open interesting use cases:

  • stream of query responses from bionode-ncbi can be piped into pipelines
  • ease of incorporating node transform streams into pipelines
  • separate tools (do not need a container with A and B to run A | B for example)
  • cool things with fork, something like: A | fork(B1, B2) | C (however this will be tricky to implement as we will need to create duplicate streams)

Dump from docs:

If either (input or output) is
not provided, it will be assumed the task is then a streaming task - i.e., it
is a duplex stream with writable and/or readable portions. Consider:

const throughCapitalize = through(function (chunk, env, next) {
  // through = require('through2') - a helper to create through streams
  // takes chunk, its encoding, and a callback to notify when complete pushing
  // push a chunk to the readable portion of this through stream with
  // then call next so that next chunk can be handled

You could connect `capitalize` to a readable (`readFile`) and writable 
(`writeFile`) file 
stream with:

const capitalize = task({
  name: 'Capitalize Through Stream'
// Here, input is a readable stream that came from the previous task
// Let's return a through stream that takes the input and capitalizes it
({ input }) => input.pipe(throughCapitalize) )
const readFile = task({
  input: '*.lowercase',
  name: 'Read from *.lowercase'
}, ({ input }) => {
  const rs = fs.createReadStream(input)
  // Add file information to stream object so we have it later
  rs.inFile = input

const writeFile = task({
  output: '*.uppercase',
  name: 'Write to *.uppercase'
}, ({ input }) => fs.createWriteStream(input.inFile.swapExt('uppercase')))

// Can now connect the three:
join(readFile, capitalize, writeFile)

Of course, this could be written as one single task. This is somewhat simpler,
but the power of splitting up the read, transform, and write portions of a task
will become apparent once we can provide multiple sets of parameters to the
transform and observe the effect, without having to manually rewire input and
output filenames
. As a single task the above would become:

const capitalize = task({
  input: '*.lowercase',
  output: '*.uppercase',
  name: 'Capitalize *.lowercase -> *.uppercase'
}, ({ input }) =>
Copy link

Tied to this issue, we should also be able to pass variables, functions, etc... between tasks and still be able to call them somehow in other downstream tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

No branches or pull requests

2 participants