Skip to content

Commit

Permalink
Get rid of ThreadPoolDispatcher and PoolThread classes
Browse files Browse the repository at this point in the history
    * Reuse the same class for both asCoroutineDispatcher and newFixedThreadPoolContext
    * Replace 3-classes hierarchy by a single impl class
    * Copy the auto-closing logic to test source
  • Loading branch information
qwwdfsad committed May 24, 2021
1 parent 73942ee commit 96e603a
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 52 deletions.
16 changes: 3 additions & 13 deletions kotlinx-coroutines-core/jvm/src/Executors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,9 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher)
override fun toString(): String = dispatcher.toString()
}

private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() {
init {
initFutureCancellation()
}
}

internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {
internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {

private var removesFutureOnCancellation: Boolean = false

internal fun initFutureCancellation() {
removesFutureOnCancellation = removeFutureOnCancel(executor)
}
private var removesFutureOnCancellation: Boolean = removeFutureOnCancel(executor)

override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
Expand Down Expand Up @@ -149,7 +139,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}

override fun toString(): String = executor.toString()
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor
override fun hashCode(): Int = System.identityHashCode(executor)
}

Expand Down
41 changes: 6 additions & 35 deletions kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -59,40 +59,11 @@ public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
@ObsoleteCoroutinesApi
public fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher {
require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
return ThreadPoolDispatcher(nThreads, name)
}

internal class PoolThread(
@JvmField val dispatcher: ThreadPoolDispatcher, // for debugging & tests
target: Runnable, name: String
) : Thread(target, name) {
init { isDaemon = true }
}

/**
* Dispatches coroutine execution to a thread pool of a fixed size. Instances of this dispatcher are
* created with [newSingleThreadContext] and [newFixedThreadPoolContext].
*/
internal class ThreadPoolDispatcher internal constructor(
private val nThreads: Int,
private val name: String
) : ExecutorCoroutineDispatcherBase() {
private val threadNo = AtomicInteger()

override val executor: Executor = Executors.newScheduledThreadPool(nThreads) { target ->
PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
}

init {
initFutureCancellation()
val threadNo = AtomicInteger()
val executor = Executors.newScheduledThreadPool(nThreads) { runnable ->
val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
t.isDaemon = true
t
}

/**
* Closes this dispatcher -- shuts down all threads in this pool and releases resources.
*/
public override fun close() {
(executor as ExecutorService).shutdown()
}

override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
return executor.asCoroutineDispatcher()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines // Trick to make guide tests use these declarations with executors that can be closed on our side implicitly

import java.util.concurrent.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.*

public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = ClosedAfterGuideTestDispatcher(1, name)

public fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher =
ClosedAfterGuideTestDispatcher(nThreads, name)

internal class PoolThread(
@JvmField val dispatcher: ExecutorCoroutineDispatcher, // for debugging & tests
target: Runnable, name: String
) : Thread(target, name) {
init {
isDaemon = true
}
}

private class ClosedAfterGuideTestDispatcher(
private val nThreads: Int,
private val name: String
) : ExecutorCoroutineDispatcher() {
private val threadNo = AtomicInteger()

override val executor: Executor =
Executors.newScheduledThreadPool(nThreads, object : ThreadFactory {
override fun newThread(target: java.lang.Runnable): Thread {
return PoolThread(
this@ClosedAfterGuideTestDispatcher,
target,
if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()
)
}
})

override fun dispatch(context: CoroutineContext, block: Runnable) {
executor.execute(wrapTask(block))
}

override fun close() {
(executor as ExecutorService).shutdown()
}

override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
}
6 changes: 2 additions & 4 deletions kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.knit
Expand All @@ -11,8 +11,6 @@ import kotlinx.knit.test.*
import java.util.concurrent.*
import kotlin.test.*

fun wrapTask(block: Runnable) = kotlinx.coroutines.wrapTask(block)

// helper function to dump exception to stdout for ease of debugging failed tests
private inline fun <T> outputException(name: String, block: () -> T): T =
try { block() }
Expand Down Expand Up @@ -176,4 +174,4 @@ private inline fun List<String>.verify(verification: () -> Unit) {
}
throw t
}
}
}

0 comments on commit 96e603a

Please sign in to comment.