Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stateless conflated Broadcast channel #1756

Closed
orwir opened this issue Jan 11, 2020 · 4 comments
Closed

Stateless conflated Broadcast channel #1756

orwir opened this issue Jan 11, 2020 · 4 comments
Assignees

Comments

@orwir
Copy link

orwir commented Jan 11, 2020

I'm trying to create events handling based on coroutines (ConflatedBroadcastChannel specifically) but faced an issue that currently it is not possible to have stateless broadcast channel. Events like button clicks and some notifications relevant only in a small time frame (just when they appeared) and preserving state in the "event bus" leads to unexpected behavior. So question is can we have "stateless flag" in ConflatedBroadcastChannel or probably new class to deal with it?

I created small class and it works fine, but might be better to have it natively.

@ExperimentalCoroutinesApi
class StatelessBroadcastChannel<T> constructor(
    private val broadcast: BroadcastChannel<T> = ConflatedBroadcastChannel()
) : BroadcastChannel<T> by broadcast {

    override fun openSubscription(): ReceiveChannel<T> = broadcast
        .openSubscription()
        .apply { poll() }

}
@elizarov
Copy link
Contributor

Have you tried using ArrayBroadcastChannel? It keeps no "state". See https://atoulme.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-array-broadcast-channel/index.html
Use can create it with BroadcastChannel(capacity).

@orwir
Copy link
Author

orwir commented Jan 16, 2020

Yep, it doesn't preserve a state but it suspends send function while buffer is full. So I get a queue of stale events if my emitter too fast or consumer too slow.

Closest examples to a thing I'm looking for are PublishProcessor and LiveEvent.

Edit
It should be something like ArrayBroadcastChannel(1) with onBackpressureLatest()

@elizarov
Copy link
Contributor

@orwir You can get it now with BroadcastChannel(1).asFlow().conflate().

However, I'm a bit lost in your specific use-case. You originally said that you are looking for a mechanism to handle "events like button clicks and some notifications relevant only in a small time frame" and it looks like onBackpressureLatest() (aka conflate in Kotlin Flow) is not really appropriate here, since any delay or hiccup in the subscribing code would immediately result in the events being lost. What am I missing here?

@orwir
Copy link
Author

orwir commented Jan 17, 2020

@elizarov It' seems I just misunderstood how I can work with channels. BroadcastChannel(1).asFlow().conflate() is exactly I'm looking for. Thanks for pointing out. You can close the issue.
Seems I need to read once again documentation about channel/flows.

And as for conception here. Let's say I need to create to event buses. One if for important event which shouldn't be lost at all and here I'll use BroadcastChannel(1).asFlow() and the second one with low importance (lost one or two events wouldn't be critical). Question was regarding second bus. It will show Snackbars (android) and flooding with tons of them (hypothetical case) will be definitely not good. So .coflate() here would leave only latest one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants