Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

We need broadcast channels, stat. #204

Closed
goodboy opened this issue Apr 13, 2021 · 1 comment
Closed

We need broadcast channels, stat. #204

goodboy opened this issue Apr 13, 2021 · 1 comment
Labels
api discussion experiment Exploratory design and testing help wanted Extra attention is needed

Comments

@goodboy
Copy link
Owner

goodboy commented Apr 13, 2021

This ties in with #40 and #53 both of which heavily rely on the discussion in python-trio/trio#987.

The main gotcha is that trio's mem chans don't have broadcasting built-in (frankly making them mostly an SC anti-pattern if you have followed the .clone() semantics discussion). This mis-assumption was what originally fueled #203.

The obvious main use case is for broadcasting stream data on the receive side of a stream such that multiple rx-side tasks get each their own copy of a new message. NB: We already have a prototype for producer side ala tractor.msg.pub, though I'm not sure it's the best solution either.

Depending on how we move forward with bidirectional-streaming in #53 it might be worth offering a .clone() or similar method which provides a broadcast reference token to consumer tasks.

Definitely needs a deeper dive.

For reference here's the current short list of broadcast chan impls:

Any more implementations are greatly welcome from the lurker pack 😉

@goodboy goodboy added help wanted Extra attention is needed discussion experiment Exploratory design and testing api labels Apr 13, 2021
@goodboy goodboy mentioned this issue May 2, 2021
9 tasks
@goodboy
Copy link
Owner Author

goodboy commented Aug 6, 2021

Interesting stuff from the rust method njs linked: https://tokio-rs.github.io/tokio/doc/tokio/sync/broadcast/index.html#lagging

This broadcast channel implementation handles this case by setting a hard upper bound on the number of values the channel may retain at any given time. This upper bound is passed to the channel function as an argument.

If a value is sent when the channel is at capacity, the oldest value currently held by the channel is released. This frees up space for the new value. Any receiver that has not yet seen the released value will return RecvError::Lagged the next time recv is called.

I kind of like this since it's very reminiscent of real-time DSP style overruns and would allow for detecting too-slow consumers via errors (that may allow restart logic to entail?).

goodboy added a commit that referenced this issue Aug 15, 2021
Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast
receiver around the stream for use by multiple actor-local consumer
tasks. Entering this context manager idempotently mutates the stream's
receive machinery which for now can not be undone. Move `.clone()` to
the receive stream type.

Resolves #204
goodboy added a commit that referenced this issue Sep 1, 2021
Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast
receiver around the stream for use by multiple actor-local consumer
tasks. Entering this context manager idempotently mutates the stream's
receive machinery which for now can not be undone. Move `.clone()` to
the receive stream type.

Resolves #204
goodboy added a commit that referenced this issue Sep 1, 2021
Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast
receiver around the stream for use by multiple actor-local consumer
tasks. Entering this context manager idempotently mutates the stream's
receive machinery which for now can not be undone. Move `.clone()` to
the receive stream type.

Resolves #204
goodboy added a commit that referenced this issue Sep 2, 2021
Add `ReceiveMsgStream.subscribe()` which allows allocating a broadcast
receiver around the stream for use by multiple actor-local consumer
tasks. Entering this context manager idempotently mutates the stream's
receive machinery which for now can not be undone. Move `.clone()` to
the receive stream type.

Resolves #204
@goodboy goodboy closed this as completed in 2d1c241 Sep 3, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api discussion experiment Exploratory design and testing help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

1 participant