Skip to content

Commit

Permalink
Deprecate flowWith operator
Browse files Browse the repository at this point in the history
  • Loading branch information
qwwdfsad authored and elizarov committed Jun 3, 2019
1 parent daf8502 commit f44942a
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 84 deletions.
5 changes: 2 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/Flow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ import kotlinx.coroutines.*
* The flow has a context preservation property: it encapsulates its own execution context and never propagates or leaks it downstream, thus making
* reasoning about the execution context of particular transformations or terminal operations trivial.
*
* There are two ways to change the context of a flow: [flowOn][Flow.flowOn] and [flowWith][Flow.flowWith].
* The former changes the upstream context ("everything above the flowOn operator") while the latter
* changes the context of the flow within [flowWith] body. For additional information refer to these operators' documentation.
* There is the only way to change the context of a flow: [flowOn][Flow.flowOn] operator,
* that changes the upstream context ("everything above the flowOn operator"). For additional information refer to its documentation.
*
* This reasoning can be demonstrated in practice:
* ```
Expand Down
21 changes: 3 additions & 18 deletions kotlinx-coroutines-core/common/src/flow/Migration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public fun <T> Flow<T>.publishOn(context: CoroutineContext): Flow<T> = error("Sh
* .doOnEach { value -> println("Processing $value in computation")
* .subscribe()
* ```
* has the following Flow equivalents:
* has the following Flow equivalent:
* ```
* withContext(Dispatchers.Default) {
* flow
Expand All @@ -82,25 +82,10 @@ public fun <T> Flow<T>.publishOn(context: CoroutineContext): Flow<T> = error("Sh
* }
* }
* ```
* or
*
* ```
* withContext(Dispatchers.Default) {
* flow
* .flowWith(Dispatchers.IO) { map { value -> println("Doing map in IO"); value } }
* .collect { value ->
* println("Processing $value in computation")
* }
* }
* ```
*
* The difference is that [flowWith] encapsulates ("preserves") the context within its lambda
* while [flowOn] changes the context of all preceding operators.
* Opposed to subscribeOn, it it **possible** to use multiple `flowOn` operators in the one flow.
*
* Opposed to subscribeOn, it it **possible** to use multiple `flowOn` operators in the one flow
* @suppress
*/
@Deprecated(message = "Use flowWith or flowOn instead", level = DeprecationLevel.ERROR)
@Deprecated(message = "Use flowOn instead", level = DeprecationLevel.ERROR)
public fun <T> Flow<T>.subscribeOn(context: CoroutineContext): Flow<T> = error("Should not be called")

