diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 365df9bd55..2defe1e005 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -884,6 +884,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static synthetic fun toSet$default (Lkotlinx/coroutines/flow/Flow;Ljava/util/Set;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; public static final fun transform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun withLatestFrom (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt index e9d99d321a..15d995c04f 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Zip.kt @@ -10,6 +10,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.channels.Channel.Factory.CONFLATED import kotlinx.coroutines.flow.internal.* import kotlinx.coroutines.selects.* import kotlin.jvm.* @@ -174,6 +175,53 @@ internal fun Flow.combineLatest(vararg others: Flow, arrayFactory: } } +/** + * Returns a [Flow] whose values are generated with [transform] function every time this flow emits by combining + * the value with the latest value emitted by `other`. Emissions are only triggered by this flow, not `other. + * It does not call [transform], and the returned flow will not emit, until `other` emits – if it does not emit, the + * returned flow will complete without emitting. It will only complete when this flow completes, even if `other` + * completes first. If this flow completes first, `other` will be cancelled. + * + * It can be demonstrated with the following example: + * ``` + * val flow = flowOf(1, 2, 3).delayEach(10) + * val flow2 = flowOf("a", "b").delayEach(15) + * flow.withLatestFrom(flow2) { i, s -> i.toString() + s }.collect { + * println(it) // Will print "1a 2a 3b" + * } + * ``` + */ +@ExperimentalCoroutinesApi +public fun Flow.withLatestFrom(other: Flow, transform: suspend (T1, T2) -> R): Flow = flow { + coroutineScope { + val firstChannel = asFairChannel(this@withLatestFrom) + firstChannel.consume { + var firstIsClosed = false + + // This operator conflates values from the other Flow anyway, so the channel doesn't need any backpressure. + val secondChannel = asFairChannel(other, capacity = CONFLATED) + secondChannel.consume { + // Nothing can be emitted until the other Flow emits its first value, so don't enter the loop until + // that's happened. + var secondValue: Any = secondChannel.receiveOrNull() ?: return@coroutineScope + var secondIsClosed = false + + while (!firstIsClosed) { + select { + onReceive(firstIsClosed, firstChannel, { firstIsClosed = true }) { value -> + emit(transform(NULL.unbox(value), NULL.unbox(secondValue))) + } + + onReceive(secondIsClosed, secondChannel, { secondIsClosed = true }) { value -> + secondValue = value + } + } + } + } + } + } +} + private inline fun SelectBuilder.onReceive( isClosed: Boolean, channel: ReceiveChannel, @@ -188,12 +236,13 @@ private inline fun SelectBuilder.onReceive( } // Channel has any type due to onReceiveOrNull. This will be fixed after receiveOrClosed -private fun CoroutineScope.asFairChannel(flow: Flow<*>): ReceiveChannel = produce { - val channel = channel as ChannelCoroutine - flow.collect { value -> - channel.sendFair(value ?: NULL) +private fun CoroutineScope.asFairChannel(flow: Flow<*>, capacity: Int = 0): ReceiveChannel = + produce(capacity = capacity) { + val channel = channel as ChannelCoroutine + flow.collect { value -> + channel.sendFair(value ?: NULL) + } } -} /** diff --git a/kotlinx-coroutines-core/common/test/flow/operators/WithLatestFromTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/WithLatestFromTest.kt new file mode 100644 index 0000000000..76ba8ab72f --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/WithLatestFromTest.kt @@ -0,0 +1,272 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.test.* + +/* + * Replace: { i, j -> i + j } -> ::sum as soon as KT-30991 is fixed + */ +class WithLatestFromTest : TestBase() { + + @Test + fun testWithLatestFrom() = runTest { + val flow = flowOf("a", "b", "c") + val flow2 = flowOf(1, 2, 3) + val list = flow.withLatestFrom(flow2) { i, j -> i + j }.toList() + assertEquals(listOf("a1", "b2", "c3"), list) + } + + @Test + fun testNulls() = runTest { + val flow = flowOf("a", null, null) + val flow2 = flowOf(1, 2, 3) + val list = flow.withLatestFrom(flow2, { i, j -> i + j }).toList() + assertEquals(listOf("a1", "null2", "null3"), list) + } + + @Test + fun testNullsOther() = runTest { + val flow = flowOf("a", "b", "c") + val flow2 = flowOf(null, 2, null) + val list = flow.withLatestFrom(flow2, { i, j -> i + j }).toList() + assertEquals(listOf("anull", "b2", "cnull"), list) + } + + @Test + fun testEmptyFlows() = runTest { + val flow = emptyFlow().withLatestFrom(emptyFlow(), { i, j -> i + j }) + assertNull(flow.singleOrNull()) + } + + @Test + fun testFirstIsEmpty() = runTest { + val f1 = emptyFlow() + val f2 = flowOf(1) + assertEquals(emptyList(), f1.withLatestFrom(f2) { i, j -> i + j }.toList()) + } + + @Test + fun testSecondIsEmpty() = runTest { + val f1 = flowOf("a") + val f2 = emptyFlow() + assertEquals(emptyList(), f1.withLatestFrom(f2) { i, j -> i + j }.toList()) + } + + @Test + fun testPreservingOrder() = runTest { + val f1 = flow { + expect(1) + emit("a") + expect(3) + emit("b") + emit("c") + expect(5) + } + + val f2 = flow { + expect(2) + emit(1) + yield() + yield() + expect(4) + emit(2) + expect(6) + yield() + expectUnreached() + } + + val result = f1.withLatestFrom(f2) { i, j -> i + j }.toList() + assertEquals(listOf("a1", "b1", "c1"), result) + finish(7) + } + + @Test + fun testPreservingOrderReversed() = runTest { + val f1 = flow { + expect(1) + emit("a") + expect(3) + emit("b") + emit("c") + expect(4) + } + + val f2 = flow { + yield() // One more yield because now this flow starts first + expect(2) + emit(1) + yield() + yield() + expect(5) + emit(2) + expect(6) + yield() + expect(7) + emit(3) + } + + val result = f2.withLatestFrom(f1) { i, j -> j + i }.toList() + assertEquals(listOf("a1", "c2", "c3"), result) + finish(8) + } + + @Test + fun testContextIsIsolated() = runTest { + val f1 = flow { + emit("a") + assertEquals("first", NamedDispatchers.name()) + expect(1) + }.flowOn(NamedDispatchers("first")).onEach { + assertEquals("nested", NamedDispatchers.name()) + expect(2) + }.flowOn(NamedDispatchers("nested")) + + val f2 = flow { + emit(1) + assertEquals("second", NamedDispatchers.name()) + expect(3) + }.flowOn(NamedDispatchers("second")) + .onEach { + assertEquals("onEach", NamedDispatchers.name()) + expect(4) + }.flowOn(NamedDispatchers("onEach")) + + val value = withContext(NamedDispatchers("main")) { + f1.withLatestFrom(f2) { i, j -> + assertEquals("main", NamedDispatchers.name()) + expect(5) + i + j + }.single() + } + + assertEquals("a1", value) + finish(6) + } + + @Test + fun testErrorInDownstreamCancelsUpstream() = runTest { + val f1 = flow { + emit("a") + hang { + expect(2) + } + }.flowOn(NamedDispatchers("first")) + + val f2 = flow { + emit(1) + hang { + expect(3) + } + }.flowOn(NamedDispatchers("second")) + + val flow = f1.withLatestFrom(f2) { i, j -> + assertEquals("combine", NamedDispatchers.name()) + expect(1) + i + j + }.flowOn(NamedDispatchers("combine")).onEach { + throw TestException() + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testErrorCancelsSibling() = runTest { + val f1 = flow { + emit("a") + hang { + expect(1) + } + }.flowOn(NamedDispatchers("first")) + + val f2 = flow { + emit(1) + throw TestException() + }.flowOn(NamedDispatchers("second")) + + val flow = f1.withLatestFrom(f2) { _, _ -> 1 } + assertFailsWith(flow) + finish(2) + } + + @Test + fun testErrorCancelsSiblingReversed() = runTest { + val f1 = flow { + emit("a") + throw TestException() + } + + val f2 = flow { + emit(1) + hang { + expect(1) + } + } + + val flow = f1.withLatestFrom(f2) { _, _ -> 1 } + assertFailsWith(flow) + finish(2) + } + + @Test + fun testCancellationExceptionUpstream() = runTest { + val f1 = flow { + expect(1) + emit(1) + throw CancellationException("") + } + val f2 = flow { + emit(1) + hang { expect(3) } + } + + val flow = f1.withLatestFrom(f2, { _, _ -> 1 }).onEach { expect(2) } + assertFailsWith(flow) + finish(4) + } + + @Test + fun testCancellationExceptionUpstreamReversed() = runTest { + val f1 = flow { + expect(1) + emit(1) + hang { expect(3) } + } + val f2 = flow { + emit(1) + throw CancellationException("") + } + + val flow = f1.withLatestFrom(f2, { _, _ -> 1 }).onEach { expect(2) } + assertFailsWith(flow) + finish(4) + } + + @Test + fun testCancellationExceptionDownstream() = runTest { + val f1 = flow { + emit(1) + expect(3) + hang { expect(6) } + } + val f2 = flow { + emit(1) + expect(2) + hang { expect(5) } + } + + val flow = f1.withLatestFrom(f2, { _, _ -> 1 }).onEach { + expect(1) + yield() + expect(4) + throw CancellationException("") + } + assertFailsWith(flow) + finish(7) + } +}