Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Catch Operation Finish #41

thejmazz opened this issue Jul 27, 2016 · 2 comments

Catch Operation Finish #41

thejmazz opened this issue Jul 27, 2016 · 2 comments


Copy link

@thejmazz thejmazz commented Jul 27, 2016

Consider this task:

const switchFileType = (str, type) => {
  // probably nicer with a regex replace..
  const pieces = str.split('.')
  return pieces.join('.') + '.' + type

const throughUppercase = through(function (chunk, enc, cb) {
  cb(null, chunk.toString().toUpperCase())

const uppercaser = task({
  input: '*.lowercase',
  output: '*.uppercase',
  name: 'Uppercase Transform'
}, ({ input }) => 
    .pipe(fs.createWriteStream(switchFileType(input, 'uppercase')))

The operation for this task will return a writable stream. Then dup.setWritable(operation) will apply it to the task duplexify stream. Similarly for readable.

Then in the catchTask action, the purpose is to resolve once the operation has completed. It's called "catch" because it stops the task lifecycle action pipeline until the operation is completed, so that then resolve output and validate output can be ran.

After output is successfully validated (file is not empty, etc.), I set an _output object in the duplex stream, and destroy the stream:

function finish(uid) {
  stream._output = getTask(uid).resolvedOutput
  // TODO not this
  // 'destroy' may as well be 'breaksCompatibleWithStreamsModules'

On destroy() the duplex stream emits a close. Then the lifecycle of a task is something like:

  .on('destroy', function() {
     console.log(this._output) // object with absolute path of output

and join will use that to run tasks after each other, collect all the outputs into the "output dump", etc.

However, this breaks if the operator itself throws a close event, as what happens with child processes. My solution to this was (which only switched to the emit('destroy') hack recently as I was hacking features into refactored code) was to add a function to the stream object, which could access things through a closure, and would run the output resolution and validations inside that function. On any "ending type" event - end, close, finish, you could call it. So the lifecycle for a task was:

  .on('close', function() {
     const output = this._output() // runs output resolution and validation now, but "outside" the task

Which is better because it uses no made up events, but a little annoying because if you just do


if will not actually run the whole task lifecycle until this._output() is called. Perhaps a taskWrapper could auto call this, and then itself emit a close.


someTask() // whole lifecycle contained inside here
  .on('close', // or some otherwise standard event, end, finish, etc
    function(this) {
      console.log(this._output) // prepared resolved+validated output

This way task lifecycle is contained in one place without doing weird wrappers that would themselves need to emit close, and the resolved output is available immediately when a task is "done".

I saw mention of a flush function, but perhaps only for a writable stream?

Also, operations which are actually just duplex streams, like a transform, can be set as the duplex inside task and forking/parallel should work as if task is a regular duplex; so the "catch finish" need not apply since no output files need to resolved to absolute paths.

Copy link
Member Author

@thejmazz thejmazz commented Jul 28, 2016

Summary and notes from chat about this:

  • can use prefinish event to cork and uncork writable stream just before it emits finish: so for example, catch prefinish from a writable stream, resolve and validate output, then uncork and the wrapping duplex emits finish: prefinish example
    • works for writable streams, but nothing like this for readable streams
    • could wrap readable in an outer stream, and the outer can emit a end after the child readable does and output resolution/validation is done
  • issues with task returning a stream object:
    • assumes the use will consume the stream and will handle errors on it
    • e..g edge case: stream won't ever emit end if it never has a buffer, so need to call .resume() to "make it alive" so that then it can emit end right after
  • use end not close: close is meant for errors!
  • ways to make sure a stream is drained:
    • resume() or pipe or .on('data')
  • to wrap a duplex fully with duplexify, setReadable and setWritable


  • put stream object inside of an object returned by task:
    • myTask().stream.pipe(hackInnerStreamIntoWritable) (not recommended)
    • this way it is less likely for the user to misuses myTask()
  • use end-of-stream to "catchFinish"
  • then task class can emit its own events, join will use those events to compose them

try to avoid "foot shooting" - example, if a streaming task is joined to a non-writable:

join(streamingTask, somethingThatIsNotWritable)

Copy link
Member Author

@thejmazz thejmazz commented Aug 17, 2016

done with async-done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Linked pull requests

Successfully merging a pull request may close this issue.

None yet
2 participants