Skip to content

Commit

Permalink
Flow: decouple buffer size from various operators and fuse
Browse files Browse the repository at this point in the history
* Introduce buffer operator
* Remove buffer size from all the other operators
* Fuse all adjacent operators that create channels
* Introduce Channel.BUFFERED buffer size marker to request
  buffered channel with a default (unspecified) size

Fixes #1233
  • Loading branch information
elizarov committed Jun 4, 2019
1 parent f44942a commit 73cc85c
Show file tree
Hide file tree
Showing 19 changed files with 924 additions and 276 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,9 @@ public final class kotlinx/coroutines/channels/BroadcastKt {
}

public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/coroutines/channels/ReceiveChannel, kotlinx/coroutines/channels/SendChannel {
public static final field BUFFERED I
public static final field CONFLATED I
public static final field DEFAULT_BUFFER_PROPERTY_NAME Ljava/lang/String;
public static final field Factory Lkotlinx/coroutines/channels/Channel$Factory;
public static final field RENDEZVOUS I
public static final field UNLIMITED I
Expand All @@ -553,7 +555,9 @@ public final class kotlinx/coroutines/channels/Channel$DefaultImpls {
}

public final class kotlinx/coroutines/channels/Channel$Factory {
public static final field BUFFERED I
public static final field CONFLATED I
public static final field DEFAULT_BUFFER_PROPERTY_NAME Ljava/lang/String;
public static final field RENDEZVOUS I
public static final field UNLIMITED I
}
Expand Down Expand Up @@ -785,6 +789,7 @@ public abstract interface class kotlinx/coroutines/flow/FlowCollector {
}

public final class kotlinx/coroutines/flow/FlowKt {
public static final field DEFAULT_CONCURRENCY_PROPERTY_NAME Ljava/lang/String;
public static final fun asFlow (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Ljava/util/Iterator;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Lkotlin/jvm/functions/Function0;)Lkotlinx/coroutines/flow/Flow;
Expand All @@ -796,12 +801,12 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun asFlow ([I)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow ([J)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static final fun callbackFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun callbackFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun channelFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun channelFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static final fun buffer (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun buffer$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun callbackFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
Expand All @@ -821,29 +826,28 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flattenConcat (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;II)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOf (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOf ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flowOn$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowViaChannel (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flowViaChannel$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowWith (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flowWith$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun getDEFAULT_CONCURRENCY ()I
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun onErrorReturn (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun onErrorReturn$default (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;I)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun produceIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED

/**
Expand Down Expand Up @@ -50,9 +52,11 @@ public interface BroadcastChannel<E> : SendChannel<E> {
* Creates a broadcast channel with the specified buffer capacity.
*
* The resulting channel type depends on the specified [capacity] parameter:
*
* * when `capacity` positive, but less than [UNLIMITED] -- creates `ArrayBroadcastChannel` with a buffer of given capacity.
* **Note:** this channel looses all items that are send to it until the first subscriber appears;
* * when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
* * when `capacity` is [BUFFERED] -- creates `ArrayBroadcastChannel` with a default capacity.
* * otherwise -- throws [IllegalArgumentException].
*
* **Note: This is an experimental api.** It may be changed in the future updates.
Expand All @@ -63,5 +67,6 @@ public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel")
CONFLATED -> ConflatedBroadcastChannel()
BUFFERED -> ArrayBroadcastChannel(CHANNEL_DEFAULT_CAPACITY)
else -> ArrayBroadcastChannel(capacity)
}
28 changes: 27 additions & 1 deletion kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY
import kotlinx.coroutines.internal.systemProp
import kotlinx.coroutines.selects.*
import kotlin.jvm.*

Expand Down Expand Up @@ -372,20 +375,43 @@ public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
* Requests conflated channel in `Channel(...)` factory function -- the `ConflatedChannel` gets created.
*/
public const val CONFLATED = -1

/**
* Requests buffered channel with a default buffer capacity in `Channel(...)` factory function --
* the `ArrayChannel` gets created with a default capacity.
* This capacity is equal to 16 by default and can be overridden by setting
* [DEFAULT_BUFFER_PROPERTY_NAME] on JVM.
*/
public const val BUFFERED = -2

// only for internal use, cannot be used with Channel(...)
internal const val OPTIONAL_CHANNEL = -3

/**
* Name of the property that defines the default channel capacity when
* [BUFFERED] is used as parameter in `Channel(...)` factory function.
*/
public const val DEFAULT_BUFFER_PROPERTY_NAME = "kotlinx.coroutines.channels.defaultBuffer"

internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
16, 1, UNLIMITED - 1
)
}
}

/**
* Creates a channel with the specified buffer capacity (or without a buffer by default).
* See [Channel] interface documentation for details.
*
* @throws IllegalArgumentException when [capacity] < -1
* @param capacity either a positive channel capacity or one of the constants defined in [Channel.Factory].
* @throws IllegalArgumentException when [capacity] < -2
*/
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannel()
CONFLATED -> ConflatedChannel()
BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY)
else -> ArrayChannel(capacity)
}

Expand Down
Loading

0 comments on commit 73cc85c

Please sign in to comment.