Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add stream-based event bus (A.K.A. Hub) #19478

Closed
rkuhn opened this issue Jan 15, 2016 · 8 comments
Closed

add stream-based event bus (A.K.A. Hub) #19478

rkuhn opened this issue Jan 15, 2016 · 8 comments

Comments

@rkuhn
Copy link
Contributor

rkuhn commented Jan 15, 2016

The input side should be a fair merge of configurable maximum width, where new sources are submitted e.g. via a Source.queue. The output side should be a dynamic broadcast stage of configurable maximum width that has two inputs—one for elements and one for Sinks—and no outputs. A buffer of configurable depth with OverflowStrategy.fail is prepended to each Sink. The bus itself may incorporate a rate limiter to protect against coordinated burst attacks from multiple sources—otherwise these could overload and disconnect all Sinks.

@rkuhn rkuhn added 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:stream labels Jan 15, 2016
@rkuhn rkuhn added this to the stream-backlog milestone Jan 15, 2016
@drewhk
Copy link
Member

drewhk commented Jan 15, 2016

alternative API:

val hub = Hub[T](parameters)
hub.source.to(Sink.println) // attach a receiver
Source.whatever.runWith(hub.sink) // attach a publisher

hub.close() // Termination must be explicit

Every materialization of the Source/Sink of the hub counts as an additional registration. This has the benefit of being lazy (only materialized streams claim resources) and being fusable (since you can reuse the adapters).

I think it would be nice if there would be a possibility to do additional logic inside the hub, so one possibility is to pass it as a Flow:

val hub = Hub[A, B](parameters, logic: Flow[A, Command[B]])

Another possibility is to delegate this to an actor, although the Flow version actually can allow for this via other means.

@drewhk
Copy link
Member

drewhk commented Jan 15, 2016

Btw, in the flow version the rate limiting can be simply done by adding it to the logic flow.

@rkuhn
Copy link
Contributor Author

rkuhn commented Jan 15, 2016

Yes, exactly. And your API proposals make a lot of sense, thanks!

@drewhk
Copy link
Member

drewhk commented Jan 15, 2016

Well, to be fair, @patriknw created similar interface to the PubSub in elsewhere (and I have independently created something like this in a sideproject)

@patriknw
Copy link
Member

:-)

@rkuhn rkuhn removed the 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted label Mar 9, 2016
@rkuhn rkuhn added the 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted label Mar 22, 2016
@rkuhn
Copy link
Contributor Author

rkuhn commented Mar 22, 2016

This might want to get done soonish, possibly with help/input from @jroper.

@ktoso ktoso changed the title add stream-based event bus add stream-based event bus (A.K.A. Hub) Apr 18, 2016
@ktoso ktoso added the medium label Jun 2, 2016
@unoexperto
Copy link

Hey guys, I needed something like this recently and implemented it via hack. Perhaps it'll be useful for the discussion. http://stackoverflow.com/q/38233742/226895

@ktoso
Copy link
Member

ktoso commented Sep 6, 2017

I believe we have it nowadays - it's both the MergeHub and BroadcastHub combined :)

@ktoso ktoso closed this as completed Sep 6, 2017
@ktoso ktoso removed the 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted label Sep 6, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants