Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document and tweak the contract of Executor.asCoroutineDispatcher and ExecutorService.asCoroutineDispatcher #2727

Merged
merged 2 commits into from
May 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
80 changes: 51 additions & 29 deletions kotlinx-coroutines-core/jvm/src/Executors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import java.io.*
import java.util.concurrent.*
Expand Down Expand Up @@ -39,6 +40,22 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea
/**
* Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
*
* ## Interaction with [delay] and time-based coroutines.
*
* If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related
* coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
* on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
* coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
*
* If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
* remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
* to reduce the memory pressure of cancelled coroutines.
*
* If the executor service is neither of this types, the separate internal thread will be used to
* _track_ the delay and time-related executions, but the coroutine itself will still be executed
* on top of the given executor.
*
* ## Rejected execution
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
Expand All @@ -52,6 +69,23 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
*
* ## Interaction with [delay] and time-based coroutines.
*
* If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related
* coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
* on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
* coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
*
* If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
* remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
* to reduce the memory pressure of cancelled coroutines.
*
* If the executor is neither of this types, the separate internal thread will be used to
* _track_ the delay and time-related executions, but the coroutine itself will still be executed
* on top of the given executor.
*
* ## Rejected execution
*
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
Expand All @@ -75,18 +109,15 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher)
override fun toString(): String = dispatcher.toString()
}

private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() {
init {
initFutureCancellation()
}
}
internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {

internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {

private var removesFutureOnCancellation: Boolean = false

internal fun initFutureCancellation() {
removesFutureOnCancellation = removeFutureOnCancel(executor)
/*
* Attempts to reflectively (to be Java 6 compatible) invoke
* ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup
* internal scheduler queue on cancellation.
*/
init {
removeFutureOnCancel(executor)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand All @@ -99,17 +130,12 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}
}

/*
* removesFutureOnCancellation is required to avoid memory leak.
* On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
* On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
*/
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val future = if (removesFutureOnCancellation) {
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis)
} else {
null
}
val future = (executor as? ScheduledExecutorService)?.scheduleBlock(
ResumeUndispatchedRunnable(this, continuation),
continuation.context,
timeMillis
)
// If everything went fine and the scheduling attempt was not rejected -- use it
if (future != null) {
continuation.cancelFutureOnCancellation(future)
Expand All @@ -120,20 +146,16 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val future = if (removesFutureOnCancellation) {
scheduleBlock(block, context, timeMillis)
} else {
null
}
val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis)
return when {
future != null -> DisposableFutureHandle(future)
else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
}
}

private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
return try {
(executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS)
schedule(block, timeMillis, TimeUnit.MILLISECONDS)
} catch (e: RejectedExecutionException) {
cancelJobOnRejection(context, e)
null
Expand All @@ -149,7 +171,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}

override fun toString(): String = executor.toString()
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor
override fun hashCode(): Int = System.identityHashCode(executor)
}

Expand Down
41 changes: 6 additions & 35 deletions kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -59,40 +59,11 @@ public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
@ObsoleteCoroutinesApi
public fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher {
require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
return ThreadPoolDispatcher(nThreads, name)
}

internal class PoolThread(
@JvmField val dispatcher: ThreadPoolDispatcher, // for debugging & tests
target: Runnable, name: String
) : Thread(target, name) {
init { isDaemon = true }
}

/**
* Dispatches coroutine execution to a thread pool of a fixed size. Instances of this dispatcher are
* created with [newSingleThreadContext] and [newFixedThreadPoolContext].
*/
internal class ThreadPoolDispatcher internal constructor(
private val nThreads: Int,
private val name: String
) : ExecutorCoroutineDispatcherBase() {
private val threadNo = AtomicInteger()

override val executor: Executor = Executors.newScheduledThreadPool(nThreads) { target ->
PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
}

init {
initFutureCancellation()
val threadNo = AtomicInteger()
val executor = Executors.newScheduledThreadPool(nThreads) { runnable ->
val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
t.isDaemon = true
t
}

/**
* Closes this dispatcher -- shuts down all threads in this pool and releases resources.
*/
public override fun close() {
(executor as ExecutorService).shutdown()
}

override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
return executor.asCoroutineDispatcher()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import org.junit.Test
import java.lang.Runnable
import java.util.concurrent.*
import kotlin.test.*

class ExecutorAsCoroutineDispatcherDelayTest : TestBase() {

private var callsToSchedule = 0

private inner class STPE : ScheduledThreadPoolExecutor(1) {
override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
if (delay != 0L) ++callsToSchedule
return super.schedule(command, delay, unit)
}
}

private inner class SES : ScheduledExecutorService by STPE()

@Test
fun testScheduledThreadPool() = runTest {
val executor = STPE()
withContext(executor.asCoroutineDispatcher()) {
delay(100)
}
executor.shutdown()
assertEquals(1, callsToSchedule)
}

@Test
fun testScheduledExecutorService() = runTest {
val executor = SES()
withContext(executor.asCoroutineDispatcher()) {
delay(100)
}
executor.shutdown()
assertEquals(1, callsToSchedule)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines // Trick to make guide tests use these declarations with executors that can be closed on our side implicitly

import java.util.concurrent.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.*

internal fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = ClosedAfterGuideTestDispatcher(1, name)

internal fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher =
ClosedAfterGuideTestDispatcher(nThreads, name)

internal class PoolThread(
@JvmField val dispatcher: ExecutorCoroutineDispatcher, // for debugging & tests
target: Runnable, name: String
) : Thread(target, name) {
init {
isDaemon = true
}
}

private class ClosedAfterGuideTestDispatcher(
private val nThreads: Int,
private val name: String
) : ExecutorCoroutineDispatcher() {
private val threadNo = AtomicInteger()

override val executor: Executor =
Executors.newScheduledThreadPool(nThreads, object : ThreadFactory {
override fun newThread(target: java.lang.Runnable): Thread {
return PoolThread(
this@ClosedAfterGuideTestDispatcher,
target,
if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()
)
}
})

override fun dispatch(context: CoroutineContext, block: Runnable) {
executor.execute(wrapTask(block))
}

override fun close() {
(executor as ExecutorService).shutdown()
}

override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
}
6 changes: 2 additions & 4 deletions kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.knit
Expand All @@ -11,8 +11,6 @@ import kotlinx.knit.test.*
import java.util.concurrent.*
import kotlin.test.*

fun wrapTask(block: Runnable) = kotlinx.coroutines.wrapTask(block)

// helper function to dump exception to stdout for ease of debugging failed tests
private inline fun <T> outputException(name: String, block: () -> T): T =
try { block() }
Expand Down Expand Up @@ -176,4 +174,4 @@ private inline fun List<String>.verify(verification: () -> Unit) {
}
throw t
}
}
}