Skip to content

Commit

Permalink
Revert "Document and tweak the contract of Executor.asCoroutineDispat…
Browse files Browse the repository at this point in the history
…cher and ExecutorService.asCoroutineDispatcher (#2727)"

This reverts commit 5121005
  • Loading branch information
anastasiia.spaseeva committed Jun 5, 2021
1 parent 421aebf commit f1dadb9
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 154 deletions.
80 changes: 29 additions & 51 deletions kotlinx-coroutines-core/jvm/src/Executors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package kotlinx.coroutines

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import java.io.*
import java.util.concurrent.*
Expand Down Expand Up @@ -40,22 +39,6 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea
/**
* Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
*
* ## Interaction with [delay] and time-based coroutines.
*
* If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related
* coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
* on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
* coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
*
* If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
* remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
* to reduce the memory pressure of cancelled coroutines.
*
* If the executor service is neither of this types, the separate internal thread will be used to
* _track_ the delay and time-related executions, but the coroutine itself will still be executed
* on top of the given executor.
*
* ## Rejected execution
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
Expand All @@ -69,23 +52,6 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
*
* ## Interaction with [delay] and time-based coroutines.
*
* If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related
* coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
* on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
* coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
*
* If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
* remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
* to reduce the memory pressure of cancelled coroutines.
*
* If the executor is neither of this types, the separate internal thread will be used to
* _track_ the delay and time-related executions, but the coroutine itself will still be executed
* on top of the given executor.
*
* ## Rejected execution
*
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
Expand All @@ -109,15 +75,18 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher)
override fun toString(): String = dispatcher.toString()
}

internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {

/*
* Attempts to reflectively (to be Java 6 compatible) invoke
* ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup
* internal scheduler queue on cancellation.
*/
private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() {
init {
removeFutureOnCancel(executor)
initFutureCancellation()
}
}

internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {

private var removesFutureOnCancellation: Boolean = false

internal fun initFutureCancellation() {
removesFutureOnCancellation = removeFutureOnCancel(executor)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand All @@ -130,12 +99,17 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor)
}
}

/*
* removesFutureOnCancellation is required to avoid memory leak.
* On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
* On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
*/
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val future = (executor as? ScheduledExecutorService)?.scheduleBlock(
ResumeUndispatchedRunnable(this, continuation),
continuation.context,
timeMillis
)
val future = if (removesFutureOnCancellation) {
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis)
} else {
null
}
// If everything went fine and the scheduling attempt was not rejected -- use it
if (future != null) {
continuation.cancelFutureOnCancellation(future)
Expand All @@ -146,16 +120,20 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor)
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis)
val future = if (removesFutureOnCancellation) {
scheduleBlock(block, context, timeMillis)
} else {
null
}
return when {
future != null -> DisposableFutureHandle(future)
else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
}
}

private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
return try {
schedule(block, timeMillis, TimeUnit.MILLISECONDS)
(executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS)
} catch (e: RejectedExecutionException) {
cancelJobOnRejection(context, e)
null
Expand All @@ -171,7 +149,7 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor)
}

override fun toString(): String = executor.toString()
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
override fun hashCode(): Int = System.identityHashCode(executor)
}

Expand Down
41 changes: 35 additions & 6 deletions kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,40 @@ public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
@ObsoleteCoroutinesApi
public fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher {
require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
val threadNo = AtomicInteger()
val executor = Executors.newScheduledThreadPool(nThreads) { runnable ->
val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
t.isDaemon = true
t
return ThreadPoolDispatcher(nThreads, name)
}

internal class PoolThread(
@JvmField val dispatcher: ThreadPoolDispatcher, // for debugging & tests
target: Runnable, name: String
) : Thread(target, name) {
init { isDaemon = true }
}

/**
* Dispatches coroutine execution to a thread pool of a fixed size. Instances of this dispatcher are
* created with [newSingleThreadContext] and [newFixedThreadPoolContext].
*/
internal class ThreadPoolDispatcher internal constructor(
private val nThreads: Int,
private val name: String
) : ExecutorCoroutineDispatcherBase() {
private val threadNo = AtomicInteger()

override val executor: Executor = Executors.newScheduledThreadPool(nThreads) { target ->
PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
}

init {
initFutureCancellation()
}
return executor.asCoroutineDispatcher()

/**
* Closes this dispatcher -- shuts down all threads in this pool and releases resources.
*/
public override fun close() {
(executor as ExecutorService).shutdown()
}

override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
}

This file was deleted.

This file was deleted.

6 changes: 4 additions & 2 deletions kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.knit
Expand All @@ -11,6 +11,8 @@ import kotlinx.knit.test.*
import java.util.concurrent.*
import kotlin.test.*

fun wrapTask(block: Runnable) = kotlinx.coroutines.wrapTask(block)

// helper function to dump exception to stdout for ease of debugging failed tests
private inline fun <T> outputException(name: String, block: () -> T): T =
try { block() }
Expand Down Expand Up @@ -174,4 +176,4 @@ private inline fun List<String>.verify(verification: () -> Unit) {
}
throw t
}
}
}

0 comments on commit f1dadb9

Please sign in to comment.