Skip to content

Commit

Permalink
Flow: decouple buffer size from various operators and fuse
Browse files Browse the repository at this point in the history
* Introduce buffer operator
* Remove buffer size from all the other operators
* Fuse all adjacent operators that create channels
* Introduce Channel.DEFAULT buffer size marker

Fixes #1233
  • Loading branch information
elizarov committed May 30, 2019
1 parent 15ee8a3 commit 8a69faa
Show file tree
Hide file tree
Showing 17 changed files with 881 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ public final class kotlinx/coroutines/channels/BroadcastKt {

public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/coroutines/channels/ReceiveChannel, kotlinx/coroutines/channels/SendChannel {
public static final field CONFLATED I
public static final field DEFAULT I
public static final field DEFAULT_CAPACITY_PROPERTY_NAME Ljava/lang/String;
public static final field Factory Lkotlinx/coroutines/channels/Channel$Factory;
public static final field RENDEZVOUS I
public static final field UNLIMITED I
Expand All @@ -554,6 +556,8 @@ public final class kotlinx/coroutines/channels/Channel$DefaultImpls {

public final class kotlinx/coroutines/channels/Channel$Factory {
public static final field CONFLATED I
public static final field DEFAULT I
public static final field DEFAULT_CAPACITY_PROPERTY_NAME Ljava/lang/String;
public static final field RENDEZVOUS I
public static final field UNLIMITED I
}
Expand Down Expand Up @@ -785,6 +789,7 @@ public abstract interface class kotlinx/coroutines/flow/FlowCollector {
}

public final class kotlinx/coroutines/flow/FlowKt {
public static final field DEFAULT_CONCURRENCY_PROPERTY_NAME Ljava/lang/String;
public static final fun asFlow (Ljava/lang/Iterable;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Ljava/util/Iterator;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Lkotlin/jvm/functions/Function0;)Lkotlinx/coroutines/flow/Flow;
Expand All @@ -796,12 +801,12 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun asFlow ([I)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow ([J)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;ILkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static final fun callbackFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun callbackFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun channelFlow (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun channelFlow$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun broadcastIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static synthetic fun broadcastIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/CoroutineStart;ILjava/lang/Object;)Lkotlinx/coroutines/channels/BroadcastChannel;
public static final fun buffer (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun buffer$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun callbackFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
Expand All @@ -821,29 +826,27 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun filterNot (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapMerge (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flatMapMerge$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flattenConcat (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;II)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IIILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flattenMerge (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flattenMerge$default (Lkotlinx/coroutines/flow/Flow;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOf (Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOf ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flowOn$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowViaChannel (ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flowViaChannel$default (ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowWith (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flowWith$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun flowWith (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun getDEFAULT_CONCURRENCY ()I
public static final fun map (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun mapNotNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun onErrorReturn (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun onErrorReturn$default (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;I)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun produceIn$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.DEFAULT
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED

/**
Expand Down Expand Up @@ -50,9 +51,11 @@ public interface BroadcastChannel<E> : SendChannel<E> {
* Creates a broadcast channel with the specified buffer capacity.
*
* The resulting channel type depends on the specified [capacity] parameter:
*
* * when `capacity` positive, but less than [UNLIMITED] -- creates `ArrayBroadcastChannel` with a buffer of given capacity.
* **Note:** this channel looses all items that are send to it until the first subscriber appears;
* * when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
* * when `capacity` is [DEFAULT] -- creates `ArrayBroadcastChannel` with default capacity.
* * otherwise -- throws [IllegalArgumentException].
*
* **Note: This is an experimental api.** It may be changed in the future updates.
Expand All @@ -63,5 +66,6 @@ public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
UNLIMITED -> throw IllegalArgumentException("Unsupported UNLIMITED capacity for BroadcastChannel")
CONFLATED -> ConflatedBroadcastChannel()
DEFAULT -> ArrayBroadcastChannel(CHANNEL_DEFAULT_CAPACITY)
else -> ArrayBroadcastChannel(capacity)
}
26 changes: 25 additions & 1 deletion kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.channels.Channel.Factory.DEFAULT
import kotlinx.coroutines.internal.systemProp
import kotlinx.coroutines.selects.*
import kotlin.jvm.*

Expand Down Expand Up @@ -372,20 +374,42 @@ public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
* Requests conflated channel in `Channel(...)` factory function -- the `ConflatedChannel` gets created.
*/
public const val CONFLATED = -1

/**
* Requests default channel capacity in `Channel(...)` factory function.
* This capacity is equal to 16 by default and can be overridden by setting
* [DEFAULT_CAPACITY_PROPERTY_NAME] on JVM.
*/
public const val DEFAULT = -2

// only for internal use, cannot be used with Channel(...)
internal const val OPTIONAL_CHANNEL = -3

/**
* Name of the property that defines the default channel capacity when
* [DEFAULT] is used as parameter in `Channel(...)` factory function.
*/
public const val DEFAULT_CAPACITY_PROPERTY_NAME = "kotlinx.coroutines.channels.defaultCapacity"
}
}

internal val CHANNEL_DEFAULT_CAPACITY = systemProp(Channel.DEFAULT_CAPACITY_PROPERTY_NAME,
16, 1, UNLIMITED - 1
)

/**
* Creates a channel with the specified buffer capacity (or without a buffer by default).
* See [Channel] interface documentation for details.
*
* @throws IllegalArgumentException when [capacity] < -1
* @param capacity either a positive channel capacity or one of the constants defined in [Channel.Factory].
* @throws IllegalArgumentException when [capacity] < -2
*/
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannel()
CONFLATED -> ConflatedChannel()
DEFAULT -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY)

This comment has been minimized.

Copy link
@fvasco

fvasco May 30, 2019

Contributor

It looks strange to me that the default Channel is RENDEZVOUS, but the Channel.DEFAULT is ArrayChannel(16).

Can we put this constant elsewhere?
Ie: Flow.DEFAULT_BUFFER_CAPACITY

This comment has been minimized.

Copy link
@elizarov

elizarov May 30, 2019

Author Contributor

That's a valid observation. It does not really belong to Flow but we might find a better name for this constant (something other than DEFAULT). One suggestion is to name it DEFAULT_BUFFER to be more explicit about the fact that it is creating a buffered channel with a default size.

This comment has been minimized.

Copy link
@fvasco

fvasco May 31, 2019

Contributor

Hi @elizarov,
DEFAULT_BUFFER and BUFFER_DEFAULT are really similar, so it is easy confuse them.
If you say that the "default buffer" is 16, it is preatty easy use this constant in the code, ie newCapacity.coerceAtLeast(Channel.DEFAULT_BUFFER).

My suggestion is to consider to set Channel.DEFAULT_BUFFER = 16, if this is acceptable in the rest of code. RENDEZVOUS and UNLIMITED are valid values, this can be reasonable also for DEFAULT_BUFFER.
Otherwise please consider this name as a part of enumeration, ie: BUFFERED, so RENDEZVOUS, BUFFERED, UNLIMITED and CONFLATED.

This comment has been minimized.

Copy link
@elizarov

elizarov May 31, 2019

Author Contributor

For the purpose of bufffer() operation fusion is it important to distinguish a case of "use a buffer of 16" and a case of "use a buffer of some size, I don't care which". BUFFERED is an interesting suggestion, though.

else -> ArrayChannel(capacity)
}

Expand Down
Loading

0 comments on commit 8a69faa

Please sign in to comment.