diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 3e91bad57c..1e786a2116 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -170,7 +170,9 @@ public abstract class kotlinx/coroutines/CoroutineDispatcher : kotlin/coroutines public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element; public final fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation; public fun isDispatchNeeded (Lkotlin/coroutines/CoroutineContext;)Z - public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher; + public synthetic fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher; + public fun limitedParallelism (ILjava/lang/String;)Lkotlinx/coroutines/CoroutineDispatcher; + public static synthetic fun limitedParallelism$default (Lkotlinx/coroutines/CoroutineDispatcher;ILjava/lang/String;ILjava/lang/Object;)Lkotlinx/coroutines/CoroutineDispatcher; public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext; public final fun plus (Lkotlinx/coroutines/CoroutineDispatcher;)Lkotlinx/coroutines/CoroutineDispatcher; public final fun releaseInterceptedContinuation (Lkotlin/coroutines/Continuation;)V @@ -502,7 +504,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin public abstract class kotlinx/coroutines/MainCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher { public fun ()V public abstract fun getImmediate ()Lkotlinx/coroutines/MainCoroutineDispatcher; - public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher; + public fun limitedParallelism (ILjava/lang/String;)Lkotlinx/coroutines/CoroutineDispatcher; public fun toString ()Ljava/lang/String; protected final fun toStringInternalImpl ()Ljava/lang/String; } diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api index 0690da3126..b61fac350b 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api @@ -59,6 +59,7 @@ abstract class kotlinx.coroutines/CoroutineDispatcher : kotlin.coroutines/Abstra open fun dispatchYield(kotlin.coroutines/CoroutineContext, kotlinx.coroutines/Runnable) // kotlinx.coroutines/CoroutineDispatcher.dispatchYield|dispatchYield(kotlin.coroutines.CoroutineContext;kotlinx.coroutines.Runnable){}[0] open fun isDispatchNeeded(kotlin.coroutines/CoroutineContext): kotlin/Boolean // kotlinx.coroutines/CoroutineDispatcher.isDispatchNeeded|isDispatchNeeded(kotlin.coroutines.CoroutineContext){}[0] open fun limitedParallelism(kotlin/Int): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/CoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int){}[0] + open fun limitedParallelism(kotlin/Int, kotlin/String? =...): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/CoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int;kotlin.String?){}[0] open fun toString(): kotlin/String // kotlinx.coroutines/CoroutineDispatcher.toString|toString(){}[0] } abstract class kotlinx.coroutines/MainCoroutineDispatcher : kotlinx.coroutines/CoroutineDispatcher { // kotlinx.coroutines/MainCoroutineDispatcher|null[0] @@ -66,7 +67,7 @@ abstract class kotlinx.coroutines/MainCoroutineDispatcher : kotlinx.coroutines/C abstract fun (): kotlinx.coroutines/MainCoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.immediate.|(){}[0] constructor () // kotlinx.coroutines/MainCoroutineDispatcher.|(){}[0] final fun toStringInternalImpl(): kotlin/String? // kotlinx.coroutines/MainCoroutineDispatcher.toStringInternalImpl|toStringInternalImpl(){}[0] - open fun limitedParallelism(kotlin/Int): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int){}[0] + open fun limitedParallelism(kotlin/Int, kotlin/String?): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int;kotlin.String?){}[0] open fun toString(): kotlin/String // kotlinx.coroutines/MainCoroutineDispatcher.toString|toString(){}[0] } abstract fun interface <#A: in kotlin/Any?> kotlinx.coroutines.flow/FlowCollector { // kotlinx.coroutines.flow/FlowCollector|null[0] diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt index 7bcb003eba..8ab83ed3a8 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt @@ -83,11 +83,11 @@ public abstract class CoroutineDispatcher : * // Background dispatcher for the application * val dispatcher = newFixedThreadPoolContext(4, "App Background") * // At most 2 threads will be processing images as it is really slow and CPU-intensive - * val imageProcessingDispatcher = dispatcher.limitedParallelism(2) + * val imageProcessingDispatcher = dispatcher.limitedParallelism(2, "Image processor") * // At most 3 threads will be processing JSON to avoid image processing starvation - * val jsonProcessingDispatcher = dispatcher.limitedParallelism(3) + * val jsonProcessingDispatcher = dispatcher.limitedParallelism(3, "Json processor") * // At most 1 thread will be doing IO - * val fileWriterDispatcher = dispatcher.limitedParallelism(1) + * val fileWriterDispatcher = dispatcher.limitedParallelism(1, "File writer") * ``` * Note how in this example the application has an executor with 4 threads, but the total sum of all limits * is 6. Still, at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism, @@ -105,7 +105,7 @@ public abstract class CoroutineDispatcher : * is established between them: * * ``` - * val confined = Dispatchers.Default.limitedParallelism(1) + * val confined = Dispatchers.Default.limitedParallelism(1, "incrementDispatcher") * var counter = 0 * * // Invoked from arbitrary coroutines @@ -135,15 +135,24 @@ public abstract class CoroutineDispatcher : * Implementations of this method are allowed to return `this` if the current dispatcher already satisfies the parallelism requirement. * For example, `Dispatchers.Main.limitedParallelism(1)` returns `Dispatchers.Main`, because the main dispatcher is already single-threaded. * + * @param name optional name for the resulting dispatcher string representation if a new dispatcher was created. + * Implementations are free to ignore this parameter. * @throws IllegalArgumentException if the given [parallelism] is non-positive * @throws UnsupportedOperationException if the current dispatcher does not support limited parallelism views */ @ExperimentalCoroutinesApi - public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + public open fun limitedParallelism(parallelism: Int, name: String? = null): CoroutineDispatcher { parallelism.checkParallelism() - return LimitedDispatcher(this, parallelism) + return LimitedDispatcher(this, parallelism, name) } + // Was experimental since 1.6.0, deprecated since 1.8.x + @Deprecated("Deprecated for good. Override 'limitedParallelism(parallelism: Int, name: String?)' instead", + level = DeprecationLevel.HIDDEN, + replaceWith = ReplaceWith("limitedParallelism(parallelism, null)") + ) + public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher = limitedParallelism(parallelism, null) + /** * Requests execution of a runnable [block]. * The dispatcher guarantees that [block] will eventually execute, typically by dispatching it to a thread pool, diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index 914cc8ea69..6bfcbf0cbf 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -111,9 +111,9 @@ internal abstract class EventLoop : CoroutineDispatcher() { } } - final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + final override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() - return this + return namedOrThis(name) // Single-threaded, short-circuit } open fun shutdown() {} diff --git a/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt index 8644474879..5a7eb9a7c4 100644 --- a/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt @@ -49,10 +49,10 @@ public abstract class MainCoroutineDispatcher : CoroutineDispatcher() { */ override fun toString(): String = toStringInternalImpl() ?: "$classSimpleName@$hexAddress" - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() // MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it - return this + return namedOrThis(name) } /** diff --git a/kotlinx-coroutines-core/common/src/Unconfined.kt b/kotlinx-coroutines-core/common/src/Unconfined.kt index 46ea4ea191..ffc89e622f 100644 --- a/kotlinx-coroutines-core/common/src/Unconfined.kt +++ b/kotlinx-coroutines-core/common/src/Unconfined.kt @@ -9,7 +9,7 @@ import kotlin.jvm.* internal object Unconfined : CoroutineDispatcher() { @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { throw UnsupportedOperationException("limitedParallelism is not supported for Dispatchers.Unconfined") } diff --git a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt index 9faa71a6d9..fb615649c9 100644 --- a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt @@ -21,7 +21,8 @@ import kotlin.coroutines.* */ internal class LimitedDispatcher( private val dispatcher: CoroutineDispatcher, - private val parallelism: Int + private val parallelism: Int, + private val name: String? ) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) { // Atomic is necessary here for the sake of K/N memory ordering, @@ -34,10 +35,10 @@ internal class LimitedDispatcher( private val workerAllocationLock = SynchronizedObject() @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() - if (parallelism >= this.parallelism) return this - return super.limitedParallelism(parallelism) + if (parallelism >= this.parallelism) return namedOrThis(name) + return super.limitedParallelism(parallelism, name) } override fun dispatch(context: CoroutineContext, block: Runnable) { @@ -95,7 +96,7 @@ internal class LimitedDispatcher( } } - override fun toString() = "$dispatcher.limitedParallelism($parallelism)" + override fun toString() = name ?: "$dispatcher.limitedParallelism($parallelism)" /** * A worker that polls the queue and runs tasks until there are no more of them. @@ -128,3 +129,8 @@ internal class LimitedDispatcher( } internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" } + +internal fun CoroutineDispatcher.namedOrThis(name: String?): CoroutineDispatcher { + if (name != null) return NamedDispatcher(this, name) + return this +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/internal/NamedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/NamedDispatcher.kt new file mode 100644 index 0000000000..72dbd65380 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/NamedDispatcher.kt @@ -0,0 +1,25 @@ +package kotlinx.coroutines.internal + +import kotlinx.coroutines.* +import kotlinx.coroutines.DefaultDelay +import kotlin.coroutines.* + +/** + * Wrapping dispatcher that has a nice user-supplied `toString()` representation + */ +internal class NamedDispatcher( + private val dispatcher: CoroutineDispatcher, + private val name: String +) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) { + + override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context) + + override fun dispatch(context: CoroutineContext, block: Runnable) = dispatcher.dispatch(context, block) + + @InternalCoroutinesApi + override fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatcher.dispatchYield(context, block) + + override fun toString(): String { + return name + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt index b002ea722a..eee75ffc50 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt @@ -30,9 +30,9 @@ internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay abstract fun scheduleQueueProcessing() - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() - return this + return namedOrThis(name) } override fun dispatch(context: CoroutineContext, block: Runnable) { diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt index 3eaf723e9a..0cace58cf9 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt @@ -91,7 +91,7 @@ private class MissingMainCoroutineDispatcher( override fun isDispatchNeeded(context: CoroutineContext): Boolean = missing() - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher = + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher = missing() override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index 335af8810f..1dab5268c7 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -12,10 +12,12 @@ internal object DefaultScheduler : SchedulerCoroutineDispatcher( ) { @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() - if (parallelism >= CORE_POOL_SIZE) return this - return super.limitedParallelism(parallelism) + if (parallelism >= CORE_POOL_SIZE) { + return namedOrThis(name) + } + return super.limitedParallelism(parallelism, name) } // Shuts down the dispatcher, used only by Dispatchers.shutdown() @@ -44,10 +46,12 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() { } @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { parallelism.checkParallelism() - if (parallelism >= MAX_POOL_SIZE) return this - return super.limitedParallelism(parallelism) + if (parallelism >= MAX_POOL_SIZE) { + return namedOrThis(name) + } + return super.limitedParallelism(parallelism, name) } // This name only leaks to user code as part of .limitedParallelism machinery @@ -72,9 +76,9 @@ internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor { override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command) @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { // See documentation to Dispatchers.IO for the rationale - return UnlimitedIoScheduler.limitedParallelism(parallelism) + return UnlimitedIoScheduler.limitedParallelism(parallelism, name) } override fun dispatch(context: CoroutineContext, block: Runnable) { diff --git a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt index b455e3c3af..128da00f30 100644 --- a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt @@ -1,3 +1,5 @@ +@file:OptIn(ExperimentalStdlibApi::class) + package kotlinx.coroutines import kotlin.test.* @@ -18,5 +20,27 @@ class DispatchersToStringTest { assertEquals("Dispatchers.Default.limitedParallelism(2)", Dispatchers.Default.limitedParallelism(2).toString()) // Not overridden at all, limited parallelism returns `this` assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) + + assertEquals("filesDispatcher", Dispatchers.IO.limitedParallelism(1, "filesDispatcher").toString()) + assertEquals("json", Dispatchers.Default.limitedParallelism(2, "json").toString()) + assertEquals("\uD80C\uDE11", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42, "\uD80C\uDE11").toString()) + assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) + + val limitedNamed = Dispatchers.IO.limitedParallelism(10, "limited") + assertEquals("limited.limitedParallelism(2)", limitedNamed.limitedParallelism(2).toString()) + assertEquals("2", limitedNamed.limitedParallelism(2, "2").toString()) + // We asked for too many threads with no name, this was returned + assertEquals("limited", limitedNamed.limitedParallelism(12).toString()) + assertEquals("12", limitedNamed.limitedParallelism(12, "12").toString()) + + runBlocking { + val d = coroutineContext[CoroutineDispatcher]!! + assertContains(d.toString(), "BlockingEventLoop") + val limited = d.limitedParallelism(2) + assertContains(limited.toString(), "BlockingEventLoop") + assertFalse(limited.toString().contains("limitedParallelism")) + val named = d.limitedParallelism(2, "Named") + assertEquals("Named", named.toString()) + } } } \ No newline at end of file diff --git a/kotlinx-coroutines-core/native/src/Dispatchers.kt b/kotlinx-coroutines-core/native/src/Dispatchers.kt index 9965186303..119b4fd323 100644 --- a/kotlinx-coroutines-core/native/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/native/src/Dispatchers.kt @@ -28,9 +28,9 @@ internal object DefaultIoScheduler : CoroutineDispatcher() { private val io = unlimitedPool.limitedParallelism(64) // Default JVM size @ExperimentalCoroutinesApi - override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher { // See documentation to Dispatchers.IO for the rationale - return unlimitedPool.limitedParallelism(parallelism) + return unlimitedPool.limitedParallelism(parallelism, name) } override fun dispatch(context: CoroutineContext, block: Runnable) { diff --git a/test-utils/common/src/TestBase.common.kt b/test-utils/common/src/TestBase.common.kt index 5c0cba4eec..e12d897adc 100644 --- a/test-utils/common/src/TestBase.common.kt +++ b/test-utils/common/src/TestBase.common.kt @@ -252,6 +252,7 @@ public class TestRuntimeException(message: String? = null, private val data: Any public class RecoverableTestException(message: String? = null) : RuntimeException(message) public class RecoverableTestCancellationException(message: String? = null) : CancellationException(message) +// Erases identity and equality checks for tests public fun wrapperDispatcher(context: CoroutineContext): CoroutineContext { val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher return object : CoroutineDispatcher() {