/** @suppress **/
Expand Down
16 changes: 14 additions & 2 deletions kotlinx-coroutines-core/common/src/flow/operators/Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,23 @@ public fun <T> Flow<T>.flowOn(flowContext: CoroutineContext, bufferSize: Int = 1
* For more explanation of context preservation please refer to [Flow] documentation.
*
* This operator uses channel of the specific [bufferSize] in order to switch between contexts,
* but it is not guaranteed that channel will be created, implementation is free to optimize it away in case of fusing.
* but it is not guaranteed that channel will be created, implementation is free to optimize it away in case of fusing.*
*
* @throws [IllegalArgumentException] if provided context contains [Job] instance.
* This operator is deprecated without replacement because it was discovered that it doesn't play well with coroutines and flow semantics:
* 1) It doesn't prevent context elements from the downstream to leak into its body
* ```
* flowOf(1).flowWith(EmptyCoroutineContext) {
* onEach { println(kotlin.coroutines.coroutineContext[CoroutineName]) } // Will print 42
* }.flowOn(CoroutineName(42))
* ```
* 2) To avoid such leaks, new primitive should be introduced to `kotlinx.coroutines` -- the subtraction of contexts.
* And this will become a new concept to learn, maintain and explain.
* 3) It defers the execution of declarative [builder] until the moment of [collection][Flow.collect] similarly
* to `Observable.defer`. But it is unexpected because nothing in the name `flowWith` reflects this fact.
* 4) It can be confused with [flowOn] operator, though [flowWith] is much rarer.
*/
@FlowPreview
@Deprecated(message = "flowWith is deprecated without replacement, please refer to its KDoc for an explanation", level = DeprecationLevel.WARNING) // Error in beta release, removal in 1.4
public fun <T, R> Flow<T>.flowWith(
flowContext: CoroutineContext,
bufferSize: Int = 16,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.combineLatest as combineLatestOriginal
import kotlin.test.*
import kotlinx.coroutines.flow.combineLatest as combineLatestOriginal

/*
* Replace: { i, j -> i + j } -> { i, j -> i + j } as soon as KT-30991 is fixed
Expand Down Expand Up @@ -133,12 +133,11 @@ abstract class CombineLatestTestBase : TestBase() {
emit(1)
assertEquals("second", NamedDispatchers.name())
expect(3)
}.flowOn(NamedDispatchers("second")).flowWith(NamedDispatchers("with")) {
onEach {
assertEquals("with", NamedDispatchers.name())
}.flowOn(NamedDispatchers("second"))
.onEach {
assertEquals("onEach", NamedDispatchers.name())
expect(4)
}
}
}.flowOn(NamedDispatchers("onEach"))

val value = withContext(NamedDispatchers("main")) {
f1.combineLatest(f2) { i, j ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,9 @@ class DebounceTest : TestBase() {
emit(1)
expect(2)
throw TestException()
}.flowWith(NamedDispatchers("unused")) {
debounce(Long.MAX_VALUE).map {
}.flowOn(NamedDispatchers("source")).debounce(Long.MAX_VALUE).map {
expectUnreached()
}
}

assertFailsWith<TestException>(flow)
finish(3)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlin.test.*

abstract class FlatMapBaseTest : TestBase() {
Expand Down Expand Up @@ -75,14 +73,12 @@ abstract class FlatMapBaseTest : TestBase() {
fun testIsolatedContext() = runTest {
val flow = flowOf(1)
.flowOn(NamedDispatchers("irrelevant"))
.flowWith(NamedDispatchers("inner")) {
flatMap {
.flatMap {
flow {
assertEquals("inner", NamedDispatchers.name())
emit(it)
}
}
}.flowOn(NamedDispatchers("irrelevant"))
}.flowOn(NamedDispatchers("inner"))
.flatMap {
flow {
assertEquals("outer", NamedDispatchers.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kotlinx.coroutines.channels.*
import kotlin.coroutines.*
import kotlin.test.*

@Suppress("DEPRECATION")
class FlowContextTest : TestBase() {

private val captured = ArrayList<String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.test.*

@Suppress("DEPRECATION")
class FlowWithTest : TestBase() {

private fun mapper(name: String, index: Int): suspend (Int) -> Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,8 @@ class SampleTest : TestBase() {
emit(1)
expect(2)
throw TestException()
}.flowWith(NamedDispatchers("unused")) {
sample(Long.MAX_VALUE).map {
expectUnreached()
}
}.flowOn(NamedDispatchers("unused")).sample(Long.MAX_VALUE).map {
expectUnreached()
}

assertFailsWith<TestException>(flow)
Expand Down
44 changes: 3 additions & 41 deletions kotlinx-coroutines-core/common/test/flow/operators/ZipTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -112,52 +112,15 @@ class ZipTest : TestBase() {
}

@Test
fun testContextIsIsolated() = runTest {
fun testContextIsIsolatedReversed() = runTest {
val f1 = flow {
emit("a")
assertEquals("first", NamedDispatchers.name())
expect(1)
}.flowOn(NamedDispatchers("first")).onEach {
assertEquals("nested", NamedDispatchers.name())
assertEquals("with", NamedDispatchers.name())
expect(2)
}.flowOn(NamedDispatchers("nested"))

val f2 = flow {
emit(1)
assertEquals("second", NamedDispatchers.name())
expect(3)
}.flowOn(NamedDispatchers("second")).flowWith(NamedDispatchers("with")) {
onEach {
assertEquals("with", NamedDispatchers.name())
expect(4)
}
}

val value = withContext(NamedDispatchers("main")) {
f1.zip(f2) { i, j ->
assertEquals("main", NamedDispatchers.name())
expect(5)
i + j
}.single()
}

assertEquals("a1", value)
finish(6)
}

@Test
fun testContextIsIsolatedReversed() = runTest {
val f1 = flow {
emit("a")
assertEquals("first", NamedDispatchers.name())
expect(1)
}.flowOn(NamedDispatchers("first"))
.flowWith(NamedDispatchers("with")) {
onEach {
assertEquals("with", NamedDispatchers.name())
expect(2)
}
}
}.flowOn(NamedDispatchers("with"))

val f2 = flow {
emit(1)
Expand All @@ -180,7 +143,6 @@ class ZipTest : TestBase() {
finish(6)
}


@Test
fun testErrorInDownstreamCancelsUpstream() = runTest {
val f1 = flow {
Expand Down

0 comments on commit f44942a

Please sign in to comment.