Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support thread interrupting blocking functions (#1947)
This is implementation of issue #1947 Signed-off-by: Trol <jiaoxiaodong@xiaomi.com>
- Loading branch information
Trol
committed
Apr 23, 2020
1 parent
5eaf83c
commit f410298
Showing
3 changed files
with
322 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,145 @@ | ||
/* | ||
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package kotlinx.coroutines | ||
|
||
import kotlinx.atomicfu.AtomicRef | ||
import kotlinx.atomicfu.atomic | ||
import kotlinx.atomicfu.loop | ||
import kotlin.coroutines.CoroutineContext | ||
import kotlin.coroutines.EmptyCoroutineContext | ||
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn | ||
|
||
/** | ||
* Makes a blocking code block cancellable (become a cancellation point of the coroutine). | ||
* | ||
* The blocking code block will be interrupted and this function will throw [CancellationException] | ||
* if the coroutine is cancelled. | ||
* | ||
* Example: | ||
* ``` | ||
* GlobalScope.launch(Dispatchers.IO) { | ||
* async { | ||
* // This function will throw [CancellationException]. | ||
* runInterruptible { | ||
* doSomethingUseful() | ||
* | ||
* // This blocking procedure will be interrupted when this coroutine is canceled | ||
* // by Exception thrown by the below async block. | ||
* doSomethingElseUsefulInterruptible() | ||
* } | ||
* } | ||
* | ||
* async { | ||
* delay(500L) | ||
* throw Exception() | ||
* } | ||
* } | ||
* ``` | ||
* | ||
* There is also an optional context parameter to this function to enable single-call conversion of | ||
* interruptible Java methods into main-safe suspending functions like this: | ||
* ``` | ||
* // With one call here we are moving the call to Dispatchers.IO and supporting interruption. | ||
* suspend fun <T> BlockingQueue<T>.awaitTake(): T = | ||
* runInterruptible(Dispatchers.IO) { queue.take() } | ||
* ``` | ||
* | ||
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine. | ||
* @param block regular blocking block that will be interrupted on coroutine cancellation. | ||
*/ | ||
public suspend fun <T> runInterruptible( | ||
context: CoroutineContext = EmptyCoroutineContext, | ||
block: () -> T | ||
): T { | ||
// fast path: empty context | ||
if (context === EmptyCoroutineContext) { return runInterruptibleInExpectedContext(block) } | ||
// slow path: | ||
return withContext(context) { runInterruptibleInExpectedContext(block) } | ||
} | ||
|
||
private suspend fun <T> runInterruptibleInExpectedContext(block: () -> T): T = | ||
suspendCoroutineUninterceptedOrReturn sc@{ uCont -> | ||
try { | ||
// fast path: no job | ||
val job = uCont.context[Job] ?: return@sc block() | ||
// slow path | ||
val threadState = ThreadState().apply { initInterrupt(job) } | ||
try { | ||
block() | ||
} finally { | ||
threadState.clearInterrupt() | ||
} | ||
} catch (e: InterruptedException) { | ||
throw CancellationException() | ||
} | ||
} | ||
|
||
private class ThreadState { | ||
|
||
fun initInterrupt(job: Job) { | ||
// starts with Init | ||
if (state.value !== Init) throw IllegalStateException("impossible state") | ||
// remembers this running thread | ||
state.value = Working(Thread.currentThread(), null) | ||
// watches the job for cancellation | ||
val cancelHandle = | ||
job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = CancelHandler()) | ||
// remembers the cancel handle or drops it | ||
state.loop { s -> | ||
when { | ||
s is Working -> if (state.compareAndSet(s, Working(s.thread, cancelHandle))) return | ||
s === Interrupting || s === Interrupted -> return | ||
s === Init || s === Finish -> throw IllegalStateException("impossible state") | ||
else -> throw IllegalStateException("unknown state") | ||
} | ||
} | ||
} | ||
|
||
fun clearInterrupt() { | ||
state.loop { s -> | ||
when { | ||
s is Working -> if (state.compareAndSet(s, Finish)) { s.cancelHandle!!.dispose(); return } | ||
s === Interrupting -> Thread.yield() // eases the thread | ||
s === Interrupted -> { Thread.interrupted(); return } // no interrupt leak | ||
s === Init || s === Finish -> throw IllegalStateException("impossible state") | ||
else -> throw IllegalStateException("unknown state") | ||
} | ||
} | ||
} | ||
|
||
private inner class CancelHandler : CompletionHandler { | ||
override fun invoke(cause: Throwable?) { | ||
state.loop { s -> | ||
when { | ||
s is Working -> { | ||
if (state.compareAndSet(s, Interrupting)) { | ||
s.thread!!.interrupt() | ||
state.value = Interrupted | ||
return | ||
} | ||
} | ||
s === Finish -> return | ||
s === Interrupting || s === Interrupted -> return | ||
s === Init -> throw IllegalStateException("impossible state") | ||
else -> throw IllegalStateException("unknown state") | ||
} | ||
} | ||
} | ||
} | ||
|
||
private val state: AtomicRef<State> = atomic(Init) | ||
|
||
private interface State | ||
// initial state | ||
private object Init : State | ||
// cancellation watching is setup and/or the continuation is running | ||
private data class Working(val thread: Thread?, val cancelHandle: DisposableHandle?) : State | ||
// the continuation done running without interruption | ||
private object Finish : State | ||
// interrupting this thread | ||
private object Interrupting: State | ||
// done interrupting | ||
private object Interrupted: State | ||
} |
172 changes: 172 additions & 0 deletions
172
kotlinx-coroutines-core/jvm/test/InterruptibleCancellationPointTest.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
/* | ||
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package kotlinx.coroutines | ||
|
||
import java.io.IOException | ||
import java.util.concurrent.Executors | ||
import java.util.concurrent.atomic.AtomicBoolean | ||
import java.util.concurrent.atomic.AtomicInteger | ||
import kotlin.test.* | ||
|
||
class InterruptibleCancellationPointTest: TestBase() { | ||
|
||
@Test | ||
fun testNormalRun() = runBlocking { | ||
var result = runInterruptible { | ||
var x = doSomethingUsefulBlocking(1, 1) | ||
var y = doSomethingUsefulBlocking(1, 2) | ||
x + y | ||
} | ||
assertEquals(3, result) | ||
} | ||
|
||
@Test | ||
fun testExceptionThrow() = runBlocking { | ||
val exception = Exception() | ||
try { | ||
runInterruptible { | ||
throw exception | ||
} | ||
} catch (e: Throwable) { | ||
assertEquals(exception, e) | ||
return@runBlocking | ||
} | ||
fail() | ||
} | ||
|
||
@Test | ||
fun testRunWithContext() = runBlocking { | ||
var runThread = | ||
runInterruptible (Dispatchers.IO) { | ||
Thread.currentThread() | ||
} | ||
assertNotEquals(runThread, Thread.currentThread()) | ||
} | ||
|
||
@Test | ||
fun testRunWithContextFastPath() = runBlocking { | ||
var runThread : Thread = | ||
runInterruptible { | ||
Thread.currentThread() | ||
} | ||
assertEquals(runThread, Thread.currentThread()) | ||
} | ||
|
||
@Test | ||
fun testInterrupt() { | ||
val count = AtomicInteger(0) | ||
try { | ||
expect(1) | ||
runBlocking { | ||
launch(Dispatchers.IO) { | ||
async { | ||
try { | ||
// `runInterruptible` makes a blocking block cancelable (become a cancellation point) | ||
// by interrupting it on cancellation and throws CancellationException | ||
runInterruptible { | ||
try { | ||
doSomethingUsefulBlocking(100, 1) | ||
doSomethingUsefulBlocking(Long.MAX_VALUE, 0) | ||
} catch (e: InterruptedException) { | ||
expect(3) | ||
throw e | ||
} | ||
} | ||
} catch (e: CancellationException) { | ||
expect(4) | ||
} | ||
} | ||
|
||
async { | ||
delay(500L) | ||
expect(2) | ||
throw IOException() | ||
} | ||
} | ||
} | ||
} catch (e: IOException) { | ||
expect(5) | ||
} | ||
finish(6) | ||
} | ||
|
||
@Test | ||
fun testNoInterruptLeak() = runBlocking { | ||
var interrupted = true | ||
|
||
var task = launch(Dispatchers.IO) { | ||
try { | ||
runInterruptible { | ||
doSomethingUsefulBlocking(Long.MAX_VALUE, 0) | ||
} | ||
} finally { | ||
interrupted = Thread.currentThread().isInterrupted | ||
} | ||
} | ||
|
||
delay(500) | ||
task.cancel() | ||
task.join() | ||
assertFalse(interrupted) | ||
} | ||
|
||
@Test | ||
fun testStress() { | ||
val REPEAT_TIMES = 2_000 | ||
|
||
Executors.newCachedThreadPool().asCoroutineDispatcher().use { dispatcher -> | ||
val interruptLeak = AtomicBoolean(false) | ||
val enterCount = AtomicInteger(0) | ||
val interruptedCount = AtomicInteger(0) | ||
val otherExceptionCount = AtomicInteger(0) | ||
|
||
runBlocking { | ||
repeat(REPEAT_TIMES) { repeat -> | ||
var job = launch(start = CoroutineStart.LAZY, context = dispatcher) { | ||
try { | ||
runInterruptible { | ||
enterCount.incrementAndGet() | ||
try { | ||
doSomethingUsefulBlocking(Long.MAX_VALUE, 0) | ||
} catch (e: InterruptedException) { | ||
interruptedCount.incrementAndGet() | ||
throw e | ||
} | ||
} | ||
} catch (e: CancellationException) { | ||
} catch (e: Throwable) { | ||
otherExceptionCount.incrementAndGet() | ||
} finally { | ||
interruptLeak.set(interruptLeak.get() || Thread.currentThread().isInterrupted) | ||
} | ||
} | ||
|
||
var cancelJob = launch(start = CoroutineStart.LAZY, context = dispatcher) { | ||
job.cancel() | ||
} | ||
|
||
launch (dispatcher) { | ||
delay((REPEAT_TIMES - repeat).toLong()) | ||
job.start() | ||
} | ||
|
||
launch (dispatcher) { | ||
delay(repeat.toLong()) | ||
cancelJob.start() | ||
} | ||
} | ||
} | ||
|
||
assertFalse(interruptLeak.get()) | ||
assertEquals(enterCount.get(), interruptedCount.get()) | ||
assertEquals(0, otherExceptionCount.get()) | ||
} | ||
} | ||
|
||
private fun doSomethingUsefulBlocking(timeUseMillis: Long, result: Int): Int { | ||
Thread.sleep(timeUseMillis) | ||
return result | ||
} | ||
} |