diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt index e9d9f19ee3..7bcb003eba 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt @@ -65,37 +65,78 @@ public abstract class CoroutineDispatcher : /** * Creates a view of the current dispatcher that limits the parallelism to the given [value][parallelism]. - * The resulting view uses the original dispatcher for execution, but with the guarantee that + * The resulting view uses the original dispatcher for execution but with the guarantee that * no more than [parallelism] coroutines are executed at the same time. * * This method does not impose restrictions on the number of views or the total sum of parallelism values, * each view controls its own parallelism independently with the guarantee that the effective parallelism * of all views cannot exceed the actual parallelism of the original dispatcher. * - * ### Limitations - * - * The default implementation of `limitedParallelism` does not support direct dispatchers, - * such as executing the given runnable in place during [dispatch] calls. - * Any dispatcher that may return `false` from [isDispatchNeeded] is considered direct. - * For direct dispatchers, it is recommended to override this method - * and provide a domain-specific implementation or to throw an [UnsupportedOperationException]. + * The resulting dispatcher does not guarantee that the coroutines will always be dispatched on the same + * subset of threads, it only guarantees that at most [parallelism] coroutines are executed at the same time, + * and reuses threads from the original dispatchers. + * It does not constitute a resource -- it is a _view_ of the underlying dispatcher that can be thrown away + * and is not required to be closed. * * ### Example of usage * ``` - * private val backgroundDispatcher = newFixedThreadPoolContext(4, "App Background") + * // Background dispatcher for the application + * val dispatcher = newFixedThreadPoolContext(4, "App Background") * // At most 2 threads will be processing images as it is really slow and CPU-intensive - * private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(2) + * val imageProcessingDispatcher = dispatcher.limitedParallelism(2) * // At most 3 threads will be processing JSON to avoid image processing starvation - * private val jsonProcessingDispatcher = backgroundDispatcher.limitedParallelism(3) + * val jsonProcessingDispatcher = dispatcher.limitedParallelism(3) * // At most 1 thread will be doing IO - * private val fileWriterDispatcher = backgroundDispatcher.limitedParallelism(1) + * val fileWriterDispatcher = dispatcher.limitedParallelism(1) * ``` * Note how in this example the application has an executor with 4 threads, but the total sum of all limits - * is 6. Still, at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism. + * is 6. Still, at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism, + * and at most 4 threads can exist in the system. * * Note that this example was structured in such a way that it illustrates the parallelism guarantees. - * In practice, it is usually better to use [Dispatchers.IO] or [Dispatchers.Default] instead of creating a - * `backgroundDispatcher`. It is both possible and advised to call `limitedParallelism` on them. + * In practice, it is usually better to use `Dispatchers.IO` or [Dispatchers.Default] instead of creating a + * `backgroundDispatcher`. + * + * ### `limitedParallelism(1)` pattern + * + * One of the common patterns is confining the execution of specific tasks to a sequential execution in background + * with `limitedParallelism(1)` invocation. + * For that purpose, the implementation guarantees that tasks are executed sequentially and that a happens-before relation + * is established between them: + * + * ``` + * val confined = Dispatchers.Default.limitedParallelism(1) + * var counter = 0 + * + * // Invoked from arbitrary coroutines + * launch(confined) { + * // This increment is sequential and race-free + * ++counter + * } + * ``` + * Note that there is no guarantee that the underlying system thread will always be the same. + * + * ### Dispatchers.IO + * + * `Dispatcher.IO` is considered _elastic_ for the purposes of limited parallelism -- the sum of + * views is not restricted by the capacity of `Dispatchers.IO`. + * It means that it is safe to replace `newFixedThreadPoolContext(nThreads)` with + * `Dispatchers.IO.limitedParallelism(nThreads)` w.r.t. available number of threads. + * See `Dispatchers.IO` documentation for more details. + * + * ### Restrictions and implementation details + * + * The default implementation of `limitedParallelism` does not support direct dispatchers, + * such as executing the given runnable in place during [dispatch] calls. + * Any dispatcher that may return `false` from [isDispatchNeeded] is considered direct. + * For direct dispatchers, it is recommended to override this method + * and provide a domain-specific implementation or to throw an [UnsupportedOperationException]. + * + * Implementations of this method are allowed to return `this` if the current dispatcher already satisfies the parallelism requirement. + * For example, `Dispatchers.Main.limitedParallelism(1)` returns `Dispatchers.Main`, because the main dispatcher is already single-threaded. + * + * @throws IllegalArgumentException if the given [parallelism] is non-positive + * @throws UnsupportedOperationException if the current dispatcher does not support limited parallelism views */ @ExperimentalCoroutinesApi public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher { diff --git a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt index 0c8e0778b1..9faa71a6d9 100644 --- a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt @@ -95,6 +95,8 @@ internal class LimitedDispatcher( } } + override fun toString() = "$dispatcher.limitedParallelism($parallelism)" + /** * A worker that polls the queue and runs tasks until there are no more of them. * @@ -125,5 +127,4 @@ internal class LimitedDispatcher( } } -// Save a few bytecode ops internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" } diff --git a/kotlinx-coroutines-core/concurrent/src/Dispatchers.kt b/kotlinx-coroutines-core/concurrent/src/Dispatchers.kt index aadca2fc9e..d18efdc35f 100644 --- a/kotlinx-coroutines-core/concurrent/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/concurrent/src/Dispatchers.kt @@ -26,6 +26,15 @@ package kotlinx.coroutines * the system may have up to `64 + 100 + 60` threads dedicated to blocking tasks during peak loads, * but during its steady state there is only a small number of threads shared * among `Dispatchers.IO`, `myMysqlDbDispatcher` and `myMongoDbDispatcher` + * + * It is recommended to replace manually created thread-backed executors with `Dispatchers.IO.limitedParallelism` instead: + * ``` + * // Requires manual closing, allocates resources for all threads + * val databasePoolDispatcher = newFixedThreadPoolContext(128) + * + * // Provides the same number of threads as a resource but shares and caches them internally + * val databasePoolDispatcher = Dispatchers.IO.limitedParallelism(128) + * ``` */ @Suppress("EXTENSION_SHADOWED_BY_MEMBER") public expect val Dispatchers.IO: CoroutineDispatcher diff --git a/kotlinx-coroutines-core/concurrent/src/MultithreadedDispatchers.common.kt b/kotlinx-coroutines-core/concurrent/src/MultithreadedDispatchers.common.kt index a05f0a60e8..b85c878cba 100644 --- a/kotlinx-coroutines-core/concurrent/src/MultithreadedDispatchers.common.kt +++ b/kotlinx-coroutines-core/concurrent/src/MultithreadedDispatchers.common.kt @@ -19,7 +19,8 @@ import kotlin.jvm.* * associated native resources (threads or native workers). It should not be allocated in place, * should be closed at the end of its lifecycle, and has non-trivial memory and CPU footprint. * If you do not need a separate thread pool, but only have to limit effective parallelism of the dispatcher, - * it is recommended to use [CoroutineDispatcher.limitedParallelism] instead. + * it is recommended to use [`Dispatchers.IO.limitedParallelism(1)`][CoroutineDispatcher.limitedParallelism] + * or [`Dispatchers.Default.limitedParallelism(1)`][CoroutineDispatcher.limitedParallelism] instead. * * If you need a completely separate thread pool with scheduling policy that is based on the standard * JDK executors, use the following expression: @@ -48,7 +49,8 @@ public fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher = * associated native resources (threads or native workers). It should not be allocated in place, * should be closed at the end of its lifecycle, and has non-trivial memory and CPU footprint. * If you do not need a separate thread pool, but only have to limit effective parallelism of the dispatcher, - * it is recommended to use [CoroutineDispatcher.limitedParallelism] instead. + * it is recommended to use [`Dispatchers.IO.limitedParallelism(nThreads)`][CoroutineDispatcher.limitedParallelism] + * or [`Dispatchers.Default.limitedParallelism(nThreads)`][CoroutineDispatcher.limitedParallelism] instead. * * If you need a completely separate thread pool with scheduling policy that is based on the standard * JDK executors, use the following expression: diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index b467cd180a..3ce7e0d333 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -184,6 +184,11 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { (this as Object).notifyAll() } + // User only for testing and nothing else internal val isThreadPresent get() = _thread != null + + override fun toString(): String { + return "DefaultExecutor" + } } diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index f3d66cdef9..335af8810f 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -49,6 +49,11 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() { if (parallelism >= MAX_POOL_SIZE) return this return super.limitedParallelism(parallelism) } + + // This name only leaks to user code as part of .limitedParallelism machinery + override fun toString(): String { + return "Dispatchers.IO" + } } // Dispatchers.IO diff --git a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt index 9bd7e8c1e6..b455e3c3af 100644 --- a/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt +++ b/kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt @@ -11,4 +11,12 @@ class DispatchersToStringTest { assertEquals("Dispatchers.Main[missing]", Dispatchers.Main.toString()) assertEquals("Dispatchers.Main[missing]", Dispatchers.Main.immediate.toString()) } + + @Test + fun testLimitedParallelism() { + assertEquals("Dispatchers.IO.limitedParallelism(1)", Dispatchers.IO.limitedParallelism(1).toString()) + assertEquals("Dispatchers.Default.limitedParallelism(2)", Dispatchers.Default.limitedParallelism(2).toString()) + // Not overridden at all, limited parallelism returns `this` + assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString()) + } } \ No newline at end of file