# 前置依赖

In [102]:
%useLatestDescriptors
%use coroutines

# 协程基础

## 你的第一个协程

**协程** 是可挂起计算的一个实例。概念上，它类似于一个线程，在于它接受一个代码块来运行，并与代码的其余部分并发工作。 然而，协程不绑定到任何特定线程。它可以在一个线程中挂起执行，并在另一个线程中恢复。

协程可以被认为是轻量级线程，但它们之间存在许多重要差异，使得其在实际使用中与线程大相径庭。

运行以下代码来创建你的第一个可运行协程：

In [103]:
runBlocking { // this: CoroutineScope
    launch { // 启动一个新的协程并继续
        delay(1000L) // 非阻塞式延迟 1 秒（默认时间单位为毫秒）
        println("World!") // 延迟后打印
    }
    println("Hello") // 主协程继续执行，而前一个协程处于延迟状态
}

Hello
World!


让我们剖析这段代码的作用。

`launch` 是一个_协程构建器_。它启动一个新的协程，与代码的其余部分并发执行，代码的其余部分会继续独立工作。这就是为什么 `Hello` 先被打印出来。

`delay` 是一个特殊的_挂起函数_。它使协程_挂起_特定时间。挂起协程并不会_阻塞_底层线程，而是允许其他协程运行并使用底层线程来执行它们的代码。

`runBlocking` 也是一个协程构建器，它连接了常规 `fun main()` 的非协程世界与 `runBlocking { ... }` 花括号内部包含协程的代码。IDE 中 `runBlocking` 左花括号后的 `this: CoroutineScope` 提示就突显了这一点。

如果你移除或忘记这段代码中的 `runBlocking`，你会在 `launch` 调用处得到一个错误，因为 `launch` 只在 `CoroutineScope` 上声明：


> Unresolved reference: launch

`runBlocking` 的名称意味着运行它的线程（在本例中 — 主线程）在调用期间会被_阻塞_，直到 `runBlocking { ... }` 内的所有协程完成执行。你经常会在应用程序的最顶层看到 `runBlocking` 的这种用法，但在实际代码中却很少见，因为线程是昂贵的资源，阻塞它们效率低下，并且通常不被期望。

## 结构化并发

协程遵循结构化并发的原则，这意味着新的协程只能在限定协程生命周期的特定 `CoroutineScope` 中启动。上面的例子表明 `runBlocking` 建立了相应的协程作用域，这就是为什么前面的例子会等待 `World!` 在一秒延迟后打印出来，然后才退出。

在实际应用程序中，你会启动大量的协程。结构化并发确保它们不会丢失且不会泄露。外部作用域只有在其所有子协程完成之后才能完成。结构化并发还确保代码中的任何错误都能正确报告，永不丢失。

## 提取函数重构

让我们将 `launch { ... }` 内部的代码块提取到一个单独的函数中。当你对这段代码执行“提取函数”重构时，你会得到一个带有 `suspend` 修饰符的新函数。这是你的第一个_挂起函数_。挂起函数可以在协程内部像常规函数一样使用，但它们的额外特性是，它们反过来可以使用其他挂起函数（例如本例中的 `delay`）来_挂起_协程的执行。

In [104]:
import kotlinx.coroutines.*

runBlocking { // this: CoroutineScope
    launch { doWorld() }
    println("Hello")
}

// 这是你的第一个挂起函数
suspend fun doWorld() {
    delay(1000L)
    println("World!")
}

Hello
World!


## 作用域构建器

除了由不同构建器提供的协程作用域之外，还可以使用 `coroutineScope` 构建器声明自己的作用域。它会创建一个协程作用域，并且只有在所有已启动的子协程完成之后才会完成。

`runBlocking` 和 `coroutineScope` 构建器可能看起来相似，因为它们都等待其主体及所有子协程完成。 主要区别在于，`runBlocking` 方法会_阻塞_当前线程进行等待，而 `coroutineScope` 只是挂起，释放底层线程供其他用途使用。由于这个区别，`runBlocking` 是一个常规函数，而 `coroutineScope` 是一个挂起函数。

你可以在任何挂起函数中使用 `coroutineScope`。 例如，你可以将 `Hello` 和 `World` 的并发打印移至 `suspend fun doWorld()` 函数中：

In [105]:
import kotlinx.coroutines.*

runBlocking {
    doWorld()
}

suspend fun doWorld() = coroutineScope {  // this: CoroutineScope
    launch {
        delay(1000L)
        println("World!")
    }
    println("Hello")
}

Hello
World!


## 作用域构建器与并发

`coroutineScope` 构建器可以在任何挂起函数内部使用，以执行多个并发操作。 让我们在 `doWorld` 挂起函数中启动两个并发协程：

In [106]:
import kotlinx.coroutines.*

// 顺序执行 doWorld，然后是 "Done"
runBlocking {
    doWorld()
    println("Done")
}

// 并发执行这两个部分
suspend fun doWorld() = coroutineScope { // this: CoroutineScope
    launch {
        delay(2000L)
        println("World 2")
    }
    launch {
        delay(1000L)
        println("World 1")
    }
    println("Hello")
}

Hello
World 1
World 2
Done


`launch { ... }` 代码块内部的两个代码片段都是_并发_执行的，`World 1` 在启动一秒后先打印，`World 2` 在启动两秒后接着打印。 `doWorld` 中的 `coroutineScope` 只有在两者都完成之后才会完成，所以 `doWorld` 会在此之后才返回并允许 `Done` 字符串被打印

## 显式的 Job

`launch` 协程构建器会返回一个 `Job` 对象，它是一个已启动协程的句柄，可以用来显式地等待其完成。 例如，你可以等待子协程完成，然后打印 "Done" 字符串：

In [107]:
runBlocking {
    val job = launch { // 启动一个新的协程并保留对其 Job 的引用
        delay(1000L)
        println("World!")
    }
    println("Hello")
    job.join() // 等待子协程完成
    println("Done")
}

Hello
World!
Done


## 协程是轻量级的

协程比 `JVM` 线程的资源密集度更低。使用线程时会耗尽 `JVM` 可用内存的代码，可以使用协程来表达，而不会触及资源限制。例如，以下代码启动 50,000 个不同的协程，每个协程等待 5 秒，然后打印一个点（'.'），同时消耗很少的内存：

In [108]:
runBlocking {
    repeat(50_000) { // 启动大量协程
        launch {
            delay(5000L)
            print(".")
        }
    }
}

........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................

如果你使用线程编写相同的程序（移除 `runBlocking`，将 `launch` 替换为 `thread`，并将 `delay` 替换为 `Thread.sleep`），它将消耗大量内存。根据你的操作系统、`JDK` 版本及其设置，它要么抛出内存不足错误，要么缓慢启动线程，以至于永远不会有太多并发运行的线程。

# 取消与超时

## 取消协程执行

在长时间运行的应用程序中，你可能需要对后台协程进行细粒度控制。 例如，用户可能关闭了启动协程的页面，现在其结果不再需要，并且其操作可以被取消。 `launch` 函数返回一个 `Job`，可用于取消正在运行的协程：

