Skip to content

Latest commit

 

History

History
1395 lines (999 loc) · 83.6 KB

File metadata and controls

1395 lines (999 loc) · 83.6 KB

十、并行执行

"What do we want? Now! When do we want it? Fewer race conditions!" – Anna Melzer

在本章中,我将从我将要介绍的概念和我将向您展示的代码片段的复杂性两方面对游戏进行一点介绍。如果你觉得不能胜任这项任务,或者当你通读时意识到它变得太难了,可以跳过它。当你准备好的时候,你可以随时回来。

我们的计划是绕开熟悉的单线程执行范式,深入研究所谓的并发执行。我只能触及这个复杂主题的表面,所以我不希望你在读完后成为并发性大师,但我会一如既往地尝试给你足够的信息,这样你就可以继续走这条路

我们将学习应用于编程领域的所有重要概念,我将尝试向您展示以不同风格编写的示例,让您对这些主题的基础有一个坚实的理解。要深入研究这个富有挑战性且有趣的编程分支,您必须参考 Python 文档(中的并发执行部分 https://docs.python.org/3.7/library/concurrency.html ),或许可以通过学习相关书籍来补充您的知识。

我们将特别探讨以下方面:

  • 线程和进程背后的理论
  • 编写多线程代码
  • 编写多处理代码
  • 使用执行器生成线程和进程
  • asyncio编程的一个简单示例

让我们先把理论弄清楚。

并发与并行

并发和并行常常被误认为是同一件事,但它们之间有区别。并发是同时运行多个事物的能力,不一定是并行运行。并行性是指同时做许多事情的能力。

想象你带着你的另一半去剧院。有两条线:即贵宾票和普通票。只有一名工作人员在检查车票,因此,为了避免堵塞两个队列中的任何一个,他们先从 VIP 线路检查一张车票,然后从普通线路检查一张车票。随着时间的推移,两个队列都会被处理。这是并发的一个例子。

现在假设另一个工作人员加入,那么现在每个队列有一个工作人员。这样,两个队列都将由各自的工作人员处理。这是并行性的一个例子。

现代笔记本电脑处理器具有多个内核(通常为两到四个)。是属于处理器的独立处理单元。拥有多个内核意味着所讨论的 CPU 具有实际并行执行任务的物理能力。在每个核心中,通常有一个不断交替的工作流,即并发执行。

请记住,我在这里故意保持一般性的讨论。根据您使用的系统,执行方式会有所不同,因此我将重点介绍所有系统或至少大多数系统通用的概念。

线程和进程–概述

线程可以定义为调度程序可以运行的一系列指令,调度程序是操作系统的一部分,它决定哪些工作块将接收到要执行的必要资源。通常,线程存在于进程中。进程可以定义为正在执行的计算机程序的实例。

在前面的章节中,我们使用类似于$ python my_script.py的命令运行我们自己的模块和脚本。当运行这样的命令时,将创建一个 Python 进程。在它里面,产生了一个主执行线程。脚本中的指令将在该线程中运行。

但这只是一种工作方式,Python 实际上可以在同一进程中使用多个线程,甚至可以生成多个进程。毫不奇怪,计算机科学的这些分支被称为多线程多处理

为了理解差异,让我们花一点时间深入研究线程和进程。

线的快速解剖

一般来说,有两种不同类型的螺纹:

  • 用户级线程:我们可以创建和管理以执行任务的线程
  • 内核级线程:以内核模式运行并代表操作系统运行的低级线程

鉴于 Python 在用户级别工作,我们现在不打算深入研究内核线程。相反,我们将在本章的示例中探讨几个用户级线程的示例。

线程可以处于以下任何状态:

  • 新线程:尚未启动且未分配任何资源的线程。
  • 可运行:线程正在等待运行。它拥有运行所需的所有资源,一旦调度程序给它开绿灯,它就会运行。
  • 正在运行:其指令流正在执行的线程。从这个状态,它可以返回到非运行状态,或者死亡。
  • 未运行:已暂停的线程。这可能是由于另一个线程优先于它,或者仅仅是因为该线程正在等待长时间运行的 IO 操作完成。
  • 已死亡:线程已死亡,因为它已到达其执行流的自然终点,或者它已被杀死。

状态之间的转换由我们的操作或调度程序触发。不过,有一件事需要记住;最好不要干扰线的死亡。

杀线

终止线程被认为不是好的实践。Python 不提供通过调用方法或函数杀死线程的功能,这应该是一个提示,说明杀死线程不是您想要做的事情。

一个原因是,一个线程可能有从线程本身中派生的子线程,当其父线程死亡时,子线程将成为孤立线程。另一个原因可能是,如果您要杀死的线程持有需要正确关闭的资源,您可能会阻止这种情况发生,这可能会导致问题。

稍后,我们将看到如何解决这些问题的示例。

上下文转换

我们已经说过,调度程序可以决定线程何时可以运行,或者何时暂停,等等。每当一个正在运行的线程需要挂起以便另一个线程可以运行时,调度程序都会以一种方式保存正在运行的线程的状态,以便以后可以在暂停的位置恢复执行。

这种行为称为上下文切换。人们也总是这样做。我们正在做一些文书工作,我们听到*bing!*在我们的电话上。我们停止文书工作,检查电话。当我们处理完可能是第无数张有趣的猫的照片后,我们回到我们的文件工作。然而,我们并不是从一开始就开始文书工作;我们只是简单地继续我们中断的地方。

上下文切换是现代计算机的一项神奇功能,但如果生成太多线程,它可能会变得很麻烦。然后,调度程序将尝试给每个线程一个运行一段时间的机会,并且将花费大量时间来保存和恢复分别暂停和重新启动的线程的状态。

为了避免这个问题,限制在任何给定时间点可以运行的线程数量(同样的考虑也适用于进程)是很常见的。这是通过使用称为池的结构来实现的,池的大小可以由程序员决定。简而言之,我们创建一个池,然后将任务分配给它的线程。当池中的所有线程都忙时,程序将无法生成新线程,直到其中一个线程终止(并返回池)。池在节约资源方面也很好,因为它们为线程生态系统提供了回收功能。

在编写多线程代码时,了解软件将要运行的机器的相关信息非常有用。这些信息,再加上一些评测(我们将在第 11 章调试和故障排除g 中了解),应该能够使我们正确校准池的大小。

全局解释器锁

2015 年 7 月,我参加了在毕尔巴鄂举行的 EuroPython 会议,在会上我谈到了测试驱动开发。不幸的是,摄像机操作员丢失了它的前半部分,但从那以后,我又做了几次这样的演讲,所以你可以在网上找到它的完整版本。在会议上,我很高兴与 Guido van Rossum 会面并与他交谈,我还出席了他的主题演讲。

他谈到的话题之一是臭名昭著的全局解释器锁吉尔)。GIL 是一个互斥锁,用于保护对 Python 对象的访问,防止多个线程同时执行 Python 字节码。这意味着,即使可以用 Python 编写多线程代码,在任何时间点(当然,每个进程)也只有一个线程在运行。

In computer programming, a mutual exclusion object (mutex) is a program object that allows multiple program threads to share the same resource, such as file access, but not simultaneously.

