Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support BroadcastChannel(Channel.UNLIMITED) #736

Closed
clarfonthey opened this issue Oct 19, 2018 · 11 comments
Closed

Support BroadcastChannel(Channel.UNLIMITED) #736

clarfonthey opened this issue Oct 19, 2018 · 11 comments

Comments

@clarfonthey
Copy link

This was mentioned in #254 but never had an issue created for it.

@elizarov
Copy link
Contributor

Can you please clarify what your use-case for that?

@clarfonthey
Copy link
Author

clarfonthey commented Oct 20, 2018

I don't really see why this needs a use case, but I want to not bottleneck the producer on consumers being slow, and not bottleneck one consumer because its neighbours are slow. While arguably a huge speed difference might be an indication of a bug, I want my program to be resilient to this, and I also test that my program can handle these sorts of cases by simulating increased latency on multiple ends.

Right now, in the tests I have that do this, I have to simply use a large capacity instead of UNLIMITED, which wastes memory instead of dynamically allocating memory as needed.

Plus, it's very convenient to be able to use UNLIMITED rather than having to figure out what an ideal number is, when I want it to be larger than one but not quite sure how much larger.

@elizarov
Copy link
Contributor

elizarov commented Oct 20, 2018

But if slow consumers don't provide back-pressure to slow down producers, then you are eventually running out of memory. So what would be use-case for that? Why would you prefer that as opposed to proper back-pressure support?

@clarfonthey
Copy link
Author

Personally, I'd rather have an OOM error show up than my program just being slow and me not knowing why. Also, in my case the producers send the data to two places: one on disk and a second down a channel to be processed, so that if execution stops suddenly it can repeat the work it's done.

In my workflow specifically, I end up doing large web requests that can sometimes take a while to finish. Because it takes a while, it's sometimes useful for me to start the pipeline after I've added the first few stages, continue to develop it, then start it over again without repeating any work. In these cases, it's very useful to have a "just use as much memory as you need" option, because even if it ends up breaking before it finishes, in the time until it got there, I managed to get some real work done. And, chances are that by the time I've finished developing the entire thing, I've managed to fix the bugs that resulted in the congestion.

To clarify, I completely agree with you that in most cases, you'll want to properly limit the size of the queue. But I do think that it's very reasonable to provide an unlimited option for BroadcastChannel in addition to Channel.

@jcornaz
Copy link
Contributor

jcornaz commented Oct 22, 2018

I completely agree with you that in most cases, you'll want to properly limit the size of the queue. But I do think that it's very reasonable to provide an unlimited option for BroadcastChannel in addition to Channel.

I aggree. This is the same for channels and we do have unlimited channels. Most of the time I'll favor a rendez-vous channel or a fixed buffer. But Channel.UNLIMITED is useful from time to time.

For instance, sometime (often in our code-base) we know that the publisher won't have to publish a crazy amount of messages, and therefore it is safe to use an unlimited buffer, because the OOM is very unlikely, or even impossible. In theses cases, it is sad that we have to either suspend the publisher (using small fixed-buffer) or waste memory before even knowing how much we need (using big fixed-buffer).

@clarfonthey
Copy link
Author

@elizarov are you still looking for more clarification on this?

@ScottPierce
Copy link
Contributor

@elizarov This should be something that's supported. Here is an example:

I use the following class to create a delegated property that when it's set, or read, it delegates to an android / iOS SharedPreferences or UserPreferences object. I offer a channel on the delegate so that changes to the data can be watched:

abstract class AbstractDelegate<T>(protected val store: Store) : ReadWriteProperty<Any?, T> {
  private val broadcastChannel: BroadcastChannel<T> by lazy {
    BroadcastChannel<T>(10).apply { offer(performGet()) }
  }
  protected abstract fun performGet(): T
  protected abstract fun performSet(value: T)

  final override fun getValue(thisRef: Any?, property: KProperty<*>): T {
    return performGet()
  }

  final override fun setValue(thisRef: Any?, property: KProperty<*>, value: T) {
    performSet(value)
    broadcastChannel.offer(value) // Always succeeds for ConflatedBroadcastChannel
  }

  fun createChannel(): ReceiveChannel<T> = broadcastChannel.openSubscription()
}

I have to make an odd trade off between allocating too much memory in the ArrayBroadcastChannel, verses choosing how many items I want to allow to queue before I start losing data to my subscribers. This is quite annoying, and while unlikely, if data is every written quickly multiple times, then I'll lose data to my subscribers. This is quite annoying, especially since this issue still technically exists, even if I increase the queue to something stupid like 100

Offering an Unlimited / LinkedList backed BroadcastChannel would work around this by offering a much nicer.

I'm also forced to make decisions like this every time my team uses a BroadcastChannel to send UI events from a View to a Presenter as well. It's far more convenient to use broadcastChannel.offer(event), but my team has to effectively ban doing that as a bad practice, and force ourselves for every event to do something like:

GlobalScope.launch(Dispatchers.Main) {
  broadcastChannel.send(event)
}

This is annoying. The dev has to remember to use Dispatchers.Main or risk a race condition existing, and then the dev can just as easily accidentally use broadcastChannel.offer, and risk the possibility of losing events.

It would be far better to just be able to use BroadcastChannel(), and have it default to the safest option of an unlimited queue. Our teams should be protected common edge cases like this by default. This is not an issue that people are used to solving for coming from the Rx world.

@elizarov
Copy link
Contributor

elizarov commented Feb 6, 2019

Thanks for description of various use-cases. All (well almost all) of them make sense.

@ScottPierce One question is about watching preference. Wouldn't ConflatedBroadcastChannel be more appropriate here? It looks like your code should be only interested in the most recently set value for a preference.

@qwwdfsad
Copy link
Member

Won't be implemented. A corresponding DataFlow/StateFlow will be implemented instead

@ScottPierce
Copy link
Contributor

ScottPierce commented Dec 13, 2019

@qwwdfsad Is there a ticket we can follow for that? Also - can you help me understand if the DataFlow/ StateFlow will solve my use-case? I tried to migrate a few weeks back from a BroadcastChannel to a Flow, but ended up deciding that a Channel was best.

One question is about watching preference. Wouldn't ConflatedBroadcastChannel be more appropriate here? It looks like your code should be only interested in the most recently set value for a preference.

@elizarov Sorry - I never saw your response. So in a lot of cases it's likely ConflatedBroadcastChannel would suffice, however in the case that someone wants to receive all changes to a value, it wouldn't. Given that I'm making a library, and my teams have encountered the situation where they've needed all change events, this is still something that I have to support.

When using RxJava there were several cases where I explicitly used PublishSubject as opposed to BehaviorSubject, and I have no equivalent with coroutines.

@qwwdfsad
Copy link
Member

qwwdfsad commented Dec 13, 2019

#1082 is the closest one. Real-world examples of both PublishSubject and BehaviorSubject (and their potential flow counterparts) will be extremely helpful for us there.

After a bit of investigation and prototyping, we've found that BroadcastChannel is not really a good abstraction: it is neither Flow nor Channel :)
A long-term plan is to deprecate BroadcastChannel and replace it with corresponding *Flow primitives that should cover most of the BroadcastChannel/PublishSubject/BehaviorSubject usages.

can you help me understand if the DataFlow/ StateFlow will solve my use-case?

Sure,broadcastChannel in your example will be replaced with StateFlow (for conflated case) or DataFlow(capacity) (capacity is potentially unlimited) and instead of createChannel you just expose the flow and collect it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants