Skip to content

Commit

Permalink
Limited parallelism improvements (#4098)
Browse files Browse the repository at this point in the history
* Provide toString implementation for limitedParallelism
* Improve documentation of limitedParallelism

Co-authored-by: Dmitry Khalanskiy <52952525+dkhalanskyjb@users.noreply.github.com>
  • Loading branch information
2 people authored and AleksDanil committed May 10, 2024
1 parent 77b39fd commit 8e1fb8e
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 18 deletions.
71 changes: 56 additions & 15 deletions kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -65,37 +65,78 @@ public abstract class CoroutineDispatcher :

/**
* Creates a view of the current dispatcher that limits the parallelism to the given [value][parallelism].
* The resulting view uses the original dispatcher for execution, but with the guarantee that
* The resulting view uses the original dispatcher for execution but with the guarantee that
* no more than [parallelism] coroutines are executed at the same time.
*
* This method does not impose restrictions on the number of views or the total sum of parallelism values,
* each view controls its own parallelism independently with the guarantee that the effective parallelism
* of all views cannot exceed the actual parallelism of the original dispatcher.
*
* ### Limitations
*
* The default implementation of `limitedParallelism` does not support direct dispatchers,
* such as executing the given runnable in place during [dispatch] calls.
* Any dispatcher that may return `false` from [isDispatchNeeded] is considered direct.
* For direct dispatchers, it is recommended to override this method
* and provide a domain-specific implementation or to throw an [UnsupportedOperationException].
* The resulting dispatcher does not guarantee that the coroutines will always be dispatched on the same
* subset of threads, it only guarantees that at most [parallelism] coroutines are executed at the same time,
* and reuses threads from the original dispatchers.
* It does not constitute a resource -- it is a _view_ of the underlying dispatcher that can be thrown away
* and is not required to be closed.
*
* ### Example of usage
* ```
* private val backgroundDispatcher = newFixedThreadPoolContext(4, "App Background")
* // Background dispatcher for the application
* val dispatcher = newFixedThreadPoolContext(4, "App Background")
* // At most 2 threads will be processing images as it is really slow and CPU-intensive
* private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(2)
* val imageProcessingDispatcher = dispatcher.limitedParallelism(2)
* // At most 3 threads will be processing JSON to avoid image processing starvation
* private val jsonProcessingDispatcher = backgroundDispatcher.limitedParallelism(3)
* val jsonProcessingDispatcher = dispatcher.limitedParallelism(3)
* // At most 1 thread will be doing IO
* private val fileWriterDispatcher = backgroundDispatcher.limitedParallelism(1)
* val fileWriterDispatcher = dispatcher.limitedParallelism(1)
* ```
* Note how in this example the application has an executor with 4 threads, but the total sum of all limits
* is 6. Still, at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism.
* is 6. Still, at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism,
* and at most 4 threads can exist in the system.
*
* Note that this example was structured in such a way that it illustrates the parallelism guarantees.
* In practice, it is usually better to use [Dispatchers.IO] or [Dispatchers.Default] instead of creating a
* `backgroundDispatcher`. It is both possible and advised to call `limitedParallelism` on them.
* In practice, it is usually better to use `Dispatchers.IO` or [Dispatchers.Default] instead of creating a
* `backgroundDispatcher`.
*
* ### `limitedParallelism(1)` pattern
*
* One of the common patterns is confining the execution of specific tasks to a sequential execution in background
* with `limitedParallelism(1)` invocation.
* For that purpose, the implementation guarantees that tasks are executed sequentially and that a happens-before relation
* is established between them:
*
* ```
* val confined = Dispatchers.Default.limitedParallelism(1)
* var counter = 0
*
* // Invoked from arbitrary coroutines
* launch(confined) {
* // This increment is sequential and race-free
* ++counter
* }
* ```
* Note that there is no guarantee that the underlying system thread will always be the same.
*
* ### Dispatchers.IO
*
* `Dispatcher.IO` is considered _elastic_ for the purposes of limited parallelism -- the sum of
* views is not restricted by the capacity of `Dispatchers.IO`.
* It means that it is safe to replace `newFixedThreadPoolContext(nThreads)` with
* `Dispatchers.IO.limitedParallelism(nThreads)` w.r.t. available number of threads.
* See `Dispatchers.IO` documentation for more details.
*
* ### Restrictions and implementation details
*
* The default implementation of `limitedParallelism` does not support direct dispatchers,
* such as executing the given runnable in place during [dispatch] calls.
* Any dispatcher that may return `false` from [isDispatchNeeded] is considered direct.
* For direct dispatchers, it is recommended to override this method
* and provide a domain-specific implementation or to throw an [UnsupportedOperationException].
*
* Implementations of this method are allowed to return `this` if the current dispatcher already satisfies the parallelism requirement.
* For example, `Dispatchers.Main.limitedParallelism(1)` returns `Dispatchers.Main`, because the main dispatcher is already single-threaded.
*
* @throws IllegalArgumentException if the given [parallelism] is non-positive
* @throws UnsupportedOperationException if the current dispatcher does not support limited parallelism views
*/
@ExperimentalCoroutinesApi
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ internal class LimitedDispatcher(
}
}

override fun toString() = "$dispatcher.limitedParallelism($parallelism)"

/**
* A worker that polls the queue and runs tasks until there are no more of them.
*
Expand Down Expand Up @@ -125,5 +127,4 @@ internal class LimitedDispatcher(
}
}

// Save a few bytecode ops
internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }
9 changes: 9 additions & 0 deletions kotlinx-coroutines-core/concurrent/src/Dispatchers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ package kotlinx.coroutines
* the system may have up to `64 + 100 + 60` threads dedicated to blocking tasks during peak loads,
* but during its steady state there is only a small number of threads shared
* among `Dispatchers.IO`, `myMysqlDbDispatcher` and `myMongoDbDispatcher`
*
* It is recommended to replace manually created thread-backed executors with `Dispatchers.IO.limitedParallelism` instead:
* ```
* // Requires manual closing, allocates resources for all threads
* val databasePoolDispatcher = newFixedThreadPoolContext(128)
*
* // Provides the same number of threads as a resource but shares and caches them internally
* val databasePoolDispatcher = Dispatchers.IO.limitedParallelism(128)
* ```
*/
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
public expect val Dispatchers.IO: CoroutineDispatcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import kotlin.jvm.*
* associated native resources (threads or native workers). It should not be allocated in place,
* should be closed at the end of its lifecycle, and has non-trivial memory and CPU footprint.
* If you do not need a separate thread pool, but only have to limit effective parallelism of the dispatcher,
* it is recommended to use [CoroutineDispatcher.limitedParallelism] instead.
* it is recommended to use [`Dispatchers.IO.limitedParallelism(1)`][CoroutineDispatcher.limitedParallelism]
* or [`Dispatchers.Default.limitedParallelism(1)`][CoroutineDispatcher.limitedParallelism] instead.
*
* If you need a completely separate thread pool with scheduling policy that is based on the standard
* JDK executors, use the following expression:
Expand Down Expand Up @@ -48,7 +49,8 @@ public fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher =
* associated native resources (threads or native workers). It should not be allocated in place,
* should be closed at the end of its lifecycle, and has non-trivial memory and CPU footprint.
* If you do not need a separate thread pool, but only have to limit effective parallelism of the dispatcher,
* it is recommended to use [CoroutineDispatcher.limitedParallelism] instead.
* it is recommended to use [`Dispatchers.IO.limitedParallelism(nThreads)`][CoroutineDispatcher.limitedParallelism]
* or [`Dispatchers.Default.limitedParallelism(nThreads)`][CoroutineDispatcher.limitedParallelism] instead.
*
* If you need a completely separate thread pool with scheduling policy that is based on the standard
* JDK executors, use the following expression:
Expand Down
5 changes: 5 additions & 0 deletions kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,11 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
(this as Object).notifyAll()
}

// User only for testing and nothing else
internal val isThreadPresent
get() = _thread != null

override fun toString(): String {
return "DefaultExecutor"
}
}
5 changes: 5 additions & 0 deletions kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() {
if (parallelism >= MAX_POOL_SIZE) return this
return super.limitedParallelism(parallelism)
}

// This name only leaks to user code as part of .limitedParallelism machinery
override fun toString(): String {
return "Dispatchers.IO"
}
}

// Dispatchers.IO
Expand Down
8 changes: 8 additions & 0 deletions kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,12 @@ class DispatchersToStringTest {
assertEquals("Dispatchers.Main[missing]", Dispatchers.Main.toString())
assertEquals("Dispatchers.Main[missing]", Dispatchers.Main.immediate.toString())
}

@Test
fun testLimitedParallelism() {
assertEquals("Dispatchers.IO.limitedParallelism(1)", Dispatchers.IO.limitedParallelism(1).toString())
assertEquals("Dispatchers.Default.limitedParallelism(2)", Dispatchers.Default.limitedParallelism(2).toString())
// Not overridden at all, limited parallelism returns `this`
assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString())
}
}

0 comments on commit 8e1fb8e

Please sign in to comment.