Skip to content

Commit

Permalink
Introduce merge operator
Browse files Browse the repository at this point in the history
Fixes #1491
  • Loading branch information
qwwdfsad committed Dec 11, 2019
1 parent 6cb317b commit 1b378ba
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,9 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun mapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun merge (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow;
public static final fun merge (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun merge ([Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun observeOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
Expand Down
20 changes: 20 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/internal/Merge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,23 @@ internal class ChannelFlowMerge<T>(
override fun additionalToStringProps(): String =
"concurrency=$concurrency, "
}

internal class ChannelLimitedFlowMerge<T>(
private val flows: Iterable<Flow<T>>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.BUFFERED
) : ChannelFlow<T>(context, capacity) {
override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
ChannelLimitedFlowMerge(flows, context, capacity)

override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
return scope.flowProduce(context, capacity, block = collectToFun)
}

override suspend fun collectTo(scope: ProducerScope<T>) {
val collector = SendingCollector(scope)
flows.forEach { flow ->
scope.launch { flow.collect(collector) }
}
}
}
36 changes: 36 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,42 @@ public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
collect { value -> emitAll(value) }
}

/**
* Merges the given flows into a single flow without preserving an order of elements.
* All flows are merged concurrently, without limit on the number of simultaneously collected flows.
*
* ### Operator fusion
*
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*/
@ExperimentalCoroutinesApi
public fun <T> Iterable<Flow<T>>.merge(): Flow<T> {
/*
* This is a fuseable implementation of the following operator:
* channelFlow {
* forEach { flow ->
* launch {
* flow.collect { send(it) }
* }
* }
* }
*/
return ChannelLimitedFlowMerge(this)
}

/**
* Merges the given flows into a single flow without preserving an order of elements.
* All flows are merged concurrently, without limit on the number of simultaneously collected flows.
*
* ### Operator fusion
*
* Applications of [flowOn], [buffer], [produceIn], and [broadcastIn] _after_ this operator are fused with
* its concurrent merging so that only one properly configured channel is used for execution of merging logic.
*/
@ExperimentalCoroutinesApi
public fun <T> merge(vararg flows: Flow<T>): Flow<T> = flows.asIterable().merge()

/**
* Flattens the given flow of flows into a single flow with a [concurrency] limit on the number of
* concurrently collected flows.
Expand Down
68 changes: 68 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/MergeTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlin.test.*
import kotlinx.coroutines.flow.merge as originalMerge

abstract class MergeTest : TestBase() {

abstract fun <T> Iterable<Flow<T>>.merge(): Flow<T>

@Test
fun testMerge() = runTest {
val n = 100
val sum = (1..n).map { flowOf(it) }
.merge()
.sum()

assertEquals(n * (n + 1) / 2, sum)
}

@Test
fun testSingle() = runTest {
val flow = listOf(flowOf(), flowOf(42), flowOf()).merge()
val value = flow.single()
assertEquals(42, value)
}

@Test
fun testNulls() = runTest {
val list = listOf(flowOf(1), flowOf(null), flowOf(2)).merge().toList()
assertEquals(listOf(1, null, 2), list)
}

@Test
fun testContext() = runTest {
val flow = flow {
emit(NamedDispatchers.name())
}.flowOn(NamedDispatchers("source"))

val result = listOf(flow).merge().flowOn(NamedDispatchers("irrelevant")).toList()
assertEquals(listOf("source"), result)
}

@Test
fun testIsolatedContext() = runTest {
val flow = flow {
emit(NamedDispatchers.name())
}

val result = listOf(flow.flowOn(NamedDispatchers("1")), flow.flowOn(NamedDispatchers("2")))
.merge()
.flowOn(NamedDispatchers("irrelevant"))
.toList()
assertEquals(listOf("1", "2"), result)
}
}

class IterableMergeTest : MergeTest() {
override fun <T> Iterable<Flow<T>>.merge(): Flow<T> = originalMerge()
}

class VarargMergeTest : MergeTest() {
override fun <T> Iterable<Flow<T>>.merge(): Flow<T> = originalMerge(*toList().toTypedArray())
}

0 comments on commit 1b378ba

Please sign in to comment.