Skip to content

add EventStream::queue method #318

@michaelficarra

Description

@michaelficarra

I'd like to propose a queue method for EventStreams. This method is used to consume only a given number of events from a stream simultaneously. When the consuming operation finishes (its stream is ended), it frees space for more consumption. Here's an implementation I'm currently using:

Bacon.EventStream::queue = (size, fn) ->
  buffer = []
  outStream = new Bacon.Bus
  draining = 0
  drain = ->
    ++draining
    stream = new Bacon.Bus
    outStream.plug stream
    stream.onEnd ->
      --draining
      if buffer.length > 0
        drain()
    fn buffer.shift(), stream
  @onError (e) ->
    outStream.error e
  @onValue (v) ->
    buffer.push v
    if draining < size
      drain()
    return
  outStream

Like flatMap, the given function returns a stream, the events of which get pushed to the stream produced by queue. Here's an example of my use of queue:

# get the email of the author responsible for changing a given host to/from a given hostclass
negligentAuthors = updatedHosts.queue 4, ([host, [runningHostclass, configuredHostclass]], stream) ->
  stdout = ''
  gitProcess = # long git invocation using child_process.spawn, omitted
  gitProcess.stdout.on 'data', (data) -> stdout += data
  gitProcess.stderr.pipe process.stderr
  gitProcess.on 'close', (exitCode) ->
    if exitCode is 0
      if stdout
        stream.push [stdout[...-1], host, runningHostclass, configuredHostclass]
    else
      stream.error exitCode
    stream.end()
  return

Without queue, processing this stream would cause a very large number of git processes to be running simultaneously, as it was when I was simply using flatMap. With queue, only at most 4 git processes are ever running simultaneously. Would you consider this function for inclusion?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions