Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SharedFlow collect overload and override that return Nothing #2803

Merged
merged 8 commits into from
Oct 20, 2021
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lkotlinx/coroutines/flow/SharedFlow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collectLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -1061,6 +1062,7 @@ public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotli
}

public abstract interface class kotlinx/coroutines/flow/SharedFlow : kotlinx/coroutines/flow/Flow {
public abstract fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getReplayCache ()Ljava/util/List;
}

Expand Down
15 changes: 14 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,19 @@ public interface SharedFlow<out T> : Flow<T> {
* A snapshot of the replay cache.
*/
public val replayCache: List<T>

/**
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
* This method should never be used directly. To emit values from a shared flow into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension
* should be used.
*
* **A shared flow never completes**. A call to [Flow.collect] or any other terminal operator
* on a shared flow never completes normally.
*
* @see [Flow.collect]
*/
@InternalCoroutinesApi
override suspend fun collect(collector: FlowCollector<T>): Nothing
}

/**
Expand Down Expand Up @@ -335,7 +348,7 @@ private class SharedFlowImpl<T>(
}

@Suppress("UNCHECKED_CAST")
override suspend fun collect(collector: FlowCollector<T>) {
override suspend fun collect(collector: FlowCollector<T>): Nothing {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/StateFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ private class StateFlowImpl<T>(
throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
}

override suspend fun collect(collector: FlowCollector<T>) {
override suspend fun collect(collector: FlowCollector<T>): Nothing {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
Expand Down
13 changes: 13 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value
override suspend fun emit(value: T) = action(value)
})

/**
* Terminal flow operator that collects the given [SharedFlow] with the provided [action].
* If any exception occurs during `collect` or in the provided flow, this exception is rethrown from this method.
*
* This is a counterpart of a regular [Flow.collect] extension, only different in the return type
* so that any code below `collect` produces a compilation warning.
*/
public suspend inline fun <T> SharedFlow<T>.collect(crossinline action: suspend (value: T) -> Unit): Nothing {
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
}

/**
* Terminal flow operator that collects the given flow with a provided [action] that takes the index of an element (zero-based) and the element.
* If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
Expand Down