Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow.share operator #1261

Closed
elizarov opened this issue Jun 7, 2019 · 62 comments
Closed

Flow.share operator #1261

elizarov opened this issue Jun 7, 2019 · 62 comments

Comments

@elizarov
Copy link
Contributor

elizarov commented Jun 7, 2019

The share() operator operates on Flow<T> and returns Flow<T>. It shall have the following semantics. The resulting flow is cold, but when one collector shart collecting from it, the it starts to collect from the upstream flow, activating the emitter upstream. The trick of the share operator is that when additional collectors appear in the downstream, they all "share" the same upstream emitter.

For example, consider the flow:

val flow = flow { 
    var i = 0
    while(true) { 
        delay(1000) 
        println("Emit $i")
        emit(i++)
    }
}

If you launch two collectors:

launch { flow.collect { println("A: got $it") } }
launch { flow.collect { println("B: got $it") } }

Then you shall see "Emit 0 / A: got 0 / Emit 0 / B: got 0 / Emit 1 / A: got 1 / Emit 1 / B: got 1 / ...".

However, if you change the flow to val flow = flow { /* same */ }.share(), then you shall see "Emit 0 / A: got 0 / B: got 0 / Emit 1 / A: got 1 / B: got 1 / ...", that is one emission gets delivered to both collectors.

Now if you need to artificially "start" the shared flow simply to keep it active, then you can always launch a dummy collector: launch { flow.collect {} } that works as a "reference" which is active until you cancel the resulting job.

TBD: Share operator might need some configuration with the respect to how much "history" to keep in memory for "late collectors". So far it seems that one non-negative integer is enough (with zero -- new collector don't get any history, with one -- only the most recent value, with more -- the specified number of recent values). What is unclear is what shall be the default value (if any).

UPDATE: It will have to be, actually, a shareIn(scope) operator. Otherwise, the scope it works in will be bound the the first collectors and when this callector is cancelled it will not be able to maintain emissions to other collectors.

@matejdro
Copy link

matejdro commented Jun 7, 2019

This looks good as #1086 implementation. One thing missing is the sharing timeout, which use case is described in #1086 (comment)

@streetsofboston
Copy link

One question about 'starting' a shared/published flow through launch { flow.collect {} }.

In #1086 , starting a shared Flow could only happen on a Flow that was already shared, because connect() could only be invoked on a ConnectableFlow. Any subsequent subscribers/cosumers/collectors to this shared flow would not restart the producer of the flow.

Say I want to create an extension function out of launch { flow.collect {} } called connect:

fun <T> Flow<T>.connect(scope: CoroutineScope) = scope.launch { collect {} }

I can't be sure that the receiver of the connect function is a shared/published Flow.
The argument for exposing a shared/published flow as a separate type, as a ConnectableFlow, is that I can then write this extension function as follows:

fun <T> ConnectableFlow<T>.connect(scope: CoroutineScope) = scope.launch { collect {} }

and non-shared/non-published Flows won't be able to get 'connect'ed.

@zach-klippenstein
Copy link
Contributor

What happens to the cached "history" when moving from one to zero collectors (cancelling the upstream collection), then back to one? Does the history get preserved across upstream collections or lost?

@streetsofboston
Copy link

streetsofboston commented Jun 7, 2019

@zach-klippenstein That is why Rx does not use share() for a shared cache, but cache(). The number of subscribers governs when the flow/stream starts, but it doesn't govern when the flow/stream stops.

With Rx' ConnectableObservable's refCount(), when there is at least 1 subscriber, the Flowable starts. When there are no more subscribers the Flowable stops.
With Rx' ConnectableObservable's autoConnect(), when there is at least 1 subscriber, the Flowable starts but even when there are no more subscribers, the Flowable continues. the 'history' won't get lost.

@zach-klippenstein
Copy link
Contributor

What is unclear is what shall be the default value (if any).

To me, the name "share" implies only multicasting, so zero would be a reasonable default. The caching/replaying behavior is an additional feature that is often used along with multicasting, but not necessarily implied by the name.

@zach-klippenstein
Copy link
Contributor

@streetsofboston That's how Rx works, yes, but I was asking for clarification about this operator because I didn't see it specified in the issue description.

I believe share(0) as is proposed here would be equivalent to RxJava's share() (i.e. publish().refCount()), and share(n) would be equivalent to RxJava's replay(n).refCount().

The behavior of saving the cache across "connections" would be similar to Jake Wharton's RxReplayingShare.

@matejdro
Copy link

matejdro commented Jun 7, 2019

What happens to the cached "history" when moving from one to zero collectors (cancelling the upstream collection), then back to one? Does the history get preserved across upstream collections or lost?

I would argue that this should be an option, since it could be useful to have it preserve data over cancelling (for example to use this flow as a memory cache if data takes a while to load after subscribing) or it would not be useful (for example when data from the flow changes frequently, rendering old values obsolete), depending on the context

What is unclear is what shall be the default value (if any).

I think that there shouldn't be any default value, but startWith operator could be added for this like RX.

@streetsofboston
Copy link

streetsofboston commented Jun 7, 2019

@zach-klippenstein That's how I modeled my prototype implementation of the ConnectableFlow as well, for the share() and cache() functions: https://gist.github.com/streetsofboston/39c3f8c882c9891b35e0ebc3cd812381

Roman wants to not expose yet another type (ConnectableFlow) from the library and only add one more function called share() for the most-used use-cases so that the Flow api doesn't become bloated. I agree with this sentiment. Keep it as small as possible.

And maybe the argument in favor of exposing a new ConnectableFlow type is strong enough (see my previous comment about a connect() extension function) so that it will be added to the Flow api ....

In the end it's up to the maintainers of kotlinx.coroutines to weigh these factors and make a decision :-)

However, I don't think overloading the proposed share() operator with caching functionality (share when size == 0, cache when size > 0) is a good idea. Better to spell out the functions' responsibilities and have a share() function without a size parameter that shares and a cache() function with size parameter that caches.

Maybe a separate, but related, issue should be opened for a Flow.cache operator.

@streetsofboston
Copy link

@matejdro With the ConnectableFlow from my gist, either would be possible:

Cache survives re-activations of the flow:
flow.replay(size).autoConnect()

Cache does not survive re-activations of the flow:
flow.replay(size).refCount()

@elizarov
Copy link
Contributor Author

elizarov commented Jun 7, 2019

This looks good as #1086 implementation. One thing missing is the sharing timeout, which use case is described in #1086 (comment)

Thanks for timeout reminder. I'll see how it can be incorporate here is some nicer way than just adding an additional "timeout" parameter to this operator.

@elizarov
Copy link
Contributor Author

elizarov commented Jun 7, 2019

Say I want to create an extension function out of launch { flow.collect {} } called connect:

We actually plan this kind of operator, tentatively to be called launchIn(scope) for all kinds of flows. The only missing piece in this operator's design is user-friendly error-handling.

@elizarov
Copy link
Contributor Author

elizarov commented Jun 7, 2019

However, I don't think overloading the proposed share() operator with caching functionality (share when size == 0, cache when size > 0) is a good idea. Better to spell out the functions responsibilities and have a share() function without a size parameter that shares and a cache() function with size parameter that caches.

I don't see how to decouple share and cache functionality. It seems that you cannot cache without sharing, so we can just as well have a single operator with optional integer parameter that defaults to 0. No need to overload, because share() is conceptually same as cache(0).

@fvasco
Copy link
Contributor

fvasco commented Jun 7, 2019

@elizarov

you can always launch a dummy collector: launch { flow.collect {} }

But the scope remains active until job is working

@streetsofboston
Copy link

@fvasco
That's true. It can be cancelled, though.
And it will resume when, in case of share(), all subscribers/collectors stop collecting.

@streetsofboston
Copy link

I don't see how to decouple share and cache functionality
My prototype implementation decouples the two. It requires two private implementations of ConnectableFlow, though...

@elizarov
Copy link
Contributor Author

elizarov commented Jun 8, 2019

My prototype implementation decouples the two. It requires two private implementations of ConnectableFlow, though...

I actually envision a single implementation where "replay size" (of zero or more) is just a configuration parameter.

@LouisCAD
Copy link
Contributor

LouisCAD commented Jun 8, 2019

That proposal lookg good. It leaves me wondering about Throwables handling though:

  1. How should exceptions/throwables be handled when there is replay? Are they replayed too? Or is throwing illegal if shared?
  2. How to recover from an exception/throwable in a share session? Thinking about end-user point of view (e.g. of a mobile/desktop app) can help find possibilities.

@elizarov
Copy link
Contributor Author

elizarov commented Jun 8, 2019

If you need to handle errors for all collectors in a shared flow (that is, handle emitter's errors), then you have to put error-handling operators before you call share(). Btw, take a look at the error handling proposal: #1263

@fvasco
Copy link
Contributor

fvasco commented Jun 8, 2019

My consideration is that the launch trick has non-trivial consequences.
This can be related to #1065

@streetsofboston
Copy link

I actually envision a single implementation where "replay size" (of zero or more) is just a configuration parameter.

You can design it to just have one "share" method with a replay-size parameter, but my point was that decoupling the share and cache use cases is possible. I'd argue for decoupling, but you can argue for one "share" method as well.

@zach-klippenstein
Copy link
Contributor

Cache survives re-activations of the flow:
flow.replay(size).autoConnect()

@streetsofboston This is slightly different. AutoConnect preserves the upstream connection as well. RxReplayingShare will still disconnect upstream when the ref count hits zero (like the refCount operator), but it will preserve the cache and continue to replay the cache to new subscribers. The latter is useful if the upstream subscription represents some expensive resource that should be shutdown if nobody is actively listening, but you still want to have some initial value to emit immediately if the upstream takes a while to "warm up", like @matejdro suggested.

In general I am wary of autoConnect in Rx because it's so easy to misuse and accidentally leak the upsteam connection. Structured concurrency makes that a lot safer, since the connection requires the collection to happen in a particular scope.

I think that there shouldn't be any default value, but startWith operator could be added for this like RX.

@matejdro The function of startWith, at least as Rx defines it, is unrelated to multicasting and caching, but I see what you meant. I read that open question as referring to the "default value" of the cache size parameter - zero, one, or something else - not the default value of the cache itself, which, as you said, would just be empty until the upstream emits something.

@zach-klippenstein
Copy link
Contributor

zach-klippenstein commented Jun 8, 2019

In @streetsofboston's proposal, share and cache are "sibling" operators with disjoint functionality, but both are composed of the same two steps: of multicasting and automatic connection. I also prefer that naming because caching implies sharing, but sharing does not imply caching.

You could also make share and cache coordinate through operator fusion (like the recent change to buffer), but it's unclear what cache would do if not adjacent to share, since it doesn't make sense to cache for a single collector:

flow.share() // returns a multicasted flow
  .cache(1) // configures the multicasted flow to cache 1 emission

I don't mind using a single operator for both, as long as the operator name communicates that it does both multicasting and caching (e.g. shareReplaying).

@LouisCAD
Copy link
Contributor

LouisCAD commented Jun 8, 2019

Having a cache() operator for Flow would be misleading as you might use it incorrectly on any Flow. The fact that a flow is shared and has cache is an implementation detail, and we should avoid operators that make assumptions on the source Flow. Since cache only makes sense when the Flow is shared, and needs to be applied only on the sharing flow, not by the consumers, it should be a configuration, or an overload of the share operator.

@voddan
Copy link

voddan commented Jun 11, 2019

How will the updated signature shareIn(scope) work with several dynamically connecting and disconnecting collectors? E.i. there are use cases for the share operator that require the flow to keep track of the scopes of its consumers. Here is a quote from a Slack message with such a use case:

What I'm looking is to create long time operation that persists through multiple scopes. For example:

  1. Screen A (with Scope A) starts long operation
  2. User presses a button to switch to Screen B
  3. Screen B (with Scope B) also starts this long operation, but since this operation is already running, it would just somehow "add its scope"
  4. Screen A is closed, so Scope A that started the operation is cancelled. But since Scope B is still using this operation, it would not stop until Scope B is also cancelled.

@elizarov
Copy link
Contributor Author

shareIn(GlobalScope) would do the trick.

@voddan
Copy link

voddan commented Jun 11, 2019

shareIn(GlobalScope) would do the trick.

Would it close the flow if all of the consumers are cancelled? In terms of the above example that would mean canceling the long-running operation in the flow if both consumers, Screen A and Screen B, are closed.

@LouisCAD
Copy link
Contributor

@voddan Yes, that's the whole point of having a share operator for Flow instead of a ConflatedBroadcastChannel that you have to close manually.

RBusarow pushed a commit to RBusarow/kotlinx.coroutines that referenced this issue Dec 16, 2019
@radityagumay
Copy link

@digitalbuddha did the trick here https://github.com/dropbox/Store/blob/master/multicast/src/main/kotlin/com/dropbox/flow/multicast/Multicaster.kt

@matejdro
Copy link

matejdro commented Jan 3, 2020

Here is my take on the share operator: https://gist.github.com/matejdro/a9c838bf0066595fb52b4b8816f49252

It supports conflating and timeouts described in #1086 (comment). From what I see, implementation is a bit simpler than above links (it just uses an actor that loops through events), but it is not very fast - I would not use it to emit thousands events per second. However, in my projects I almost never need that big of a throughput, so it fits my use cases.

@radityagumay
Copy link

hi, @matejdro I have seen your gist, and it looks good.

What do you think of using an extension function in this loc? https://gist.github.com/matejdro/a9c838bf0066595fb52b4b8816f49252#file-multicastflowtest-kt-L139

@matejdro
Copy link

matejdro commented Jan 6, 2020

Yes, that would probably be a good idea.

@OrhanTozan
Copy link

I've also came across to use this for a usecase. Right know to do it is imperative premature collecting the flow.

@matejdro
Copy link

matejdro commented Apr 28, 2020

I just noticed this one:

It will have to be, actually, a shareIn(scope) operator. Otherwise, the scope it works in will be bound the the first collectors and when this callector is cancelled it will not be able to maintain emissions to other collectors.

I think this unnecessarily complicates the use of this feature. Flow is already scoped by the downstream (it starts collecting when first collector starts and stops when all collectors get cancelled). Introducing another scope would significantly complicate implementations where such share operator is exposed by the stateless objects.

@zach-klippenstein
Copy link
Contributor

It definitely makes the stream more complicated to reason about, but I don't think it does so unnecessarily. Multicasting inherently involves a scope that is separate from that of any of the downstream collectors, because no downstream collector is guaranteed to be active for the lifetime of the upstream collection. That scope could be implicit, as in the case with a reference counting approach, but sometimes you need it to be explicit: if the owner of the multicasting operation has its own scope (e.g. is a service-type object that gets shutdown when a user logs out of an app), it may want to explicitly manage the lifetime of that upstream collection and cancel it when the service object itself is shutdown.

@matejdro
Copy link

I see the purpose here. So we would need support for both explicit and implicit scopes?

@streetsofboston
Copy link

streetsofboston commented Apr 28, 2020

@zach-klippenstein Issue #1086 discusses such a class for explicitly managing the scope: ConnectableFlow.

Instead of the fun connect(CoroutineScope): Connection shown there, though, it just could have fun launchIn(CoroutineScope): Job, like a regular Flow, to manage its lifecycle. For the share() operator, it is then a question of managing that 'shared' CoroutineScope based on reference-counting the number of subscribers.

@elizarov
Copy link
Contributor Author

elizarov commented Apr 30, 2020

Scoping by downstream is not enough. We need the real scope to provide the context for pulling upstream data. We cannot just take the context of the first downstream subscriber. It can get destroyed, while other subscribers should not be suffering. In a case where you don't really care, you can always use shareIn(GlobalScope).

@elizarov
Copy link
Contributor Author

elizarov commented May 8, 2020

📣 Asking here a question to the community interested in this issue. There are tons of use-cases listed here and in #1086, but they all revolve around working with never-ending streams of incoming events or state updates. However, a Flow (just like other reactive streams) does not have to be infinite. It can complete. It can produce a number of events and stop. Disregard the question of failures for a while. Let's discuss just a regular completion. Assume that we don't retry either and are not just polling a non-streaming data source. So we have upstreamFlow.shareIn(scope) and the upstream flow completes.

Now, in Rx there is a replay operator that records all the upstream emissions and replays them to each downstream, completing the replay when the upstream had completed. I understand how replay works, but why would anyone ever need it? What are the actual use-cases for replay operator in real-life application? More generally, what are use-cases for having a stream that completes (so it is not an infinite stream of updates) and which you want to share among multiple downstream consumers at the same time?

@matthiaswelz
Copy link

What are the actual use-cases for replay operator in real-life application? More generally, what are use-cases for having a stream that completes (so it is not an infinite stream of updates) and which you want to share among multiple downstream consumers at the same time?

For me, having a share/replay operator would allow to implement some sort of if-/else logic with flows, i.e. to distribute elements into multiple flows based on criteria.

For example, I need the first 100 item with foo == "Bar' in flow a and all other items in flow b.

With share/replay:

var flow = //...
flow = flow.shareReplay() //or just share()
val a = flow.filter { it.foo == "Bar" }.take(100)
val b = flow.filter { it.foo != "Bar" }

In this case, I don't want to execute the input Flow multiple times (which could be a file consisting of a couple of GB of data on a network drive etc.).
So shareReplay should be able to transparently handle the "take" (i.e. don't block the other flow) and avoid evaluating the input flow multiple times.

@matthiaswelz
Copy link

For reference: This is the share implementation I ended up using which seems to fulfill my requirements (in my case parsing a binary file with a dynamic schema and converting to CSV - for this, I need to "split" the flow to "guess" the column headers based on the first X entries, but then need all entries again to actually to the parsing and writing):

(I wasn't able to get rid of the consumerCount parameter, because in my case I collect the first flow before starting the second one)

private val logger = KotlinLogging.logger {}

fun <T> Flow<T>.share(consumerCount: Int, bufferSize: Int = BUFFERED, scope: CoroutineContext? = null): Flow<T> {
    if (this is SharedFlow)
        return this

    logger.debug { "Sharing flow" }

    var broadcastChannel: BroadcastChannel<T>? = null
    var channels: List<ReceiveChannel<T>>? = null
    val lock = ReentrantLock()

    val pendingConsumers = AtomicInteger(consumerCount)
    val index = AtomicInteger(0)

    return SharedFlow(flow {
        lock.withLock {
            if (broadcastChannel == null) {
                logger.debug { "First consumer found. Creating channel and flows" }

                broadcastChannel = buffer(bufferSize).broadcastIn(CoroutineScope(scope ?: coroutineContext))
                channels =  0.until(consumerCount).map { broadcastChannel!!.openSubscription() }.toList()
            }
        }

        val curIndex = index.getAndIncrement()
        val receiveChannel = channels!![curIndex]
        val flow = receiveChannel.consumeAsFlow()
        logger.debug { "Flow $curIndex is now being consumed" }

        try {
            logger.debug { "Starting emitting (pendingConsumers is ${pendingConsumers.get()})" }
            emitAll(flow)
        } finally {
            receiveChannel.cancel()

            if (pendingConsumers.decrementAndGet() == 0)
                broadcastChannel!!.cancel()

            logger.debug { "Finished emitting (pendingConsumers is ${pendingConsumers.get()})" }
        }
    })
}

private class SharedFlow<T>(fl: Flow<T>) : Flow<T> by fl

fun <T, R> Flow<T>.flatMapItems(callback: (T) -> Iterable<R>): Flow<R> = flatMapConcat { callback(it).asFlow() }

inline fun <T> Flow<T>.parallelFilter(concurrencyLevel: Int = DEFAULT_CONCURRENCY, crossinline predicate: suspend (T) -> Boolean): Flow<T> {
    return flatMapMerge(concurrencyLevel) { value ->
        flow { if (predicate(value)) emit(value) }
    }
}

inline fun <T, R> Flow<T>.parallelMap(concurrencyLevel: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend (T) -> R): Flow<R> {
    return flatMapMerge(concurrencyLevel) { value ->
        flow { emit(transform(value)) }
    }
}

@zach-klippenstein
Copy link
Contributor

I work on a fairly large mobile app, and we use replay(1) (ie stateIn) all the time but I don't think I've ever seen an unbounded replay in our codebase.

@elizarov
Copy link
Contributor Author

elizarov commented May 8, 2020

@matthiaswelz Thanks for your use-case. Note, that in your use-case you are not actually "widely sharing" the flow. You don't actually publish the shared flow for later consumption by an unknown number of future collectors. On the contrary, here you know, in advance, that there are going to be two downstream flows that are going to process your upstream flow. In this case, it looks like Rx-like replay is a bit of overkill, since it actually caches all the data forever so you'll be running out of memory if the data stream is very long, you are not getting advantage of the streaming nature of data producer here to minimize memory consumption.

I think that your use-case of "splitting the flow in a few other flows" should be covered by a separate operator which we can tentatively call replicate. See this comment with a strawman example: #1086 (comment)

@elizarov
Copy link
Contributor Author

elizarov commented May 8, 2020

I work on a fairly large mobile app, and we use replay(1) (ie stateIn) all the time but I don't think I've ever seen an unbounded replay in our codebase.

@zach-klippenstein But do you ever call replay(1) on an upstream flow that is finite and completes at some moment of time?

@elizarov
Copy link
Contributor Author

There is a design for SharedFlow that provides most of the underlying framework to implement the actual sharing operators. See #2034

@elizarov
Copy link
Contributor Author

This issue is superseded by the worked-out design of sharing operators. See #2047

@SzyQ
Copy link

SzyQ commented Aug 6, 2020

While it's not release, a good enough worakround is that simple extension:

fun <T> Flow<T>.cache(scope: CoroutineScope, default: T): Flow<T> {
    val stateFlow = MutableStateFlow(default)
    scope.launch {
        collect {
            stateFlow.value = it
        }
    }
    return stateFlow
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests