Add flatMapWithConcurrencyLimit, flatMapConcat, bufferingThrottle, holdWhen #324

Merged
merged 29 commits into from May 22, 2014

Projects

None yet

4 participants

@raimohanska
Contributor

Related to #318 I implemented a flatMapWithConcurrencyLimit method that's a flatMap variant that limits the number of open spawned streams. It buffers incoming events to keep from losing any input.

Then I applied the new combinator to create flatMapConcat that's just a shorthand for the previous with limit==1. The default flatMap implementation can be thought of "flatMapMerge" because it merges the output of spawned streams.

Finally, I implemented rateLimit as a one-liner on top of flatMapConcat. This is the combinator that was requested in #317 and once implemented in #82 as "limitThroughPut".

And then I added one more thing holdWhen that's the same thing as bufferWhile in #300, also based on flatMapConcat. I chose this name not to confuse this method with the "buffer" family of methods or the takeWhile/skipWhile methods.

DON'T MERGE YET!

TODO: still naming..

@tkurki tkurki commented on an outdated diff Jan 19, 2014
spec/BaconSpec.coffee
@@ -853,6 +886,32 @@ describe "EventStream.throttle(delay)", ->
it "toString", ->
expect(Bacon.never().throttle(1).toString()).to.equal("Bacon.never().throttle(1)")
+describe "EventStream.rateLimit(delay)", ->
@tkurki
tkurki Jan 19, 2014

delay => minimumInterval ?

@tkurki tkurki commented on the diff Jan 19, 2014
src/Bacon.coffee
@@ -518,18 +550,10 @@ flatMap_ = (root, f, firstOnly) ->
Bacon.more
else
return Bacon.noMore if composite.unsubscribed
- child = makeObservable(f event.value())
- composite.add (unsubAll, unsubMe) -> child.subscribe (event) ->
- if event.isEnd()
- checkEnd(unsubMe)
- Bacon.noMore
- else
- if event instanceof Initial
- # To support Property as the spawned stream
- event = event.toNext()
- reply = sink event
- unsubAll() if reply == Bacon.noMore
- reply
+ if limit and composite.count() > limit
@tkurki
tkurki Jan 19, 2014

How about a sensible upper bound for queue size? It is really easy to create a leak if you keep pushing more than the limit specifies.

@raimohanska
raimohanska Jan 19, 2014 Contributor

Probably a good idea, but "sensible" is a flexible term. So, we should be able to specify that in the API. How would the API look now?

@tkurki
tkurki commented Jan 19, 2014

rate is concise but the meaning is really "the speed at which something happens over a particular period of time", which is the inverse of minimum interval, eg. if the minimum interval is 200 ms the max rate is 5 per second. Same goes for throughput.

limitForInterval or limitForMinimumInterval? First I thought of bufferForMinimumInterval, but doesn't work with the other bufferWith* method logic, even if it describes what is happening.

@tkurki
tkurki commented Jan 19, 2014

How would minIntervalWithBuffer(200,1000) sound? Pretty verbose.

@raimohanska
Contributor

Bug found: 116cf76

@raimohanska
Contributor

I might go for limitThroughput(minIntervalMs). We may add an extra parameter for setting maximum buffer size. The same extra param should be added to flatMapConcat somehow, too.

@raimohanska
Contributor

In case you want to try out this code, you can dl it here

@raimohanska
Contributor

There's a stack overflow issue yet to be solved. Fiddle here: http://jsfiddle.net/865yn/1/

@raimohanska
Contributor

The stack overflow occurs here because flatMapWithConcurrencyLimit flushes its buffer using a recursive algorithm. This can be fixed.

@raimohanska
Contributor

Well, now the test passes, but if chunk size is increased still, there will be another stack overflow.

I don't think the problem has much to do with this particular PR, but is in fact a more general issue when dealing with "big" accumulation, where javascript recursion fails.

It could also be said that buffering thousands of events and then flushing them at once is usually not good business anyway.

I vote we ship this stuff in now and consider the "big accumulation" issue separately. I added a test in 1cbd648 for finding the maximum supported "fold length" which is slightly above 5000 iterations on node.

@raimohanska
Contributor

Oops, it seems I fixed it.

Added support for {eager: true} in scan/fold. This is used now in holdWhen to prevent stack overflow.

@codeflows

flatMapWithConcurrencyLimit is exactly what we need. Using it from the branch currently, any idea if it will be in master soon?

@raimohanska
Contributor

Yeah, this PR had been hanging around for a while. I just rebased it against master to make it mergeable again.

@raimohanska
Contributor

Need to update readme (readme-src.coffee, then grunt readme) to include all the changed, then we can merge. Who's up for the task?

Oh, and naming rateLimit. Should we called limitThroughput or limitMinimumInterval now?

@phadej
Member
phadej commented May 10, 2014

I can update the readme. limitMinimumInterval is IMHO better, as not so sensible to typos. Also throughput would probably take a frequency (or amount of events, say per second).

@phadej
Member
phadej commented May 10, 2014

Another comment: should we invert the holdWhen to be positive, e.g. pass the events thru if valve is positive. i.e. the predicate to work similarly as with 'filter'. ?

@phadej
Member
phadej commented May 10, 2014

Here's a docs: feel free to cherry-pick and edit: phadej@111a5ff

@raimohanska
Contributor

Thanks @phadej, I included your docs!

What about bufferingThrottle for a name? That would associate it with throttle, whose close cousin it really is, the different being that throttle discards the overflow while bufferingThrottle buffers it.

I appreciate @phadej's suggestion of inverting holdWhen and am waiting for a name suggestion :)

@phadej
Member
phadej commented May 14, 2014

@raimohanska: one way is to scan thru plumbing vocabulary: http://en.wikipedia.org/wiki/Control_valves for a new name...

which btw makes me think about, how these are then related:

var a = stream.filter(f);
var b = stream.controlValve(stream.map(f));
@raimohanska
Contributor

Same kinda relationship as with throttle and bufferingThrottle. But bufferingFilter ain't so good, because it's not really filtering anything.

@phadej
Member
phadej commented May 14, 2014

Ah, wrote the last post too fast:

Let us have stream.controlValve(@ : EventStream[A], ControlValue | Observable[ControlValue]) : EventStream[A], where ControlValue = Pass | Hold | Drop, then we can generalize both
holdWhen and filter (actually also throttle and bufferingThrottle, using delay) :

stream.filter = (p) ->
  @controlValue(@map((x) -> if p(x) then Pass else Drop))

stream.holdWhen = (valve) ->
  @controlValue(valve.map((x) -> if x then Pass else Hold))
@raimohanska
Contributor

@phadej you're right in that controlValve could be used to build other combinators upon, but it might be a bit heavyweight base for implement simple things like filter. If you feel like experimenting, please do, but I'm now focused on gettings these new methods into master first.

@raimohanska raimohanska merged commit ea61aa4 into master May 22, 2014

1 check was pending

continuous-integration/travis-ci The Travis CI build is in progress
Details
@raimohanska
Contributor

wat wat, it's in master!

@raimohanska raimohanska changed the title from Add flatMapWithConcurrencyLimit, flatMapConcat, rateLimit, holdWhen to Add flatMapWithConcurrencyLimit, flatMapConcat, bufferingThrottle, holdWhen May 22, 2014
@raimohanska
Contributor

So I went with bufferingThrottle in the end and released 0.7.13.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment