From a16cef068eccd2624bea0d8c3efaf61f33cd9e43 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 18 Apr 2024 12:57:26 +0200 Subject: [PATCH] Add optional name parameter to .limitedParallelism Fixes #4023 --- .../api/kotlinx-coroutines-core.api | 6 ++++-- .../api/kotlinx-coroutines-core.klib.api | 3 ++- .../common/src/CoroutineDispatcher.kt | 14 +++++++++++--- .../common/src/EventLoop.common.kt | 2 +- .../common/src/MainCoroutineDispatcher.kt | 2 +- kotlinx-coroutines-core/common/src/Unconfined.kt | 2 +- .../common/src/internal/LimitedDispatcher.kt | 9 +++++---- .../jsAndWasmShared/src/internal/JSDispatcher.kt | 2 +- .../jvm/src/internal/MainDispatchers.kt | 2 +- .../jvm/src/scheduling/Dispatcher.kt | 12 ++++++------ .../jvm/test/DispatchersToStringTest.kt | 5 +++++ kotlinx-coroutines-core/native/src/Dispatchers.kt | 4 ++-- 12 files changed, 40 insertions(+), 23 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 3e91bad57c..1e786a2116 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -170,7 +170,9 @@ public abstract class kotlinx/coroutines/CoroutineDispatcher : kotlin/coroutines public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element; public final fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation; public fun isDispatchNeeded (Lkotlin/coroutines/CoroutineContext;)Z - public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher; + public synthetic fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher; + public fun limitedParallelism (ILjava/lang/String;)Lkotlinx/coroutines/CoroutineDispatcher; + public static synthetic fun limitedParallelism$default (Lkotlinx/coroutines/CoroutineDispatcher;ILjava/lang/String;ILjava/lang/Object;)Lkotlinx/coroutines/CoroutineDispatcher; public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext; public final fun plus (Lkotlinx/coroutines/CoroutineDispatcher;)Lkotlinx/coroutines/CoroutineDispatcher; public final fun releaseInterceptedContinuation (Lkotlin/coroutines/Continuation;)V @@ -502,7 +504,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin public abstract class kotlinx/coroutines/MainCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher { public fun ()V public abstract fun getImmediate ()Lkotlinx/coroutines/MainCoroutineDispatcher; - public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher; + public fun limitedParallelism (ILjava/lang/String;)Lkotlinx/coroutines/CoroutineDispatcher; public fun toString ()Ljava/lang/String; protected final fun toStringInternalImpl ()Ljava/lang/String; } diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api index 0690da3126..b61fac350b 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api @@ -59,6 +59,7 @@ abstract class kotlinx.coroutines/CoroutineDispatcher : kotlin.coroutines/Abstra open fun dispatchYield(kotlin.coroutines/CoroutineContext, kotlinx.coroutines/Runnable) // kotlinx.coroutines/CoroutineDispatcher.dispatchYield|dispatchYield(kotlin.coroutines.CoroutineContext;kotlinx.coroutines.Runnable){}[0] open fun isDispatchNeeded(kotlin.coroutines/CoroutineContext): kotlin/Boolean // kotlinx.coroutines/CoroutineDispatcher.isDispatchNeeded|isDispatchNeeded(kotlin.coroutines.CoroutineContext){}[0] open fun limitedParallelism(kotlin/Int): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/CoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int){}[0] + open fun limitedParallelism(kotlin/Int, kotlin/String? =...): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/CoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int;kotlin.String?){}[0] open fun toString(): kotlin/String // kotlinx.coroutines/CoroutineDispatcher.toString|toString(){}[0] } abstract class kotlinx.coroutines/MainCoroutineDispatcher : kotlinx.coroutines/CoroutineDispatcher { // kotlinx.coroutines/MainCoroutineDispatcher|null[0] @@ -66,7 +67,7 @@ abstract class kotlinx.coroutines/MainCoroutineDispatcher : kotlinx.coroutines/C abstract fun (): kotlinx.coroutines/MainCoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.immediate.|(){}[0] constructor () // kotlinx.coroutines/MainCoroutineDispatcher.|(){}[0] final fun toStringInternalImpl(): kotlin/String? // kotlinx.coroutines/MainCoroutineDispatcher.toStringInternalImpl|toStringInternalImpl(){}[0] - open fun limitedParallelism(kotlin/Int): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int){}[0] + open fun limitedParallelism(kotlin/Int, kotlin/String?): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int;kotlin.String?){}[0] open fun toString(): kotlin/String // kotlinx.coroutines/MainCoroutineDispatcher.toString|toString(){}[0] } abstract fun interface <#A: in kotlin/Any?> kotlinx.coroutines.flow/FlowCollector { // kotlinx.coroutines.flow/FlowCollector|null[0] diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt index 7bcb003eba..396ecb8f33 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt @@ -105,7 +105,7 @@ public abstract class CoroutineDispatcher : * is established between them: * * ``` - * val confined = Dispatchers.Default.limitedParallelism(1) + * val confined = Dispatchers.Default.limitedParallelism(1, "incrementDispatcher") * var counter = 0 * * // Invoked from arbitrary coroutines @@ -135,15 +135,23 @@ public abstract class CoroutineDispatcher : * Implementations of this method are allowed to return `this` if the current dispatcher already satisfies the parallelism requirement. * For example, `Dispatchers.Main.limitedParallelism(1)` returns `Dispatchers.Main`, because the main dispatcher is already single-threaded. * + * @param name optional name for the resulting dispatcher string representation if a new dispatcher was created * @throws IllegalArgumentException if the given [parallelism] is non-positive * @throws UnsupportedOperationException if the current dispatcher does not support limited parallelism views */ @ExperimentalCoroutinesApi - public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + public open fun limitedParallelism(parallelism: Int, name: String? = null): CoroutineDispatcher { parallelism.checkParallelism() - return LimitedDispatcher(this, parallelism) + return LimitedDispatcher(this, parallelism, name) } + // Was experimental since 1.6.0, deprecated since 1.8.x + @Deprecated("Deprecated for good. Override 'limitedParallelism(parallelism: Int, name: String?)' instead", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("limitedParallelism(parallelism, null)") + ) + public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher = limitedParallelism(parallelism, null) + /** * Requests execution of a runnable [block]. * The dispatcher guarantees that [block] will eventually execute, typically by dispatching it to a thread pool, diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index 914cc8ea69..959aa7b7ec 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -111,7 +111,7 @@ internal abstract class EventLoop : CoroutineDispatcher() { } } - final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + final override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() return this } diff --git a/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt index 8644474879..726cd6102e 100644 --- a/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt @@ -49,7 +49,7 @@ public abstract class MainCoroutineDispatcher : CoroutineDispatcher() { */ override fun toString(): String = toStringInternalImpl() ?: "$classSimpleName@$hexAddress" - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() // MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it return this diff --git a/kotlinx-coroutines-core/common/src/Unconfined.kt b/kotlinx-coroutines-core/common/src/Unconfined.kt index 46ea4ea191..ffc89e622f 100644 --- a/kotlinx-coroutines-core/common/src/Unconfined.kt +++ b/kotlinx-coroutines-core/common/src/Unconfined.kt @@ -9,7 +9,7 @@ import kotlin.jvm.* internal object Unconfined : CoroutineDispatcher() { @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { throw UnsupportedOperationException("limitedParallelism is not supported for Dispatchers.Unconfined") } diff --git a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt index 9faa71a6d9..7a63802159 100644 --- a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt @@ -21,7 +21,8 @@ import kotlin.coroutines.* */ internal class LimitedDispatcher( private val dispatcher: CoroutineDispatcher, - private val parallelism: Int + private val parallelism: Int, + private val name: String? ) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) { // Atomic is necessary here for the sake of K/N memory ordering, @@ -34,10 +35,10 @@ internal class LimitedDispatcher( private val workerAllocationLock = SynchronizedObject() @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() if (parallelism >= this.parallelism) return this - return super.limitedParallelism(parallelism) + return super.limitedParallelism(parallelism, name) } override fun dispatch(context: CoroutineContext, block: Runnable) { @@ -95,7 +96,7 @@ internal class LimitedDispatcher( } } - override fun toString() = "$dispatcher.limitedParallelism($parallelism)" + override fun toString() = name ?: "$dispatcher.limitedParallelism($parallelism)" /** * A worker that polls the queue and runs tasks until there are no more of them. diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt index b002ea722a..1529cbf53c 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt @@ -30,7 +30,7 @@ internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay abstract fun scheduleQueueProcessing() - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() return this } diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt index 3eaf723e9a..0cace58cf9 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt @@ -91,7 +91,7 @@ private class MissingMainCoroutineDispatcher( override fun isDispatchNeeded(context: CoroutineContext): Boolean = missing() - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher = + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher = missing() override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index 335af8810f..401443ba56 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -12,10 +12,10 @@ internal object DefaultScheduler : SchedulerCoroutineDispatcher( ) { @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() if (parallelism >= CORE_POOL_SIZE) return this - return super.limitedParallelism(parallelism) + return super.limitedParallelism(parallelism, name) } // Shuts down the dispatcher, used only by Dispatchers.shutdown() @@ -44,10 +44,10 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() { } @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() if (parallelism >= MAX_POOL_SIZE) return this - return super.limitedParallelism(parallelism) + return super.limitedParallelism(parallelism, name) } // This name only leaks to user code as part of .limitedParallelism machinery @@ -72,9 +72,9 @@ internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor { override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command) @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { // See documentation to Dispatchers.IO for the rationale - return UnlimitedIoScheduler.limitedParallelism(parallelism) + return UnlimitedIoScheduler.limitedParallelism(parallelism, name) } override fun dispatch(context: CoroutineContext, block: Runnable) { diff --git a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt index b455e3c3af..9a83e6f974 100644 --- a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt @@ -18,5 +18,10 @@ class DispatchersToStringTest { assertEquals("Dispatchers.Default.limitedParallelism(2)", Dispatchers.Default.limitedParallelism(2).toString()) // Not overridden at all, limited parallelism returns `this` assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) + + assertEquals("filesDispatcher", Dispatchers.IO.limitedParallelism(1, "filesDispatcher").toString()) + assertEquals("json", Dispatchers.Default.limitedParallelism(2, "json").toString()) + // Not overridden at all, limited parallelism returns `this` + assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42, "ignored").toString()) } } \ No newline at end of file diff --git a/kotlinx-coroutines-core/native/src/Dispatchers.kt b/kotlinx-coroutines-core/native/src/Dispatchers.kt index 9965186303..119b4fd323 100644 --- a/kotlinx-coroutines-core/native/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/native/src/Dispatchers.kt @@ -28,9 +28,9 @@ internal object DefaultIoScheduler : CoroutineDispatcher() { private val io = unlimitedPool.limitedParallelism(64) // Default JVM size @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { // See documentation to Dispatchers.IO for the rationale - return unlimitedPool.limitedParallelism(parallelism) + return unlimitedPool.limitedParallelism(parallelism, name) } override fun dispatch(context: CoroutineContext, block: Runnable) {