diff --git a/build.gradle b/build.gradle index 87aac2a1e0..791b2bd298 100644 --- a/build.gradle +++ b/build.gradle @@ -60,6 +60,7 @@ buildscript { repositories { jcenter() maven { url "https://kotlin.bintray.com/kotlinx" } + maven { url "https://repo.spring.io/snapshot/" } maven { url "https://kotlin.bintray.com/kotlin-dev" credentials { @@ -153,6 +154,7 @@ allprojects { jcenter() maven { url "https://kotlin.bintray.com/kotlin-dev" + url "https://repo.spring.io/snapshot/" credentials { username = project.hasProperty('bintrayUser') ? project.property('bintrayUser') : System.getenv('BINTRAY_USER') ?: "" password = project.hasProperty('bintrayApiKey') ? project.property('bintrayApiKey') : System.getenv('BINTRAY_API_KEY') ?: "" diff --git a/kotlinx-coroutines-debug/build.gradle b/kotlinx-coroutines-debug/build.gradle index 7fc2e22369..65203a2028 100644 --- a/kotlinx-coroutines-debug/build.gradle +++ b/kotlinx-coroutines-debug/build.gradle @@ -22,6 +22,8 @@ dependencies { compileOnly "junit:junit:$junit_version" shadowDeps "net.bytebuddy:byte-buddy:$byte_buddy_version" shadowDeps "net.bytebuddy:byte-buddy-agent:$byte_buddy_version" + compile 'io.projectreactor.tools:blockhound:1.0.1.BUILD-SNAPSHOT' + compile "org.jetbrains.kotlin:kotlin-reflect:$kotlin_version" } jar { diff --git a/kotlinx-coroutines-debug/src/internal/BlockHoundIntegration.kt b/kotlinx-coroutines-debug/src/internal/BlockHoundIntegration.kt new file mode 100644 index 0000000000..fae35cd4a2 --- /dev/null +++ b/kotlinx-coroutines-debug/src/internal/BlockHoundIntegration.kt @@ -0,0 +1,43 @@ +package kotlinx.coroutines.debug.internal + +import reactor.blockhound.BlockHound +import kotlin.reflect.KClass +import kotlin.reflect.full.* + +internal object BlockHoundIntegration { + + init { + val cls = Class.forName("kotlinx.coroutines.scheduling.CoroutineScheduler\$Worker").kotlin + initializerHelper(cls) + } + + private fun initializerHelper(cls: KClass) { + val field = cls.declaredMemberProperties.find { it.name == "state" }!! + BlockHound.builder() + .addDynamicThreadPredicate(cls::isInstance) + .nonBlockingThreadPredicate { p -> + p.or { thread -> + val castThread = cls.safeCast(thread) + if (!enabled || castThread == null) { + false + } else { + val state = field(castThread) as Enum<*> + state.name == "CPU_ACQUIRED" + } + } + } + .install() + } + + @Volatile + private var enabled = false + + fun install() { + enabled = true + } + + fun uninstall() { + enabled = false + } + +} diff --git a/kotlinx-coroutines-debug/src/internal/DebugProbesImpl.kt b/kotlinx-coroutines-debug/src/internal/DebugProbesImpl.kt index 090d3e5d89..27bdc83a2a 100644 --- a/kotlinx-coroutines-debug/src/internal/DebugProbesImpl.kt +++ b/kotlinx-coroutines-debug/src/internal/DebugProbesImpl.kt @@ -66,6 +66,8 @@ internal object DebugProbesImpl { .name(cl.name) .make() .load(cl.classLoader, ClassReloadingStrategy.fromInstalledAgent()) + + BlockHoundIntegration.install() } public fun uninstall(): Unit = coroutineStateLock.write { @@ -82,6 +84,8 @@ internal object DebugProbesImpl { .name(cl.name) .make() .load(cl.classLoader, ClassReloadingStrategy.fromInstalledAgent()) + + BlockHoundIntegration.uninstall() } public fun hierarchyToString(job: Job): String = coroutineStateLock.write { diff --git a/kotlinx-coroutines-debug/test/BlockHoundTest.kt b/kotlinx-coroutines-debug/test/BlockHoundTest.kt new file mode 100644 index 0000000000..107bccea93 --- /dev/null +++ b/kotlinx-coroutines-debug/test/BlockHoundTest.kt @@ -0,0 +1,79 @@ +package kotlinx.coroutines.debug +import kotlinx.coroutines.* +import kotlinx.coroutines.debug.internal.BlockHoundIntegration +import org.junit.* +import reactor.blockhound.BlockingOperationError + +class BlockHoundTest : TestBase() { + + @Before + fun init() { + BlockHoundIntegration.install() + } + + @After + fun deinit() { + BlockHoundIntegration.uninstall() + } + + @Test(expected = BlockingOperationError::class) + fun shouldDetectBlockingInDefault() = runTest { + withContext(Dispatchers.Default) { + Thread.sleep(1) + } + } + + @Test + fun shouldNotDetectBlockingInIO() = runTest { + withContext(Dispatchers.IO) { + Thread.sleep(1) + } + } + + @Test + fun shouldNotDetectNonblocking() = runTest { + withContext(Dispatchers.Default) { + val a = 1 + val b = 2 + assert(a + b == 3) + } + } + + @Test + fun testReusingThreads() = runTest { + val n = 100 + repeat(n) { + async(Dispatchers.IO) { + Thread.sleep(1) + } + } + repeat(n) { + async(Dispatchers.Default) { + } + } + repeat(n) { + async(Dispatchers.IO) { + Thread.sleep(1) + } + } + } + + @Test(expected = BlockingOperationError::class) + fun testReusingThreadsFailure() = runTest { + val n = 100 + repeat(n) { + async(Dispatchers.IO) { + Thread.sleep(1) + } + } + async(Dispatchers.Default) { + Thread.sleep(1) + } + repeat(n) { + async(Dispatchers.IO) { + Thread.sleep(1) + } + } + } + +}