In [109]:
runBlocking {
    val job = launch {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    job.join() // waits for job's completion
    println("main: Now I can quit.")
}

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.


一旦 `main` 调用 `job.cancel`，我们就不会看到来自另一个协程的任何输出，因为它已被取消。 还有一个 `Job` 扩展函数 `cancelAndJoin`， 它结合了 `cancel` 和 `join` 调用。

## 取消是协作性的

协程取消是_协作性的_。协程代码必须相互配合才能被取消。 `kotlinx.coroutines` 中的所有挂起函数都是_可取消的_。它们会检测协程的取消，并在取消时抛出 `CancellationException`。然而，如果一个协程正在进行计算并且不检测取消，那么它就无法被取消，如下例所示：

In [110]:
runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 5) { // computation loop, just wastes CPU
            // print a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...
main: Now I can quit.


运行它会发现，即使在取消之后，它仍然会继续打印 "I'm sleeping"，直到该 `Job` 在五次迭代后自行完成。

通过捕获 `CancellationException` 但不重新抛出它，可以观察到同样的问题：

In [111]:
runBlocking {
    val job = launch(Dispatchers.Default) {
        repeat(5) { i ->
            try {
                // print a message twice a second
                println("job: I'm sleeping $i ...")
                delay(500)
            } catch (e: Exception) {
                // log the exception
                println(e)
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#70204":StandaloneCoroutine{Cancelling}@3db5defe
job: I'm sleeping 3 ...
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#70204":StandaloneCoroutine{Cancelling}@3db5defe
job: I'm sleeping 4 ...
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#70204":StandaloneCoroutine{Cancelling}@3db5defe
main: Now I can quit.


虽然捕获 `Exception` 是一种反模式，但此问题可能会以更微妙的方式出现，例如在使用 `runCatching` 函数时，该函数不会重新抛出 `CancellationException`。

## 使计算代码可取消

有两种方法可以使计算代码可取消。 第一种是定期调用检查取消的挂起函数。 `yield` 和 `ensureActive` 函数是实现此目的的绝佳选择。 另一种方法是使用 `isActive` 显式检查取消状态。 让我们尝试后一种方法。

将上一个示例中的 `while (i < 5)` 替换为 `while (isActive)` 并重新运行。

In [112]:
runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (isActive) { // cancellable computation loop
            // prints a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.


如你所见，现在这个循环被取消了。`isActive` 是一个扩展属性，通过 `CoroutineScope` 对象在协程内部可用。

## 使用 `finally` 关闭资源

可取消的挂起函数在取消时会抛出 `CancellationException`，可以按常规方式处理。 例如， `try {...} finally {...}` 表达式和 `Kotlin` 的 `use` 函数在协程取消时正常执行其终结操作：

In [113]:
runBlocking {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            println("job: I'm running finally")
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
main: Now I can quit.


`join` 和 `cancelAndJoin` 都会等待所有终结操作完成

## 运行不可取消的代码块

在上一示例的 `finally` 代码块中，任何尝试使用挂起函数的操作都会导致 `CancellationException`，因为运行此代码的协程已被取消。通常，这不是问题，因为所有行为良好的关闭操作（关闭文件、取消作业或关闭任何类型的通信通道）通常都是非阻塞的，并且不涉及任何挂起函数。但是，在极少数情况下，当你需要在已取消的协程中挂起时，你可以使用 `withContext` 函数和 `NonCancellable` 上下文，将相应的代码包装在 `withContext(NonCancellable) {...}` 中，如下例所示：

In [114]:
runBlocking {
    val job = launch {
        try {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            withContext(NonCancellable) {
                println("job: I'm running finally")
                delay(1000L)
                println("job: And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
}

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
job: And I've just delayed for 1 sec because I'm non-cancellable
main: Now I can quit.


## 超时

取消协程执行最明显的实际原因是其执行时间超过了某个超时。 虽然你可以手动跟踪对应 `Job` 的引用并启动一个单独的协程来在延迟后取消被跟踪的协程，但有一个现成的 `withTimeout` 函数可以完成此操作。 请看以下示例：

In [115]:
runBlocking {
    withTimeout(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
}

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...


kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms

`withTimeout` 抛出的 `TimeoutCancellationException` 是 `CancellationException` 的子类。 我们之前没有在控制台上看到它的堆栈跟踪打印出来。那是因为 在已取消的协程内部，`CancellationException` 被认为是协程完成的正常原因。 但是，在此示例中，我们直接在 `main` 函数内部使用了 `withTimeout`。

由于取消只是一种异常，所有资源都以常规方式关闭。 如果你需要在任何类型的超时时执行一些额外的操作，可以将带有超时的代码包装在 `try {...} catch (e: TimeoutCancellationException) {...}` 代码块中，或者使用 `withTimeoutOrNull` 函数，它与 `withTimeout` 类似，但在超时时返回 `null` 而不是抛出异常：

In [99]:
runBlocking {
    val result = withTimeoutOrNull(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
        "Done" // will get cancelled before it produces this result
    }
    println("Result is $result")
}

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null


运行此代码时不再出现异常：

## 异步超时和资源

`withTimeout` 中的超时事件相对于其代码块中运行的代码是异步的，并且可能随时发生，甚至在超时代码块内部返回之前。如果你在代码块内部打开或获取了需要在代码块外部关闭或释放的资源，请记住这一点。

例如，这里我们用 `Resource` 类模拟一个可关闭资源，它通过递增 `acquired` 计数器来简单地跟踪其被创建的次数，并在其 `close` 函数中递减计数器。 现在让我们创建许多协程，每个协程在 `withTimeout` 块的末尾创建一个 `Resource`，并在块外部释放资源。我们添加一个小的延迟，以便超时更可能在 `withTimeout` 块已经完成时发生，这将导致资源泄漏。

In [100]:
var acquired = 0

class Resource {
    init {
        acquired++
    } // Acquire the resource

    fun close() {
        acquired--
    } // Release the resource
}

runBlocking {
    repeat(10_000) { // Launch 10K coroutines
        launch {
            val resource = withTimeout(60) { // Timeout of 60 ms
                delay(50) // Delay for 50 ms
                Resource() // Acquire a resource and return it from withTimeout block
            }
            resource.close() // Release the resource
        }
    }
}
// Outside of runBlocking all coroutines have completed
println(acquired) // Print the number of resources still acquired

475


如果你运行上述代码，你会发现它不总是打印零，尽管这可能取决于你机器的时序。你可能需要调整此示例中的超时以实际看到非零值。

> 请注意，这里从 10K 个协程中递增和递减 `acquired` 计数器是完全线程安全的， 因为它总是发生在 `runBlocking` 使用的同一个线程中。 关于这方面的更多内容将在协程上下文一章中解释。

要解决此问题，你可以将对资源的引用存储在一个变量中，而不是从 `withTimeout` 代码块返回它。

In [101]:
var acquired = 0

class Resource {
    init {
        acquired++
    } // Acquire the resource

    fun close() {
        acquired--
    } // Release the resource
}

runBlocking {
    repeat(10_000) { // Launch 10K coroutines
        launch {
            var resource: Resource? = null // Not acquired yet
            try {
                withTimeout(60) { // Timeout of 60 ms
                    delay(50) // Delay for 50 ms
                    resource = Resource() // Store a resource to the variable if acquired
                }
                // We can do something else with the resource here
            } finally {
                resource?.close() // Release the resource if it was acquired
            }
        }
    }
}
// Outside of runBlocking all coroutines have completed
println(acquired) // Print the number of resources still acquired

0


此示例始终打印零。资源不会泄漏。

# 组合挂起函数

## 默认是顺序执行的

我们使用正常的顺序调用，因为协程中的代码，就像常规代码一样，默认是_顺序执行_的。以下示例通过测量执行这两个挂起函数所需的总时间来演示这一点：

In [54]:
import kotlin.system.measureTimeMillis

suspend fun doSomethingUsefulOne(): Int {
    delay(500L) // pretend we are doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(500L) // pretend we are doing something useful here, too
    return 29
}

In [55]:
runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = doSomethingUsefulOne()
        val two = doSomethingUsefulTwo()
        println("The answer is ${one + two}")
    }
    println("Completed in $time ms")
}

The answer is 42
Completed in 1005 ms


## 使用 async 实现并发
如果 `doSomethingUsefulOne` 和 `doSomethingUsefulTwo` 的调用之间没有依赖关系，并且我们希望通过_并发_执行两者来更快地得到结果，该怎么办？这就是 `async` 发挥作用的地方。

从概念上讲，`async` 就像 `launch` 一样。它启动一个独立的协程，这是一个与所有其他协程并发工作的轻量级线程。不同之处在于，`launch` 返回一个 `Job` 并且不携带任何结果值，而 `async` 返回一个 `Deferred` — 一个轻量级的非阻塞未来（`future`），它代表了一个稍后提供结果的承诺。你可以在一个 `deferred` 值上使用 `.await()` 来获取其最终结果，但 `Deferred` 也是一个 `Job`，因此如果需要，你可以取消它。




In [56]:
runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async { doSomethingUsefulOne() }
        val two = async { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

The answer is 42
Completed in 506 ms


## 惰性启动的 async

可选地，通过将其 `start` 参数设置为 `CoroutineStart.LAZY`，`async` 可以实现惰性启动。在此模式下，它仅在 `await` 需要其结果时，或者当其 `Job` 的 `start` 函数被调用时，才启动协程。运行以下示例：

In [57]:
runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        // some computation
        one.start() // start the first one
        two.start() // start the second one
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}

The answer is 42
Completed in 505 ms


## 使用 async 实现结构化并发

如果 `concurrentSum` 函数的代码内部出现问题并抛出异常，所有在其作用域内启动的协程都将被取消。
取消总是通过协程层次结构传播：


In [58]:
runBlocking<Unit> {
    try {
        failedConcurrentSum()
    } catch(e: ArithmeticException) {
        println("Computation failed with ArithmeticException")
    }
}

suspend fun failedConcurrentSum(): Int = coroutineScope {
    val one = async<Int> {
        try {
            delay(Long.MAX_VALUE) // Emulates very long computation
            42
        } finally {
            println("First child was cancelled")
        }
    }
    val two = async<Int> {
        println("Second child throws an exception")
        throw ArithmeticException()
    }
    one.await() + two.await()
}

Second child throws an exception
First child was cancelled
Computation failed with ArithmeticException


# 协程上下文与调度器

协程总是在某个上下文中执行，该上下文由 `Kotlin` 标准库中定义的 `CoroutineContext` 类型的值表示。

协程上下文是各种元素的集合。主要元素是协程的 `Job`（我们之前已经见过）及其调度器，本节将介绍它。

## 调度器与线程

协程上下文包含一个 协程调度器（参见 `CoroutineDispatcher`），它决定了相应协程用于执行的线程或线程集。协程调度器可以将协程执行限制在特定线程、将其分派到线程池，或者让其以非受限方式运行。

所有协程构建器（如 `launch` 和 `async`）都接受一个可选的 `CoroutineContext` 参数，该参数可用于显式指定新协程的调度器及其他上下文元素。

尝试以下示例：

In [59]:
runBlocking<Unit> {
    launch { // context of the parent, main runBlocking coroutine
        println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
        println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher
        println("Default               : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
        println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
    }
}

Unconfined            : I'm working in thread Execution of code 'runBlocking<Unit> {...' @coroutine#108
Default               : I'm working in thread DefaultDispatcher-worker-1 @coroutine#109
newSingleThreadContext: I'm working in thread MyOwnThread @coroutine#110
main runBlocking      : I'm working in thread Execution of code 'runBlocking<Unit> {...' @coroutine#107


当 `launch { ... }` 不带参数使用时，它会从其启动的 `CoroutineScope` 继承上下文（以及调度器）。在这种情况下，它继承了运行在 `main` 线程中的主 `runBlocking` 协程的上下文。

`Dispatchers.Unconfined` 是一种特殊的调度器，它似乎也在 `main` 线程中运行，但实际上，它是一种不同的机制，稍后会进行解释。

当作用域中未显式指定其他调度器时，会使用默认调度器。它由 `Dispatchers.Default` 表示，并使用一个共享的后台线程池。

`newSingleThreadContext` 会为协程创建一个线程来运行。一个专用线程是非常昂贵的资源。在实际应用程序中，当不再需要时，必须使用 `close` 函数释放它，或者将其存储在顶层变量中并在整个应用程序中重复使用。



## 非受限调度器与受限调度器

`Dispatchers.Unconfined` 协程调度器会在调用者线程中启动协程，但仅限于第一个挂起点之前。挂起之后，它会在完全由所调用的挂起函数确定的线程中恢复协程。非受限调度器适用于那些既不消耗 `CPU` 时间，也不更新任何限制在特定线程上的共享数据（如 UI）的协程。

另一方面，调度器默认从外部 `CoroutineScope` 继承。特别是，`runBlocking` 协程的默认调度器受限于调用者线程，因此继承它会使执行受限于该线程，并具有可预测的 `FIFO` 调度。

In [60]:
runBlocking<Unit> {
    launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
        println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
        delay(500)
        println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
    }
    launch { // context of the parent, main runBlocking coroutine
        println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
        delay(1000)
        println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
    }
}

Unconfined      : I'm working in thread Execution of code 'runBlocking<Unit> {...' @coroutine#112
main runBlocking: I'm working in thread Execution of code 'runBlocking<Unit> {...' @coroutine#113
Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor @coroutine#112
main runBlocking: After delay in thread Execution of code 'runBlocking<Unit> {...' @coroutine#113


## 线程间跳转
使用 `-Dkotlinx.coroutines.debug JVM` 选项运行以下代码（参见调试）：

In [61]:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

newSingleThreadContext("Ctx1").use { ctx1 ->
    newSingleThreadContext("Ctx2").use { ctx2 ->
        runBlocking(ctx1) {
            log("Started in ctx1")
            withContext(ctx2) {
                log("Working in ctx2")
            }
            log("Back to ctx1")
        }
    }
}

[Ctx1 @coroutine#114] Started in ctx1
[Ctx2 @coroutine#114] Working in ctx2
[Ctx1 @coroutine#114] Back to ctx1


上面的示例展示了协程使用中的新技巧。

第一种技巧展示了如何使用带指定上下文的 `runBlocking`。第二种技巧涉及调用 `withContext`，它可能会挂起当前协程并切换到新上下文——前提是新上下文与现有上下文不同。具体来说，如果你指定了不同的 `CoroutineDispatcher`，则需要额外的分派：该代码块会在新调度器上调度，一旦完成，执行将返回到原始调度器。

> 上面的示例使用了 `Kotlin` 标准库中的 `use` 函数，以便在不再需要时正确释放由 `newSingleThreadContext` 创建的线程资源。

## 上下文中的 Job

协程的 `Job` 是其上下文的一部分，可以使用 `coroutineContext[Job]` 表达式从中检索：

In [62]:
runBlocking<Unit> {
    println("My job is ${coroutineContext[Job]}")
}

My job is "coroutine#115":BlockingCoroutine{Active}@2f90bf43


> 请注意，`CoroutineScope` 中的 `isActive` 只是 `coroutineContext[Job]?.isActive == true` 的一个便捷快捷方式。

## 协程的子协程

当一个协程在另一个协程的 `CoroutineScope` 中启动时，它会通过 `CoroutineScope.coroutineContext` 继承其上下文，并且新协程的 `Job` 会成为父协程 `Job` 的_子级_。当父协程被取消时，它的所有子协程也会被递归取消。

然而，这种父子关系可以通过以下两种方式之一显式覆盖：

当启动协程时显式指定不同的作用域（例如 `GlobalScope.launch`）时，它不会从父作用域继承 `Job`。
当一个不同的 `Job` 对象作为新协程的上下文传递时（如下例所示），它会覆盖父作用域的 `Job`。
在这两种情况下，启动的协程都不会绑定到其启动的作用域，并独立运行。

In [63]:
runBlocking<Unit> {
    // launch a coroutine to process some kind of incoming request
    val request = launch {
        // it spawns two other jobs
        launch(Job()) {
            println("job1: I run in my own Job and execute independently!")
            delay(1000)
            println("job1: I am not affected by cancellation of the request")
        }
        // and the other inherits the parent context
        launch {
            delay(100)
            println("job2: I am a child of the request coroutine")
            delay(1000)
            println("job2: I will not execute this line if my parent request is cancelled")
        }
    }
    delay(500)
    request.cancel() // cancel processing of the request
    println("main: Who has survived request cancellation?")
    delay(1000) // delay the main thread for a second to see what happens
}

job1: I run in my own Job and execute independently!
job2: I am a child of the request coroutine
main: Who has survived request cancellation?
job1: I am not affected by cancellation of the request


## 父级职责

父协程总是等待其所有子协程的完成。父级不必显式跟踪它启动的所有子协程，也无需在最后使用 `Job.join` 等待它们：

In [64]:
runBlocking<Unit> {
    // launch a coroutine to process some kind of incoming request
    val request = launch {
        repeat(3) { i -> // launch a few children jobs
            launch  {
                delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
                println("Coroutine $i is done")
            }
        }
        println("request: I'm done and I don't explicitly join my children that are still active")
    }
    request.join() // wait for completion of the request, including all its children
    println("Now processing of the request is complete")
}

request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete


## 组合上下文元素

有时我们需要为一个协程上下文定义多个元素。我们可以使用 `+` 运算符来实现。例如，我们可以同时启动一个协程，并显式指定调度器和名称：

In [65]:
runBlocking<Unit> {
    launch(Dispatchers.Default + CoroutineName("test")) {
        println("I'm working in thread ${Thread.currentThread().name}")
    }
}

I'm working in thread DefaultDispatcher-worker-1 @test#126


## 线程局部数据

In [66]:
val threadLocal = ThreadLocal<String?>() // declare thread-local variable

runBlocking<Unit> {
    threadLocal.set("父协程")
    println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "子协程")) {
        println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
        yield()
        println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    }
    job.join()
    println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
}

Pre-main, current thread: Thread[#30,Execution of code 'val threadLocal = Th...' @coroutine#127,5,main], thread local value: '父协程'
Launch start, current thread: Thread[#80,DefaultDispatcher-worker-1 @coroutine#128,5,main], thread local value: '子协程'
After yield, current thread: Thread[#80,DefaultDispatcher-worker-1 @coroutine#128,5,main], thread local value: '子协程'
Post-main, current thread: Thread[#30,Execution of code 'val threadLocal = Th...' @coroutine#127,5,main], thread local value: '父协程'


# 流

## 流构建器

各种集合和序列都可以使用 `.asFlow()` 扩展函数转换为流：

In [67]:
runBlocking<Unit> {
    // Convert an integer range to a flow
    (1..3).asFlow().collect { value -> println(value) }
}

1
2
3


## 中间流操作符

流可以使用操作符进行转换，就像你转换集合和序列一样。中间操作符应用于上游流并返回下游流。这些操作符是冷的，就像流一样。调用这样的操作符本身不是一个挂起函数。它工作快速，返回一个新的转换流的定义。

基本操作符有像 `map` 和 `filter` 这样熟悉的名字。这些操作符与序列的一个重要区别是，这些操作符内部的代码块可以调用挂起函数。

例如，传入请求的流可以使用 `map` 操作符映射到其结果，即使执行请求是一个由挂起函数实现的长期运行操作：

In [68]:
suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

response 1
response 2
response 3


## Transform 操作符

在流转换操作符中，最通用的一种是 `transform`。它既可以用于模拟 `map` 和 `filter` 等简单转换，也可以实现更复杂的转换。使用 `transform` 操作符，我们可以任意次数地 发出 任意值。

例如，使用 `transform`，我们可以在执行长时间运行的异步请求之前发出一个字符串，然后接着发出一个响应：

In [69]:
suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .transform { request ->
            emit("Making request $request")
            emit(performRequest(request))
        }
        .collect { response -> println(response) }
}

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3


## 尺寸限制操作符

take 等尺寸限制的中间操作符在达到相应的限制时会取消流的执行。协程中的取消总是通过抛出异常来执行，这样所有资源管理函数（例如 `try { ... } finally { ... }` 块）在取消时都能正常运行：

In [70]:
fun numbers(): Flow<Int> = flow {
    try {
        emit(1)
        emit(2)
        println("This line will not execute")
        emit(3)
    } finally {
        println("Finally in numbers")
    }
}

runBlocking<Unit> {
    numbers()
        .take(2) // take only the first two
        .collect { value -> println(value) }
}

1
2
Finally in numbers


## 终止流操作符

流上的终止操作符是 挂起函数，它们启动流的收集。 `collect` 操作符是最基本的一个，但还有其他终止操作符，可以使其更容易：

转换为各种集合，如 `toList` 和 `toSet`。
获取第一个值 (`first`) 并确保流发出单个值 (`single`) 的操作符。
使用 `reduce` 和 `fold` 将流归约为一个值。
例如：

In [71]:
runBlocking<Unit> {
    val sum = (1..5).asFlow()
        .map { it * it } // squares of numbers from 1 to 5
        .reduce { a, b -> a + b } // sum them (terminal operator)
    println(sum)
}

55


## 流是顺序的 ([Flows are sequential])

流的每次独立收集都是顺序执行的，除非使用作用于多个流的特殊操作符。收集直接在调用终止操作符的协程中工作。默认情况下不会启动新的协程。每个发出的值都会被所有中间操作符从上游到下游处理，然后传递给终止操作符。

请看以下示例，它过滤偶数整数并将它们映射为字符串：

In [72]:
runBlocking<Unit> {
    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0
        }
        .map {
            println("Map $it")
            "string $it"
        }.collect {
            println("Collect $it")
        }
}

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5


## flowOn 操作符

异常指的是 `flowOn` 函数，该函数应用于改变流发出的上下文。改变流上下文的正确方法如下面示例所示，它还打印相应线程的名称以展示其工作原理：

In [73]:
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value")
    }
}

[DefaultDispatcher-worker-1 @coroutine#136] Emitting 1
[Execution of code 'fun log(msg: String)...' @coroutine#135] Collected 1
[DefaultDispatcher-worker-1 @coroutine#136] Emitting 2
[Execution of code 'fun log(msg: String)...' @coroutine#135] Collected 2
[DefaultDispatcher-worker-1 @coroutine#136] Emitting 3
[Execution of code 'fun log(msg: String)...' @coroutine#135] Collected 3


## 缓冲

在不同的协程中运行流的不同部分有助于缩短收集流的总时间，特别是在涉及长期运行的异步操作时。例如，考虑一种情况，`simple` 流的发出很慢，产生一个元素需要 100 毫秒；并且收集器也很慢，处理一个元素需要 300 毫秒。让我们看看收集这样一个包含三个数字的流需要多长时间：

In [74]:
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

runBlocking<Unit> {
    val time = measureTimeMillis {
        simple().collect { value ->
            delay(300) // pretend we are processing it for 300 ms
            println(value)
        }
    }
    println("Collected in $time ms")
}

1
2
3
Collected in 1225 ms


我们可以在流上使用 `buffer` 操作符，让 `simple` 流的发出代码与收集代码并发运行，而不是顺序运行它们：

In [75]:
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

runBlocking<Unit> {
    val time = measureTimeMillis {
        simple()
            .buffer() // buffer emissions, don't wait
            .collect { value ->
                delay(300) // pretend we are processing it for 300 ms
                println(value)
            }
    }
    println("Collected in $time ms")
}

1
2
3
Collected in 1022 ms


## 合并 ([Conflation])

当流表示操作的部分结果或操作状态更新时，可能不需要处理每个值，而只需要处理最新值。在这种情况下，可以使用 `conflate` 操作符来跳过中间值，当收集器处理它们的速度太慢时。基于之前的示例：

In [76]:
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

runBlocking<Unit> {
    val time = measureTimeMillis {
        simple()
            .conflate() // conflate emissions, don't process each one
            .collect { value ->
                delay(300) // pretend we are processing it for 300 ms
                println(value)
            }
    }
    println("Collected in $time ms")
}

1
3
Collected in 714 ms


我们看到，当第一个数字仍在处理时，第二个和第三个数字已经生成，因此第二个数字被 合并 了，只有最新（第三个）的数字被传递给收集器：

## 处理最新值

合并是加速处理的一种方式，当发出者和收集器都很慢时。它通过丢弃发出的值来实现。另一种方法是取消一个慢速收集器，并在每次发出新值时重新启动它。有一系列 `xxxLatest` 操作符，它们执行 `xxx` 操作符相同的基本逻辑，但**_在新值出现时取消其块中的代码_**。让我们尝试将 `conflate` 更改为 `collectLatest` 在之前的示例中：

In [77]:
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

runBlocking<Unit> {
    val time = measureTimeMillis {
        simple()
            .collectLatest { value -> // cancel & restart on the latest value
                println("Collecting $value")
                delay(300) // pretend we are processing it for 300 ms
                println("Done $value")
            }
    }
    println("Collected in $time ms")
}

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 614 ms


## 组合多个流

## Zip

In [78]:
runBlocking<Unit> {
    val nums = (1..3).asFlow() // numbers 1..3
    val strs = flowOf("one", "two", "three") // strings
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
        .collect { println(it) } // collect and print
}

1 -> one
2 -> two
3 -> three


## Combine

当流表示变量或操作的最新值时（另请参阅有关 合并 的相关部分），可能需要执行一个依赖于相应流最新值的计算，并且每当任何上游流发出值时重新计算它。相应的操作符系列称为 `combine`。

例如，如果前面示例中的数字每 `300` 毫秒更新一次，但字符串每 `400` 毫秒更新一次，那么使用 `zip` 操作符将它们打包仍将产生相同的结果，尽管结果每 `400` 毫秒打印一次：

In [79]:
runBlocking<Unit> {
    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
    val startTime = System.currentTimeMillis() // remember the start time
    nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
        .collect { value -> // collect and print
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

1 -> one at 406 ms from start
2 -> two at 813 ms from start
3 -> three at 1218 ms from start


然而，当这里使用 `combine` 操作符而不是 `zip` 时：

In [80]:
runBlocking<Unit> {
    val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
    val startTime = System.currentTimeMillis() // remember the start time
    nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
        .collect { value -> // collect and print
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

1 -> one at 406 ms from start
2 -> one at 609 ms from start
2 -> two at 811 ms from start
3 -> two at 914 ms from start
3 -> three at 1212 ms from start


## 展平流

流表示异步接收到的值序列，因此很容易出现每个值都触发对另一个值序列请求的情况。例如，我们可以有以下函数，它返回一个包含两个字符串的流，间隔 500 毫秒：

In [81]:
fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // wait 500 ms
    emit("$i: Second")
}

现在，如果我们有一个包含三个整数的流，并对它们中的每一个调用 `requestFlow`，像这样：

In [82]:
(1..3).asFlow().map { requestFlow(it) }

Line_84_jupyter$special$$inlined$map$1@4457a1f5

那么我们最终会得到一个流的流（`Flow<Flow<String>>`），需要将其 展平 成一个单独的流以供进一步处理。集合和序列有 `flatten` 和 `flatMap` 操作符来完成此操作。然而，由于流的异步特性，它们需要不同的展平 模式，因此存在一系列流的展平操作符。

## flatMapConcat

流的流的连接由 `flatMapConcat` 和 `flattenConcat` 操作符提供。它们是相应序列操作符最直接的类比。它们等待内部流完成后才开始收集下一个流，如下面示例所示：

In [83]:
fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // wait 500 ms
    emit("$i: Second")
}

runBlocking<Unit> {
    val startTime = System.currentTimeMillis() // remember the start time
    (1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms
        .flatMapConcat { requestFlow(it) }
        .collect { value -> // collect and print
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

1: First at 107 ms from start
1: Second at 607 ms from start
2: First at 712 ms from start
2: Second at 1217 ms from start
3: First at 1323 ms from start
3: Second at 1828 ms from start


## flatMapMerge

另一种展平操作是并发收集所有传入的流，并将它们的值合并到一个流中，以便值尽快发出。它由 `flatMapMerge` 和 `flattenMerge` 操作符实现。它们都接受一个可选的 `concurrency` 参数，该参数限制了同时收集的并发流的数量（默认情况下它等于 `DEFAULT_CONCURRENCY`）。

In [84]:
fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // wait 500 ms
    emit("$i: Second")
}

runBlocking<Unit> {
    val startTime = System.currentTimeMillis() // remember the start time
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
        .flatMapMerge { requestFlow(it) }
        .collect { value -> // collect and print
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

1: First at 106 ms from start
2: First at 206 ms from start
3: First at 309 ms from start
1: Second at 607 ms from start
2: Second at 711 ms from start
3: Second at 814 ms from start


> NOTE

> 请注意，flatMapMerge 按顺序调用其代码块（在此示例中为 { `requestFlow(it)` }），但并发收集结果流，这相当于首先执行顺序的 `map { requestFlow(it) }`，然后对结果调用 `flattenMerge`。

## flatMapLatest

与 `collectLatest` 操作符类似，`collectLatest` 操作符在“处理最新值”部分中有所描述，存在相应的“最新”展平模式，其中一旦发出新流，前一个流的收集就会被取消。它由 `flatMapLatest` 操作符实现。

In [85]:
fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500) // wait 500 ms
    emit("$i: Second")
}

runBlocking<Unit> {
    val startTime = System.currentTimeMillis() // remember the start time
    (1..3).asFlow().onEach { delay(100) } // a number every 100 ms
        .flatMapLatest { requestFlow(it) }
        .collect { value -> // collect and print
            println("$value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

1: First at 106 ms from start
2: First at 208 ms from start
3: First at 313 ms from start
3: Second at 818 ms from start


## 流异常

## 收集器的 try 和 catch

In [86]:
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i) // emit next value
    }
}

runBlocking<Unit> {
    try {
        simple().collect { value ->
            println(value)
            check(value <= 1) { "异常值： $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    }
}

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: 异常值： 2


## 一切都被捕获

前面的示例实际上捕获了发出者或任何中间或终止操作符中发生的任何异常。例如，让我们更改代码，以便发出的值被映射为字符串，但相应的代码产生异常：（此异常仍然被捕获，收集被停止）

In [88]:
fun simple(): Flow<String> =
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }.map { value ->
        check(value <= 1) { "Crashed on $value" }
        "string $value"
    }

runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } catch (e: Throwable) {
        println("Caught $e")
    }
}

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2


## 异常透明性

但是发出者的代码如何封装其异常处理行为呢？

流必须对异常是 透明的，在 `flow { ... }` 构建器内部的 `try/catch` 块中 发出 值是违反异常透明性的。这保证了抛出异常的收集器总能像前面的示例一样使用 `try/catch` 捕获它。

发出者可以使用 `catch` 操作符，该操作符保留了这种异常透明性并允许封装其异常处理。`catch` 操作符的主体可以分析异常并根据捕获到的异常以不同的方式对其作出反应：

异常可以使用 `throw` 重新抛出。
异常可以通过 `catch` 主体中的 `emit` 转换为值的发出。
异常可以被忽略、记录或由其他代码处理。
例如，让我们在捕获到异常时发出文本：

In [89]:
fun simple(): Flow<String> =
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
        .map { value ->
            check(value <= 1) { "Crashed on $value" }
            "string $value"
        }

runBlocking<Unit> {
    simple()
        .catch { e -> emit("Caught $e") } // emit on exception
        .collect { value -> println(value) }
}

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2


## 透明捕获

`catch` 中间操作符，遵循异常透明性，只捕获上游异常（即来自 `catch` 上方所有操作符的异常，而不是其下方）。如果 `collect { ... }` 中的块（位于 `catch` 下方）抛出异常，那么它会逃逸：

In [90]:
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
}

Emitting 1
1
Emitting 2


java.lang.IllegalStateException: Collected 2

## 声明式捕获

我们可以结合 `catch` 操作符的声明性与处理所有异常的愿望，方法是将 `collect` 操作符的主体移到 onEach 中，并将其放在 `catch` 操作符之前。此流的收集必须通过调用不带参数的 `collect()` 来触发：

In [39]:
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

runBlocking<Unit> {
    simple()
        .onEach { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
        .catch { e -> println("Caught $e") }
        .collect()
}

Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2


## 流完成

当流收集完成（正常或异常）时，可能需要执行一个操作。正如您可能已经注意到的，这可以通过两种方式完成：命令式或声明式。

## 命令式 finally 块

除了 `try/catch`，收集器还可以使用 `finally` 块在 `collect` 完成时执行一个操作。

In [40]:
fun simple(): Flow<Int> = (1..3).asFlow()

runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}

1
2
3
Done


## 声明式处理

对于声明式方法，流有一个 `onCompletion` 中间操作符，当流完全收集完毕时被调用。

前面的示例可以使用 `onCompletion` 操作符重写，并产生相同的输出：

In [41]:
fun simple(): Flow<Int> = (1..3).asFlow()

runBlocking<Unit> {
    simple()
        .onCompletion { println("Done") }
        .collect { value -> println(value) }
}

1
2
3
Done


`onCompletion` 的关键优势是 `lambda` 的可空 `Throwable` 参数，它可以用于确定流收集是正常完成还是异常完成。在以下示例中，`simple` 流在发出数字 `1` 后抛出异常：

In [49]:
fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}

1
Flow completed exceptionally
Caught exception


`onCompletion` 操作符与 `catch` 不同，它不处理异常。正如我们从上面的示例代码中看到的，异常仍然流向下游。它将被传递给进一步的 `onCompletion` 操作符，并且可以用 `catch` 操作符处理。

## 成功完成

与 `catch` 操作符的另一个区别是，`onCompletion` 看到所有异常，并且仅在上游流成功完成时（没有取消或失败）接收 `null` 异常。

In [50]:
fun simple(): Flow<Int> = (1..3).asFlow()

runBlocking<Unit> {
    simple()
        .onCompletion { cause -> println("Flow completed with $cause") }
        .collect { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
}

1
Flow completed with java.lang.IllegalStateException: Collected 2


java.lang.IllegalStateException: Collected 2

## 启动流

很容易使用流来表示来自某个源的异步事件。在这种情况下，我们需要一个类似于 `addEventListener` 函数的类比，它注册一段代码以响应传入事件并继续后续工作。`onEach` 操作符可以扮演这个角色。然而，`onEach` 是一个中间操作符。我们还需要一个终止操作符来收集流。否则，仅仅调用 `onEach` 没有效果。

如果我们在 `onEach` 之后使用 `collect` 终止操作符，那么它后面的代码将等到流被收集为止：

In [51]:
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")
}

Event: 1
Event: 2
Event: 3
Done


`launchIn` 终止操作符在这里派上用场。通过将 `collect` 替换为 `launchIn`，我们可以在单独的协程中启动流的收集，这样后续代码的执行立即继续：

In [52]:
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}

Done
Event: 1
Event: 2
Event: 3


`launchIn` 的必需参数必须指定一个 `CoroutineScope`，用于启动收集流的协程。在上面的示例中，这个作用域来自 `runBlocking` 协程构建器，因此当流运行时，这个 `runBlocking` 作用域会等待其子协程完成，并阻止 `main` 函数返回并终止此示例。

在实际应用中，作用域将来自具有有限生命周期的实体。一旦此实体的生命周期终止，相应的作用域就会被取消，从而取消相应流的收集。这样，`onEach { ... }.launchIn(scope)` 组合就像 `addEventListener` 一样工作。然而，不需要相应的 `removeEventListener` 函数，因为取消和结构化并发可以达到这个目的。

请注意，`launchIn` 也返回一个 `Job`，它可以用于仅 取消 相应的流收集协程，而不取消整个作用域，或者 `join` 它。

## 流取消检查

为了方便，`flow` 构建器对每个发出的值执行额外的 `ensureActive` 取消检查。这意味着从 `flow { ... }` 发出值的繁忙循环是可取消的：

In [53]:
fun foo(): Flow<Int> = flow {
    for (i in 1..5) {
        println("Emitting $i")
        emit(i)
    }
}

runBlocking<Unit> {
    foo().collect { value ->
        if (value == 3) cancel()
        println(value)
    }
}

Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4


kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled

然而，出于性能原因，大多数其他流操作符不会自行执行额外的取消检查。例如，如果你使用 `IntRange.asFlow` 扩展来编写相同的繁忙循环并且不挂起任何地方，那么就没有取消检查：

In [54]:
runBlocking<Unit> {
    (1..5).asFlow().collect { value ->
        if (value == 3) cancel()
        println(value)
    }
}

1
2
3
4
5


kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled

## 使繁忙的流可取消

在您有带有协程的繁忙循环的情况下，您必须显式检查取消。您可以添加 `.onEach { currentCoroutineContext().ensureActive() }`，但有一个开箱即用的 `cancellable` 操作符可供使用：

In [55]:
runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value ->
        if (value == 3) cancel()
        println(value)
    }
}

1
2
3


kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled

## 流 (Flow) 与响应式流 (Reactive Streams)

对于熟悉 响应式流 (`Reactive Streams`) 或 `RxJava` 和 `Project Reactor` 等响应式框架的人来说，流 (`Flow`) 的设计可能看起来非常熟悉。

确实，它的设计受到了响应式流及其各种实现的启发。但 `Flow` 的主要目标是拥有尽可能简单的设计，易于与 `Kotlin` 和挂起兼容，并尊重结构化并发。如果没有响应式领域的先驱者及其巨大的工作，实现这一目标将是不可能的。你可以在 Reactive Streams and Kotlin Flows 文章中阅读完整的故事。

虽然概念上有所不同，但 `Flow` 是 一个响应式流，并且可以将其转换为响应式（符合规范和 TCK）的 Publisher，反之亦然。`kotlinx.coroutines` 开箱即用地提供了此类转换器，可以在相应的响应式模块中找到（`kotlinx-coroutines-reactive` 用于 `Reactive Streams`，`kotlinx-coroutines-reactor` 用于 `Project Reactor`，`kotlinx-coroutines-rx2/kotlinx-coroutines-rx3` 用于 `RxJava2/RxJava3`）。集成模块包括与 `Flow` 之间的转换、与 `Reactor` 的 `Context` 集成以及与各种响应式实体协作的挂起友好方式。

# 通道

Deferred 值提供了一种在协程之间传输单个值的便捷方式。通道 (Channels) 提供了一种传输值流的方式。

## 通道基础

`[通道 (Channel)]` 在概念上与 `BlockingQueue` 非常相似。一个主要区别是，它没有阻塞的 `put` 操作，而是有一个挂起的 `send` 操作；它没有阻塞的 `take` 操作，而是有一个挂起的 `receive` 操作。

In [57]:
import kotlinx.coroutines.channels.Channel

runBlocking {
    val channel = Channel<Int>()
    launch {
        // this might be heavy CPU-consuming computation or async logic,
        // we'll just send five squares
        for (x in 1..5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) { println(channel.receive()) }
    println("Done!")
}

1
4
9
16
25
Done!


## 关闭和迭代通道

与队列不同，通道可以被关闭，以表明不再有元素到来。在接收端，使用常规的 `for` 循环从通道接收元素非常方便。

概念上，`close` 就像是向通道发送一个特殊的关闭令牌。一旦接收到这个关闭令牌，迭代就会停止，因此可以保证在关闭之前所有先前发送的元素都会被接收：

In [58]:
runBlocking {
    val channel = Channel<Int>()
    launch {
        for (x in 1..5) channel.send(x * x)
        channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) println(y)
    println("Done!")
}

1
4
9
16
25
Done!


## 构建通道生产者

协程生成元素序列的模式非常常见。这是并发代码中常见的_生产者-消费者_模式的一部分。你可以将这样的生产者抽象为一个以通道作为参数的函数，但这与函数必须返回结果的常理相悖。

有一个名为 `produce` 的便捷协程构建器，它使得在生产者端正确实现变得容易，还有一个扩展函数 `consumeEach`，它在消费者端替代了 `for` 循环：

In [59]:
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}

runBlocking {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
}

1
4
9
16
25
Done!


## 使用管道生成素数

以下示例打印前十个素数，并在主线程的上下文中运行整个管道。由于所有协程都在主 `runBlocking` 协程的作用域内启动，我们无需保留所有已启动协程的显式列表。我们在打印前十个素数后，使用 `cancelChildren` 扩展函数来取消所有子协程。

In [60]:
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}

runBlocking {
    var cur = numbersFrom(2)
    repeat(10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

2
3
5
7
11
13
17
19
23
29


请注意，你也可以使用标准库中的 `iterator` 协程构建器来构建相同的管道。将 `produce` 替换为 `iterator`，`send` 替换为 `yield`，`receive` 替换为 `next`，`ReceiveChannel` 替换为 `Iterator`，并摆脱协程作用域。你也不再需要 `runBlocking`。然而，如上所示使用通道的管道的优点是，如果你在 `Dispatchers.Default` 上下文中运行它，它实际上可以使用多个 CPU 核心。

无论如何，这是一种极其不切实际的寻找素数的方法。在实践中，管道确实涉及其他一些挂起调用（例如对远程服务的异步调用），并且这些管道无法使用 `sequence/iterator` 构建，因为它们不允许任意挂起，这与完全异步的 `produce` 不同。

## 缓冲通道

到目前为止所示的通道都没有缓冲区。无缓冲通道在发送者和接收者相遇（即会合 (rendezvous)）时传输元素。如果 `send` 首先被调用，那么它会挂起直到 `receive` 被调用；如果 `receive` 首先被调用，它会挂起直到 `send` 被调用。

`Channel()` 工厂函数和 `produce` 构建器都接受一个可选的 `capacity` 参数来指定_缓冲区大小_。缓冲区允许发送者在挂起之前发送多个元素，类似于具有指定容量的 `BlockingQueue`，后者在缓冲区满时会阻塞。

请看以下代码的行为：

In [63]:
runBlocking<Unit> {
    val channel = Channel<Int>(4) // create buffered channel
    val sender = launch { // launch sender coroutine
        repeat(10) {
            println("Sending $it") // print before sending each element
            channel.send(it) // will suspend when buffer is full
        }
    }
    // don't receive anything... just wait....
    delay(500)
    sender.cancel() // cancel sender coroutine
}

Sending 0
Sending 1
Sending 2
Sending 3
Sending 4


前四个元素被添加到缓冲区，发送者在尝试发送第五个时挂起。

## 通道是公平的
通道的 `send` 和 `receive` 操作对于从多个协程调用它们的顺序是_公平的_。它们以先进先出 (`first-in` `first-out`) 顺序服务，例如，第一个调用 `receive` 的协程会获取元素。在以下示例中，两个协程“ping”和“pong”正在从共享的“table”通道接收“ball”对象。

In [64]:
data class Ball(var hits: Int)

runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)


“ping”协程首先启动，所以它是第一个接收到球的。尽管“ping”协程在将球送回“table”后立即再次开始接收球，但球却被“pong”协程接收了，因为它已经在等待它：

## 计时器通道

计时器通道 (`Ticker channel`) 是一种特殊的会合 (`rendezvous`) 通道，它在自上次从该通道消费以来经过给定延迟后，每次都会生产 `Unit`。虽然它单独使用可能看起来无用，但它是创建复杂基于时间的 `produce` 管道以及进行窗口化和其他时间相关处理的运算符的有用构建块。 计时器通道可以在 `select` 中使用，以执行“时钟滴答”操作。

要创建此类通道，请使用工厂方法 `ticker`。要表明不再需要更多元素，请在其上使用 `ReceiveChannel.cancel` 方法。

现在让我们看看它在实践中是如何工作的：

In [65]:
import kotlinx.coroutines.channels.ticker

runBlocking<Unit> {
    val tickerChannel = ticker(delayMillis = 200, initialDelayMillis = 0) // create a ticker channel
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // no initial delay

    nextElement =
        withTimeoutOrNull(100) { tickerChannel.receive() } // all subsequent elements have 200ms delay
    println("Next element is not ready in 100 ms: $nextElement")

    nextElement = withTimeoutOrNull(120) { tickerChannel.receive() }
    println("Next element is ready in 200 ms: $nextElement")

    // Emulate large consumption delays
    println("Consumer pauses for 300ms")
    delay(300)
    // Next element is available immediately
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    nextElement = withTimeoutOrNull(120) { tickerChannel.receive() }
    println("Next element is ready in 100ms after consumer pause in 300ms: $nextElement")

    tickerChannel.cancel() // indicate that no more elements are needed
}

Initial element is available immediately: kotlin.Unit
Next element is not ready in 100 ms: null
Next element is ready in 200 ms: kotlin.Unit
Consumer pauses for 300ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 100ms after consumer pause in 300ms: kotlin.Unit


请注意，`ticker` 知道可能的消费者暂停，并且默认情况下，如果发生暂停，会调整下一个生产元素的延迟，试图保持生产元素的固定速率。

可选地，可以指定一个等于 `TickerMode.FIXED_DELAY` 的 `mode` 参数，以保持元素之间的固定延迟。

# 协程异常处理

本节涵盖异常处理以及异常时的取消。 我们已经知道，已取消的协程会在挂起点抛出 [`CancellationException`]，并且会被协程机制忽略。这里我们看看如果在取消期间抛出异常，或者同一协程的多个子协程抛出异常时会发生什么。

异常传播
协程构建器有两种类型：自动传播异常 ([`launch`]) 或将它们暴露给用户 ([`async`] 和 [`produce`])。 当这些构建器用于创建不是其他协程的_子_协程的_根_协程时，前者会将异常视为未捕获异常，类似于 Java 的 `Thread.uncaughtExceptionHandler`，而后者则依赖用户消费最终的异常，例如通过 [`await`][Deferred.await] 或 [`receive`][ReceiveChannel.receive]（[`produce`] 和 [`receive`][ReceiveChannel.receive] 在 Channels 部分中介绍）。

这可以通过一个使用 [`GlobalScope`] 创建根协程的简单示例来演示：

> NOTE
> [`GlobalScope`] 是一个慎用的 API，可能会以复杂的方式产生意想不到的负面结果。为整个应用程序创建根协程是 `GlobalScope` 少数几个合理的使用场景之一，因此你必须通过 `@OptIn(DelicateCoroutinesApi::class)` 明确选择使用 `GlobalScope`。

In [66]:
@OptIn(DelicateCoroutinesApi::class)
runBlocking {
    val job = GlobalScope.launch { // root coroutine with launch
        println("Throwing exception from launch")
        throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
    }
    job.join()
    println("Joined failed job")
    val deferred = GlobalScope.async { // root coroutine with async
        println("Throwing exception from async")
        throw ArithmeticException() // Nothing is printed, relying on user to call await
    }
    try {
        deferred.await()
        println("Unreached")
    } catch (e: ArithmeticException) {
        println("Caught ArithmeticException")
    }
}

Throwing exception from launch
Joined failed job
Throwing exception from async
Caught ArithmeticException


## CoroutineExceptionHandler

可以自定义将未捕获异常打印到控制台的默认行为。_根_协程上的 [`CoroutineExceptionHandler`] 上下文元素可用作此根协程及其所有子协程的通用 `catch` 块，用于处理自定义异常。 它类似于 `Thread.uncaughtExceptionHandler`。 你无法在 `CoroutineExceptionHandler` 中从异常中恢复。当处理程序被调用时，协程已经因相应的异常而完成。通常，该处理程序用于记录异常、显示某种错误消息、终止和/或重启应用程序。

`CoroutineExceptionHandler` 仅对未捕获异常（即未以任何其他方式处理的异常）调用。 特别地，所有_子_协程（在另一个 [`Job`] 的上下文中创建的协程）都会将其异常处理委托给其父协程，后者又委托给其父协程，以此类推直到根协程，因此安装在它们上下文中的 `CoroutineExceptionHandler` 永远不会被使用。 此外，[`async`] 构建器总是捕获所有异常并在生成的 [`Deferred`] 对象中表示它们，因此其 `CoroutineExceptionHandler` 也无效。

In [67]:
@OptIn(DelicateCoroutinesApi::class)
runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("CoroutineExceptionHandler got $exception")
    }
    val job = GlobalScope.launch(handler) { // root coroutine, running in GlobalScope
        throw AssertionError()
    }
    val deferred = GlobalScope.async(handler) { // also root, but async instead of launch
        throw ArithmeticException() // Nothing will be printed, relying on user to call deferred.await()
    }
    joinAll(job, deferred)
}

CoroutineExceptionHandler got java.lang.AssertionError


## 取消与异常

取消与异常密切相关。协程内部使用 `CancellationException` 进行取消，这些异常被所有处理程序忽略，因此它们应该只用作额外调试信息的来源，可以通过 `catch` 块获取。 当协程使用 [`Job.cancel`] 取消时，它会终止，但不会取消其父级。

In [68]:
runBlocking {
    val job = launch {
        val child = launch {
            try {
                delay(Long.MAX_VALUE)
            } finally {
                println("Child is cancelled")
            }
        }
        yield()
        println("Cancelling child")
        child.cancel()
        child.join()
        yield()
        println("Parent is not cancelled")
    }
    job.join()
}

Cancelling child
Child is cancelled
Parent is not cancelled


如果协程遇到 `CancellationException` 以外的异常，它会用该异常取消其父级。此行为无法被覆盖，并用于为结构化并发提供稳定的协程层次结构。 [`CoroutineExceptionHandler`] 实现不用于子协程。

> NOTE
> 在这些示例中，[`CoroutineExceptionHandler`] 总是安装到在 [`GlobalScope`] 中创建的协程上。将异常处理程序安装到在主 [`runBlocking`] 范围内启动的协程是没有意义的，因为即使安装了处理程序，当其子协程因异常而完成时，主协程也总是会被取消。

只有当所有子协程终止时，父协程才会处理原始异常，这由以下示例演示。

In [69]:
@OptIn(DelicateCoroutinesApi::class)
runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("CoroutineExceptionHandler got $exception")
    }
    val job = GlobalScope.launch(handler) {
        launch { // the first child
            try {
                delay(Long.MAX_VALUE)
            } finally {
                withContext(NonCancellable) {
                    println("Children are cancelled, but exception is not handled until all children terminate")
                    delay(100)
                    println("The first child finished its non cancellable block")
                }
            }
        }
        launch { // the second child
            delay(10)
            println("Second child throws an exception")
            throw ArithmeticException()
        }
    }
    job.join()
}

Second child throws an exception
Children are cancelled, but exception is not handled until all children terminate
The first child finished its non cancellable block
CoroutineExceptionHandler got java.lang.ArithmeticException


## 异常聚合

当一个协程的多个子协程因异常而失败时，一般规则是“第一个异常优先”，因此第一个异常得到处理。第一个异常之后发生的所有额外异常都作为被抑制的异常附加到第一个异常上。

In [72]:
import java.io.IOException

@OptIn(DelicateCoroutinesApi::class)
runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
    }
    val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE) // it gets cancelled when another sibling fails with IOException
            } finally {
                throw ArithmeticException() // the second exception
            }
        }
        launch {
            delay(100)
            throw IOException() // the first exception
        }
        delay(Long.MAX_VALUE)
    }
    job.join()
}

CoroutineExceptionHandler got java.io.IOException with suppressed [java.lang.ArithmeticException]


取消异常是透明的，默认会被解包：

In [78]:
@OptIn(DelicateCoroutinesApi::class)
runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("CoroutineExceptionHandler got $exception")
    }
    val job = GlobalScope.launch(handler) {
        val innerJob = launch { // all this stack of coroutines will get cancelled
            launch {
                launch {
                    throw IOException() // the original exception
                }
                launch {
                    delay(100)
                    throw ArithmeticException()
                }
            }
        }
        try {
            innerJob.join()
        } catch (e: CancellationException) {
            println("Rethrowing CancellationException with original cause")
            throw e // cancellation exception is rethrown, yet the original IOException gets to the handler
        }
    }
    job.join()
}

Rethrowing CancellationException with original cause
CoroutineExceptionHandler got java.lang.ArithmeticException


## Supervisor

如前所述，取消是一种双向关系，在整个协程层次结构中传播。让我们来看看需要单向取消的情况。

这种要求的一个很好的例子是其作用域中定义了作业的 UI 组件。如果 UI 的任何子任务失败，并不总是需要取消（实际上是终止）整个 UI 组件，但如果 UI 组件被销毁（且其作业被取消），则需要取消所有子作业，因为它们的结果不再需要。

另一个例子是服务器进程，它产生多个子作业并需要_监管_它们的执行，跟踪它们的失败并只重启失败的作业。

## 监管作业

[`SupervisorJob`][SupervisorJob()] 可用于这些目的。它类似于普通的 [`Job`][Job()]，唯一的例外是取消仅向下传播。这可以通过以下示例轻松演示：

In [82]:
runBlocking {
    val supervisor = SupervisorJob()
    with(CoroutineScope(coroutineContext + supervisor)) {
        // launch the first child -- its exception is ignored for this example (don't do this in practice!)
        val firstChild = launch(CoroutineExceptionHandler { _, _ ->  }) {
            println("The first child is failing")
            throw AssertionError("The first child is cancelled")
        }
        // launch the second child
        val secondChild = launch {
            firstChild.join()
            // Cancellation of the first child is not propagated to the second child
            println("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active")
            try {
                delay(Long.MAX_VALUE)
            } finally {
                // But cancellation of the supervisor is propagated
                println("The second child is cancelled because the supervisor was cancelled")
            }
        }
        // wait until the first child fails & completes
        firstChild.join()
        println("Cancelling the supervisor")
        supervisor.cancel()
        secondChild.join()
    }
}

The first child is failing
The first child is cancelled: true, but the second one is still active
Cancelling the supervisor
The second child is cancelled because the supervisor was cancelled


## Supervisor Scope

除了 [`coroutineScope`]] 之外，我们还可以使用 [`supervisorScope`] 实现_作用域_并发。它只在一个方向上传播取消，并且只有当自身失败时才取消所有子协程。它也像 [`coroutineScope`] 一样，在完成之前等待所有子协程。

In [86]:
runBlocking {
    try {
        supervisorScope {
            val child = launch {
                try {
                    println("The child is sleeping")
                    delay(Long.MAX_VALUE)
                } finally {
                    println("The child is cancelled")
                }
            }
            // Give our child a chance to execute and print using yield
            yield()
            println("Throwing an exception from the scope")
            throw AssertionError()
        }
    } catch (e: AssertionError) {
        println("Caught an assertion error")
    }
}

The child is sleeping
Throwing an exception from the scope
The child is cancelled
Caught an assertion error


## 监管协程中的异常

常规作业和监管作业之间的另一个关键区别是异常处理。 每个子协程都应该通过异常处理机制自己处理其异常。 这种差异源于子协程的失败不会传播给父协程。 这意味着直接在 [`supervisorScope`] 内启动的协程_确实_使用安装在其作用域中的 [`CoroutineExceptionHandler`]，与根协程的方式相同（详见 `CoroutineExceptionHandler` 部分）。

In [88]:
runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("CoroutineExceptionHandler got $exception")
    }
    supervisorScope {
        val child = launch(handler) {
            println("The child throws an exception")
            throw AssertionError()
        }
        println("The scope is completing")
    }
    println("The scope is completed")
}

The scope is completing
The child throws an exception
CoroutineExceptionHandler got java.lang.AssertionError
The scope is completed


# 共享可变状态与并发

协程可以使用像 `Dispatchers.Default` 这样的多线程调度器并行执行。这会带来所有常见的并行问题，主要问题是对共享可变状态的访问同步。协程领域中解决此问题的一些方案与多线程世界中的方案类似，但也有一些是独有的。

## 问题

让我们启动一百个协程，每个协程都执行相同的操作一千次。我们还将测量它们的完成时间以供后续比较：

In [89]:
suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // 要启动的协程数量
    val k = 1000 // 每个协程重复操作的次数
    val time = measureTimeMillis {
        coroutineScope { // 协程作用域
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

我们首先用多线程 `Dispatchers.Default` 来递增一个共享可变变量，这是一个非常简单的操作。

In [93]:
suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")
}

var counter = 0

runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

Completed 100000 actions in 10 ms
Counter = 33248


## volatile 无济于事

有一个常见的误解是，将变量声明为 `volatile` 可以解决并发问题。让我们试一试：

In [94]:
@Volatile // 在 Kotlin 中，`volatile` 是一个注解
var counter = 0

runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

Completed 100000 actions in 14 ms
Counter = 27538


这段代码运行得更慢，但我们仍然不能总是在最后得到 "Counter = 100000"，因为 `volatile` 变量保证了对相应变量的线性化（这是“原子性”的技术术语）读写，但不提供更大操作（本例中的递增操作）的原子性。

## 线程安全的数据结构

适用于线程和协程的普遍解决方案是使用线程安全（也称为同步、线性化或原子）的数据结构，它为在共享状态上执行的相应操作提供了所有必要的同步。对于一个简单的计数器，我们可以使用 `AtomicInteger` 类，它具有原子性的 `incrementAndGet` 操作：

In [96]:
import java.util.concurrent.atomic.AtomicInteger

val counter = AtomicInteger()

runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.incrementAndGet()
        }
    }
    println("Counter = $counter")
}

Completed 100000 actions in 11 ms
Counter = 100000


这是针对这个特定问题最快的解决方案。它适用于普通计数器、集合、队列以及其他标准数据结构及其基本操作。然而，它不容易扩展到复杂的状态或没有现成线程安全实现的复杂操作。

## 线程封闭：细粒度

**线程封闭** 是一种解决共享可变状态问题的途径，即对特定共享状态的所有访问都局限于单个线程。它通常用于 UI 应用程序，其中所有 UI 状态都局限于单个事件分发/应用程序线程。使用单线程上下文，可以很容易地将其应用于协程。

In [98]:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // 将每次递增限制在单线程上下文中
            withContext(counterContext) {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

Completed 100000 actions in 343 ms
Counter = 100000


这段代码运行非常慢，因为它采用了_细粒度_的线程封闭。每次单独的递增都会使用 `withContext(counterContext)` 块从多线程的 `Dispatchers.Default` 上下文切换到单线程上下文。

## 线程封闭：粗粒度
实践中，线程封闭是以大块进行的，例如，大段的状态更新业务逻辑被限制在单个线程中。以下示例就是这样做的，从一开始就在单线程上下文中运行每个协程。

In [99]:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

runBlocking {
    // 将所有操作限制在单线程上下文中
    withContext(counterContext) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

Completed 100000 actions in 2 ms
Counter = 100000


## 互斥

互斥解决方案是通过_临界区_来保护对共享状态的所有修改，确保这些修改永不并发执行。在阻塞世界中，你通常会为此使用 `synchronized` 或 `ReentrantLock`。协程的替代方案称为 `Mutex`。它具有 `lock` 和 `unlock` 函数来划定临界区。关键区别在于 `Mutex.lock()` 是一个挂起函数。它不会阻塞线程。

还有一个 `withLock` 扩展函数，它方便地表示了 `mutex.lock(); try { ... } finally { mutex.unlock() }` 模式：

In [100]:
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

val mutex = Mutex()
var counter = 0

runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // 用锁保护每次递增
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

Completed 100000 actions in 153 ms
Counter = 100000


这个例子中的加锁是细粒度的，因此会付出性能代价。然而，对于某些情况，这是一个不错的选择，在这些情况下，你必须定期修改某些共享状态，但没有一个自然线程来限定这个状态。

# Select 表达式（实验性）

`Select` 表达式使得同时等待多个挂起函数并选择第一个可用的函数成为可能。

## 从通道中选择

假设我们有两个字符串生产者：`fizz` 和 `buzz`。

`fizz` 每 500 毫秒生成一次 "Fizz" 字符串：

`buzz` 每 1000 毫秒生成一次 "Buzz!" 字符串：

In [101]:
fun CoroutineScope.fizz() = produce<String> {
    while (true) { // sends "Fizz" every 500 ms
        delay(500)
        send("Fizz")
    }
}

fun CoroutineScope.buzz() = produce<String> {
    while (true) { // sends "Buzz!" every 1000 ms
        delay(1000)
        send("Buzz!")
    }
}

使用 [`receive`][ReceiveChannel.receive] 挂起函数，我们可以要么从一个通道接收，要么从另一个通道接收。但是 [`select`] 表达式允许我们使用其 [`onReceive`][ReceiveChannel.onReceive] 子句同时从两者接收：

In [102]:
import kotlinx.coroutines.selects.select

suspend fun selectFizzBuzz(fizz: ReceiveChannel<String>, buzz: ReceiveChannel<String>) {
    select<Unit> { // <Unit> means that this select expression does not produce any result
        fizz.onReceive { value ->  // this is the first select clause
            println("fizz -> '$value'")
        }
        buzz.onReceive { value ->  // this is the second select clause
            println("buzz -> '$value'")
        }
    }
}

让我们运行它七次：

In [103]:
runBlocking<Unit> {
    val fizz = fizz()
    val buzz = buzz()
    repeat(7) {
        selectFizzBuzz(fizz, buzz)
    }
    coroutineContext.cancelChildren() // 取消 fizz 和 buzz 协程
}

fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'
buzz -> 'Buzz!'
fizz -> 'Fizz'
fizz -> 'Fizz'


## 在通道关闭时选择

当通道关闭时，`select` 中的 [`onReceive`][ReceiveChannel.onReceive] 子句会失败，导致相应的 `select` 抛出异常。我们可以使用 [`onReceiveCatching`][ReceiveChannel.onReceiveCatching] 子句在通道关闭时执行特定操作。以下示例还展示了 `select` 是一个表达式，它返回其所选子句的结果：

In [104]:
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String =
    select<String> {
        a.onReceiveCatching { it ->
            val value = it.getOrNull()
            if (value != null) {
                "a -> '$value'"
            } else {
                "Channel 'a' is closed"
            }
        }
        b.onReceiveCatching { it ->
            val value = it.getOrNull()
            if (value != null) {
                "b -> '$value'"
            } else {
                "Channel 'b' is closed"
            }
        }
    }

让我们用通道 a（生成四次 "Hello" 字符串）和通道 b（生成四次 "World"）来使用它：

In [108]:
runBlocking<Unit> {
    val a = produce<String> {
        repeat(4) { send("Hello $it") }
    }
    val b = produce<String> {
        repeat(4) { send("World $it") }
    }
    repeat(8) { // 打印前八个结果
        println(selectAorB(a, b))
    }
    coroutineContext.cancelChildren()
}

a -> 'Hello 0'
a -> 'Hello 1'
b -> 'World 0'
a -> 'Hello 2'
a -> 'Hello 3'
b -> 'World 1'
Channel 'a' is closed
Channel 'a' is closed


可以从中得出几点观察结果。

首先，`select` 对第一个子句有偏向性。当多个子句同时可选择时，它们中第一个被选中。在这里，两个通道都在持续生成字符串，因此 a 通道作为 `select` 中的第一个子句，会胜出。然而，因为我们使用的是无缓冲通道，`a` 会在其 [`send`][SendChannel.send] 调用时时不时地挂起，这也给了 `b` 发送的机会。

第二个观察结果是，当通道已关闭时，[`onReceiveCatching`][ReceiveChannel.onReceiveCatching] 会立即被选中。

## 选择发送

`Select` 表达式具有 [`onSend`][SendChannel.onSend] 子句，结合选择的偏向性，可以大有裨益。

让我们编写一个整数生产者的示例，当其主通道上的消费者无法跟上时，它会将值发送到 `side` 通道：

In [109]:
import kotlinx.coroutines.channels.SendChannel

fun CoroutineScope.produceNumbers(side: SendChannel<Int>) = produce<Int> {
    for (num in 1..10) { // produce 10 numbers from 1 to 10
        delay(100) // every 100 ms
        select<Unit> {
            onSend(num) {} // Send to the primary channel
            side.onSend(num) {} // or to the side channel
        }
    }
}

消费者会相当慢，处理每个数字需要 250 毫秒：

In [110]:
runBlocking<Unit> {
    val side = Channel<Int>() // 分配侧通道
    launch { // 这是侧通道的一个非常快的消费者
        side.consumeEach { println("Side channel has $it") }
    }
    produceNumbers(side).consumeEach {
        println("Consuming $it")
        delay(250) // 让我们妥善处理消费的数字，不要着急
    }
    println("Done consuming") // 完成消费
    coroutineContext.cancelChildren()
}

Consuming 1
Side channel has 2
Side channel has 3
Consuming 4
Side channel has 5
Side channel has 6
Consuming 7
Side channel has 8
Side channel has 9
Consuming 10
Done consuming


## 选择延迟值

延迟值可以使用 [`onAwait`][Deferred.onAwait] 子句进行选择。让我们从一个异步函数开始，它在随机延迟后返回一个延迟的字符串值：

In [111]:
fun CoroutineScope.asyncString(time: Int) = async {
    delay(time.toLong())
    "Waited for $time ms"
}

让我们启动十二个带有随机延迟的函数。

In [112]:
import kotlin.random.Random

fun CoroutineScope.asyncStringsList(): List<Deferred<String>> {
    val random = Random(3)
    return List(12) { asyncString(random.nextInt(1000)) }
}

现在主函数会等待其中第一个完成，并计算仍处于活动状态的延迟值数量。请注意，我们在这里利用了 `select` 表达式是一个 `Kotlin DSL` 的事实，因此我们可以使用任意代码为其提供子句。在这种情况下，我们遍历延迟值列表，为每个延迟值提供 `onAwait` 子句。

In [113]:
runBlocking<Unit> {
    val list = asyncStringsList()
    val result = select<String> {
        list.withIndex().forEach { (index, deferred) ->
            deferred.onAwait { answer ->
                "Deferred $index produced answer '$answer'" // 延迟值 $index 产生了结果 '$answer'
            }
        }
    }
    println(result)
    val countActive = list.count { it.isActive }
    println("$countActive coroutines are still active") // $countActive 协程仍处于活动状态
}

Deferred 6 produced answer 'Waited for 43 ms'
11 coroutines are still active


## 在延迟值通道上切换

让我们编写一个通道生产者函数，它消费一个延迟字符串值通道，等待每个接收到的延迟值，但只等到下一个延迟值出现或通道关闭。这个示例将 [`onReceiveCatching`][ReceiveChannel.onReceiveCatching] 和 [`onAwait`][Deferred.onAwait] 子句放在同一个 `select` 中：

In [114]:
fun CoroutineScope.switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> {
    var current = input.receive() // start with first received deferred value
    while (isActive) { // loop while not cancelled/closed
        val next = select<Deferred<String>?> { // return next deferred value from this select or null
            input.onReceiveCatching { update ->
                update.getOrNull()
            }
            current.onAwait { value ->
                send(value) // send value that current deferred has produced
                input.receiveCatching().getOrNull() // and use the next deferred from the input channel
            }
        }
        if (next == null) {
            println("Channel was closed") // 通道已关闭
            break // out of loop
        } else {
            current = next
        }
    }
}

为了测试它，我们将使用一个简单的异步函数，它在指定时间后解析为指定的字符串：

In [115]:
fun CoroutineScope.asyncString(str: String, time: Long) = async {
    delay(time)
    str
}

主函数只是启动一个协程来打印 `switchMapDeferreds` 的结果，并向其发送一些测试数据：

In [117]:
runBlocking<Unit> {
    val chan = Channel<Deferred<String>>() // 用于测试的通道
    launch { // 启动打印协程
        for (s in switchMapDeferreds(chan))
            println(s) // 打印每个接收到的字符串
    }
    chan.send(asyncString("BEGIN", 100))
    delay(200) // 足够时间让 "BEGIN" 产生
    chan.send(asyncString("Slow", 500))
    delay(100) // 没有足够时间产生 slow
    chan.send(asyncString("Replace", 100))
    delay(500) // 在最后一个之前给它时间
    chan.send(asyncString("END", 500))
    delay(1000) // 给它时间处理
    chan.close() // 关闭通道...
    delay(500) // 并等待一段时间让它完成
}

BEGIN
Replace
END
Channel was closed
