Skip to content

Commit 20660c4

Browse files
committed
Finished 'src/async/all_together.md'.
1 parent 82fefd5 commit 20660c4

File tree

3 files changed

+132
-52
lines changed

3 files changed

+132
-52
lines changed

projects/hello-async/src/main.rs

Lines changed: 10 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,18 @@
1-
use std::{pin::pin, time::Duration};
2-
use trpl::{ReceiverStream, Stream, StreamExt};
1+
use std::{thread, time::Duration};
32

4-
fn get_messages() -> impl Stream<Item = String> {
5-
let (tx, rx) = trpl::channel();
6-
7-
trpl::spawn_task(async move {
8-
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
9-
10-
for (index, message) in messages.into_iter().enumerate() {
11-
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
12-
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
13-
14-
if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
15-
eprintln!("Cannot send message '{message}': {send_error}");
16-
break;
17-
}
18-
}
19-
});
20-
21-
ReceiverStream::new(rx)
22-
}
23-
24-
fn get_intervals() -> impl Stream<Item = u32> {
25-
let (tx, rx) = trpl::channel();
26-
27-
trpl::spawn_task(async move {
28-
let mut count = 0;
29-
loop {
30-
trpl::sleep(Duration::from_millis(1)).await;
31-
count += 1;
3+
fn main() {
4+
let (tx, mut rx) = trpl::channel();
325

33-
if let Err(send_error) = tx.send(count) {
34-
eprintln!("Could not send interval {count}: {send_error}");
35-
break;
36-
};
6+
thread::spawn(move || {
7+
for i in 1..11 {
8+
tx.send(i).unwrap();
9+
thread::sleep(Duration::from_secs(1));
3710
}
3811
});
3912

40-
ReceiverStream::new(rx)
41-
}
42-
43-
fn main() {
4413
trpl::run(async {
45-
let messages = get_messages().timeout(Duration::from_millis(200));
46-
let intervals = get_intervals()
47-
.map(|count| format!("Interval: {count}"))
48-
.throttle(Duration::from_millis(100))
49-
.timeout(Duration::from_secs(10));
50-
let merged = messages.merge(intervals).take(40);
51-
let mut stream = pin!(merged);
52-
53-
while let Some(result) = stream.next().await {
54-
match result {
55-
Ok(message) => println!("{message}"),
56-
Err(reason) => eprintln!("Problem: {reason:?}"),
57-
}
14+
while let Some(message) = rx.recv().await {
15+
println!("{message}");
5816
}
59-
})
17+
});
6018
}

src/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
- [使用任意数量的未来值](async/multiple_futures.md)
123123
- [流:序列中的未来值](async/streams.md)
124124
- [近观异步相关的特质](async/async_traits.md)
125+
- [放在一起:未来值、任务与线程](async/all_together.md)
125126

126127

127128
- [Rust 的面向对象编程特性](Ch17_Object_Oriented_Programming_Features_of_Rust.md)

src/async/all_together.md

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
# 放在一起:未来值、任务与线程
2+
3+
正如我们在 [第 16 章](../Ch16_Fearless_Concurrency.md) 中所看到的,线程提供了一种并发的方法。我们在本章中看到了另一种方法:使用未来值与流的异步。如果咱们想要知道,何时该选择另一种方法,答案是:视情况而定!在很多情况下,我们需要选择的不是线程 ** 异步,而是线程 ** 异步。
4+
5+
6+
数十年来,许多操作系统都提供了基于线程的并发模型,而许多编程语言也因此支持这些模型。不过,这些模型也并非没有代价。在许多操作系统上,每个线程都会占用相当多的内存,而且启动和关闭线程都会产生一些开销。也只有在操作系统和硬件支持的情况下,线程才可用。与主流台式机和便携电脑不同,一些嵌入式系统根本没有操作系统,因此他们也没有线程。
7+
8+
9+
异步模型提供了一套不同的权衡机制,而成为一种终极补充。在异步模型中,并发操作不需要其各自的线程。相反,他们可运行于任务之上,就像我们在流小节中,使用 `trpl::spawn_task` 启动某个同步函数的工作一样。任务类似于线程,但他不是由操作系统管理,而是由库级别的代码(即运行时)管理。
10+
11+
12+
在上一小节中,我们看到了可通过使用一个异步通道,及生成一个我们可从同步代码中调用的异步任务,而构建出一个流。我们也可使用线程,完成这完全一样的事情。在下面清单 17-40 中,我们使用标准库中的 `trpl::spawn_task``trpl::sleep` 两个 APIs,替换了 `get_intervals` 中的异步通道与异步任务。
13+
14+
15+
文件名:`src/main.rs`
16+
17+
```rust
18+
fn get_intervals() -> impl Stream<Item = u32> {
19+
let (tx, rx) = trpl::channel();
20+
21+
// This is *not* `trpl::spawn` but `std::thread::spawn`!
22+
thread::spawn(move || {
23+
let mut count = 0;
24+
loop {
25+
// Likewise, this is *not* `trpl::sleep` but `std::thread::sleep`!
26+
thread::sleep(Duration::from_millis(1));
27+
count += 1;
28+
29+
if let Err(send_error) = tx.send(count) {
30+
eprintln!("Could not send interval {count}: {send_error}");
31+
break;
32+
};
33+
}
34+
});
35+
36+
ReceiverStream::new(rx)
37+
}
38+
```
39+
40+
*清单 17-41:在 `get_intervals` 中使用 `std::thread` 而非异步的 `trpl` APIs*
41+
42+
43+
若咱们运行这段代码,输出结果会与清单 17-40 相同。请注意,从调用代码的角度来看,这里的变化微乎其微。更重要的是,尽管我们的一个函数在运行时上生成了个异步任务,而另一函数生成了个操作系统的线程,但得到的两个流,并没有受到这些差异的影响。
44+
45+
46+
尽管这两种方法有相似之处,但他们的行为却大相径庭,尽管我们可能很难在这个非常简单的例子中测量出来。我们可以在任何现代个人电脑上,生成数百万个异步任务。但若我们试图用线程来做这件事,内存真的会用完!
47+
48+
49+
然而,这些 API 如此相似是有原因的。线程充当了一些同步操作集的边界;线程 *之间* 可以并发。任务则充当了一些异步操作集的边界;任务 *之间* 和任务 *内部* 都可以并发,因为任务可以在其主体中的未来值之间切换。最后,未来值是 Rust 最细粒度的并发单元,每个未来值都可以代表一棵由其他未来值组成的树。运行时 -- 具体来说是运行时的执行器 -- 管理着任务,而任务管理着未来值。在这方面,任务类似于由运行时管理着的轻量级线程,同时由于是由运行时而不是操作系统管理,因此任务还是具有更多功能的轻量级线程。
50+
51+
52+
这并不意味着异步任务总是要比线程更好(反之亦然)。在某些方面,相比于使用 `async` 的并发,使用线程的并发是一种更简单的编程模型。这可以是优点,也可以是缺点。线程在某种程度上是 “触发并遗忘” 的;他们没有与未来值相对应的原生对等体,因此除非被操作系统本身打断,他们运行即可完成。也就是说,线程并不像未来值那样,支持 *任务内的并发*。Rust 中的线程也没有取消机制 -- 我们在本章中没有明确涉及这一主题,但每当我们结束某个未来值时,其状态就会被正确清理,这一事实暗示了任务的取消机制。
53+
54+
55+
这些限制也使得线程比期货更难于组装。例如,使用线程构建 `timeout``throttle` 方法等辅助工具,就比我们在本章前面所构建的要困难得多。正如我们所看到的,未来值是一种更丰富的数据结构,这意味着他们可以更自然地组合在一起。
56+
57+
58+
因此,任务给到我们对未来值的 *额外* 控制,允许我们选择在何处以及如何对他们分组。事实证明,线程和任务往往能配合得很好,因为任务可以(至少在某些运行时下)在线程间迁移。事实上,我们一直在使用的运行时,包括 `spawn_blocking``spawn_task` 两个函数,默认情况下都是多线程的!许多运行时都使用了一种名为 *工作偷取,work stealing* 的方法,根据线程当前的使用情况,在线程间透明地迁移任务,以提高系统的整体性能。这种方法实际上需要线程 ** 任务,因此也需要未来值。
59+
60+
61+
在考虑何时使用哪种方法时,请考虑以下经验法则:
62+
63+
64+
- 如果工作的 *并行性很强*,比如处理每个部分都可以单独处理的大量数据时,线程是更好的选择;
65+
- 如果工作的 *并发性很高*,例如处理来自不同来源,可能以不同时间间隔或不同速度发送的消息时,那么异步是更好的选择。
66+
67+
68+
如果咱们同时需要并行性和并发性,咱们就不必在线程和异步之间做出选择。咱们可自由地将他们结合在一起使用,让他们各自发挥其最擅长的部分。例如,下面清单 17-42 展示了,实际 Rust 代码中这种混合使用的一个相当常见的示例。
69+
70+
71+
文件名:`src/main.rs`
72+
73+
74+
75+
```rust
76+
use std::{thread, time::Duration};
77+
78+
fn main() {
79+
let (tx, mut rx) = trpl::channel();
80+
81+
thread::spawn(move || {
82+
for i in 1..11 {
83+
tx.send(i).unwrap();
84+
thread::sleep(Duration::from_secs(1));
85+
}
86+
});
87+
88+
trpl::run(async {
89+
while let Some(message) = rx.recv().await {
90+
println!("{message}");
91+
}
92+
});
93+
}
94+
```
95+
96+
97+
*清单 17-42:在一个线程中以阻塞代码发送消息,并在一个异步代码块中等待消息*
98+
99+
100+
我们以创建一个异步通道开始,然后生成一个取得通道发送侧所有权的线程。在该线程中,我们发送数字 1 到 10,每个数字之间休眠一秒钟。最后,就像本章所做的那样,我们运行了一个以传递给 `trpl::run` 的异步代码块创建出的未来值。在这个未来值中,我们等待这些消息,就像在我们曾看到的其他消息传递示例中一样。
101+
102+
103+
回到本章开头的场景,设想使用一个专门线程运行一组视频编码任务(因为视频编码是计算密集的),而以一个异步通道,通知用户界面这些操作已完成。在真实世界用例中,这类组合的例子数不胜数。
104+
105+
106+
107+
## 本章小结
108+
109+
110+
这并不是咱们在本书中最后一次看到并发。[第 21 章](../Ch20_Final_Project_Building_a_Multithreaded_Web_Server.md) 中的项目,将在比这里讨论的简单示例更现实的情况下,应用这些概念,并更直接地比较使用线程和任务解决问题的方法。
111+
112+
113+
无论咱们选择这些方法的哪种,Rust 都能为咱们提供编写安全、快速、并发代码所需的工具,无论是用于高吞吐量 web 服务器,还是某种嵌入式操作系统。
114+
115+
116+
接下来,我们将讨论在咱们的 Rust 程序变大时,问题建模和构建解决方案的一些惯用方法。此外,我们还将讨论 Rust 的惯用语,与咱们在面向对象编程中熟悉的惯用语之间的关系。
117+
118+
119+
(End)
120+
121+

0 commit comments

Comments
 (0)