Skip to content

Commit

Permalink
Fix newSingleThreadContext awaiting cancelled scheduled coroutines (#…
Browse files Browse the repository at this point in the history
…3769)

* Fix newSingleThreadContext awaiting cancelled scheduled coroutines

Before the change, the Worker is not notified about its work being
cancelled, due to no API being present for that.
We work around the issue by checking every 100 milliseconds whether
cancellation happened.

Also, alias 'newSingleThreadContext' to 'newFixedThreadPoolContext(1)` for the sake of consistent implementation

Fixes #3768
  • Loading branch information
dkhalanskyjb committed Jun 29, 2023
1 parent 897599f commit c7545b5
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,40 @@
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:JvmMultifileClass
@file:JvmName("ThreadPoolDispatcherKt")
package kotlinx.coroutines

import kotlin.jvm.*

/**
* Creates a coroutine execution context using a single thread with built-in [yield] support.
* **NOTE: The resulting [CloseableCoroutineDispatcher] owns native resources (its thread).
* Resources are reclaimed by [CloseableCoroutineDispatcher.close].**
*
* If the resulting dispatcher is [closed][CloseableCoroutineDispatcher.close] and
* attempt to submit a task is made, then:
* * On the JVM, the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
* [Dispatchers.IO], so that the affected coroutine can clean up its resources and promptly complete.
* * On Native, the attempt to submit a task throws an exception.
*
* This is a **delicate** API. The result of this method is a closeable resource with the
* associated native resources (threads or native workers). It should not be allocated in place,
* should be closed at the end of its lifecycle, and has non-trivial memory and CPU footprint.
* If you do not need a separate thread-pool, but only have to limit effective parallelism of the dispatcher,
* it is recommended to use [CoroutineDispatcher.limitedParallelism] instead.
*
* If you need a completely separate thread-pool with scheduling policy that is based on the standard
* JDK executors, use the following expression:
* `Executors.newSingleThreadExecutor().asCoroutineDispatcher()`.
* See `Executor.asCoroutineDispatcher` for details.
*
* @param name the base name of the created thread.
*/
@ExperimentalCoroutinesApi
public expect fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher
@DelicateCoroutinesApi
public fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher =
newFixedThreadPoolContext(1, name)

@ExperimentalCoroutinesApi
public expect fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher
29 changes: 2 additions & 27 deletions kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,13 @@
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:JvmMultifileClass
@file:JvmName("ThreadPoolDispatcherKt")
package kotlinx.coroutines

import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger

/**
* Creates a coroutine execution context using a single thread with built-in [yield] support.
* **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its thread).
* Resources are reclaimed by [ExecutorCoroutineDispatcher.close].**
*
* If the resulting dispatcher is [closed][ExecutorCoroutineDispatcher.close] and
* attempt to submit a continuation task is made,
* then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
* [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
*
* This is a **delicate** API. The result of this method is a closeable resource with the
* associated native resources (threads). It should not be allocated in place,
* should be closed at the end of its lifecycle, and has non-trivial memory and CPU footprint.
* If you do not need a separate thread-pool, but only have to limit effective parallelism of the dispatcher,
* it is recommended to use [CoroutineDispatcher.limitedParallelism] instead.
*
* If you need a completely separate thread-pool with scheduling policy that is based on the standard
* JDK executors, use the following expression:
* `Executors.newSingleThreadExecutor().asCoroutineDispatcher()`.
* See [Executor.asCoroutineDispatcher] for details.
*
* @param name the base name of the created thread.
*/
@DelicateCoroutinesApi
public actual fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
newFixedThreadPoolContext(1, name)

/**
* Creates a coroutine execution context with the fixed-size thread-pool and built-in [yield] support.
* **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its threads).
Expand Down
29 changes: 18 additions & 11 deletions kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,15 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.native.concurrent.*

@ExperimentalCoroutinesApi
public actual fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher {
return WorkerDispatcher(name)
}
import kotlin.time.*
import kotlin.time.Duration.Companion.milliseconds

public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher {
require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads" }
return MultiWorkerDispatcher(name, nThreads)
}

@OptIn(ExperimentalTime::class)
internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), Delay {
private val worker = Worker.start(name = name)

Expand Down Expand Up @@ -52,21 +50,30 @@ internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(),
override fun dispose() {
disposableHolder.value = null
}

fun isDisposed() = disposableHolder.value == null
}

fun Worker.runAfterDelay(block: DisposableBlock, targetMoment: TimeMark) {
if (block.isDisposed()) return
val durationUntilTarget = -targetMoment.elapsedNow()
val quantum = 100.milliseconds
if (durationUntilTarget > quantum) {
executeAfter(quantum.inWholeMicroseconds) { runAfterDelay(block, targetMoment) }
} else {
executeAfter(maxOf(0, durationUntilTarget.inWholeMicroseconds), block)
}
}

val disposableBlock = DisposableBlock(block)
worker.executeAfter(timeMillis.toMicrosSafe(), disposableBlock)
val targetMoment = TimeSource.Monotonic.markNow() + timeMillis.milliseconds
worker.runAfterDelay(disposableBlock, targetMoment)
return disposableBlock
}

override fun close() {
worker.requestTermination().result // Note: calling "result" blocks
}

private fun Long.toMicrosSafe(): Long {
val result = this * 1000
return if (result > this) result else Long.MAX_VALUE
}
}

private class MultiWorkerDispatcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
import kotlin.native.concurrent.*
import kotlin.test.*
import kotlin.time.Duration.Companion.seconds

private class BlockingBarrier(val n: Int) {
val counter = atomic(0)
Expand Down Expand Up @@ -63,4 +64,20 @@ class MultithreadedDispatchersTest {
dispatcher.close()
}
}

/**
* Test that [newSingleThreadContext] will not wait for the cancelled scheduled coroutines before closing.
*/
@Test
fun timeoutsNotPreventingClosing(): Unit = runBlocking {
val dispatcher = WorkerDispatcher("test")
withContext(dispatcher) {
withTimeout(5.seconds) {
}
}
withTimeout(1.seconds) {
dispatcher.close() // should not wait for the timeout
yield()
}
}
}

0 comments on commit c7545b5

Please sign in to comment.