Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow context preserving property, thread safety and context changing #1210

Closed
qwwdfsad opened this issue May 20, 2019 · 0 comments

Comments

@qwwdfsad
Copy link
Member

commented May 20, 2019

Context preservation

flow has the context preservation property that is supposed to make programming with flows more predictable, less error-prone and simpler.

For example, the following code will produce ISE on the first attempt to launch it:

flow {
    withContext(MyCustomContext) { emit(42) }
}

This is the desired property, but it is not enough to provide the best possible experience for developers.

Problem #⁠1

For example (and I've seen this in the wild), one may want to write the following merge implementation:

fun Flow<T>.merge(other: Flow<T>): Flow<T> {
    coroutineScope {
        launch {
            collect { emit(value) }
        }
        other.collect { emit(value) }
    }
}

it passes context preservation check, it works under the single-threaded dispatcher (and on Android most of the tests are single-threaded), but it produces the least expected error (probably KotlinNullPointerException) when running concurrently.
Of course, it is possible to make flow thread-safe, but it significantly reduces performance, even when no actual parallelism is present. The other option is to do a probabilistic data-race check, but this won't be sufficient because of its nature. It is impossible to catch a data-race when no real data race occurs, so such check will always be too late.

Problem #⁠2

Originally reported by @gildor

Consider the following system-wide function to work with any resources:
fun foo(...parameters, block: suspend (resource) -> Unit) that uses withContext internally.
The natural desire is to write the following snippet:

flow {
    foo(...) { resource ->
        emit(transform(resource))
    }
}

And it doesn't work because it violates context preservation. And one cannot simply use flowOn because foo encapsulates its context.

Solution

Strengthen context preservation property. Prohibit not only context element changes, but also situations when emission happens from the different coroutines (efficiently prohibiting situation from #⁠1).
Formulated more strictly, the rule sounds like 'collect' and 'emit' context should have transitively the same non-scoped Job in the hierarchy.
Sounds complicated, but this won't leak to the users 99% of the time.

But to allow writing such simple merge operator and to resolve #⁠2, the additional builder is required: channelFlow (and this builder will be mentioned in the text of exception when context preservation will be violated).

The full signature: fun channelFlow(capacity: Int = DEFAULT, block: suspend ProducerScope.() -> Unit). To avoid having two "identical" builders, flowViaChannel will probably be deprecated with replacement.

The only change required to write concurrent or context-ignorant builder is replacing emit with send.

It was decided to provide a channel-based builder instead of thread-safe one for the sake of consistent mental model:

  • FlowCollector is not thread-safe and never can be used concurrently
  • Channels are thread-safe and can be used in the fan-in fashion
  • The same builder can be used to both concurrent primitives and integrations with callback-based APIs

The only serious downside is Java interoperability: block is suspending and thus cannot be easily invoked from Java

@qwwdfsad qwwdfsad added this to the 1.3.0-alpha milestone May 20, 2019

@qwwdfsad qwwdfsad self-assigned this May 20, 2019

qwwdfsad added a commit that referenced this issue May 21, 2019

New flow builder: channelFlow (and its alias callbackFlow) and supple…
…mentary ProducerScope.await method

Rationale:

    * Can be used in different context without breaking context preservation
    * Can be used to build concurrent operators such as merge
    * Can be used to integrate with callbacks
    * Is less error-prone than flowViaChannel because requires explicit await() call

Partially fixes #1210

qwwdfsad added a commit that referenced this issue May 21, 2019

Strengthen flow context preservation invariant
    * Add additional check in SafeCollector with an error message pointing to channelFlow
    * Improve performance of the CoroutineId check in SafeCollector

Fixes #1210

qwwdfsad added a commit that referenced this issue May 27, 2019

New flow builder: channelFlow (and its alias callbackFlow) and supple…
…mentary ProducerScope.await method

Rationale:

    * Can be used in different context without breaking context preservation
    * Can be used to build concurrent operators such as merge
    * Can be used to integrate with callbacks
    * Is less error-prone than flowViaChannel because requires explicit await() call

Partially fixes #1210

elizarov added a commit that referenced this issue May 29, 2019

SafeCollector rework (#1196)
SafeCollector performance improvements:
* Cache result of the context check thus speeding up safe collector on happy path by a factor of three
* Separate fast and slow paths in JobSupport to drastically change inlining decisions of the JVM that are crucial for leaf coroutines with flows

Strengthen flow context preservation invariant

* Add additional check in SafeCollector with an error message pointing to channelFlow
* Improve performance of the CoroutineId check in SafeCollector
* Change wording in documentation

Fixes #1210

@qwwdfsad qwwdfsad closed this in b08d61c Jun 6, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
1 participant
You can’t perform that action at this time.