这通常被视为该语言的一个不受欢迎的限制,许多开发人员以诅咒这个大坏蛋为荣。然而,正如雷蒙德·赫廷格(Raymond Hettinger)在其 2017 年 PyBay 会议上关于并发性的主旨演讲(中完美地解释的那样,真相存在于其他地方 https://bit.ly/2KcijOB )。大约 10 分钟后,Raymond 解释说,从 Python 中删除 GIL 实际上非常简单。大约需要一天的工作。然而,你为这个GIL所付出的代价是,你必须在代码中需要锁的地方自己应用锁。这会导致更昂贵的占用空间,因为大量的单个锁需要花费更多的时间来获取和发布,而且最重要的是,它会带来 bug 的风险,因为编写健壮的多线程代码并不容易,最终可能需要编写几十个或数百个锁。

为了理解锁是什么,以及为什么要使用它,我们首先需要讨论多线程编程的危险之一:竞争条件。

竞争条件和死锁

在编写多线程代码时,您需要意识到当您的代码不再线性执行时所带来的危险。我的意思是,多线程代码面临被调度程序在任何时间点暂停的风险,因为它决定给另一个指令流一些 CPU 时间。

这种行为会使您面临不同类型的风险,其中最著名的两种是竞争条件和死锁。让我们简单地谈谈。

比赛条件

竞态条件是一种系统行为,其中程序的输出取决于其他不可控事件的顺序或时间。当这些事件没有按照程序员预期的顺序展开时,竞争条件就变成了 bug。

用一个例子来解释这一点要容易得多。

假设您有两个线程在运行。两者都在执行相同的任务,包括从一个位置读取一个值,使用该值执行一个操作,将该值增加1单位,然后将其保存回去。假设操作是将该值发布到 API。

场景 A–比赛条件未发生

线程A读取值(1,将1发布到 API,然后将其递增到2,并将其保存回去。紧接着,调度程序暂停线程A,并运行线程B。线程B读取该值(现在为2,将2发布到 API 中,并将其递增到3并保存回去。

此时,在操作发生两次后,存储的值是正确的:1+2=3。此外,API 已使用12正确调用。

场景 B–竞争条件发生

线程A读取值(1,将其发布到 API,并将其增量为2,但在它可以将其保存回来之前,调度程序决定暂停线程A,以支持线程B

线程B读取该值(仍然是1),将其发布到 API,将其增量为2,并将其保存回原处。然后,调度程序再次切换到线程A。线程A只需保存递增后保存的值即可恢复其工作流,即2

在此场景之后,即使操作发生了场景A中的两次,保存的值仍然是2,并且 API 已经被1调用了两次。

在现实生活中,当多个线程和实际代码执行多个操作时,程序的整体行为会爆发出无数的可能性。稍后我们将看到一个例子,我们将使用锁修复它。

竞争条件的主要问题是它们使我们的代码不确定,这是不好的。在计算机科学的某些领域中,非决定论被用来实现某些事情,这很好,但一般来说,您希望能够预测代码的行为,而竞争条件使其无法做到这一点。

救援之锁

在处理比赛情况时,锁起到了救援作用。例如,为了修复前面的示例,您只需要在过程周围设置一个锁。锁就像守护者,只允许一个线程控制它(我们说获取锁),在该线程释放锁之前,其他线程无法获取它。他们将不得不坐着等待,直到锁再次可用。

场景 C–使用锁

线程A获取锁,读取值(1,发布到 API,增加到2,调度器将其挂起。线程B被赋予了一些 CPU 时间,因此它尝试获取锁。但是线程A尚未释放锁,因此线程B坐着等待。调度程序可能会注意到这一点,并迅速决定切换回线程A

线程A保存 2,并释放锁,使其可用于所有其他线程。

此时,是否通过线程A或线程B再次获取锁(因为调度程序可能已决定再次切换)并不重要。该过程将始终正确执行,因为锁确保当线程读取值时,它必须先完成该过程(ping API、increment 和 save),然后其他线程才能读取该值。

标准库中有许多不同的锁可用。我绝对鼓励您仔细阅读它们,以了解编写多线程代码时可能遇到的所有危险,以及如何解决这些问题。

现在我们来谈谈死锁。

僵局

死锁是一种状态,其中组中的每个成员都在等待其他成员采取行动,例如发送消息,或者更常见的是释放锁或资源。

一个简单的例子将帮助您了解情况。想象两个小孩在一起玩。找一个由两部分组成的玩具,给每个部分一个。当然,他们两个都不想给对方他们的部分,他们会希望对方释放他们拥有的部分。所以他们两个都不能玩玩具,因为他们每个人都拿着一半,并且会无限期地等待另一个孩子释放另一半。

Don't worry, no kids were harmed during the making of this example. It all happened in my mind.

另一个例子是让两个线程再次执行相同的过程。该过程需要获取两个资源AB,这两个资源都由单独的锁保护。线程1获取A,线程2获取B,然后无限期等待,直到另一个释放其拥有的资源。但这不会发生,因为他们都被指示等待并获取第二个资源以完成该过程。线程可能比孩子们更顽固。

你可以用几种方法解决这个问题。最简单的方法可能是简单地对资源获取应用一个命令,这意味着获得A的线程也将获得其余的所有线程:BC等等。

另一种方法是在整个资源获取过程中设置一个锁,这样即使它发生的顺序不正确,它仍然在锁的上下文中,这意味着一次只有一个线程可以实际收集所有资源。

现在让我们暂停一下关于线程的讨论,并探讨流程。

快速解剖一个过程

进程通常比线程更复杂。通常,它们包含一个主线程,但如果您选择,也可以是多线程的。它们能够产生多个子线程,每个子线程都包含自己的寄存器集和堆栈。每个进程提供计算机执行程序所需的所有资源。

与使用多线程类似,我们可以设计代码以利用多处理设计。多个进程可能在多个核上运行,因此通过多处理,您可以真正实现计算的并行化。不过,它们的内存占用比线程的内存占用略重,使用多个进程的另一个缺点是进程间通信IPC)往往比线程间通信更昂贵。

进程的属性

UNIX 进程由操作系统创建。它通常包含以下内容:

  • 进程 ID、进程组 ID、用户 ID 或组 ID
  • 环境和工作目录
  • 程序指令
  • 寄存器、堆栈和堆
  • 文件描述符
  • 信号动作
  • 共享库
  • 进程间通信工具(管道、消息队列、信号量或共享内存)

如果您对流程感兴趣,请打开一个 shell 并键入$ top。此命令显示并更新有关系统中运行的进程的排序信息。当我在我的机器上运行它时,第一行告诉我:

$ top
Processes: 477 total, 4 running, 473 sleeping, 2234 threads
...

这让你知道我们的计算机在没有意识到的情况下做了多少工作。

多线程还是多处理?

考虑到所有这些信息,决定哪种方法是最好的方法意味着了解需要执行的工作类型,并了解将致力于执行该工作的系统。

这两种方法都有优点,所以让我们试着澄清主要的区别。

以下是使用多线程的一些优点:

  • 线程都出生在同一进程中。他们共享资源,可以很容易地相互交流。过程之间的通信需要更复杂的结构和技术。
  • 生成线程的开销小于进程的开销。此外,它们的内存占用也更小。
  • 线程可以非常有效地阻止 IO 绑定的应用程序。例如,当一个线程在等待网络连接返回一些数据时被阻塞,工作可以轻松有效地切换到另一个线程。
  • 由于进程之间没有任何共享资源,我们需要使用 IPC 技术,它们需要的内存比线程之间的通信更多。

以下是使用多处理的一些优点:

  • 我们可以通过使用过程来避免 GIL 的局限性。
  • 失败的子进程不会杀死主应用程序。
  • 线程会遇到争用条件和死锁等问题;在使用过程时,必须处理它们的可能性大大降低。
  • 当线程的数量超过某个阈值时,线程的上下文切换可能会变得非常昂贵。
  • 进程可以更好地利用多核处理器。
  • 进程在处理 CPU 密集型任务方面优于多线程。

在本章中,我将针对多个示例向您展示这两种方法,希望您能够很好地理解各种不同的技术。那我们就开始讲代码吧!

Python 中的并发执行

让我们首先通过一些简单的示例来探索 Python 多线程和多处理的基础知识。

Keep in mind that several of the following examples will produce an output that depends on a particular run. When dealing with threads, things can get non-deterministic, as I mentioned earlier. So, if you experience different results, it is absolutely fine. You will probably notice that some of your results will vary from run to run too.

开始一个线程

首先,让我们开始一个线程:

# start.py
import threading

def sum_and_product(a, b):
    s, p = a + b, a * b
    print(f'{a}+{b}={s}, {a}*{b}={p}')

t = threading.Thread(
    target=sum_and_product, name='SumProd', args=(3, 7)
)
t.start()

导入threading后,我们定义了一个函数:sum_and_product。此函数计算两个数字的和和与积,并打印结果。有趣的是在函数后面。我们从threading.Thread实例化t。这是我们的线。我们传递了将作为线程体运行的函数的名称,给了它一个名称,并传递了参数37,这两个参数将分别作为ab输入函数。

创建线程后,我们使用同名方法开始它。

此时,Python 将开始在新线程中执行该函数,当该操作完成时,整个程序也将完成,并退出。让我们运行它:

$ python start.py
3+7=10, 3*7=21 

因此,启动线程非常简单。让我们看一个更有趣的示例,其中显示了更多信息:

# start_with_info.py
import threading
from time import sleep

def sum_and_product(a, b):
    sleep(.2)
    print_current()
    s, p = a + b, a * b
    print(f'{a}+{b}={s}, {a}*{b}={p}')

def status(t):
    if t.is_alive():
        print(f'Thread {t.name} is alive.')
    else:
        print(f'Thread {t.name} has terminated.')

def print_current():
    print('The current thread is {}.'.format(
        threading.current_thread()
    ))
    print('Threads: {}'.format(list(threading.enumerate())))

print_current()
t = threading.Thread(
    target=sum_and_product, name='SumPro', args=(3, 7)
)
t.start()
status(t)
t.join()
status(t)

在本例中,线程逻辑与上一个完全相同,因此您无需费劲,可以专注于我添加的(疯狂的!)日志信息量。我们使用两个功能来显示信息:statusprint_current。第一个线程在输入中获取一个线程,并通过调用其is_alive方法显示其名称以及它是否处于活动状态。第二个线程打印当前线程,然后枚举进程中的所有线程。此信息来自threading.current_threadthreading.enumerate

我之所以将.2秒的睡眠时间放在函数中是有原因的。当线程启动时,它的第一条指令是睡眠片刻。鬼鬼祟祟的调度程序将捕捉到这一点,并将执行切换回主线程。您可以通过以下事实来验证这一点:在输出中,您将从线程中看到status(t)的结果先于print_current的结果。这意味着该调用在线程睡眠时发生。

最后,请注意,最后我打电话给t.join()。它指示 Python 阻塞,直到线程完成。这是因为我希望最后一次调用status(t)告诉我们线程已经结束了。让我们看看输出(为了可读性稍微重新排列):

$ python start_with_info.py
The current thread is
 <_MainThread(MainThread, started 140735733822336)>.
Threads: [<_MainThread(MainThread, started 140735733822336)>]
Thread SumProd is alive.
The current thread is <Thread(SumProd, started 123145375604736)>.
Threads: [
 <_MainThread(MainThread, started 140735733822336)>,
 <Thread(SumProd, started 123145375604736)>
]
3+7=10, 3*7=21
Thread SumProd has terminated.

如您所见,首先当前线程是主线程。枚举仅显示一个线程。然后我们创建并开始SumProd。我们打印它的状态,我们知道它是活的。然后,这次从SumProd中,我们再次显示关于当前线程的信息。当然,现在当前线程是SumProd,我们可以看到枚举所有线程都会返回这两个线程。打印结果后,我们通过最后一次调用status来验证线程是否已按预期终止。如果得到不同的结果(当然,除了线程的 ID 之外),请尝试增加睡眠时间,看看是否有任何变化。

启动进程

现在让我们看一个等效的示例,但我们将使用进程,而不是使用线程:

# start_proc.py
import multiprocessing

...

p = multiprocessing.Process(
    target=sum_and_product, name='SumProdProc', args=(7, 9)
)
p.start()

代码与第一个示例完全相同,但是我们没有使用Thread,而是实例化了multiprocessing.Processsum_and_product功能与之前相同。输出也相同,只是数字不同。

停止线程和进程

如前所述,一般来说,停止线程是个坏主意,进程也是如此。确保你已经小心地处理和关闭了所有打开的东西可能是相当困难的。但是,在某些情况下,您可能希望能够停止线程,因此让我向您演示如何执行此操作:

# stop.py
import threading
from time import sleep

class Fibo(threading.Thread):
    def __init__(self, *a, **kwa):
        super().__init__(*a, **kwa)
        self._running = True

    def stop(self):
        self._running = False

    def run(self):
        a, b = 0, 1
        while self._running:
            print(a, end=' ')
            a, b = b, a + b
            sleep(0.07)
        print()

fibo = Fibo()
fibo.start()
sleep(1)
fibo.stop()
fibo.join()
print('All done.')

对于本例,我们使用斐波那契生成器。我们以前见过,所以我不解释了。需要关注的重要一点是_running属性。首先,请注意该类继承自Thread。通过覆盖__init__方法,我们可以将_running标志设置为True。当您以这种方式编写线程时,您只需重写类中的run方法,而不是给它一个目标函数。我们的run方法计算出一个新的斐波那契数,然后睡眠约0.07秒。

在最后一段代码中,我们创建并启动类的一个实例。然后我们睡眠一秒钟,这应该给线程时间来产生大约 14 个斐波那契数。当我们调用fibo.stop()时,实际上并没有停止线程。我们只需将标志设置为False,这就允许run中的代码达到其自然终点。这意味着线程将有机地消亡。在控制台上打印All done.之前,我们会调用join以确保线程已经完成。让我们检查一下输出:

$ python stop.py
0 1 1 2 3 5 8 13 21 34 55 89 144 233
All done.

检查打印了多少个数字:14,如预测的那样。

这基本上是一种解决方法,允许您停止线程。如果您根据多线程范例正确地设计代码,那么您就不必一直杀死线程,所以让这种需要成为您的警钟,让它可以设计得更好。

停止进程

当涉及到停止一个过程时,情况就不同了,而且没有麻烦。您可以使用terminatekill方法,但请确保您知道自己在做什么,因为前面关于悬而未决的开放资源的所有考虑仍然是正确的。

产生多个线程

为了好玩,现在让我们玩两个线程:

# starwars.py
import threading
from time import sleep
from random import random

def run(n):
    t = threading.current_thread()
    for count in range(n):
        print(f'Hello from {t.name}! ({count})')
        sleep(0.2 * random())

obi = threading.Thread(target=run, name='Obi-Wan', args=(4, ))
ani = threading.Thread(target=run, name='Anakin', args=(3, ))
obi.start()
ani.start()
obi.join()
ani.join()

run函数只需打印当前线程,然后进入一个n循环,在该循环中,它打印一条问候语,并在00.2秒之间随机休眠一段时间(random()返回一个介于01之间的浮点数)。

本例的目的是向您展示调度程序如何在线程之间进行跳转,从而有助于使它们稍微休眠。让我们看看输出:

$ python starwars.py
Hello from Obi-Wan! (0)
Hello from Anakin! (0)
Hello from Obi-Wan! (1)
Hello from Obi-Wan! (2)
Hello from Anakin! (1)
Hello from Obi-Wan! (3)
Hello from Anakin! (2)

如您所见,输出在这两者之间随机交替。每次发生这种情况时,您都知道调度程序已经执行了上下文切换。

处理种族问题

现在我们有了启动和运行线程的工具,让我们模拟一个竞争条件,如我们前面讨论的:

# race.py
import threading
from time import sleep
from random import random

counter = 0
randsleep = lambda: sleep(0.1 * random())

def incr(n):
    global counter
    for count in range(n):
        current = counter
        randsleep()
        counter = current + 1
        randsleep()

n = 5
t1 = threading.Thread(target=incr, args=(n, ))
t2 = threading.Thread(target=incr, args=(n, ))
t1.start()
t2.start()
t1.join()
t2.join()
print(f'Counter: {counter}')

在本例中,我们定义了incr函数,该函数在输入中获取一个数字n,并在n上循环。在每个循环中,它读取计数器的值,通过调用randsleep(我为提高可读性而编写的一个很小的 Lambda 函数),随机休眠一段时间(介于00.1秒之间),然后将counter的值增加1

我选择使用global是为了对counter进行读/写访问,但它可以是任何东西,所以您可以自己尝试一下。

整个脚本基本上启动两个线程,每个线程运行相同的函数,并得到n = 5。请注意,我们需要在最后连接两个线程,以确保在打印计数器的最终值(最后一行)时,两个线程都完成了其工作。

当我们打印最终值时,我们希望计数器是 10,对吗?两条线,每条五个环,总共十个。然而,如果我们运行这个脚本,我们几乎永远不会得到 10。我自己跑了很多次,似乎总是在 5 到 7 点之间。发生这种情况的原因是代码中存在竞争条件,我添加的那些随机睡眠会加剧这种情况。如果删除它们,仍然会存在争用条件,因为计数器是以非原子方式增加的(这意味着一个操作可以在多个步骤中分解,因此在两个步骤之间暂停)。然而,这种比赛状态出现的可能性很低,所以增加随机睡眠会有所帮助。

让我们分析一下代码。t1获取计数器的当前值,比如3t1然后睡一会儿。如果调度器在该时刻切换上下文,暂停t1并启动t2,则t2将读取相同的值3。无论之后发生什么,我们都知道两个线程都会将计数器更新为4,这将是不正确的,因为在两次读取之后,计数器应该上升到5。在更新之后添加第二个随机休眠调用,有助于调度器更频繁地切换,并更容易显示竞争条件。试着对其中一个进行评论,看看结果如何变化(它会戏剧性地变化)。

现在我们已经确定了问题,让我们使用锁来修复它。代码基本相同,因此我将仅向您展示哪些更改:

# race_with_lock.py
incr_lock = threading.Lock()

def incr(n):
    global counter
    for count in range(n):
        with incr_lock:
            current = counter
            randsleep()
            counter = current + 1
            randsleep()

这次我们从threading.Lock类创建了一个锁。我们可以手动调用它的acquirerelease方法,或者我们可以是 Pythonic 的,在上下文管理器中使用它,这看起来更好,为我们完成整个获取/发布业务。注意,我在代码中留下了随机睡眠。但是,每次运行它时,它都会返回10

区别在于:当第一个线程获得该锁时,调度程序切换上下文并不重要。第二个线程将尝试获取锁,Python 将以响亮的回答。因此,第二个线程将只是坐着等待,直到锁被释放。一旦调度程序切换回第一个线程,并且锁被释放,那么另一个线程将有机会(如果它首先到达那里,这不一定保证),获取锁并更新计数器。尝试将一些打印添加到该逻辑中,以查看线程是否完全交替。我猜他们不会,至少不是每次都会。记住threading.current_thread函数,以便能够看到哪个线程正在实际打印信息。

Python 在threading模块中提供了几种数据结构:Lock、RLock、Condition、Semaphore、Event、Timer 和 Barrier。我无法向您展示所有的用例,因为遗憾的是我没有空间解释所有用例,只能阅读threading模块(的文档 https://docs.python.org/3.7/library/threading.html 将是开始了解他们的好地方。

现在我们来看一个关于线程本地数据的示例。

线程的本地数据

threading模块提供了一种为线程实现本地数据的方法。本地数据是保存线程特定数据的对象。让我给你们看一个例子,并允许我也偷偷加入一个Barrier,这样我就可以告诉你们它是如何工作的:

# local.py
import threading
from random import randint

local = threading.local()

def run(local, barrier):
    local.my_value = randint(0, 10**2)
    t = threading.current_thread()
    print(f'Thread {t.name} has value {local.my_value}')
    barrier.wait()
    print(f'Thread {t.name} still has value {local.my_value}')

count = 3
barrier = threading.Barrier(count)
threads = [
    threading.Thread(
        target=run, name=f'T{name}', args=(local, barrier)
    ) for name in range(count)
]
for t in threads:
    t.start()

我们从定义local开始。这是保存线程特定数据的特殊对象。我们运行三个线程。他们每个人都会给local.my_value分配一个随机值,并打印出来。然后线程到达一个Barrier对象,该对象被编程为总共容纳三条线程。当障碍物被第三根线击中时,它们都可以通过。这基本上是一种很好的方法,可以确保N数量的线程达到某一点,并且它们都会等待,直到每个线程都到达。

现在,如果local是一个正常的虚拟对象,那么第二个线程将重写local.my_value的值,第三个线程也会这样做。这意味着我们将看到他们在第一组打印中打印不同的值,但在第二轮打印中显示相同的值(最后一个)。但这并没有发生,多亏了local。输出显示以下内容:

$ python local.py
Thread T0 has value 61
Thread T1 has value 52
Thread T2 has value 38
Thread T2 still has value 38
Thread T0 still has value 61
Thread T1 still has value 52

请注意,由于调度程序切换上下文,顺序错误,但值都是正确的。

线程与进程通信

到目前为止,我们已经看到了很多例子。因此,让我们探讨如何通过使用队列使线程和进程相互通信。让我们从线程开始。

线程通信

在本例中,我们将使用来自queue模块的正常Queue

# comm_queue.py
import threading
from queue import Queue

SENTINEL = object()

def producer(q, n):
    a, b = 0, 1
    while a <= n:
        q.put(a)
        a, b = b, a + b
    q.put(SENTINEL)

def consumer(q):
    while True:
        num = q.get()
        q.task_done()
        if num is SENTINEL:
            break
        print(f'Got number {num}')

q = Queue()
cns = threading.Thread(target=consumer, args=(q, ))
prd = threading.Thread(target=producer, args=(q, 35))
cns.start()
prd.start()
q.join()

逻辑是非常基本的。我们有一个producer函数,它生成斐波那契数并将它们放入队列。当下一个数字大于给定的n时,制作者退出while循环,并将最后一个东西放入队列:aSENTINELSENTINEL是用来表示某事的任何对象,在我们的例子中,它向消费者表示生产者已经完成了。

有趣的逻辑是在consumer函数中。它无限循环,从队列中读取值并打印出来。这里有几件事需要注意。首先,看看我们如何称呼q.task_done()?即确认队列中的元素已被处理。这样做的目的是允许代码中的最后一条指令q.join()在确认所有元素后解除阻塞,从而结束执行。

其次,请注意我们如何使用is操作符与项目进行比较,以找到哨兵。我们很快就会看到,当使用multiprocessing.Queue时,这将不再可能。在我们到达那里之前,你能猜出原因吗?

运行此示例会产生一系列行,例如Got number 0Got number 1等等,直到34,因为我们设置的限制是35,下一个斐波那契数将是55

发送事件

使线程通信的另一种方法是触发事件。让我快速向您展示一个例子:

# evt.py
import threading

def fire():
    print('Firing event...')
    event.set()

def listen():
    event.wait()
    print('Event has been fired')

event = threading.Event()
t1 = threading.Thread(target=fire)
t2 = threading.Thread(target=listen)
t2.start()
t1.start()

这里我们有两个线程运行firelisten,分别触发和监听事件。要触发事件,请对其调用set方法。首先启动的t2线程已经在侦听该事件,并且将一直坐在那里直到触发该事件。上一示例的输出如下所示:

$ python evt.py
Firing event...
Event has been fired

在某些情况下,事件是伟大的。考虑让等待连接对象的线程准备就绪,然后才能真正开始使用它。他们可能正在等待一个事件,一个线程可能正在检查该连接,并在事件准备就绪时触发该事件。事件很有趣,所以请确保您进行了实验并考虑了它们的用例。

与队列的进程间通信

现在让我们看看如何使用队列在进程之间进行通信。此示例与线程的示例非常相似:

# comm_queue_proc.py
import multiprocessing

SENTINEL = 'STOP'

def producer(q, n):
    a, b = 0, 1
    while a <= n:
        q.put(a)
        a, b = b, a + b
    q.put(SENTINEL)

def consumer(q):
    while True:
        num = q.get()
        if num == SENTINEL:
            break
        print(f'Got number {num}')

q = multiprocessing.Queue()
cns = multiprocessing.Process(target=consumer, args=(q, ))
prd = multiprocessing.Process(target=producer, args=(q, 35))
cns.start()
prd.start()

如您所见,在本例中,我们必须使用一个队列,该队列是multiprocessing.Queue的实例,它不公开task_done方法。但是,由于这个队列的设计方式,它会自动加入主线程,因此我们只需要启动这两个进程,所有进程都会工作。本例的输出与前面的相同。

当涉及到 IPC 时,要小心。对象进入队列时会被 pickle,因此 ID 会丢失,还有一些其他微妙的事情需要处理。这就是为什么在这个例子中,我不能再像在多线程版本中那样使用对象作为哨兵,并使用is进行比较。该 sentinel 对象将在队列中被 pickle(因为这次Queue来自multiprocessing,而不是像以前一样来自queue),并且在取消 pickle 后将采用新 ID,无法正确比较。本例中的字符串"STOP"起到了作用,您需要为 sentinel 找到一个合适的值,该值必须永远不会与同一队列中的任何项目发生冲突。我让您参考文档,并尽可能多地了解这个主题。

队列不是进程之间通信的唯一方式。您还可以使用管道(multiprocessing.Pipe,它提供从一个流程到另一个流程的连接(如管道),反之亦然。您可以在文档中找到大量示例;它们和我们在这里看到的没什么不同。

线程和进程池

如前所述,池是设计用来容纳N对象(线程、进程等)的结构。当使用量达到容量时,在当前正在工作的线程(或进程)中的一个线程(或进程)重新可用之前,不会将任何工作分配给该线程(或进程)。因此,池是限制可同时处于活动状态的线程(或进程)数量的一种很好的方法,可以防止系统因资源耗尽而陷入饥饿,或者防止计算时间受到过多上下文切换的影响。

在下面的示例中,我将进入concurrent.futures模块以使用ThreadPoolExecutorProcessPoolExecutor执行器。这两个类使用线程池(分别是进程池)异步执行调用。它们都接受一个参数max_workers,该参数设置了执行器可以同时使用多少线程(或进程)的上限。

让我们从多线程示例开始:

# pool.py
from concurrent.futures import ThreadPoolExecutor, as_completed
from random import randint
import threading

def run(name):
    value = randint(0, 10**2)
    tname = threading.current_thread().name
    print(f'Hi, I am {name} ({tname}) and my value is {value}')
    return (name, value)

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [
        executor.submit(run, f'T{name}') for name in range(5)
    ]
    for future in as_completed(futures):
        name, value = future.result()
        print(f'Thread {name} returned {value}')

在导入必要的位之后,我们定义了run函数。它获取一个随机值,打印并返回它,以及调用它时使用的name参数。有趣的一点就在函数后面。

如您所见,我们正在使用上下文管理器调用ThreadPoolExecutor,并将max_workers=3传递给它,这意味着池大小为3。这意味着任何时候只有三个线程处于活动状态。

我们通过列表理解来定义未来对象的列表,在列表理解中我们调用submit我们的 executor 对象。我们指示执行器运行run函数,名称从T0T4future是一个对象,它封装了可调用函数的异步执行。

然后我们在future对象上循环,就像它们完成一样。为此,我们使用as_completed获取future实例的迭代器,该迭代器在实例完成(完成或取消)后立即返回它们。我们通过调用同名方法获取每个future的结果,然后简单地打印出来。考虑到run返回一个元组namevalue,我们希望结果是包含namevalue的两个元组。如果我们打印一个run的输出(请记住每个run可能略有不同),我们会得到:

$ python pool.py
Hi, I am T0 (ThreadPoolExecutor-0_0) and my value is 5
Hi, I am T1 (ThreadPoolExecutor-0_0) and my value is 23
Hi, I am T2 (ThreadPoolExecutor-0_1) and my value is 58
Thread T1 returned 23
Thread T0 returned 5
Hi, I am T3 (ThreadPoolExecutor-0_0) and my value is 93
Hi, I am T4 (ThreadPoolExecutor-0_1) and my value is 62
Thread T2 returned 58
Thread T3 returned 93
Thread T4 returned 62

在继续阅读之前,您能告诉我为什么输出是这样的吗?你能解释一下发生了什么事吗?花点时间想一想。

所以,接下来是三个线程开始运行,我们得到三条Hi, I am...消息打印出来。一旦这三个线程都运行了,池的容量就达到了,所以我们需要等待至少一个线程完成,然后才能发生其他任何事情。在示例运行中,T0T2已完成(通过打印它们返回的内容来表示),因此它们返回到池中,可以再次使用。他们的名字是T3T4,最后三个名字T1T3T4都完成了。您可以从输出中看到线程实际上是如何被重用的,以及前两个线程完成后如何被重新分配到T3T4

现在让我们看一个相同的例子,但采用多进程设计:

# pool_proc.py
from concurrent.futures import ProcessPoolExecutor, as_completed
from random import randint
from time import sleep

def run(name):
    sleep(.05)
    value = randint(0, 10**2)
    print(f'Hi, I am {name} and my value is {value}')
    return (name, value)

with ProcessPoolExecutor(max_workers=3) as executor:
    futures = [
        executor.submit(run, f'P{name}') for name in range(5)
    ]
    for future in as_completed(futures):
        name, value = future.result()
        print(f'Process {name} returned {value}')

差别确实很小。这次我们使用了ProcessPoolExecutor,而run的功能完全相同,只是增加了一点:我们在每个run开始时睡眠 50 毫秒。这是为了加剧行为,让输出清楚地显示池的大小,仍然是三个。如果我们运行该示例,我们会得到:

$ python pool_proc.py
Hi, I am P0 and my value is 19
Hi, I am P1 and my value is 97
Hi, I am P2 and my value is 74
Process P0 returned 19
Process P1 returned 97
Process P2 returned 74
Hi, I am P3 and my value is 80
Hi, I am P4 and my value is 68
Process P3 returned 80
Process P4 returned 68

此输出清楚地显示池大小为三。非常有趣的是,如果我们删除对sleep的调用,大多数情况下输出将有五个Hi, I am...打印,然后是五个Process Px returned...打印。我们怎么解释呢?很简单。当前三个过程完成并由as_completed返回时,所有三个过程都会被询问结果,返回的内容都会被打印出来。当这种情况发生时,执行者已经可以开始循环两个进程来运行最后两个任务,并且在允许进行for循环中的打印之前,它们碰巧打印了Hi, I am...消息。

这基本上意味着ProcessPoolExecutor非常快速且具有攻击性(就获得调度程序的注意而言),值得注意的是,这种行为不会发生在线程对应物上,如果您还记得的话,我们不需要使用任何人工睡眠。

但要记住的重要一点是,要能够理解,即使是像这样简单的例子,也可能会有点难以理解或解释。这对您来说是一个教训,这样当您为多线程或多进程设计编写代码时,您就可以将注意力提高到 110%。

现在让我们来看一个更有趣的例子。

使用进程向函数添加超时

大多数(如果不是全部的话)公开函数以发出 HTTP 请求的库都提供了在执行请求时指定超时的功能。这意味着,如果在X秒(X为超时)之后,请求尚未完成,则整个操作将中止,并从下一条指令开始恢复执行。但并非所有函数都公开此功能,因此,当函数不提供中断功能时,我们可以使用进程来模拟该行为。在本例中,我们将尝试将主机名转换为 IPv4 地址。来自socket模块的gethostbyname函数不允许我们在操作上设置超时,因此我们使用一个进程来人为地设置超时。下面的代码可能不那么简单,因此我鼓励您在阅读解释之前花一些时间仔细阅读:

# hostres/util.py
import socket
from multiprocessing import Process, Queue

def resolve(hostname, timeout=5):
    exitcode, ip = resolve_host(hostname, timeout)
    if exitcode == 0:
        return ip
    else:
        return hostname

def resolve_host(hostname, timeout):
    queue = Queue()
    proc = Process(target=gethostbyname, args=(hostname, queue))
    proc.start()
    proc.join(timeout=timeout)

    if queue.empty():
        proc.terminate()
        ip = None
    else:
        ip = queue.get()
    return proc.exitcode, ip

def gethostbyname(hostname, queue):
    ip = socket.gethostbyname(hostname)
    queue.put(ip)

让我们从resolve开始。它只需要一个hostname和一个timeout,然后用它们调用resolve_host。如果退出代码为0(这意味着进程正确终止),它将返回对应于该主机的 IPv4。否则,它返回hostname本身,作为一种回退机制。

接下来,我们来谈谈gethostbyname。需要一个hostname和一个queue,并调用socket.gethostbyname来解析hostname。当结果可用时,将其放入queue。现在,这就是问题所在。如果对socket.gethostbyname的调用花费的时间超过了我们想要分配的超时时间,我们需要终止它。

resolve_host函数正是这样做的。它接收hostnametimeout,首先,它只是创建一个queue。然后它产生一个新的进程,将gethostbyname作为target,并传递适当的参数。然后,该过程开始并加入,但带有一个timeout

现在,成功的场景是这样的:对socket.gethostbyname的调用很快成功,IP 在队列中,进程在超时时间之前终止,当我们到达if部分时,队列将不会为空。我们从它获取 IP,并返回它,以及进程退出代码。

在不成功的场景中,对socket.gethostbyname的调用花费的时间太长,进程在其timeout过期后被终止。由于呼叫失败,queue中没有插入 IP,因此将为空。在if逻辑中,我们因此将 IP 设置为None,并像以前一样返回。resolve函数将发现退出代码不是0(因为进程没有愉快地终止,而是被终止了),并将正确返回主机名而不是 IP,我们无论如何都无法获取。

在本书的源代码中,在本章的hostres文件夹中,我添加了一些测试,以确保这种行为实际上是正确的。您可以在文件夹的README.md文件中找到如何运行它们的说明。确保您也检查了测试代码,这应该是非常有趣的。

案例

在本章的最后一部分,我将向您展示三个案例示例,其中我们将看到如何通过使用不同的方法(单线程、多线程和多进程)来完成相同的事情。最后,我将用几句话介绍asyncio,这是一个模块,它介绍了用 Python 进行异步编程的另一种方法。

示例 1–并发合并排序

第一个示例将围绕 mergesort 算法展开。该排序算法基于分而治之(分而治之)设计范式。它的工作方式非常简单。您有一个要排序的数字列表。第一步是将列表分成两部分,对它们进行排序,然后将结果合并回一个已排序的列表。让我给你一个简单的例子,有六个数字。想象一下我们有一张清单,v=[8, 5, 3, 9, 0, 2]。第一步是将列表v分为两个子列表,其中包含三个数字:v1=[8, 5, 3]v2=[9, 0, 2]。然后我们通过递归调用 mergesort 对v1v2进行排序。结果将是v1=[3, 5, 8]v2=[0, 2, 9]。为了将 SoT T8T 和{Ty9T}组合成一个排序的 Ty-T10T,我们简单地考虑两个列表中的第一个项,并选择其中的最小项。第一次迭代将比较30。我们选择0,离开v2=[2, 9]。然后我们冲洗并重复:我们比较32,我们选择2,所以现在v2=[9]。然后比较39。这次我们选了3,留下v1=[5, 8]等等。接下来我们将选择559),然后选择889,最后选择9。这将为我们提供一个新的分类版本的vv=[0, 2, 3, 5, 8, 9]

我选择这个算法作为例子的原因有两个。首先,并行化很容易。将列表一分为二,让两个进程处理它们,然后收集结果。第二,可以修改算法,使其将初始列表拆分为任意N≥ 2,并将这些零件分配给N流程。重组就像只处理两部分一样简单。这个特性使它成为并发实现的一个很好的候选者。

单线程合并排序

让我们看看所有这些是如何转化为代码的,首先学习如何编写我们自己自制的mergesort

# ms/algo/mergesort.py
def sort(v):
    if len(v) <= 1:
        return v
    mid = len(v) // 2
    v1, v2 = sort(v[:mid]), sort(v[mid:])
    return merge(v1, v2)

def merge(v1, v2):
    v = []
    h = k = 0
    len_v1, len_v2 = len(v1), len(v2)
    while h < len_v1 or k < len_v2:
        if k == len_v2 or (h < len_v1 and v1[h] < v2[k]):
            v.append(v1[h])
            h += 1
        else:
            v.append(v2[k])
            k += 1
    return v

让我们从sort函数开始。首先,我们遇到递归的基础,它表示如果列表有01元素,我们不需要对其进行排序,我们可以简单地按原样返回它。如果不是这样,那么我们计算中点(mid,并递归调用v[:mid]v[mid:]上的排序。我希望您现在已经非常熟悉切片语法,但为了防止您需要复习,第一个是v中的所有元素,直到mid索引(排除),第二个是从mid到结尾的所有元素。排序结果分别分配给v1v2。最后,我们调用merge,通过v1v2

merge的逻辑使用两个指针hk,跟踪我们已经比较过的v1v2中的哪些元素。如果我们发现最小值在v1中,我们将其附加到v中,并增加h。另一方面,如果最小值在v2中,我们将其附加到v中,但这次增加k。该过程在一个while循环中运行,该循环的条件与内部if相结合,确保我们不会因为索引超出范围而出现错误。这是一个相当标准的算法,你可以在网络上的许多不同版本中找到它。

为了确保这段代码是可靠的,我已经在ch10/ms文件夹中编写了一个测试套件。我鼓励你去看看。

现在我们已经有了构建块,让我们看看如何修改它,使其能够处理任意数量的零件。

单线程多部分合并排序

该算法的多部分版本的代码非常简单。我们可以重用merge函数,但我们必须重写sort函数:

# ms/algo/multi_mergesort.py
from functools import reduce
from .mergesort import merge

def sort(v, parts=2):
    assert parts > 1, 'Parts need to be at least 2.'
    if len(v) <= 1:
        return v

    chunk_len = max(1, len(v) // parts)
    chunks = (
        sort(v[k: k + chunk_len], parts=parts)
        for k in range(0, len(v), chunk_len)
    )
    return multi_merge(*chunks)

def multi_merge(*v):
    return reduce(merge, v)

当我们编码自己的阶乘函数时,我们在第 4 章中看到了reduce函数,代码的构建块。它在multi_merge中的工作方式是合并v中的前两个列表。然后将结果与第三个结果合并,然后将结果与第四个结果合并,依此类推。

请看sort的新版本。它需要v列表,以及我们想要将其拆分成的部分的数量。我们要做的第一件事是检查我们是否传递了一个正确的parts编号,该编号至少必须是两个。然后,像以前一样,我们有了递归的基础。最后,我们进入函数的主要逻辑,它只是我们在前面示例中看到的多部分版本。我们使用max函数计算每个chunk的长度,以防列表中的元素少于部分。然后我们编写一个生成器表达式,递归调用每个chunk上的sort。最后,我们通过调用multi_merge来合并所有结果。

我知道,在解释这段代码时,我并没有像平时那样详尽,恐怕这是故意的。mergesort 之后的示例要复杂得多,因此我鼓励您真正尝试尽可能彻底地理解前两个片段。

现在,让我们把这个例子带到下一步:多线程。

多线程合并排序

在本例中,我们再次修改了sort函数,以便在初始划分为块之后,每个部分生成一个线程。每个线程使用算法的单线程版本对其部分进行排序,然后在最后使用多重合并技术计算最终结果。翻译成 Python:

# ms/algo/mergesort_thread.py
from functools import reduce
from math import ceil
from concurrent.futures import ThreadPoolExecutor, as_completed
from .mergesort import sort as _sort, merge

def sort(v, workers=2):
    if len(v) == 0:
        return v
    dim = ceil(len(v) / workers)
    chunks = (v[k: k + dim] for k in range(0, len(v), dim))
    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = [
            executor.submit(_sort, chunk) for chunk in chunks
        ]
        return reduce(
            merge,
            (future.result() for future in as_completed(futures))
        )

我们从算法的单线程版本导入了所有必需的工具,包括执行器、ceiling函数以及sortmerge。请注意,我是如何在导入时将单线程sort的名称更改为_sort

在这个版本的sort中,我们首先检查v是否为空,如果不是,我们继续。我们使用ceil函数计算每个chunk的尺寸。它基本上就是我们在前面的代码片段中对max所做的,但我想向您展示另一种解决问题的方法。

当我们得到维度时,我们计算chunks并准备一个好的生成器表达式,将它们提供给执行器。剩下的很简单:我们定义了一个未来对象的列表,每个对象都是调用submit执行器的结果。每个未来的对象都在分配给它的chunk上运行单线程_sort算法。

最后,当它们由as_completed函数返回时,使用我们在前面的多部分示例中看到的相同技术合并结果。

多进程合并排序

要执行最后一步,我们只需要修改前面代码中的两行。如果你在介绍性的例子中注意了,你就会知道我指的是这两行中的哪一行。为了节省一些空间,我将给出代码的差异:

# ms/algo/mergesort_proc.py
...
from concurrent.futures import ProcessPoolExecutor, as_completed
...

def sort(v, workers=2):
    ...
    with ProcessPoolExecutor(max_workers=workers) as executor:
    ...

就这样!基本上,您所要做的就是使用ProcessPoolExecutor而不是ThreadPoolExecutor,并且您不是在生成线程,而是在生成进程。

你还记得我说过进程实际上可以在不同的内核上运行,而线程在同一个进程中运行,所以它们实际上不是并行运行的吗?这是一个很好的例子,向您展示了选择一种方法或另一种方法的结果。因为代码是 CPU 密集型的,并且没有 IO,所以拆分列表并让线程处理块并没有增加任何优势。另一方面,使用流程确实如此。我已经运行了一些性能测试(您自己运行ch10/ms/performance.py模块,您将看到您的机器如何运行),结果证明了我的期望:

$ python performance.py

Testing Sort
Size: 100000
Elapsed time: 0.492s
Size: 500000
Elapsed time: 2.739s

Testing Sort Thread
Size: 100000
Elapsed time: 0.482s
Size: 500000
Elapsed time: 2.818s

Testing Sort Proc
Size: 100000
Elapsed time: 0.313s
Size: 500000
Elapsed time: 1.586s

这两项测试分别在 100000 项和 500000 项的两个列表上运行。对于多线程和多处理版本,我使用了四个 worker。在查找模式时,使用不同的大小非常有用。如您所见,前两个版本(单线程和多线程)所用的时间基本相同,但多处理版本的时间减少了约 50%。这一比例略高于 50%,因为必须生成进程并处理它们是有代价的。不过,你肯定会明白,我的机器上有一个双核处理器。

这还告诉您,即使我在多处理版本中使用了四个 worker,我仍然只能按处理器拥有的内核数量成比例地进行并行化。因此,两个或两个以上的工人差别不大。

现在大家都准备好了,让我们继续下一个例子。

示例 2–批量数独求解器

在本例中,我们将探索数独解算器。我们不打算详细介绍它,因为重点不是理解如何解决数独问题,而是向您展示如何使用多重处理来解决一批数独难题。

本例中有趣的是,我们不再进行单线程版本和多线程版本之间的比较,而是跳过这一步,将单线程版本与两个不同的多进程版本进行比较。每个人将分配一个谜题,因此如果我们解决 1000 个谜题,我们将使用 1000 个工人(好的,我们将使用一个由N个工人组成的池,每个工人都不断循环使用)。另一个版本将把最初的一批谜题除以池的大小,并在一个过程中批量解决每个区块。这意味着,假设池大小为 4,将 1000 个拼图分成 250 个拼图的块,并将每个块分配给一个工作人员,总共四个。

The code I will present to you for the sudoku-solver (without the multiprocessing part), comes from a solution designed by Peter Norvig, which has been distributed under the MIT license. His solution is so efficient that, after trying to re-implement my own for a few days, and getting to the same result, I simply gave up and decided to go with his design. I did do a lot of refactoring though, because I wasn't happy with his choice of function and variable names, so I made those more book friendly, so to speak. You can find the original code, a link to the original page from which I got it, and the original MIT license, in the ch10/sudoku/norvig folder. If you follow the link, you'll find a very thorough explanation of the sudoku-solver by Norvig himself.

什么是数独?

第一件事。什么是数独游戏?数独是一种起源于日本的基于逻辑的数字布局拼图。其目的是用数字填充9x9网格,以便每行、每列和每框(3x3组成网格的子网格)包含19之间的所有数字。您从一个部分填充的网格开始,并使用逻辑考虑在一个数字之后添加数字。

从计算机科学的角度来看,数独可以被解释为一个符合精确封面范畴的问题。计算机编程艺术(以及许多其他精彩书籍)的作者唐纳德·克努特(Donald Knuth)设计了一种称为算法 X的算法,解决这类问题。算法 X 的一个漂亮而高效的实现称为舞蹈链接,它利用了循环双链表的力量,可以用来解决数独问题。这种方法的美妙之处在于,它所需要的只是数独结构和舞蹈链接算法之间的映射,而不必进行通常需要的任何逻辑推导,就可以以光速找到解决方案。

许多年前,当我的空闲时间大于零时,我用 C#编写了一个舞蹈链接数独解算器,我仍然将其存档在某个地方,这对于设计和编码来说是非常有趣的。我绝对鼓励你查阅文献并编写你自己的解算器,如果你能抽出时间,这是一个很好的练习。

在本例的解决方案中,我们将使用一种搜索算法,该算法与人工智能中称为约束传播的过程结合使用。这两种方法通常一起使用,使问题更容易解决。我们将在我们的示例中看到,它们足以让我们在几毫秒内解出一个困难的数独。

用 Python 实现数独解算器

现在,让我们来探索我的解算器重构实现。我将分步骤向您展示代码,因为它非常复杂(另外,在转到另一个模块之前,我不会在每个代码段的顶部重复源代码名):

# sudoku/algo/solver.py
import os
from itertools import zip_longest, chain
from time import time

def cross_product(v1, v2):
    return [w1 + w2 for w1 in v1 for w2 in v2]

def chunk(iterable, n, fillvalue=None):
    args = [iter(iterable)] * n
    return zip_longest(*args, fillvalue=fillvalue)

我们从一些导入开始,然后定义两个有用的函数:cross_productchunk。他们做的正是名字所暗示的。第一个返回两个 iterable 之间的叉积,而第二个返回来自iterable的块列表,每个块都有n元素,如果iterable的长度不是n的倍数,则最后一个块可能填充有给定的fillvalue。然后,我们继续定义几个结构,这些结构将由解算器使用:

digits = '123456789'
rows = 'ABCDEFGHI'
cols = digits
squares = cross_product(rows, cols)
all_units = (
    [cross_product(rows, c) for c in cols]
    + [cross_product(r, cols) for r in rows]
    + [cross_product(rs, cs)
        for rs in chunk(rows, 3) for cs in chunk(cols, 3)]
)
units = dict(
    (square, [unit for unit in all_units if square in unit])
    for square in squares
)
peers = dict(
    (square, set(chain(*units[square])) - set([square]))
    for square in squares
)

不必太详细,让我们将鼠标悬停在这些对象上。squares是网格中所有正方形的列表。正方形由字符串表示,如A3C7。行用字母编号,列用数字编号,因此A3将指示第一行和第三列中的正方形。

all_units是所有可能的行、列和块的列表。这些元素中的每一个都表示为属于行/列/块的正方形列表。units是一个更复杂的结构。这是一本有 81 个键的字典。每个键表示一个正方形,对应的值是一个列表,其中包含三个元素:行、列和块。当然,这些是正方形所属的行、列和块。

最后,peers是一个与units非常相似的字典,但每个键的值(仍然代表一个正方形)是一个包含该正方形所有对等点的集合。对等点定义为属于键中正方形所属的行、列和块的所有正方形。当试图解决难题时,这些结构将用于解决方案的计算。

在我们研究解析输入行的函数之前,让我给您一个输入拼图的示例:

1..3.......75...3..3.4.8.2...47....9.........689....4..5..178.4.....2.75.......1.

前九个字符代表第一行,然后第二行代表另九个字符,依此类推。空正方形由点表示:

def parse_puzzle(puzzle):
    assert set(puzzle) <= set('.0123456789')
    assert len(puzzle) == 81

    grid = dict((square, digits) for square in squares)
    for square, digit in zip(squares, puzzle):
        if digit in digits and not place(grid, square, digit):
            return False  # Incongruent puzzle
    return grid

def solve(puzzle):
    grid = parse_puzzle(puzzle)
    return search(grid)

这个简单的parse_puzzle函数用于解析输入拼图。我们在开始时做了一点理智检查,断言输入谜题必须缩小为一个集合,该集合是所有数字加上一个点的集合的子集。然后我们确定我们有81输入字符,最后我们定义grid,它最初只是一个包含81键的字典,每个键都是一个正方形,都有相同的值,这是一个包含所有可能数字的字符串。这是因为完全空网格中的正方形有可能变成 1 到 9 之间的任意数字。 循环绝对是最有趣的部分。我们解析输入拼图中的 81 个字符中的每一个,将它们与网格中相应的正方形耦合,并尝试*“放置”*它们。我把它用双引号引起来,因为我们马上就会看到,place函数的作用远不止简单地在给定的正方形中设置一个给定的数字。如果我们发现无法从输入拼图中放置数字,则表示输入无效,我们返回False。否则,我们就可以走了,我们还了grid

parse_puzzle用于solve函数,该函数只解析输入拼图,并在其上释放search。因此,以下内容是算法的核心:

def search(grid):
    if not grid:
        return False
    if all(len(grid[square]) == 1 for square in squares):
        return grid  # Solved
    values, square = min(
        (len(grid[square]), square) for square in squares
        if len(grid[square]) > 1
    )
    for digit in grid[square]:
        result = search(place(grid.copy(), square, digit))
        if result:
            return result

这个简单的函数首先检查网格是否实际为非空。然后尝试查看网格是否已求解。已求解的网格每平方将有一个值。如果不是这样,它将循环遍历每个正方形,并找到候选数量最少的正方形。如果一个正方形的字符串值只有一个数字,则表示该正方形中已放置了一个数字。但是如果值超过一位,那么这些都是可能的候选项,因此我们需要找到候选项数量最少的平方,并尝试它们。尝试使用23候选方比使用23589候选方要好得多。在第一种情况下,我们有 50%的机会得到正确的值,而在第二种情况下,我们只有 20%。因此,选择候选数量最少的正方形可以最大化我们在网格中放置好数字的机会。

一旦找到候选人,我们会按顺序进行尝试,如果其中任何一个成功,我们就解决了网格问题,然后返回。您可能已经注意到在搜索中使用了place函数。因此,让我们来探索它的代码:

def place(grid, square, digit):
    """Eliminate all the other values (except digit) from
    grid[square] and propagate.
    Return grid, or False if a contradiction is detected.
    """
    other_vals = grid[square].replace(digit, '')
    if all(eliminate(grid, square, val) for val in other_vals):
        return grid
    return False

此函数接受一个正在进行的工作网格,并尝试将给定的数字放置在给定的正方形中。正如我前面提到的,*“放置”*并不是那么简单。事实上,当我们放置一个数字时,我们必须在整个网格中传播该操作的结果。我们通过调用eliminate函数来实现这一点,该函数应用了数独游戏的两种策略:

  • 如果一个正方形只有一个可能的值,则从该正方形的对等点中删除该值
  • 如果一个单位只有一个值的位置,请将该值放置在该位置

让我简单地举一个这两点的例子。对于第一个,如果将数字 7 放在一个正方形中,则可以从属于该正方形所属的行、列和块的所有正方形的候选列表中删除 7。

对于第二点,假设您正在检查第四行,在所有属于它的正方形中,只有一个在候选行中有数字 7。这意味着数字 7 只能进入那个精确的正方形,所以你应该把它放在那里。

以下函数eliminate应用这两个规则。它的代码非常复杂,因此我没有逐行解释,而是添加了一些注释,留给您理解它的任务:

def eliminate(grid, square, digit):
    """Eliminate digit from grid[square]. Propagate when candidates
    are <= 2.
    Return grid, or False if a contradiction is detected.
    """
    if digit not in grid[square]:
        return grid  # already eliminated
    grid[square] = grid[square].replace(digit, '')

    ## (1) If a square is reduced to one value, eliminate value
    ## from peers.
    if len(grid[square]) == 0:
        return False  # nothing left to place here, wrong solution
    elif len(grid[square]) == 1:
        value = grid[square]
        if not all(
            eliminate(grid, peer, value) for peer in peers[square]
        ):
            return False

    ## (2) If a unit is reduced to only one place for a value,
    ## then put it there.
    for unit in units[square]:
        places = [sqr for sqr in unit if digit in grid[sqr]]
        if len(places) == 0:
            return False  # No place for this value
        elif len(places) == 1:
            # digit can only be in one place in unit,
            # assign it there
            if not place(grid, places[0], digit):
                return False
    return grid

模块中的其余函数对于本示例的其余部分并不重要,因此我将跳过它们。您可以自行运行此模块;它将首先对其数据结构执行一系列检查,然后解决我在sudoku/puzzles文件夹中放置的所有数独难题。但这不是我们感兴趣的,对吗?我们想看看如何使用多处理技术解决数独问题,让我们开始吧。

用多处理方法求解数独

在本模块中,我们将实现三个功能。第一个简单地解决了一批数独难题,没有涉及多处理。我们将使用结果进行基准测试。第二个和第三个将使用多处理,有批处理和没有批处理,因此我们可以理解其中的差异。让我们开始:

# sudoku/process_solver.py
import os
from functools import reduce
from operator import concat
from math import ceil
from time import time
from contextlib import contextmanager
from concurrent.futures import ProcessPoolExecutor, as_completed
from unittest import TestCase
from algo.solver import solve

@contextmanager
def timer():
    t = time()
    yield
    tot = time() - t
    print(f'Elapsed time: {tot:.3f}s')

在一长串导入之后,我们定义了一个上下文管理器,将其用作计时器设备。它引用当前时间(t,然后产生。屈服后,即执行托管上下文的主体。最后,在退出托管上下文时,我们计算tot,这是经过的总时间,并打印它。这是一个用装饰技术编写的简单而优雅的上下文管理器,非常有趣。现在让我们看看我前面提到的三个函数:

def batch_solve(puzzles):
    # Single thread batch solve.
    return [solve(puzzle) for puzzle in puzzles]

这是一个单线程的简单批处理解决方案,这将给我们一个时间进行比较。它只返回所有已求解网格的列表。没趣的现在,检查以下代码:

def parallel_single_solver(puzzles, workers=4):
    # Parallel solve - 1 process per each puzzle
    with ProcessPoolExecutor(max_workers=workers) as executor:
        futures = (
            executor.submit(solve, puzzle) for puzzle in puzzles
        )
        return [
            future.result() for future in as_completed(futures)
        ]

这个好多了。它使用ProcessPoolExecutor来使用workers池,每个池用于解决大约四分之一的谜题。这是因为我们正在为每个拼图生成一个未来对象。该逻辑与我们在本章中已经看到的任何多处理示例极其相似。让我们看看第三个函数:

def parallel_batch_solver(puzzles, workers=4):
    # Parallel batch solve - Puzzles are chunked into `workers`
    # chunks. A process for each chunk.
    assert len(puzzles) >= workers
    dim = ceil(len(puzzles) / workers)
    chunks = (
        puzzles[k: k + dim] for k in range(0, len(puzzles), dim)
    )
    with ProcessPoolExecutor(max_workers=workers) as executor:
        futures = (
            executor.submit(batch_solve, chunk) for chunk in chunks
        )
        results = (
            future.result() for future in as_completed(futures)
        )
        return reduce(concat, results)

最后一个函数略有不同。它没有为每个拼图生成一个future对象,而是将所有拼图列表拆分为workers块,然后为每个块创建一个future对象。这意味着如果workers是八个,我们将产生八个future对象。请注意,我们不是通过solveexecutor.submit,而是通过batch_solve,这就成功了。我之所以对最后两个函数进行如此不同的编码,是因为我很好奇,当我们从一个池中回收进程时,我们所产生的开销的严重性是不可忽略的。

现在我们已经定义了函数,让我们使用它们:

puzzles_file = os.path.join('puzzles', 'sudoku-topn234.txt')
with open(puzzles_file) as stream:
    puzzles = [puzzle.strip() for puzzle in stream]

# single thread solve
with timer():
    res_batch = batch_solve(puzzles)

# parallel solve, 1 process per puzzle
with timer():
    res_parallel_single = parallel_single_solver(puzzles)

# parallel batch solve, 1 batch per process
with timer():
    res_parallel_batch = parallel_batch_solver(puzzles)

# Quick way to verify that the results are the same, but
# possibly in a different order, as they depend on how the
# processes have been scheduled.
assert_items_equal = TestCase().assertCountEqual
assert_items_equal(res_batch, res_parallel_single)
assert_items_equal(res_batch, res_parallel_batch)
print('Done.')

在本次基准测试课程中,我们使用了一组 234 个非常难的数独难题。如您所见,我们只是在一个时间上下文中运行三个函数,batch_solveparallel_single_solverparallel_batch_solver。我们收集结果,并且,为了确保,我们验证所有的运行都产生了相同的结果。

当然,在第二次和第三次运行中,我们使用了多处理,因此我们不能保证结果中的顺序与单线程batch_solve的顺序相同。借助于 Python 标准库中命名最差的方法之一assertCountEqual,这个小问题得到了出色的解决。我们在TestCase类中找到它,我们可以实例化它来引用我们需要的方法。我们实际上并没有运行单元测试,但这是一个很酷的技巧,我想展示给大家看。让我们看看运行此模块的输出:

$ python process_solver.py
Elapsed time: 5.368s
Elapsed time: 2.856s
Elapsed time: 2.818s
Done. 

哇!这很有趣。首先,您可以再次看到我的机器有一个双核处理器,因为多处理运行所用的时间大约是单线程解算器所用时间的一半。然而,实际上更有趣的是,两个多处理函数所花费的时间基本上没有差异。多次运行有时以一种方式结束,有时以另一种方式结束。理解为什么需要深入理解参与游戏的所有组件,而不仅仅是过程,因此我们无法在此讨论。不过,可以很有把握地说,这两种方法在性能方面具有可比性。

在本书的源代码中,您可以在sudoku文件夹中找到测试,以及如何运行测试的说明。花点时间看看吧!

现在,让我们来看最后一个例子。

示例三–下载随机图片

这个例子编写起来很有趣。我们将从网站上下载随机图片。我将向您展示三个版本:串行版本、多处理版本,最后是使用asyncio编码的解决方案。在这些示例中,我们将使用一个名为的网站 http://lorempixel.com ,它为您提供了一个 API,您可以调用该 API 来获取随机图像。如果您发现网站已关闭或运行缓慢,您可以使用一个很好的替代方案:https://lorempizza.com/

对于一本意大利人写的书来说,这可能有点陈词滥调,但画面非常华丽。如果你想玩得开心,你可以在网上搜索另一种选择。无论你选择哪一个网站,请理智一点,尽量不要通过向它发出一百万次请求来打击它。这段代码的多处理和asyncio版本可能非常具有攻击性!

让我们从探索代码的单线程版本开始:

# aio/randompix_serial.py
import os
from secrets import token_hex
import requests

PICS_FOLDER = 'pics'
URL = 'http://lorempixel.com/640/480/'

def download(url):
    resp = requests.get(URL)
    return save_image(resp.content)

def save_image(content):
    filename = '{}.jpg'.format(token_hex(4))
    path = os.path.join(PICS_FOLDER, filename)
    with open(path, 'wb') as stream:
        stream.write(content)
    return filename

def batch_download(url, n):
    return [download(url) for _ in range(n)]

if __name__ == '__main__':
    saved = batch_download(URL, 10)
    print(saved)

到现在为止,这段代码对您来说应该很简单。我们定义了一个download函数,它向给定的URL发出请求,通过调用save_image保存结果,并将来自网站的响应正文提供给它。保存图像非常简单:我们创建一个带有token_hex的随机文件名,只是因为它很有趣,然后我们计算文件的完整路径,以二进制模式创建,并将响应的内容写入其中。我们返回filename以便能够在屏幕上打印它。最后,batch_download只需运行我们想要运行的n请求,并返回文件名。

您现在可以跳过if __name__ ...行,这将在第 12 章GUI 和脚本中解释,在这里并不重要。我们所做的就是用 URL 调用batch_download并让它下载10图像。如果你有一个编辑器,打开pics文件夹,你可以看到它在几秒钟内被填充(还要注意:脚本假设pics文件夹存在)。

让我们把事情变得有趣一点。让我们介绍多处理(代码非常相似,因此我不再重复):

# aio/randompix_proc.py
...
from concurrent.futures import ProcessPoolExecutor, as_completed
...

def batch_download(url, n, workers=4):
    with ProcessPoolExecutor(max_workers=workers) as executor:
        futures = (executor.submit(download, url) for _ in range(n))
        return [future.result() for future in as_completed(futures)]

...

现在您应该已经熟悉了这项技术。我们只需将作业提交给执行者,并在其可用时收集结果。因为这是 IO 绑定的代码,所以进程的工作速度非常快,并且在进程等待 API 响应时存在大量的上下文切换。如果您查看了pics文件夹,您会注意到它不再以线性方式填充,而是以批量方式填充。

现在让我们看看这个例子的asyncio版本。

使用 asyncio 下载随机图片

这段代码可能是整个章节中最具挑战性的一段,因此,如果此时此刻对您来说太多,请不要感到难过。我添加这个示例只是为了让人垂涎三尺,鼓励您深入了解 Python 异步编程的核心。另一件值得知道的事情是,可能还有其他几种方法来编写相同的逻辑,因此请记住,这只是可能的示例之一。

asyncio模块提供了使用协程编写单线程并发代码、通过套接字和其他资源复用 IO 访问、运行网络客户端和服务器以及其他相关原语的基础设施。它是在 3.4 版中添加到 Python 中的,有人声称它将成为未来编写 Python 代码的事实上的标准。我不知道这是否属实,但我知道这绝对值得一看:

# aio/randompix_corout.py
import os
from secrets import token_hex
import asyncio
import aiohttp

首先,我们不能再使用requests,因为它不适合asyncio。我们必须使用aiohttp,因此请确保您已经安装了它(在本书的要求中):

PICS_FOLDER = 'pics'
URL = 'http://lorempixel.com/640/480/'

async def download_image(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.read()

前面的代码看起来不太友好,但一旦您了解了它背后的概念,就不会那么糟糕了。我们定义了异步协同路由download_image,它以 URL 作为参数。

In case you don't know, a coroutine is a computer program component that generalizes subroutines for non-preemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations. A subroutine is a sequence of program instructions that performs a specific task, packaged as a unit.

download_image内部,我们使用ClientSession上下文管理器创建一个会话对象,然后使用另一个上下文管理器获得响应,这次是从session.get获得的。这些管理器被定义为异步的事实仅仅意味着它们能够在 enter 和 exit 方法中暂停执行。我们使用await关键字返回响应内容,这允许暂停。请注意,为每个请求创建会话并不是最优的,但我觉得出于本例的目的,我会尽量让代码简单明了,因此我将优化留给您,作为练习。

让我们继续下一个片段:

async def download(url, semaphore):
    async with semaphore:
        content = await download_image(url)
    filename = save_image(content)
    return filename

def save_image(content):
    filename = '{}.jpg'.format(token_hex(4))
    path = os.path.join(PICS_FOLDER, filename)
    with open(path, 'wb') as stream:
        stream.write(content)
    return filename

另一个协同程序download得到一个URL和一个semaphore。它所做的只是通过调用download_image,保存并返回filename来获取图像的内容。这里有趣的一点是semaphore的用法。我们将其用作异步上下文管理器,这样我们也可以挂起此协同路由,并允许切换到其他内容,但除了如何之外,理解为什么要使用semaphore非常重要。原因很简单,这semaphore相当于一个线程池。我们使用它最多允许N个协同路由同时处于活动状态。我们在下一个函数中实例化它,并传递 10 作为初始值。每次一个协程获取semaphore时,其内部计数器都会减少1,因此当 10 个协程获取它时,下一个协程将静坐等待,直到完成的协程释放信号量。这是一种很好的方法,可以限制我们从网站 API 获取图像的积极程度。

save_image函数不是一个协程,其逻辑已经在前面的示例中讨论过。现在让我们进入代码中执行的部分:

def batch_download(images, url):
    loop = asyncio.get_event_loop()
    semaphore = asyncio.Semaphore(10)
    cors = [download(url, semaphore) for _ in range(images)]
    res, _ = loop.run_until_complete(asyncio.wait(cors))
    loop.close()
    return [r.result() for r in res]

if __name__ == '__main__':
    saved = batch_download(20, URL)
    print(saved)

我们定义了batch_download函数,它接受一个数字images和获取它们的 URL。它做的第一件事是创建一个事件循环,这是运行任何异步代码所必需的。事件循环是asyncio提供的中央执行设备。它提供多种设施,包括:

  • 注册、执行和取消延迟呼叫(超时)
  • 为各种通信创建客户端和服务器传输
  • 启动与外部程序通信的子流程和相关传输
  • 将代价高昂的函数调用委托给线程池

创建事件循环后,我们实例化信号量,然后继续创建未来列表cors。通过调用loop.run_until_complete,我们确保事件循环将一直运行,直到整个任务完成。我们将调用asyncio.wait的结果反馈给它,等待未来完成。

完成后,我们关闭事件循环,并返回每个未来对象生成的结果列表(保存图像的文件名)。注意我们是如何捕获调用loop.run_until_complete的结果的。我们并不真正关心错误,所以我们将_分配给元组中的第二项。这是一个常见的 Python 习语,当我们想要表示我们对该对象不感兴趣时使用。

在模块结束时,我们调用batch_download,保存了 20 幅图像。它们成批出现,整个过程受到一个只有 10 个可用点的信号量的限制。

就这样!欲了解更多关于asyncio的信息,请参考文档页(https://docs.python.org/3.7/library/asyncio.html 用于标准库上的asyncio模块。这个例子编写起来很有趣,希望它能激励您努力学习,理解 Python 这一奇妙一面的复杂性。

总结

在本章中,我们学习了并发性和并行性。我们看到了线程和进程如何帮助实现其中一个和另一个。我们探讨了线程的性质以及线程给我们带来的问题:竞争条件和死锁。

我们学习了如何通过使用锁和仔细的资源管理来解决这些问题。我们还学习了如何使线程通信和共享数据,并讨论了调度程序,它是操作系统中决定在任何给定时间运行哪个线程的部分。然后我们转向流程,并探索了它们的一系列属性和特征。

在最初的理论部分之后,我们学习了如何用 Python 实现线程和进程。我们处理了多个线程和进程,修复了争用条件,并学习了停止线程而不会错误地打开任何资源的变通方法。我们还研究了 IPC,并使用队列在进程和线程之间交换消息。我们还使用了事件和障碍,它们是标准库提供的一些工具,用于控制非确定性环境中的执行流。

在所有这些介绍性示例之后,我们深入讨论了三个案例示例,它们展示了如何使用不同的方法解决相同的问题:单线程、多线程、多进程和asyncio

我们学习了 mergesort,以及通常情况下,分治算法是如何易于并行化的。

我们学习了数独,并探索了一个很好的解决方案,它使用一点人工智能来运行一个高效的算法,然后我们以不同的串行和并行模式运行。

最后,我们了解了如何使用串行、多进程和asyncio代码从网站下载随机图片。后者是到目前为止整本书中最难的一段代码,它在本章中的出现是一个提醒,或者是某种里程碑,将鼓励读者深入地学习 Python。

现在,我们将继续讨论更简单的、主要面向项目的章节,在这些章节中,我们将体验不同环境中的不同实际应用程序。*