Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Pausable Buffered EventStream (or Bus) #300

Closed
hayeah opened this Issue · 13 comments

4 participants

@hayeah

I want to use an EventStream as a message channel (like in CSP). What I'd like to accomplish is that if an event is sent to the stream, it won't get lost if there's no subscriber yet. The events would be buffered, and as soon as the receiver is ready, the stream can start the data flow.

In my particular case, I am creating a socket.io connection, and using a bus to push messages into the socket. However, I'd like the bus to hold messages until socket.io had fired the "connect" event.

This is my attempt implementing a buffered EvenStream. It works, but I hope there's a more idiomatic way to do it in FRP.

class BaconBuffer
  constructor: (@upstream) ->
    @isPaused = true
    @buffer = []
    @cbs = []

    @sink = null
    @downstream = Bacon.fromBinder (sink) =>
      # seems to happen once for multiple onValue subscriptions
      # console.log "got sink"
      @sink = sink # uh... is this the same for all subscribers?
      return noop

    @downstream.resume = =>
      @resume()

    @downstream.pause = =>
      @pause()

    @upstream.subscribe (event) =>
      @yield(event)

  pause: ->
    @isPaused = true

  resume: ->
    @isPaused = false
    if buffer.length > 0
      for e in @buffer
        @sink(e)

      @buffer = []

  yield: (e) ->
    if @isPaused
      # console.log "buffer", @buffer.length, e
      @buffer.push e
    else
      # when is @sink bound? Is it when subscribed to or immediately after fromBinder?
      @sink(e)

# wait buffer stream
buffer = (stream) ->
  (new BaconBuffer(stream)).downstream
@raimohanska
Owner

For me, this doesn't sound like a case for FRP, considering that there's only one sink for the events, i.e. the WebSocket itself. Why not just something like

open = false
buffer = []
send = (msg) ->
  if open
    socket.send(msg)
  else
    buffer.push(msg)
socket.on "connect", ->
  socket.send(msg) for msg in buffer
  open = true

But if you want to use FRP concepts, you might want to do like

msgsToSend = new Bacon.Bus()

connected = Bacon.fromEventTarget(socket, "connect") # not sure if works, tell me if not!

bufferedUntilConnected = msgsToSend
  .bufferWithCount(Number.MAX_VALUE) # buffer infinite number of messages into an array
  .sampledBy(connected.take(1)) # emit value when connected
  .flatMap(Bacon.fromArray) # split array of values into multiple events
  .concat(msgsToSend) # send the rest of the messages immediately

bufferedUntilConnected.onValue (msg) -> 
  socket.send(msg) # I'm not sure of the underlying API here, but you get the point :)
@raimohanska
Owner

Regarding the idea of pausable streams, I haven't given them much thought as I haven't needed such things. I'm still developing this stuff for my personal convenience :) Do you think a generic "pausable bus" would be useful?

@hayeah
@raimohanska
Owner

If you want a generic FRP way of buffering stuff when some condition applies, that could be added to the EventStream prototype like

Bacon.EventStream :: bufferWhile = (condition) ->
  src = this
  condition.skipDuplicates().flatMapLatest (shouldBuffer) ->
    if shouldBuffer
      src.bufferWithCount(Number.MAX_VALUE)
        .sampledBy(condition.changes()) # spit it out when the condition changes
        .flatMap(Bacon.fromArray)) # split the array into discrete events
    else
      src
Bacon.EventStream :: occurred = ->
  this.map(true).toProperty(false)

So that you could then say like

disconnected = connect.occurred().not()
msgsToSend
  .bufferWhile(disconnected)
  .onValue((msg) -> socket.send(msg))

Pausing and resuming feels a bit too imperative for FRP. I'd rather build new combinators so that you I could write my application code in a more declarative way.

Channels are cool too, but certainly a different abstraction and I'm not sure if these two approaches should breed:)

@hayeah
@hura

Sorry for interrupting randomly. I have just started using bacon.js so this might be wrong:

But I think you should be able to use withStateMachine where you keep pushing arriving messages into the state as long as you're disconnected and flush them out on the "bus" when you're done.

@raimohanska
Owner

@hura I toyed with that idea too, but... The withStateMachine combinator works on a single stream and thus cannot "flush" when an external state changes. You need more combinators to get to the same result as I did with flatMapLatest et al. Feel free to show us your solution and prove me wrong :)

@hura

You're right. It wouldn't be fully reactive since it would only react upon a new item on the main data stream.

There is really a lack of functions which emit an item (or callback) upon receiving ONE OF many input streams (so a dual to zip/mergeAll but instead of an AND you'd use an OR).

Something like a multiplexer with state might be really powerful and open up a many cool things since it's very generic:

Bacon.withStateMachineMux(0, [stream1, stream2, stream3],
  function(currState, whichStream, steam) {
    if(whichStream == 0) {
      // etc
    }
})
@philipnilsson
Collaborator
@hayeah

@raimohanska reporting back on pausable/buffered bus.

I am creating a Property from a Bus. However, before the property is subscribed to all events pushed to the bus is lost, and the property doesn't get an initial value. This is a problem if I can't (or have a hard time) making sure that a property is subscribed to before pushing a value to the bus. For example

bus = new Bacon.Bus()
prop = bus.toProperty()
# first value to be produced at an undetermined time
valueProducer (->
  bus.push("value")
)
# consumer start consuming value at an undetermined time
valueConsumer (->
  prob.onValue -> ...
)

The above code introduces a race condition. If the first value is pushed before the consumer is ready, then the consumer never gets a value. This is different from the bufferWhile use case, while both are about the general idea of decoupling the timing of consumer and producer.

I've looked at the code for Bacon.Bus, it looks like "sink" is set when there's a first subscription. Until then, sink is undefined, and values pushed to bus are simply discarded.

If it's helpful, the following is what I am using Bacon.js for:

I am using Bacon.js with Facebook's React framework. React is a "functional framework", in that it transforms data into virtual DOM tree, and finally a scheduler does the necessary updates in batches for efficiency. As React.js relies on the idea of "data" rather than "changes/events", I've found FRP to be a particularly elegant way to represent changing values (like windows size, connection status, isShowing/isHiding) as a piece of data.

@hura

Note that in your case you want to use subscribe instead of onValue since it'll emit you the intial value. However, this also doesn't work in your case since you don't specify the inital value in the toProperty call.

I would've expected subscribe to give you the initial value (could be seen as a bug IMO, but at least it should be noted in the docs). The wording for toProperty is very specific and says "Without arguments, you'll get a Property without an initial value. The Property will get its first actual value from the stream".

So this prints only "Initial":

var bus1 = new Bacon.Bus();
var bus2 = new Bacon.Bus();
var prop1 = bus1.toProperty();
var prop2 = bus1.toProperty(2); // We give an initial value

bus1.push(1);
bus2.push(2);

prop1.subscribe(function(v){console.log("Prop1: " + v)});
prop2.subscribe(function(v){console.log(v.isInitial()?"Initial":"Just a val")});

which is technically correct but still rather surprising.

FWIF, RxJS solves these cases with:

@raimohanska
Owner
@hayeah
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.