From 55bead0cf340d37c899c931b0bb5866c58c70e2b Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 26 Jul 2019 18:10:10 +0300 Subject: [PATCH] Deprecate flowWith with ERROR --- .../common/src/flow/operators/Context.kt | 2 +- .../test/flow/operators/FlowContextTest.kt | 154 ------------ .../common/test/flow/operators/FlowOnTest.kt | 21 ++ .../test/flow/operators/FlowWithTest.kt | 231 ------------------ 4 files changed, 22 insertions(+), 386 deletions(-) delete mode 100644 kotlinx-coroutines-core/common/test/flow/operators/FlowContextTest.kt delete mode 100644 kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt index 8f3325c508..043c839fff 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt @@ -238,7 +238,7 @@ public fun Flow.flowOn(context: CoroutineContext): Flow { * 4) It can be confused with [flowOn] operator, though [flowWith] is much rarer. */ @FlowPreview -@Deprecated(message = "flowWith is deprecated without replacement, please refer to its KDoc for an explanation", level = DeprecationLevel.WARNING) // Error in beta release, removal in 1.4 +@Deprecated(message = "flowWith is deprecated without replacement, please refer to its KDoc for an explanation", level = DeprecationLevel.ERROR) // Error in beta release, removal in 1.4 public fun Flow.flowWith( flowContext: CoroutineContext, bufferSize: Int = BUFFERED, diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowContextTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowContextTest.kt deleted file mode 100644 index cd8af1d044..0000000000 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlowContextTest.kt +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.flow - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import kotlin.coroutines.* -import kotlin.test.* - -@Suppress("DEPRECATION") -class FlowContextTest : TestBase() { - - private val captured = ArrayList() - - @Test - fun testMixedContext() = runTest { - val flow = flow { - captured += NamedDispatchers.nameOr("main") - emit(314) - } - - val mapper: suspend (Int) -> Int = { - captured += NamedDispatchers.nameOr("main") - it - } - - val value = flow // upstream - .map(mapper) // upstream - .flowOn(NamedDispatchers("upstream")) - .map(mapper) // upstream 2 - .flowWith(NamedDispatchers("downstream")) { - map(mapper) // downstream - } - .flowOn(NamedDispatchers("upstream 2")) - .map(mapper) // main - .single() - - assertEquals(314, value) - assertEquals(listOf("upstream", "upstream", "upstream 2", "downstream", "main"), captured) - } - - @Test - fun testException() = runTest { - val flow = flow { - emit(314) - delay(Long.MAX_VALUE) - }.flowOn(NamedDispatchers("upstream")) - .map { - throw TestException() - } - - assertFailsWith { flow.single() } - assertFailsWith(flow) - ensureActive() - } - - @Test - fun testMixedContextsAndException() = runTest { - val baseFlow = flow { - emit(314) - hang { } - } - - var state = 0 - var needle = 1 - val mapper: suspend (Int) -> Int = { - if (++state == needle) throw TestException() - it - } - - val flow = baseFlow.map(mapper) // 1 - .flowOn(NamedDispatchers("ctx 1")) - .map(mapper) // 2 - .flowWith(NamedDispatchers("ctx 2")) { - map(mapper) // 3 - } - .map(mapper) // 4 - .flowOn(NamedDispatchers("ctx 3")) - .map(mapper) // 5 - - repeat(5) { // Will hang for 6 - state = 0 - needle = it + 1 - assertFailsWith { flow.single() } - - state = 0 - assertFailsWith(flow) - } - - ensureActive() - } - - @Test - fun testNestedContexts() = runTest { - val mapper: suspend (Int) -> Int = { captured += NamedDispatchers.nameOr("main"); it } - val value = flow { - captured += NamedDispatchers.nameOr("main") - emit(1) - }.flowWith(NamedDispatchers("outer")) { - map(mapper) - .flowOn(NamedDispatchers("nested first")) - .flowWith(NamedDispatchers("nested second")) { - map(mapper) - .flowOn(NamedDispatchers("inner first")) - .map(mapper) - } - .map(mapper) - }.map(mapper) - .single() - - val expected = listOf("main", "nested first", "inner first", "nested second", "outer", "main") - assertEquals(expected, captured) - assertEquals(1, value) - } - - - @Test - fun testFlowContextCancellation() = runTest { - val latch = Channel() - val flow = flow { - assertEquals("delayed", NamedDispatchers.name()) - expect(2) - emit(1) - }.flowWith(NamedDispatchers("outer")) { - map { expect(3); it + 1 }.flowOn(NamedDispatchers("inner")) - }.map { - expect(4) - assertEquals("delayed", NamedDispatchers.name()) - latch.send(Unit) - hang { expect(6) } - }.flowOn(NamedDispatchers("delayed")) - - - val job = launch(NamedDispatchers("launch")) { - expect(1) - flow.single() - } - - latch.receive() - expect(5) - job.cancelAndJoin() - finish(7) - ensureActive() - } - - @Test - fun testIllegalArgumentException() { - val flow = emptyFlow() - assertFailsWith { flow.flowOn(Job()) } - assertFailsWith { flow.flowWith(Job()) { this } } - } -} diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt index 4adc35415e..5d5fa6c6a2 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt @@ -261,6 +261,27 @@ class FlowOnTest : TestBase() { finish(3) } + @Test + fun testException() = runTest { + val flow = flow { + emit(314) + delay(Long.MAX_VALUE) + }.flowOn(NamedDispatchers("upstream")) + .map { + throw TestException() + } + + assertFailsWith { flow.single() } + assertFailsWith(flow) + ensureActive() + } + + @Test + fun testIllegalArgumentException() { + val flow = emptyFlow() + assertFailsWith { flow.flowOn(Job()) } + } + private inner class Source(private val value: Int) { public var contextName: String = "unknown" diff --git a/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt deleted file mode 100644 index a785814206..0000000000 --- a/kotlinx-coroutines-core/common/test/flow/operators/FlowWithTest.kt +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.flow - -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import kotlin.test.* - -@Suppress("DEPRECATION") -class FlowWithTest : TestBase() { - - private fun mapper(name: String, index: Int): suspend (Int) -> Int = { - assertEquals(name, NamedDispatchers.nameOr("main")) - expect(index) - it - } - - @Test - fun testFlowWith() = runTest { - val flow = flow { - assertEquals("main", NamedDispatchers.nameOr("main")) - expect(1) - emit(314) - } - - val result = flow.flowWith(NamedDispatchers("ctx1")) { - map(mapper("ctx1", 2)) - }.flowWith(NamedDispatchers("ctx2")) { - map(mapper("ctx2", 3)) - }.map(mapper("main", 4)).single() - assertEquals(314, result) - finish(5) - } - - @Test - public fun testFlowWithThrowingSource() = runTest { - val flow = flow { - emit(NamedDispatchers.nameOr("main")) - throw TestException() - }.flowWith(NamedDispatchers("throwing")) { - map { - assertEquals("main", it) - it - } - } - - assertFailsWith { flow.single() } - assertFailsWith(flow) - ensureActive() - } - - @Test - public fun testFlowWithThrowingOperator() = runTest { - val flow = flow { - emit(NamedDispatchers.nameOr("main")) - hang {} - }.flowWith(NamedDispatchers("throwing")) { - map { - assertEquals("main", it) - throw TestException() - } - } - - assertFailsWith { flow.single() } - assertFailsWith(flow) - ensureActive() - } - - @Test - public fun testFlowWithThrowingDownstreamOperator() = runTest { - val flow = flow { - emit(42) - hang {} - }.flowWith(NamedDispatchers("throwing")) { - map { it } - }.map { throw TestException() } - - assertFailsWith { flow.single() } - assertFailsWith(flow) - ensureActive() - } - - @Test - fun testMultipleFlowWith() = runTest() { - flow { - expect(1) - emit(1) - }.map(mapper("main", 2)) - .flowWith(NamedDispatchers("downstream")) { - map(mapper("downstream", 3)) - } - .flowWith(NamedDispatchers("downstream 2")) { - map(mapper("downstream 2", 4)) - } - .flowWith(NamedDispatchers("downstream 3")) { - map(mapper("downstream 3", 5)) - } - .map(mapper("main", 6)) - .flowWith(NamedDispatchers("downstream 4")) { - map(mapper("downstream 4", 7)) - }.flowWith(NamedDispatchers("ignored")) { this } - .single() - - finish(8) - } - - @Test - fun testFlowWithCancellation() = runTest() { - val latch = Channel() - expect(1) - val job = launch(NamedDispatchers("launch")) { - flow { - expect(2) - latch.send(Unit) - expect(3) - hang { - assertEquals("launch", NamedDispatchers.nameOr("main")) - expect(5) - } - }.flowWith(NamedDispatchers("cancelled")) { - map { - expectUnreached() - it - } - }.single() - } - - latch.receive() - expect(4) - job.cancel() - job.join() - ensureActive() - finish(6) - } - - @Test - fun testFlowWithCancellationHappensBefore() = runTest { - launch { - try { - flow { - expect(1) - val flowJob = kotlin.coroutines.coroutineContext[Job]!! - launch { - expect(2) - flowJob.cancel() - } - hang { expect(3) } - }.flowWith(NamedDispatchers("downstream")) { - map { it } - }.single() - } catch (e: CancellationException) { - expect(4) - } - }.join() - finish(5) - } - - @Test - fun testMultipleFlowWithException() = runTest() { - var switch = 0 - val flow = flow { - emit(Unit) - if (switch == 0) throw TestException() - }.map { if (switch == 1) throw TestException() else Unit } - .flowWith(NamedDispatchers("downstream")) { - map { if (switch == 2) throw TestException() else Unit } - } - repeat(3) { - switch = it - assertFailsWith { flow.single() } - assertFailsWith(flow) - } - } - - @Test - fun testMultipleFlowWithJobsCancellation() = runTest() { - val latch = Channel() - val flow = flow { - expect(1) - emit(Unit) - latch.send(Unit) - hang { expect(4) } - }.flowWith(NamedDispatchers("downstream")) { - map { - expect(2) - Unit - } - } - - val job = launch { - flow.single() - } - - latch.receive() - expect(3) - job.cancelAndJoin() - ensureActive() - finish(5) - } - - @Test - fun testTimeoutException() = runTest { - val flow = flow { - emit(1) - yield() - withTimeout(-1) {} - emit(42) - }.flowWith(NamedDispatchers("foo")) { - onEach { expect(1) } - } - assertFailsWith(flow) - finish(2) - } - - @Test - fun testTimeoutExceptionDownstream() = runTest { - val flow = flow { - emit(1) - hang { expect(2) } - }.flowWith(NamedDispatchers("foo")) { - onEach { - expect(1) - withTimeout(-1) {} - } - } - assertFailsWith(flow) - finish(3) - } -}