Skip to content

Conversation

@raimohanska
Copy link
Contributor

Related to #318 I implemented a flatMapWithConcurrencyLimit method that's a flatMap variant that limits the number of open spawned streams. It buffers incoming events to keep from losing any input.

Then I applied the new combinator to create flatMapConcat that's just a shorthand for the previous with limit==1. The default flatMap implementation can be thought of "flatMapMerge" because it merges the output of spawned streams.

Finally, I implemented rateLimit as a one-liner on top of flatMapConcat. This is the combinator that was requested in #317 and once implemented in #82 as "limitThroughPut".

And then I added one more thing holdWhen that's the same thing as bufferWhile in #300, also based on flatMapConcat. I chose this name not to confuse this method with the "buffer" family of methods or the takeWhile/skipWhile methods.

DON'T MERGE YET!

TODO: still naming..

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delay => minimumInterval ?

@tkurki
Copy link

tkurki commented Jan 19, 2014

rate is concise but the meaning is really "the speed at which something happens over a particular period of time", which is the inverse of minimum interval, eg. if the minimum interval is 200 ms the max rate is 5 per second. Same goes for throughput.

limitForInterval or limitForMinimumInterval? First I thought of bufferForMinimumInterval, but doesn't work with the other bufferWith* method logic, even if it describes what is happening.

@tkurki
Copy link

tkurki commented Jan 19, 2014

How would minIntervalWithBuffer(200,1000) sound? Pretty verbose.

@raimohanska
Copy link
Contributor Author

Bug found: 116cf76

@raimohanska
Copy link
Contributor Author

I might go for limitThroughput(minIntervalMs). We may add an extra parameter for setting maximum buffer size. The same extra param should be added to flatMapConcat somehow, too.

@raimohanska
Copy link
Contributor Author

In case you want to try out this code, you can dl it here

@raimohanska
Copy link
Contributor Author

There's a stack overflow issue yet to be solved. Fiddle here: http://jsfiddle.net/865yn/1/

@raimohanska
Copy link
Contributor Author

The stack overflow occurs here because flatMapWithConcurrencyLimit flushes its buffer using a recursive algorithm. This can be fixed.

@raimohanska
Copy link
Contributor Author

Well, now the test passes, but if chunk size is increased still, there will be another stack overflow.

I don't think the problem has much to do with this particular PR, but is in fact a more general issue when dealing with "big" accumulation, where javascript recursion fails.

It could also be said that buffering thousands of events and then flushing them at once is usually not good business anyway.

I vote we ship this stuff in now and consider the "big accumulation" issue separately. I added a test in 1cbd648 for finding the maximum supported "fold length" which is slightly above 5000 iterations on node.

@raimohanska
Copy link
Contributor Author

Oops, it seems I fixed it.

Added support for {eager: true} in scan/fold. This is used now in holdWhen to prevent stack overflow.

@codeflows
Copy link

flatMapWithConcurrencyLimit is exactly what we need. Using it from the branch currently, any idea if it will be in master soon?

@raimohanska
Copy link
Contributor Author

Yeah, this PR had been hanging around for a while. I just rebased it against master to make it mergeable again.

@raimohanska
Copy link
Contributor Author

Need to update readme (readme-src.coffee, then grunt readme) to include all the changed, then we can merge. Who's up for the task?

Oh, and naming rateLimit. Should we called limitThroughput or limitMinimumInterval now?

@phadej
Copy link
Member

phadej commented May 10, 2014

I can update the readme. limitMinimumInterval is IMHO better, as not so sensible to typos. Also throughput would probably take a frequency (or amount of events, say per second).

@phadej
Copy link
Member

phadej commented May 10, 2014

Another comment: should we invert the holdWhen to be positive, e.g. pass the events thru if valve is positive. i.e. the predicate to work similarly as with 'filter'. ?

@phadej
Copy link
Member

phadej commented May 10, 2014

Here's a docs: feel free to cherry-pick and edit: phadej@111a5ff

@raimohanska
Copy link
Contributor Author

Thanks @phadej, I included your docs!

What about bufferingThrottle for a name? That would associate it with throttle, whose close cousin it really is, the different being that throttle discards the overflow while bufferingThrottle buffers it.

I appreciate @phadej's suggestion of inverting holdWhen and am waiting for a name suggestion :)

@phadej
Copy link
Member

phadej commented May 14, 2014

@raimohanska: one way is to scan thru plumbing vocabulary: http://en.wikipedia.org/wiki/Control_valves for a new name...

which btw makes me think about, how these are then related:

var a = stream.filter(f);
var b = stream.controlValve(stream.map(f));

@raimohanska
Copy link
Contributor Author

Same kinda relationship as with throttle and bufferingThrottle. But bufferingFilter ain't so good, because it's not really filtering anything.

@phadej
Copy link
Member

phadej commented May 14, 2014

Ah, wrote the last post too fast:

Let us have stream.controlValve(@ : EventStream[A], ControlValue | Observable[ControlValue]) : EventStream[A], where ControlValue = Pass | Hold | Drop, then we can generalize both
holdWhen and filter (actually also throttle and bufferingThrottle, using delay) :

stream.filter = (p) ->
  @controlValue(@map((x) -> if p(x) then Pass else Drop))

stream.holdWhen = (valve) ->
  @controlValue(valve.map((x) -> if x then Pass else Hold))

@raimohanska
Copy link
Contributor Author

@phadej you're right in that controlValve could be used to build other combinators upon, but it might be a bit heavyweight base for implement simple things like filter. If you feel like experimenting, please do, but I'm now focused on gettings these new methods into master first.

@raimohanska raimohanska merged commit ea61aa4 into master May 22, 2014
@raimohanska
Copy link
Contributor Author

wat wat, it's in master!

@raimohanska raimohanska changed the title Add flatMapWithConcurrencyLimit, flatMapConcat, rateLimit, holdWhen Add flatMapWithConcurrencyLimit, flatMapConcat, bufferingThrottle, holdWhen May 22, 2014
@raimohanska
Copy link
Contributor Author

So I went with bufferingThrottle in the end and released 0.7.13.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants