Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DataFlow implementation #1354

Draft
wants to merge 1 commit into
base: develop
from

Conversation

@elizarov
Copy link
Member

elizarov commented Jul 18, 2019

DataFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API is simpler than channels APIs, the implementation of DataFlow is simpler. It consumes and allocates less memory, while still providing full deadlock-freedom (even though it is not lock-free internally).

The name reflects the fact that DataFlow represent an cell in a "data flow programming" model (think of an electronic spreadsheet). Dependent values can be defined by using various operators on the flows, with combineLatest being especially useful to combine multiple data flows using arbitrary functions.

It is also a play of Android's LiveData, providing very similar API.

Fixes #1082

Open issues

  1. The name. Shall we stick with DataFlow or pick something else?

  2. I've made it fusible with flowOn and conflated operators (they shall have no effect on the DataFlow). As a result, the sequence of dataFlow.flowOn(ctx).buffer() becomes different from a sequence of dataFlow.buffer().flowOn(ctx). In the former case cxt that is "applied" to dataflow is ignored. In the later case ctx is applied to the buffering coroutine is created between the data flow and the downstream collector. Is it clear why there is difference?

DataFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of DataFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes #1082
@elizarov elizarov force-pushed the data-flow branch from 36a0ab3 to d66206e Jul 18, 2019
* A data flow can be [closed][close] and all its collectors will complete.
*
* A read-only interface to data flow is a [Flow]. It is supposed to be collected by the consumers of the data.
* The ability to read the current [value] is provided only for convenience of the code that updates the data.

This comment has been minimized.

Copy link
@fvasco

fvasco Jul 18, 2019

Contributor

It is not specified how to get the current value using the flow read-only interface, is counter.first() work?

May multiple counter.first() invocations return the same value?

This comment has been minimized.

Copy link
@elizarov

elizarov Jul 18, 2019

Author Member

Yes. counter.first() does compute and return the current value. I'll mention it in the docs.

public var value: T

/**
* Closes this data fl9w.

This comment has been minimized.

Copy link
@fvasco

fvasco Jul 18, 2019

Contributor

typo: flow

* A data flow can be [closed][close] and all its collectors will complete.
*
* A read-only interface to data flow is a [Flow]. It is supposed to be collected by the consumers of the data.
* The ability to read the current [value] is provided only for convenience of the code that updates the data.

This comment has been minimized.

Copy link
@fvasco

fvasco Jul 18, 2019

Contributor

Is CAS not supported by design? The CounterModel example is not thread-safe.

This comment has been minimized.

Copy link
@elizarov

elizarov Jul 18, 2019

Author Member

Yes. It is assume that the owner of LiveData that makes updates is single-threaded. Do you have compelling use-cases to support CAS there to allow coordinated updates from multiple threads?

This comment has been minimized.

Copy link
@fvasco

fvasco Jul 18, 2019

Contributor

I often use a Channel or a Mutex to resolve this kind of issues, so these use cases can be resolved outside the DataFlow class.

Flow is a safe abstraction, but the example counter.value++ is not. I am not proposing a some kind of CAS operation, however there is not possible to declare that counter.value++ simply works.

Similar to other @qwwdfsad's considerations, we can consider DataFlow<T> : Flow<T>, FlowCollector<T>, this adds a lot of cerimonies but it make evident any issue in counter.emit( counter.first() + 1).

/**
* Value of this data flow.
*
* Setting a value that is [equal][Any.equals] to the previous one does nothing.

This comment has been minimized.

Copy link
@qwwdfsad

qwwdfsad Jul 18, 2019

Member

Where this restriction came from? Are we sure this is desired behavior for most of the users?

This comment has been minimized.

Copy link
@qwwdfsad

qwwdfsad Jul 18, 2019

Member

In other Reactive frameworks, such entity does not only represent Flow, but also FlowCollector.
Example of usages:

Note that I (currently) do not advocate for doing the same thing, but it is something definitely worth consideration (maybe @gildor can give us a few pointers here?).

This comment has been minimized.

Copy link
@elizarov

elizarov Jul 18, 2019

Author Member

I've thought about making it implement FlowCollector (it is easy to add), but I could not find a complessing use-case to do it. In Kotlin word it will be less convenient, since FlowCollector.emit is a suspending function, do you do not actually need suspending update function. You can simply update DataFlow.value from any context, even non-suspending one.

P.S. If we create some kind of EventFlow as a complement (which would replace ArrayBroadcastChannel and would not conflate events), then it would be logical to call its updating function emit and naturally implement FlowCollector interface there.

This comment has been minimized.

Copy link
@fvasco

fvasco Jul 18, 2019

Contributor

Hi @elizarov

FlowCollector.emit is a suspending function, do you do not actually need suspending update function

Overriding a suspending function with a non-suspending one is a current Kotlin limitation.
DataFlow shares this issue with ConflatedChannel and UnlimitedChannel.

On the other hand in current design to update value (dataFlow.value=42) is cheap, instead to read a value using the public interface is really expensive (flow.first()).

This comment has been minimized.

Copy link
@gildor

gildor Jul 23, 2019

Contributor

@qwwdfsad Personally, I do not have good use cases for FlowCollector for DataFlow (same as rarely use io.reactivex.Observer that implemented by Subject in our codebase). so my opinion that some simple adapter function that allows forwarding events from Flow/Channel/Lambda to DataFlow would be enough

But maybe I just do not see some really common use cases for it, but because it's very easy to create extension function for any type to forward any values to DataFlow, I think it's not really needed in Kotlin

This comment has been minimized.

Copy link
@yigit

yigit Sep 1, 2019

+1 for allowing same value to be set twice. If someone wants to dedup it, they can as well use distinctUntilChanged.
One use case for dispatching the same value is using it for UI actions, like a refresh.

private val refreshTrigger = DataFlow<Unit>().also {
   it.value = Unit
}
val data = refreshTrigger.flatMapLatest {
    repositroy.loadData()
}
fun refresh() { 
    refreshTrigger.value = Unit
}

If there is an EventFlow, it could also fulfill this need.

public fun <T> DataFlow(value: T): DataFlow<T> = DataFlowImpl(value ?: NULL)

@SharedImmutable
private val NONE = Symbol("NONE")

This comment has been minimized.

Copy link
@qwwdfsad

qwwdfsad Jul 18, 2019

Member

Please move implementation to a different file (so users won't see it when exploring our public API)

* cost, where `N` is the number of collectors.
*/
@FlowPreview
public interface DataFlow<T> : Flow<T> {

This comment has been minimized.

Copy link
@qwwdfsad

qwwdfsad Jul 18, 2019

Member

DataFlow sounds more like a BroacastChannel with (possibly cached or replayable) data.
This primitive represents a single state as a flow of consecutive updates, I'd rather name it StateFlow

This comment has been minimized.

Copy link
@gildor

gildor Jul 23, 2019

Contributor

I don't think that StateFlow is better, for more use cases it's essentially observable data field than some abstract state. At least it how we usually use it

This comment has been minimized.

Copy link
@yigit

yigit Sep 1, 2019

+1 for DataFlow. State sounds more limiting.

This comment has been minimized.

Copy link
@elizarov

elizarov Sep 2, 2019

Author Member

@yigit @gildor @qwwdfsad Bikeshedding the name here: #1082 (comment)

* Getting a value from an [uninitialized][isInitialized] flow produces [IllegalStateException].
* Getting a value from or setting a value to a [closed][isClosed] flow produces [IllegalStateException].
*/
public var value: T

This comment has been minimized.

Copy link
@qwwdfsad

qwwdfsad Jul 18, 2019

Member

Regarding my previous comment about FlowCollector.

I am afraid that mutable value will mix imperative and declarative approaches.
If we designed something like Compose, then yes, updating a var is a way to go.
But we are already in the world of Flow that requires more ceremonies, but is pretty approachable and clear in return. I am afraid that the ability to mutate var instead of writing state.emit(update) will make end-users code less clear and understandable.

This comment has been minimized.

Copy link
@gildor

gildor Jul 23, 2019

Contributor

Should emit be suspendable or regular function/

This comment has been minimized.

Copy link
@qwwdfsad

qwwdfsad Jul 23, 2019

Member

It's an open question. Either we can play the same trick as with channels (suspendable send + non-suspendable offer and guarantee, that for non-closed conflated channel offer always succeeds) and provide two methods (naming TBD) or make emit non-suspendable

Copy link
Contributor

digitalbuddha left a comment

I'm delighted for this api. My request is a way to transform a flow into a data flow. My use case: I have a db that exposes flows for each query, the flow emits each time a table changes. I'd like to broadcast the flow and collect it multiple times (2 parts of screen). Ideally I'd like to do dbFlow.toDataFlow and retain that in a field, then I can have many parts of my screen collect the query changes without having to create more than one dbFlow.

@JakeWharton

This comment has been minimized.

Copy link
Contributor

JakeWharton commented Sep 2, 2019

That's #1261

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants
You can’t perform that action at this time.