From a16cef068eccd2624bea0d8c3efaf61f33cd9e43 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 18 Apr 2024 12:57:26 +0200 Subject: [PATCH 1/2] Add optional name parameter to .limitedParallelism Fixes #4023 --- .../api/kotlinx-coroutines-core.api | 6 ++++-- .../api/kotlinx-coroutines-core.klib.api | 3 ++- .../common/src/CoroutineDispatcher.kt | 14 +++++++++++--- .../common/src/EventLoop.common.kt | 2 +- .../common/src/MainCoroutineDispatcher.kt | 2 +- kotlinx-coroutines-core/common/src/Unconfined.kt | 2 +- .../common/src/internal/LimitedDispatcher.kt | 9 +++++---- .../jsAndWasmShared/src/internal/JSDispatcher.kt | 2 +- .../jvm/src/internal/MainDispatchers.kt | 2 +- .../jvm/src/scheduling/Dispatcher.kt | 12 ++++++------ .../jvm/test/DispatchersToStringTest.kt | 5 +++++ kotlinx-coroutines-core/native/src/Dispatchers.kt | 4 ++-- 12 files changed, 40 insertions(+), 23 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 3e91bad57c..1e786a2116 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -170,7 +170,9 @@ public abstract class kotlinx/coroutines/CoroutineDispatcher : kotlin/coroutines public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element; public final fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation; public fun isDispatchNeeded (Lkotlin/coroutines/CoroutineContext;)Z - public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher; + public synthetic fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher; + public fun limitedParallelism (ILjava/lang/String;)Lkotlinx/coroutines/CoroutineDispatcher; + public static synthetic fun limitedParallelism$default (Lkotlinx/coroutines/CoroutineDispatcher;ILjava/lang/String;ILjava/lang/Object;)Lkotlinx/coroutines/CoroutineDispatcher; public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext; public final fun plus (Lkotlinx/coroutines/CoroutineDispatcher;)Lkotlinx/coroutines/CoroutineDispatcher; public final fun releaseInterceptedContinuation (Lkotlin/coroutines/Continuation;)V @@ -502,7 +504,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin public abstract class kotlinx/coroutines/MainCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher { public fun ()V public abstract fun getImmediate ()Lkotlinx/coroutines/MainCoroutineDispatcher; - public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher; + public fun limitedParallelism (ILjava/lang/String;)Lkotlinx/coroutines/CoroutineDispatcher; public fun toString ()Ljava/lang/String; protected final fun toStringInternalImpl ()Ljava/lang/String; } diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api index 0690da3126..b61fac350b 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api @@ -59,6 +59,7 @@ abstract class kotlinx.coroutines/CoroutineDispatcher : kotlin.coroutines/Abstra open fun dispatchYield(kotlin.coroutines/CoroutineContext, kotlinx.coroutines/Runnable) // kotlinx.coroutines/CoroutineDispatcher.dispatchYield|dispatchYield(kotlin.coroutines.CoroutineContext;kotlinx.coroutines.Runnable){}[0] open fun isDispatchNeeded(kotlin.coroutines/CoroutineContext): kotlin/Boolean // kotlinx.coroutines/CoroutineDispatcher.isDispatchNeeded|isDispatchNeeded(kotlin.coroutines.CoroutineContext){}[0] open fun limitedParallelism(kotlin/Int): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/CoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int){}[0] + open fun limitedParallelism(kotlin/Int, kotlin/String? =...): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/CoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int;kotlin.String?){}[0] open fun toString(): kotlin/String // kotlinx.coroutines/CoroutineDispatcher.toString|toString(){}[0] } abstract class kotlinx.coroutines/MainCoroutineDispatcher : kotlinx.coroutines/CoroutineDispatcher { // kotlinx.coroutines/MainCoroutineDispatcher|null[0] @@ -66,7 +67,7 @@ abstract class kotlinx.coroutines/MainCoroutineDispatcher : kotlinx.coroutines/C abstract fun (): kotlinx.coroutines/MainCoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.immediate.|(){}[0] constructor () // kotlinx.coroutines/MainCoroutineDispatcher.|(){}[0] final fun toStringInternalImpl(): kotlin/String? // kotlinx.coroutines/MainCoroutineDispatcher.toStringInternalImpl|toStringInternalImpl(){}[0] - open fun limitedParallelism(kotlin/Int): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int){}[0] + open fun limitedParallelism(kotlin/Int, kotlin/String?): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int;kotlin.String?){}[0] open fun toString(): kotlin/String // kotlinx.coroutines/MainCoroutineDispatcher.toString|toString(){}[0] } abstract fun interface <#A: in kotlin/Any?> kotlinx.coroutines.flow/FlowCollector { // kotlinx.coroutines.flow/FlowCollector|null[0] diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt index 7bcb003eba..396ecb8f33 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt @@ -105,7 +105,7 @@ public abstract class CoroutineDispatcher : * is established between them: * * ``` - * val confined = Dispatchers.Default.limitedParallelism(1) + * val confined = Dispatchers.Default.limitedParallelism(1, "incrementDispatcher") * var counter = 0 * * // Invoked from arbitrary coroutines @@ -135,15 +135,23 @@ public abstract class CoroutineDispatcher : * Implementations of this method are allowed to return `this` if the current dispatcher already satisfies the parallelism requirement. * For example, `Dispatchers.Main.limitedParallelism(1)` returns `Dispatchers.Main`, because the main dispatcher is already single-threaded. * + * @param name optional name for the resulting dispatcher string representation if a new dispatcher was created * @throws IllegalArgumentException if the given [parallelism] is non-positive * @throws UnsupportedOperationException if the current dispatcher does not support limited parallelism views */ @ExperimentalCoroutinesApi - public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + public open fun limitedParallelism(parallelism: Int, name: String? = null): CoroutineDispatcher { parallelism.checkParallelism() - return LimitedDispatcher(this, parallelism) + return LimitedDispatcher(this, parallelism, name) } + // Was experimental since 1.6.0, deprecated since 1.8.x + @Deprecated("Deprecated for good. Override 'limitedParallelism(parallelism: Int, name: String?)' instead", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("limitedParallelism(parallelism, null)") + ) + public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher = limitedParallelism(parallelism, null) + /** * Requests execution of a runnable [block]. * The dispatcher guarantees that [block] will eventually execute, typically by dispatching it to a thread pool, diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index 914cc8ea69..959aa7b7ec 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -111,7 +111,7 @@ internal abstract class EventLoop : CoroutineDispatcher() { } } - final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + final override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() return this } diff --git a/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt index 8644474879..726cd6102e 100644 --- a/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt @@ -49,7 +49,7 @@ public abstract class MainCoroutineDispatcher : CoroutineDispatcher() { */ override fun toString(): String = toStringInternalImpl() ?: "$classSimpleName@$hexAddress" - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() // MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it return this diff --git a/kotlinx-coroutines-core/common/src/Unconfined.kt b/kotlinx-coroutines-core/common/src/Unconfined.kt index 46ea4ea191..ffc89e622f 100644 --- a/kotlinx-coroutines-core/common/src/Unconfined.kt +++ b/kotlinx-coroutines-core/common/src/Unconfined.kt @@ -9,7 +9,7 @@ import kotlin.jvm.* internal object Unconfined : CoroutineDispatcher() { @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { throw UnsupportedOperationException("limitedParallelism is not supported for Dispatchers.Unconfined") } diff --git a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt index 9faa71a6d9..7a63802159 100644 --- a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt @@ -21,7 +21,8 @@ import kotlin.coroutines.* */ internal class LimitedDispatcher( private val dispatcher: CoroutineDispatcher, - private val parallelism: Int + private val parallelism: Int, + private val name: String? ) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) { // Atomic is necessary here for the sake of K/N memory ordering, @@ -34,10 +35,10 @@ internal class LimitedDispatcher( private val workerAllocationLock = SynchronizedObject() @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() if (parallelism >= this.parallelism) return this - return super.limitedParallelism(parallelism) + return super.limitedParallelism(parallelism, name) } override fun dispatch(context: CoroutineContext, block: Runnable) { @@ -95,7 +96,7 @@ internal class LimitedDispatcher( } } - override fun toString() = "$dispatcher.limitedParallelism($parallelism)" + override fun toString() = name ?: "$dispatcher.limitedParallelism($parallelism)" /** * A worker that polls the queue and runs tasks until there are no more of them. diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt index b002ea722a..1529cbf53c 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt @@ -30,7 +30,7 @@ internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay abstract fun scheduleQueueProcessing() - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() return this } diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt index 3eaf723e9a..0cace58cf9 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt @@ -91,7 +91,7 @@ private class MissingMainCoroutineDispatcher( override fun isDispatchNeeded(context: CoroutineContext): Boolean = missing() - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher = + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher = missing() override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index 335af8810f..401443ba56 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -12,10 +12,10 @@ internal object DefaultScheduler : SchedulerCoroutineDispatcher( ) { @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() if (parallelism >= CORE_POOL_SIZE) return this - return super.limitedParallelism(parallelism) + return super.limitedParallelism(parallelism, name) } // Shuts down the dispatcher, used only by Dispatchers.shutdown() @@ -44,10 +44,10 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() { } @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() if (parallelism >= MAX_POOL_SIZE) return this - return super.limitedParallelism(parallelism) + return super.limitedParallelism(parallelism, name) } // This name only leaks to user code as part of .limitedParallelism machinery @@ -72,9 +72,9 @@ internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor { override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command) @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { // See documentation to Dispatchers.IO for the rationale - return UnlimitedIoScheduler.limitedParallelism(parallelism) + return UnlimitedIoScheduler.limitedParallelism(parallelism, name) } override fun dispatch(context: CoroutineContext, block: Runnable) { diff --git a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt index b455e3c3af..9a83e6f974 100644 --- a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt @@ -18,5 +18,10 @@ class DispatchersToStringTest { assertEquals("Dispatchers.Default.limitedParallelism(2)", Dispatchers.Default.limitedParallelism(2).toString()) // Not overridden at all, limited parallelism returns `this` assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) + + assertEquals("filesDispatcher", Dispatchers.IO.limitedParallelism(1, "filesDispatcher").toString()) + assertEquals("json", Dispatchers.Default.limitedParallelism(2, "json").toString()) + // Not overridden at all, limited parallelism returns `this` + assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42, "ignored").toString()) } } \ No newline at end of file diff --git a/kotlinx-coroutines-core/native/src/Dispatchers.kt b/kotlinx-coroutines-core/native/src/Dispatchers.kt index 9965186303..119b4fd323 100644 --- a/kotlinx-coroutines-core/native/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/native/src/Dispatchers.kt @@ -28,9 +28,9 @@ internal object DefaultIoScheduler : CoroutineDispatcher() { private val io = unlimitedPool.limitedParallelism(64) // Default JVM size @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { // See documentation to Dispatchers.IO for the rationale - return unlimitedPool.limitedParallelism(parallelism) + return unlimitedPool.limitedParallelism(parallelism, name) } override fun dispatch(context: CoroutineContext, block: Runnable) { From 7917919002ec02ce2af52cdd309e84427717a807 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 26 Apr 2024 13:00:12 +0200 Subject: [PATCH 2/2] Introduce internal named dispatcher so the name is orthogonal to the limitedParallelism implementation --- .../common/src/CoroutineDispatcher.kt | 9 ++++--- .../common/src/EventLoop.common.kt | 2 +- .../common/src/MainCoroutineDispatcher.kt | 2 +- .../common/src/internal/LimitedDispatcher.kt | 7 +++++- .../common/src/internal/NamedDispatcher.kt | 25 +++++++++++++++++++ .../src/internal/JSDispatcher.kt | 2 +- .../jvm/src/scheduling/Dispatcher.kt | 8 ++++-- .../jvm/test/DispatchersToStringTest.kt | 23 +++++++++++++++-- test-utils/common/src/TestBase.common.kt | 1 + 9 files changed, 67 insertions(+), 12 deletions(-) create mode 100644 kotlinx-coroutines-core/common/src/internal/NamedDispatcher.kt diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt index 396ecb8f33..8ab83ed3a8 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt @@ -83,11 +83,11 @@ public abstract class CoroutineDispatcher : * // Background dispatcher for the application * val dispatcher = newFixedThreadPoolContext(4, "App Background") * // At most 2 threads will be processing images as it is really slow and CPU-intensive - * val imageProcessingDispatcher = dispatcher.limitedParallelism(2) + * val imageProcessingDispatcher = dispatcher.limitedParallelism(2, "Image processor") * // At most 3 threads will be processing JSON to avoid image processing starvation - * val jsonProcessingDispatcher = dispatcher.limitedParallelism(3) + * val jsonProcessingDispatcher = dispatcher.limitedParallelism(3, "Json processor") * // At most 1 thread will be doing IO - * val fileWriterDispatcher = dispatcher.limitedParallelism(1) + * val fileWriterDispatcher = dispatcher.limitedParallelism(1, "File writer") * ``` * Note how in this example the application has an executor with 4 threads, but the total sum of all limits * is 6. Still, at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism, @@ -135,7 +135,8 @@ public abstract class CoroutineDispatcher : * Implementations of this method are allowed to return `this` if the current dispatcher already satisfies the parallelism requirement. * For example, `Dispatchers.Main.limitedParallelism(1)` returns `Dispatchers.Main`, because the main dispatcher is already single-threaded. * - * @param name optional name for the resulting dispatcher string representation if a new dispatcher was created + * @param name optional name for the resulting dispatcher string representation if a new dispatcher was created. + * Implementations are free to ignore this parameter. * @throws IllegalArgumentException if the given [parallelism] is non-positive * @throws UnsupportedOperationException if the current dispatcher does not support limited parallelism views */ diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index 959aa7b7ec..6bfcbf0cbf 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -113,7 +113,7 @@ internal abstract class EventLoop : CoroutineDispatcher() { final override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() - return this + return namedOrThis(name) // Single-threaded, short-circuit } open fun shutdown() {} diff --git a/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt index 726cd6102e..5a7eb9a7c4 100644 --- a/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt @@ -52,7 +52,7 @@ public abstract class MainCoroutineDispatcher : CoroutineDispatcher() { override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() // MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it - return this + return namedOrThis(name) } /** diff --git a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt index 7a63802159..fb615649c9 100644 --- a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt @@ -37,7 +37,7 @@ internal class LimitedDispatcher( @ExperimentalCoroutinesApi override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() - if (parallelism >= this.parallelism) return this + if (parallelism >= this.parallelism) return namedOrThis(name) return super.limitedParallelism(parallelism, name) } @@ -129,3 +129,8 @@ internal class LimitedDispatcher( } internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" } + +internal fun CoroutineDispatcher.namedOrThis(name: String?): CoroutineDispatcher { + if (name != null) return NamedDispatcher(this, name) + return this +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/internal/NamedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/NamedDispatcher.kt new file mode 100644 index 0000000000..72dbd65380 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/NamedDispatcher.kt @@ -0,0 +1,25 @@ +package kotlinx.coroutines.internal + +import kotlinx.coroutines.* +import kotlinx.coroutines.DefaultDelay +import kotlin.coroutines.* + +/** + * Wrapping dispatcher that has a nice user-supplied `toString()` representation + */ +internal class NamedDispatcher( + private val dispatcher: CoroutineDispatcher, + private val name: String +) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) { + + override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context) + + override fun dispatch(context: CoroutineContext, block: Runnable) = dispatcher.dispatch(context, block) + + @InternalCoroutinesApi + override fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatcher.dispatchYield(context, block) + + override fun toString(): String { + return name + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt index 1529cbf53c..eee75ffc50 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt @@ -32,7 +32,7 @@ internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() - return this + return namedOrThis(name) } override fun dispatch(context: CoroutineContext, block: Runnable) { diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index 401443ba56..1dab5268c7 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -14,7 +14,9 @@ internal object DefaultScheduler : SchedulerCoroutineDispatcher( @ExperimentalCoroutinesApi override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() - if (parallelism >= CORE_POOL_SIZE) return this + if (parallelism >= CORE_POOL_SIZE) { + return namedOrThis(name) + } return super.limitedParallelism(parallelism, name) } @@ -46,7 +48,9 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() { @ExperimentalCoroutinesApi override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() - if (parallelism >= MAX_POOL_SIZE) return this + if (parallelism >= MAX_POOL_SIZE) { + return namedOrThis(name) + } return super.limitedParallelism(parallelism, name) } diff --git a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt index 9a83e6f974..128da00f30 100644 --- a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt @@ -1,3 +1,5 @@ +@file:OptIn(ExperimentalStdlibApi::class) + package kotlinx.coroutines import kotlin.test.* @@ -21,7 +23,24 @@ class DispatchersToStringTest { assertEquals("filesDispatcher", Dispatchers.IO.limitedParallelism(1, "filesDispatcher").toString()) assertEquals("json", Dispatchers.Default.limitedParallelism(2, "json").toString()) - // Not overridden at all, limited parallelism returns `this` - assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42, "ignored").toString()) + assertEquals("\uD80C\uDE11", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42, "\uD80C\uDE11").toString()) + assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) + + val limitedNamed = Dispatchers.IO.limitedParallelism(10, "limited") + assertEquals("limited.limitedParallelism(2)", limitedNamed.limitedParallelism(2).toString()) + assertEquals("2", limitedNamed.limitedParallelism(2, "2").toString()) + // We asked for too many threads with no name, this was returned + assertEquals("limited", limitedNamed.limitedParallelism(12).toString()) + assertEquals("12", limitedNamed.limitedParallelism(12, "12").toString()) + + runBlocking { + val d = coroutineContext[CoroutineDispatcher]!! + assertContains(d.toString(), "BlockingEventLoop") + val limited = d.limitedParallelism(2) + assertContains(limited.toString(), "BlockingEventLoop") + assertFalse(limited.toString().contains("limitedParallelism")) + val named = d.limitedParallelism(2, "Named") + assertEquals("Named", named.toString()) + } } } \ No newline at end of file diff --git a/test-utils/common/src/TestBase.common.kt b/test-utils/common/src/TestBase.common.kt index 5c0cba4eec..e12d897adc 100644 --- a/test-utils/common/src/TestBase.common.kt +++ b/test-utils/common/src/TestBase.common.kt @@ -252,6 +252,7 @@ public class TestRuntimeException(message: String? = null, private val data: Any public class RecoverableTestException(message: String? = null) : RuntimeException(message) public class RecoverableTestCancellationException(message: String? = null) : CancellationException(message) +// Erases identity and equality checks for tests public fun wrapperDispatcher(context: CoroutineContext): CoroutineContext { val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher return object : CoroutineDispatcher() {