Skip to content

Latest commit

 

History

History
1207 lines (873 loc) · 71.3 KB

File metadata and controls

1207 lines (873 loc) · 71.3 KB

七、生成器、迭代器和异步编程

生成器是使 Python 成为比传统语言更独特的语言的另一个特性。在本章中,我们将探讨它们的基本原理,为什么在语言中引入它们,以及它们解决的问题。我们还将介绍如何使用生成器以惯用的方式解决问题,以及如何使我们的生成器(或任何可移植的生成器)具有 Pythonic 功能。

我们将理解为什么该语言自动支持迭代(以迭代器模式的形式)。从这里开始,我们将进行另一个旅程,探索生成器是如何成为 Python 的一个基本特性,以支持其他功能的,例如协同路由和异步编程。

本章的目标如下:

  • 创建提高程序性能的生成器
  • 研究迭代器(尤其是迭代器模式)是如何深入嵌入 Python 的
  • 以惯用的方式解决涉及迭代的问题
  • 了解生成器如何作为协同程序和异步编程的基础
  • 探讨协同程序的句法支持——yield fromawaitasync``def

掌握生成器将使您在编写惯用 Python 方面走很长的路,因此它们在本书中的重要性。在本章中,我们不仅研究如何使用生成器,还探讨了它们的内部结构,以便深入了解它们是如何工作的。

技术要求

本章中的示例适用于任何平台上的任何版本的 Python 3.9。

本章使用的代码可在中找到 https://github.com/PacktPublishing/Clean-Code-in-Python-Second-Edition 。说明可在README文件中找到。

创建生成器

生成器是很久以前在 Python 中引入的(PEP-255),其思想是在 Python 中引入迭代,同时提高程序的性能(使用更少的内存)。

生成器的思想是创建一个可 iterable 的对象,在迭代过程中,它将一次生成一个包含的元素。生成器的主要用途是节省内存,而不是在内存中有一个非常大的元素列表,一次保存所有内容。我们有一个对象,它知道如何根据需要一次生成每个特定元素。

这个特性支持对内存中的重量级对象进行惰性计算,其方式与其他函数式编程语言(例如 Haskell)提供的方式类似。甚至可以处理无限序列,因为生成器的惰性特性支持这种选择。

发电机第一眼

让我们从一个例子开始。现在的问题是,我们想要处理一个大的记录列表,并获得一些度量和指标。给定一个包含购买信息的大型数据集,我们希望对其进行处理,以获得最低销售额、最高销售额和平均销售价格。

为了简化本例,我们将假设 CSV 只有两个字段,格式如下:

<purchase_date>, <price>
... 

我们将创建一个接收所有购买的对象,这将为我们提供必要的度量。我们可以通过简单地使用min()max()内置函数直接获得其中一些值,但这需要多次迭代所有购买,因此我们使用的是自定义对象,它将在一次迭代中获得这些值。

为我们获取数字的代码看起来相当简单。它只是一个对象,具有一种方法,可以一次性处理所有价格,并在每个步骤更新我们感兴趣的每个特定指标的值。首先,我们将在下面的清单中展示第一个实现,并且在本章的后面(一旦我们了解了更多关于迭代的内容),我们将重新讨论这个实现,并获得一个更好(更紧凑)的版本。目前,我们正在解决以下问题:

class PurchasesStats:
    def __init__(self, purchases):
        self.purchases = iter(purchases)
        self.min_price: float = None
        self.max_price: float = None
        self._total_purchases_price: float = 0.0
        self._total_purchases = 0
        self._initialize()
    def _initialize(self):
        try:
            first_value = next(self.purchases)
        except StopIteration:
            raise ValueError("no values provided")
        self.min_price = self.max_price = first_value
        self._update_avg(first_value)
    def process(self):
        for purchase_value in self.purchases:
            self._update_min(purchase_value)
            self._update_max(purchase_value)
            self._update_avg(purchase_value)
        return self
    def _update_min(self, new_value: float):
        if new_value < self.min_price:
            self.min_price = new_value
    def _update_max(self, new_value: float):
        if new_value > self.max_price:
            self.max_price = new_value
    @property
    def avg_price(self):
        return self._total_purchases_price / self._total_purchases
    def _update_avg(self, new_value: float):
        self._total_purchases_price += new_value
        self._total_purchases += 1
    def __str__(self):
        return (
            f"{self.__class__.__name__}({self.min_price}, "
            f"{self.max_price}, {self.avg_price})"
        ) 

此对象将接收purchases的所有总计,并处理所需的值。现在,我们需要一个函数,将这些数字加载到这个对象可以处理的东西中。以下是第一个版本:

def _load_purchases(filename):
    purchases = []
    with open(filename) as f:
        for line in f:
            *_, price_raw = line.partition(",")
            purchases.append(float(price_raw))
    return purchases 

这一准则有效;它将文件的所有编号加载到一个列表中,当传递给我们的自定义对象时,该列表将生成我们想要的编号。不过,它有一个性能问题。如果使用相当大的数据集运行,则需要一段时间才能完成,如果数据集足够大,无法放入主内存,则可能会失败。

如果我们看一看使用这些数据的代码,它正在处理购买,一次一个,所以我们可能想知道为什么我们的生产者一次将所有东西都放在内存中。它正在创建一个列表,其中列出了文件的所有内容,但我们知道我们可以做得更好。

解决方案是创建一个生成器。我们将一次生成一个结果,而不是将文件的全部内容加载到列表中。代码现在如下所示:

def load_purchases(filename):
    with open(filename) as f:
        for line in f:
            *_, price_raw = line.partition(",")
            yield float(price_raw) 

如果您这次测量这个过程,您会注意到内存的使用率已经显著下降。我们还可以看到代码看起来更简单,不需要定义列表(因此,不需要附加到列表中),并且return语句也消失了。

在这种情况下,load_purchases功能是一个生成器功能,或者只是一个生成器。

在 Python 中,任何函数中只要出现关键字yield,它就成为生成器,因此,调用它时,除了创建生成器实例外,不会发生其他任何事情:

>>> load_purchases("file")
<generator object load_purchases at 0x...> 

生成器对象是一个 iterable(稍后我们将更详细地重新讨论 iterables),这意味着它可以处理for循环。请注意,在新的实现之后,我们不必对消费代码进行任何更改,我们的统计处理器保持不变,for循环未修改。

使用 iterables 可以创建这些关于for循环的多态性的强大抽象。只要我们保持 iterable 接口,我们就可以透明地迭代该对象。

我们在本章中探讨的是另一个惯用代码案例,它与 Python 本身很好地融合在一起。在前面的章节中,我们已经了解了如何实现自己的上下文管理器来将对象连接到 with 语句中,或者如何创建自定义容器对象来利用in操作符,或者if语句的布尔值,等等。现在轮到for操作符了,为此,我们将创建迭代器。

在深入研究生成器的细节和细微差别之前,我们可以快速了解生成器如何与我们已经看到的概念相关:理解。理解形式的生成器称为生成器表达式,我们将在下一节简要讨论它。

生成器表达式

生成器节省了大量内存,由于它们是迭代器,因此可以方便地替代其他需要更多内存空间的 iterable 或容器,如列表、元组或集合。

与这些数据结构非常相似,它们也可以通过理解来定义,只是它们被称为生成器表达式(关于它们是否应该被称为生成器理解,有一个持续的争论。在本书中,我们将仅通过它们的规范名称来指代它们,但请随意使用您喜欢的名称)。

同样,我们将定义一个列表理解。如果我们用括号替换方括号,我们将得到一个由表达式生成的生成器。生成器表达式也可以直接传递给使用 iterables 的函数,例如sum()max()

>>> [x**2 for x in range(10)]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> (x**2 for x in range(10))
<generator object <genexpr> at 0x...>
>>> sum(x**2 for x in range(10))
285 

始终将生成器表达式(而不是列表理解)传递给期望可重用的函数,如min()max()sum()。这是更有效的,而且更具吸引力。

前面的建议意味着尽量避免将列表传递给已经使用生成器的函数。下一段代码中的示例是您希望避免的,并且支持上一清单中的方法:

>>> sum([x**2 for x in range(10)])  # here the list can be avoided 

当然,您可以将生成器表达式指定给变量,并在其他地方使用它(如理解)。请记住,在这种情况下有一个重要的区别,因为我们在这里讨论的是发电机。一个列表可以重复使用和多次迭代,但在对其进行迭代后,生成器将耗尽。因此,请确保表达式的结果只使用一次,否则将得到意外的结果。

请记住,生成器在迭代之后会耗尽,因为它们没有在内存中保存所有数据。

一种常见的方法是在代码中创建新的生成器表达式。这样,迭代后第一个将耗尽,但随后会创建一个新的。以这种方式链接生成器表达式非常有用,有助于节省内存,并使代码更具表现力,因为它在不同的步骤中解析不同的迭代。当您需要在一个 iterable 上应用多个过滤器时,这是一个有用的场景;可以通过使用多个生成器表达式作为链接过滤器来实现这一点。

现在我们的工具箱中有了一个新工具(迭代器),让我们看看如何使用它来编写更多的惯用代码。

惯用地迭代

在本节中,我们将首先探讨在 Python 中处理迭代时有用的一些习惯用法。这些代码配方将帮助我们更好地了解使用生成器可以做的事情的类型(特别是在我们已经看到生成器表达式之后),以及如何解决与之相关的典型问题。

一旦我们看到了一些习惯用法,我们将更深入地探讨 Python 中的迭代,分析使迭代成为可能的方法,以及 iterable 对象是如何工作的。

迭代的习惯用法

我们已经熟悉了内置的enumerate()函数,给定一个 iterable,它将返回另一个 iterable,其中元素是元组,其第一个元素是第二个元素的索引(对应于原始 iterable 中的元素):

>>> list(enumerate("abcdef"))
[(0, 'a'), (1, 'b'), (2, 'c'), (3, 'd'), (4, 'e'), (5, 'f')] 

我们希望创建一个类似的对象,但以更低级的方式;一种可以简单地创建无限序列的方法。我们想要一个对象,它可以从一开始就产生一系列数字,没有任何限制。

一个像下面这样简单的对象就可以做到这一点。每次调用这个对象,我们都会得到无穷多个序列的下一个数字:

class NumberSequence:
    def __init__(self, start=0):
        self.current = start
    def next(self):
        current = self.current
        self.current += 1
        return current 

基于此接口,我们必须通过显式调用其next()方法来使用此对象:

>>> seq = NumberSequence()
>>> seq.next()
0
>>> seq.next()
1
>>> seq2 = NumberSequence(10)
>>> seq2.next()
10
>>> seq2.next()
11 

但是有了这段代码,我们无法像我们希望的那样重构enumerate()函数,因为它的接口不支持在常规 Pythonfor循环上迭代,这也意味着我们不能将其作为参数传递给期望迭代的函数。请注意以下代码是如何失败的:

>>> list(zip(NumberSequence(), "abcdef"))
Traceback (most recent call last):
  File "...", line 1, in <module>
TypeError: zip argument #1 must support iteration 

问题在于NumberSequence不支持迭代。为了解决这个问题,我们必须通过实现神奇的方法__iter__()使对象成为一个可移植的对象。我们还改变了之前的next()方法,使用__next__ magic method,使对象成为迭代器:

class SequenceOfNumbers:
    def __init__(self, start=0):
        self.current = start
    def __next__(self):
        current = self.current
        self.current += 1
        return current
    def __iter__(self):
        return self 

这不仅有一个优势,我们可以迭代元素,而且我们甚至不再需要.next()方法,因为有__next__()允许我们使用next()内置函数:

>>> list(zip(SequenceOfNumbers(), "abcdef"))
[(0, 'a'), (1, 'b'), (2, 'c'), (3, 'd'), (4, 'e'), (5, 'f')]
>>> seq = SequenceOfNumbers(100)
>>> next(seq)
100
>>> next(seq)
101 

这利用了迭代协议。与我们在前面章节中探讨的上下文管理器协议类似,该协议由__enter____exit__方法组成,该协议依赖于__iter____next__方法。

在 Python 中使用这些协议有一个优势:了解 Python 的每个人都已经熟悉这个接口,因此有一种“标准契约”。这意味着,不必定义自己的方法并与团队(或任何潜在的代码读者)达成一致,这是您的代码使用的预期标准或协议(与第一个示例中的自定义next()方法相同);Python 已经提供了一个接口,并且已经有了一个协议。我们只需要适当地实施它。

next()函数

next()内置函数将将 iterable 推进到下一个元素并返回:

>>> word = iter("hello")
>>> next(word)
'h'
>>> next(word)
'e'  # ... 

如果迭代器没有更多要生成的元素,则会引发StopIteration异常:

>>> ...
>>> next(word)
'o'
>>> next(word)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration
>>> 

此异常表示迭代已结束,没有更多的元素可使用。

如果我们希望处理这种情况,除了捕获StopIteration异常外,我们还可以在其第二个参数中为该函数提供一个默认值。如果提供了此选项,则返回值将代替抛出StopIteration

>>> next(word, "default value")
'default value' 

建议在大多数情况下使用默认值,以避免程序在运行时出现异常。如果我们绝对确定我们正在处理的迭代器不能为空,那么最好是隐式的(有意的),而不是依赖于内置函数的副作用(即正确地断言案例)。

next()函数在与生成器表达式结合使用时非常有用,因为我们需要查找满足特定条件的 iterable 的第一个元素。我们将在本章中看到这个习语的例子,但主要思想是使用这个函数,而不是创建一个列表理解,然后获取它的第一个元素。

使用发电机

只需使用一个生成器,就可以大大简化前面的代码。生成器对象是迭代器。通过这种方式,我们可以定义一个函数,根据需要生成值,而不是创建一个类:

def sequence(start=0):
    while True:
        yield start
        start += 1 

请记住,从我们的第一个定义开始,函数体中的yield关键字使其成为生成器。因为它是一个生成器,所以创建这样一个无限循环是非常好的,因为当调用这个生成器函数时,它将运行所有代码,直到到达下一个yield语句。它将产生其价值并在那里暂停:

>>> seq = sequence(10)
>>> next(seq)
10
>>> next(seq)
11
>>> list(zip(sequence(), "abcdef"))
[(0, 'a'), (1, 'b'), (2, 'c'), (3, 'd'), (4, 'e'), (5, 'f')] 

正如我们在前一章(以函数为对象)中所探讨的那样,这种差异可以被认为是创建装饰器的不同方式的类比。在这里,我们也可以使用生成器函数,或 iterable 对象,如前一节所述。只要可能,建议构造生成器,因为它在语法上更简单,因此更容易理解。

Itertools

使用 iterable 对象的优点是代码与 Python 本身更好地融合,因为迭代是该语言的一个关键组件。此外,我们还可以充分利用itertools模块(ITER-01)。实际上,我们刚刚创建的sequence()生成器与itertools.count()非常相似。然而,我们还可以做更多的事情。

迭代器、生成器和 itertools 最棒的一点是它们是可以链接在一起的可组合对象。

例如,回到我们第一个处理purchases以获得一些指标的示例,如果我们想做同样的事情,但只针对超过某个阈值的值,该怎么办?解决此问题的天真方法是在迭代时放置条件:

# ...
    def process(self):
        for purchase in self.purchases:
            if purchase > 1000.0:
                ... 

这不仅是非 python 的,而且是僵硬的(僵硬是表示糟糕代码的特征)。它不能很好地处理更改。如果现在号码变了怎么办?我们通过参数传递它吗?如果我们需要不止一个呢?如果条件不同(例如小于),该怎么办?我们通过lambda了吗?

这些问题不应该由这个对象来回答,它的唯一责任是在以数字表示的购买流上计算一组定义良好的指标。当然,答案是否定的。进行这样的更改将是一个巨大的错误(同样,干净的代码是灵活的,我们不希望通过将此对象与外部因素耦合而使其僵化)。这些要求必须在其他地方解决。

最好保持此对象独立于其客户端。这个类的责任越小,它对更多的客户机就越有用,从而提高了它被重用的机会。

我们不会更改此代码,而是将其保持原样,并假设新数据根据类的每个客户的任何需求进行过滤。

例如,如果我们只想处理金额超过1000的第一批10购买,我们将执行以下操作:

>>> from itertools import islice
>>> purchases = islice(filter(lambda p: p > 1000.0, purchases), 10)
>>> stats = PurchasesStats(purchases).process()  # ... 

这种过滤方式没有内存惩罚,因为它们都是生成器,所以计算总是惰性的。这给了我们思考的能力,好像我们一次过滤了整个集合,然后将其传递给对象,但实际上没有将所有内容都放入内存中。

请记住本章开头提到的内存和 CPU 使用之间的权衡。虽然代码可能会占用更少的内存,但它可能会占用更多的 CPU 时间,但大多数情况下,这是可以接受的,因为我们必须在内存中处理大量对象,同时保持代码的可维护性。

通过迭代器简化代码

现在,我们将简要地讨论一些可以通过迭代器改进的情况,偶尔还会讨论itertools模块。在讨论每种情况及其建议的优化之后,我们将用一个推论来结束每一点。

重复迭代

现在,我们已经了解了更多关于迭代器的,并介绍了itertools模块,我们可以向您展示本章的第一个示例(用于计算某些购买的统计数据的示例)是如何被大大简化的:

def process_purchases(purchases):
    min_, max_, avg = itertools.tee(purchases, 3)
    return min(min_), max(max_), median(avg) 

在本例中,itertools.tee将原始 iterable 拆分为三个新的 iterable。我们将在需要的不同类型的迭代中使用这些方法,而无需在purchases上重复三个不同的循环。

读者可以简单地验证,如果我们传递一个 iterable 对象作为purchases参数,这个对象只被遍历一次(由于itertools.tee函数[TEE]),这是我们的主要需求。还可以验证此版本与我们的原始实现的等效性。在这种情况下,不需要手动提升ValueError,因为将空序列传递给min()功能将完成此操作。

如果您想在同一对象上多次运行循环,请停下来想想itertools.tee是否有帮助。

itertools模块包含许多有用的函数和漂亮的抽象,在 Python 中处理迭代时非常方便。它还包含关于如何以惯用方式解决典型迭代问题的好方法。作为一般建议,如果您正在考虑如何解决涉及迭代的特定问题,请查看本模块。即使答案不在字面上,这也是一个很好的灵感。

嵌套循环连接

在某些情况下,我们需要迭代多个维度,寻找一个值,嵌套循环是第一个想法。当找到值时,我们需要停止迭代,break关键字不能完全工作,因为我们必须从两个(或多个)for循环中退出,而不仅仅是一个循环。

解决这个问题的办法是什么?发出逃跑信号的旗子?不,提出一个例外?不,这将与标志相同,但更糟糕的是,因为我们知道异常不会用于控制流逻辑。将代码移动到较小的函数并返回它?很接近,但不完全一样。

答案是,只要有可能,将迭代展平为一个for循环。

这是我们希望避免的代码类型:

def search_nested_bad(array, desired_value):
    coords = None
    for i, row in enumerate(array):
        for j, cell in enumerate(row):
            if cell == desired_value:
                coords = (i, j)
                break
        if coords is not None:
            break
    if coords is None:
        raise ValueError(f"{desired_value} not found")
    logger.info("value %r found at [%i, %i]", desired_value, *coords)
    return coords 

这里是它的简化版本,它不依赖于标志来发出终止信号,并且具有更简单、更紧凑的迭代结构:

def _iterate_array2d(array2d):
    for i, row in enumerate(array2d):
        for j, cell in enumerate(row):
            yield (i, j), cell
def search_nested(array, desired_value):
    try:
        coord = next(
            coord
            for (coord, cell) in _iterate_array2d(array)
            if cell == desired_value
        )
    except StopIteration as e:
        raise ValueError(f"{desired_value} not found") from e
    logger.info("value %r found at [%i, %i]", desired_value, *coord)
    return coord 

值得一提的是,创建的辅助生成器如何作为所需迭代的抽象。在这种情况下,我们只需要在两个维度上迭代,但是如果我们需要更多的维度,不同的对象可以处理这个问题,而客户机不需要知道它。这是迭代器设计模式的本质,在 Python 中,迭代器设计模式是透明的,因为它自动支持迭代器对象,这是下一节讨论的主题。

尽量用所需的尽可能多的抽象来简化迭代,尽可能地扁平化循环。

希望这个例子能给你灵感,让你明白我们可以用发电机来做一些事情,而不仅仅是节省内存。我们可以利用迭代作为抽象。也就是说,我们不仅可以通过定义类或函数,还可以利用 Python 的语法来创建抽象。正如我们已经看到如何抽象出上下文管理器背后的一些逻辑一样(因此我们不知道在with语句下发生了什么),我们可以对迭代器进行同样的操作(因此我们可以忘记for循环的底层逻辑)。

这就是为什么我们将从下一节开始探索迭代器模式在 Python 中的工作方式。

Python 中的迭代器模式

在这里,我们将绕开生成器,更深入地理解 Python 中的迭代。生成器是 iterable 对象的一种特殊情况,但 Python 中的迭代超越了生成器,能够创建好的 iterable 对象将使我们有机会创建更高效、紧凑和可读的代码。

在前面的代码清单中,我们已经看到了iterable对象也是迭代器的例子,因为它们实现了__iter__()__next__()魔术方法。虽然一般来说这很好,但并不严格要求它们必须同时实现这两种方法,这里我们将展示iterable对象(实现__iter__的对象)和迭代器(实现__next__的对象)之间的细微差别。

我们还将探讨与迭代相关的其他主题,例如序列和容器对象。

迭代的接口

iterable是支持迭代的对象,在非常高的级别上,这意味着我们可以运行for。。in。。。在它上面循环,它将毫无问题地工作。然而,iterable并不意味着与迭代器相同。

一般来说,iterable就是我们可以迭代的东西,它使用迭代器来实现。这意味着在__iter__魔术方法中,我们希望返回一个迭代器,即一个实现了__next__()方法的对象。

迭代器是一个对象,它只知道如何生成一系列值,一次生成一个值,当它被已经探索过的内置next()函数调用时,迭代器没有被调用,它只是被冻结,无所事事地等待,直到再次调用它以生成下一个值。从这个意义上讲,生成器是迭代器。

| Python 概念 | 魔术 | 考虑 | | 可迭代的 | `__iter__` | 他们使用迭代器来构造迭代逻辑。这些对象可以在`for`中迭代。。。`in`:循环。 | | 迭代器 | `__next__` | 定义每次生成一个值的逻辑。`StopIteration`异常表示迭代结束。可通过内置的`next()`功能逐一获取数值。 |

表 7.1:Iterables 和迭代器

在下面的代码中,我们将看到一个迭代器对象的示例,该对象不可 iterable,它只支持调用其值,一次调用一个值。这里,名称sequence只是指一系列连续的数字,而不是 Python 中的序列概念,我们将在后面探讨:

class SequenceIterator:
    def __init__(self, start=0, step=1):
        self.current = start
        self.step = step
    def __next__(self):
        value = self.current
        self.current += self.step
        return value 

请注意,我们可以一次获取一个序列的值,但是我们不能迭代这个对象(这是幸运的,因为它会导致一个无休止的循环):

>>> si = SequenceIterator(1, 2)
>>> next(si)
1
>>> next(si)
3
>>> next(si)
5
>>> for _ in SequenceIterator(): pass
... 
Traceback (most recent call last):
  ...
TypeError: 'SequenceIterator' object is not iterable 

错误消息很清楚,因为对象没有实现__iter__()

仅出于解释目的,我们可以在另一个对象中分离迭代(同样,使对象同时实现__iter____next__就足够了,但分开这样做将有助于澄清我们在本解释中试图阐明的独特点)。

将对象按 iterables 排序

正如我们刚才看到的,如果一个对象实现了__iter__()魔术方法,这意味着它可以在for循环中使用。虽然这是一个很好的特性,但它不是我们可以实现的唯一可能的迭代形式。当我们编写一个for循环时,Python 将尝试查看我们正在使用的对象是否实现了__iter__,如果实现了,它将使用它来构造迭代,但如果没有实现,则会有回退选项。

如果对象恰好是一个序列(意味着它实现了__getitem__()__len__()魔术方法),那么它也可以被迭代。如果是这种情况,那么解释器将按顺序提供值,直到引发IndexError异常,这与前面提到的StopIteration类似,也会发出停止迭代的信号。

为了说明这种行为,我们将运行以下实验,展示在一系列数字上实现map()的序列对象:

# generators_iteration_2.py
class MappedRange:
    """Apply a transformation to a range of numbers."""
    def __init__(self, transformation, start, end):
        self._transformation = transformation
        self._wrapped = range(start, end)
    def __getitem__(self, index):
        value = self._wrapped.__getitem__(index)
        result = self._transformation(value)
        logger.info("Index %d: %s", index, result)
        return result
    def __len__(self):
        return len(self._wrapped) 

请记住,这个示例只是为了说明像这样的对象可以通过常规for循环进行迭代。__getitem__方法中放置了一条日志记录行,用于探索在迭代对象时传递了哪些值,我们可以从以下测试中看到:

>>> mr = MappedRange(abs, -10, 5)
>>> mr[0]
Index 0: 10
10
>>> mr[-1]
Index -1: 4
4
>>> list(mr)
Index 0: 10
Index 1: 9
Index 2: 8
Index 3: 7
Index 4: 6
Index 5: 5
Index 6: 4
Index 7: 3
Index 8: 2
Index 9: 1
Index 10: 0
Index 11: 1
Index 12: 2
Index 13: 3
Index 14: 4
[10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0, 1, 2, 3, 4] 

作为警告,需要强调的是,虽然了解这一点很有用,但它也是对象未实现__iter__时的一种回退机制,因此大多数情况下,我们希望通过考虑创建适当的序列来使用这些方法,而不仅仅是要迭代的对象。

当考虑设计一个迭代对象时,最好选择一个合适的可迭代对象(带__iter__),而不是一个可以同时迭代的序列。

Iterables 是 Python 的一个重要组成部分,这不仅是因为它们为我们提供了软件工程师所需的功能,而且还因为它们在 Python 的内部结构中起着基础性的作用。

我们在第 2 章**Python 代码中的异步代码简介中看到了如何读取异步代码。现在,我们已经研究了 Python 中的迭代器,我们可以看到这两个概念是如何关联的。特别是,下一节将探讨协同程序,我们将看到迭代器是如何处于它们的核心位置的。

协同程序

协同程序的思想是要有一个函数,它的执行可以在给定的时间点暂停,然后再恢复。通过具有这种功能,程序可能能够暂停部分代码,以便分派其他内容进行处理,然后返回到此原点继续。

我们已经知道,生成器对象是可重用的。它们实现了__iter__()__next__()。这是 Python 自动提供的,因此当我们创建 generator 对象函数时,我们会得到一个可以通过next()函数进行迭代或升级的对象。

除了这个基本功能之外,它们还有更多的方法,因此可以作为协同程序(PEP-342)工作。在这里,我们将探索生成器如何演变为协同程序以支持异步编程的基础,然后在下一节中进行更详细的讨论,在下一节中,我们将探索 Python 的新特性以及涵盖异步编程的语法。

PEP-342 中添加的支持协同程序的基本方法如下:

  • .close()
  • .throw(ex_type[, ex_value[, ex_traceback]])
  • .send(value)

Python 利用生成器来创建协同路由。因为发电机可以自然暂停,所以它们是一个方便的起点。但发电机还不够,因为它们最初被认为是,所以这些方法被添加。这是因为通常,仅仅能够挂起部分代码是不够的;您还需要与它进行通信(传递数据,并发出有关上下文更改的信号)。

通过更详细地研究每种方法,我们将能够了解更多关于 Python 中协同路由的内部内容。在此之后,我将介绍异步编程工作原理的另一个概述,但与第 2 章Python 代码中介绍的内容不同,这一部分将涉及我们刚刚学习的内部概念。

生成器接口的方法

在本节中,我们将探讨上述每种方法的作用、工作原理以及预期的使用方法。通过了解如何使用这些方法,我们将能够使用简单的协同程序。

稍后,我们将探索协同程序的更高级用法,以及如何委托给子生成器(协同程序)以重构代码,以及如何编排不同的协同程序。

关闭()

调用此方法时,生成器会收到GeneratorExit异常。如果它没有被处理,那么生成器将在不产生任何更多值的情况下完成,并且其迭代将停止。

此异常可用于处理完成状态。通常,如果我们的协同程序执行某种资源管理,我们希望捕获此异常并使用该控制块释放协同程序所持有的所有资源。这类似于使用上下文管理器或将代码放在异常控件的finally块中,但处理此异常会使其更加明确。

在下面的示例中,我们有一个协同程序,它使用一个数据库处理程序对象来保持与数据库的连接,并在数据库上运行查询,按固定长度的页面流式传输数据(而不是一次读取所有可用的数据):

def stream_db_records(db_handler):
    try:
        while True:
            yield db_handler.read_n_records(10)
    except GeneratorExit:
        db_handler.close() 

每次调用生成器时,都会返回从数据库处理程序获取的10行,但当我们决定显式完成迭代并调用close()时,我们还想关闭与数据库的连接:

>>> streamer = stream_db_records(DBHandler("testdb"))
>>> next(streamer)
[(0, 'row 0'), (1, 'row 1'), (2, 'row 2'), (3, 'row 3'), ...]
>>> next(streamer)
[(0, 'row 0'), (1, 'row 1'), (2, 'row 2'), (3, 'row 3'), ...]
>>> streamer.close()
INFO:...:closing connection to database 'testdb' 

需要时,在发电机上使用close()方法执行完成任务。

此方法旨在用于资源清理,因此当无法自动执行此操作时(例如,如果未使用上下文管理器),通常会将其用于手动释放资源。接下来,我们将了解如何将异常传递给生成器。

抛出(ex_ 类型[,ex_ 值[,ex_ 回溯]])

此方法将throw发电机当前暂停线路的异常。如果生成器处理发送的异常,将调用该特定的except子句中的代码;否则,异常将传播到调用方。

在这里,我们稍微修改了前面的示例,以显示在将此方法用于由协同例程处理的异常时与不使用此方法时的区别:

class CustomException(Exception):
    """A type of exception that is under control."""
def stream_data(db_handler):
    while True:
        try:
            yield db_handler.read_n_records(10)
        except CustomException as e:
            logger.info("controlled error %r, continuing", e)
        except Exception as e:
            logger.info("unhandled error %r, stopping", e)
            db_handler.close()
            break 

现在,接收一个CustomException是控制流的一部分,在这种情况下,生成器将记录一条信息性消息(当然,我们可以根据每个案例的业务逻辑对此进行调整),然后转到下一个yield语句,这是协程从数据库读取并返回数据的行。

此特定示例处理所有异常,但如果最后一个块(除了Exception:)不在那里,结果将是生成器在暂停生成器的行(同样是yield处)被提升,并且它将从那里传播到调用方:

>>> streamer = stream_data(DBHandler("testdb"))
>>> next(streamer)
[(0, 'row 0'), (1, 'row 1'), (2, 'row 2'), (3, 'row 3'), (4, 'row 4'), ...]
>>> next(streamer)
[(0, 'row 0'), (1, 'row 1'), (2, 'row 2'), (3, 'row 3'), (4, 'row 4'), ...]
>>> streamer.throw(CustomException)
WARNING:controlled error CustomException(), continuing
[(0, 'row 0'), (1, 'row 1'), (2, 'row 2'), (3, 'row 3'), (4, 'row 4'), ...]
>>> streamer.throw(RuntimeError)
ERROR:unhandled error RuntimeError(), stopping
INFO:closing connection to database 'testdb'
Traceback (most recent call last):
  ...
StopIteration 

当我们收到来自域的异常时,生成器继续。但是,当它接收到另一个意外的异常时,默认块捕获到我们关闭与数据库的连接并完成迭代的位置,这导致生成器停止。正如我们从提出的StopIteration中所看到的,这个生成器无法进一步迭代。

发送(值)

在前面的示例中,我们创建了一个简单的生成器,它从数据库中读取行,当我们希望完成其迭代时,该生成器释放了链接到数据库的资源。这是使用生成器提供的方法之一(close()的一个很好的例子,但我们可以做更多。

生成器的一个目标是从数据库中读取固定数量的行。

我们希望将该号码(10)参数化,以便我们可以在不同的通话中更改该号码。不幸的是,next()函数没有为我们提供相应的选项。但幸运的是,我们有send()

def stream_db_records(db_handler):
    retrieved_data = None
    previous_page_size = 10
    try:
        while True:
            page_size = yield retrieved_data
            if page_size is None:
                page_size = previous_page_size
            previous_page_size = page_size
            retrieved_data = db_handler.read_n_records(page_size)
    except GeneratorExit:
        db_handler.close() 

我们现在的想法是,我们已经使协同路由能够通过send()方法从调用方接收值。这个方法实际上是区分生成器和协同程序的方法,因为当使用它时,它意味着yield关键字将出现在语句的右侧,其返回值将被分配给其他对象。

在协同程序中,我们通常会发现yield关键字以以下形式使用:

receive = yield produced 

在这种情况下,yield将做两件事。它会将produced发送回调用者,调用者将在下一轮迭代中(例如,在调用next()之后)接收它,并在那里暂停。稍后,调用方将希望使用send()方法将值发送回协同路由。该值将成为yield语句的结果,在本例中分配给名为receive的变量。

将值发送到协程只有在yield语句暂停该协程,等待生成某些内容时才起作用。为了实现这一点,必须将协同程序提升到该状态。唯一的方法是通过调用next()来实现。这意味着在将任何内容发送到协同程序之前,必须通过next()方法至少提前一次。否则将导致异常情况:

>>> def coro():
...     y = yield
...
>>> c = coro()
>>> c.send(1)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: can't send non-None value to a just-started generator
>>> 

在发送任何值之前,请始终记住通过调用next()来推进协同程序。

回到我们的例子。我们正在改变元素生成或流式处理的方式,使其能够接收预期从数据库读取的记录长度。

我们第一次调用next()时,生成器将前进到包含yield的行;它将向调用者提供一个值(None,如变量中所设置),并将挂起该值)。从那里,我们有两个选择。如果我们选择通过调用next()来推进生成器,则将使用10的默认值,并将照常进行。这是因为调用next()在技术上与send(None)相同,但这在if语句中有介绍,该语句将处理我们之前设置的值。

另一方面,如果我们决定通过send(<value>)提供一个显式值,这个值将成为yield语句的结果,该语句将被分配给包含要使用的页面长度的变量,该变量反过来将用于从数据库中读取。

连续调用将具有这种逻辑,但重要的一点是,现在我们可以动态地改变数据的长度,以便在迭代的中间读取任意点。

现在我们已经了解了前面的代码是如何工作的,大多数 Pythonistas 都希望它有一个简化的版本(毕竟,Python 也是关于简洁、简洁和紧凑的代码):

def stream_db_records(db_handler):
    retrieved_data = None
    page_size = 10
    try:
        while True:
            page_size = (yield retrieved_data) or page_size
            retrieved_data = db_handler.read_n_records(page_size)
    except GeneratorExit:
        db_handler.close() 

这个版本不仅更加紧凑,而且更好地说明了这个想法。yield周围的括号更清楚地表明它是一条语句(将其视为函数调用),我们使用它的结果将其与前面的值进行比较。

这就像我们期望的那样工作,但我们必须始终记住,在向它发送任何数据之前,先推进协同路由。如果我们忘记打第一个next(),我们会得到一个TypeError。出于我们的目的,可以忽略此调用,因为它不会返回我们将使用的任何内容。

如果我们能够在创建协同程序之后立即直接使用它,而不必记得在每次使用它时第一次调用next(),那就太好了。一些作者(PYCOOK)设计了一个有趣的装饰器来实现这一点。此装饰器的想法是推进协同程序,因此以下定义自动工作:

@prepare_coroutine
def auto_stream_db_records(db_handler):
    retrieved_data = None
    page_size = 10
    try:
        while True:
            page_size = (yield retrieved_data) or page_size
            retrieved_data = db_handler.read_n_records(page_size)
    except GeneratorExit:
        db_handler.close() 
>>> streamer = auto_stream_db_records(DBHandler("testdb"))
>>> len(streamer.send(5))
5 

请记住,这些是协同程序在 Python 中工作的基础。通过以下示例,您将了解在使用协同路由时 Python 中实际发生了什么。然而,在现代 Python 中,您通常不会自己编写这些类型的协同程序,因为有了新的语法(我们已经提到过,但我们将再次讨论,看看它们与我们刚才看到的想法有什么关系)。

在开始使用新的语法功能之前,我们需要探索协同程序在添加功能方面的最后一步,以弥补缺失的差距。之后,我们将能够理解异步编程中使用的每个关键字和语句背后的含义。

更高级的协同程序

到目前为止,我们对协程有了更好的理解,我们可以创建简单的协程来处理小任务。我们可以说,这些协同路由实际上只是更高级的生成器(没错,协同路由只是花哨的生成器),但是,如果我们真的想开始支持更复杂的场景,我们通常必须采用同时处理多个协同路由的设计,这需要更多的特性。

在处理许多协同程序时,我们会发现新的问题。随着应用程序的控制流变得越来越复杂,我们希望在堆栈上下传递值(以及异常),能够从我们可能在任何级别调用的子协程中捕获值,最后,安排多个协程朝着一个共同目标运行。

为了使事情更简单,发电机必须再次扩展。这就是 PEP-380 通过改变生成器的语义,使其能够返回值,并引入新的yield from结构来解决的问题。

在协程中返回值

正如本章开头的所介绍的,迭代是一种机制,在引发StopIteration异常之前,它多次调用next()一个 iterable 对象。

到目前为止,我们一直在探索生成器的迭代性质,我们一次生成一个值,一般来说,我们只关心在for循环的每一步生成的每个值。这是思考生成器的一种非常合乎逻辑的方式,但是协同程序有不同的想法;尽管它们在技术上是生成器,但它们并没有考虑到迭代的思想,而是以暂停代码执行直到稍后恢复为止为目标。

这是一个有趣的挑战;在设计协同路由时,我们通常更关心挂起状态而不是迭代(迭代协同路由是一种奇怪的情况)。挑战在于两者很容易混合。这是因为技术实现细节;Python 中对协同路由的支持是建立在生成器之上的。

如果我们想使用协程来处理某些信息并暂停其执行,那么将它们视为轻量级线程(或绿色线程,在其他平台中称为绿色线程)是有意义的。在这种情况下,如果它们可以返回值,就像调用任何其他常规函数一样,这是有意义的。

但我们要记住,生成器不是常规函数,因此在生成器中,构造value = generator()除了创建生成器对象之外,什么都不做。让生成器返回值的语义是什么?它必须在迭代完成之后进行。

当生成器返回一个值时,它的迭代立即停止(不能再进行迭代)。为了保留语义,仍然会引发StopIteration异常,并且要返回的值存储在exception对象中。打电话的人有责任接住电话。

在下面的示例中,我们创建了一个简单的生成器,它生成两个值,然后返回第三个值。请注意,为了获取该值,我们必须捕获异常,以及它是如何准确地存储在异常中名为value的属性下的:

>>> def generator():
...     yield 1
...     yield 2
...     return 3
... 
>>> value = generator()
>>> next(value)
1
>>> next(value)
2
>>> try:
...     next(value)
... except StopIteration as e:
...     print(f">>>>>> returned value: {e.value}")
... 
>>>>>> returned value: 3 

正如我们稍后将看到的,这个机制用于使协程返回值。在 PEP-380 之前,这没有任何意义,任何试图在生成器中使用return语句的尝试都被视为语法错误。但现在的想法是,当迭代结束时,我们希望返回一个最终值,提供它的方法是将其存储在迭代结束时引发的异常中(StopIteration。这可能不是最干净的方法,但它完全向后兼容,因为它不会更改生成器的接口。

授权到较小的协同程序中——“屈服于”语法

前面的特性很有趣,因为它为协程(生成器)打开了许多新的可能性,现在它们可以返回值。但是,如果没有适当的语法支持,这个特性本身就没有多大用处,因为用这种方法捕获返回值有点麻烦。

这是yield from语法的主要特征之一。除此之外(我们将详细介绍),它还可以收集子生成器返回的值。还记得我们说过在生成器中返回数据很好,但不幸的是,将语句写成value = generator()是行不通的?好吧,像value = yield from generator()那样写。

产量的最简单用途

在其最基本的形式中,新的yield from语法可用于将嵌套的for循环中的生成器链接到单个循环中,最终生成一个包含连续流中所有值的单个字符串。

一个典型的例子是从standard库中创建一个类似于itertools.chain()的函数。这是一个非常好的函数,因为它允许您传递任意数量的iterables,并将它们一起返回到一个流中。

天真的实现可能如下所示:

def chain(*iterables):
    for it in iterables:
        for value in it:
            yield value 

它接收一个可变数量的iterables,遍历所有这些值,并且由于每个值都是 iterable,因此它支持for... in..构造,因此我们有另一个for循环来获取每个特定 iterable 中的每个值,这是由调用方函数生成的。

这在多种情况下可能很有用,例如将生成器链接在一起,或者尝试迭代通常无法一次性比较的内容(例如带有元组的列表,等等)。

然而,yield from语法允许我们进一步避免嵌套循环,因为它能够直接从子生成器生成值。在这种情况下,我们可以像这样简化代码:

def chain(*iterables):
    for it in iterables:
        yield from it 

请注意,对于这两种实现,生成器的行为完全相同:

>>> list(chain("hello", ["world"], ("tuple", " of ", "values.")))
['h', 'e', 'l', 'l', 'o', 'world', 'tuple', ' of ', 'values.'] 

这意味着我们可以将yield from用于任何其他 iterable,它的工作方式就好像顶级生成器(即yield from正在使用的生成器)正在自己生成这些值一样。

这适用于任何 iterable,甚至生成器表达式也不例外。现在我们已经熟悉了它的语法,让我们看看如何编写一个简单的生成函数来生成一个数字的所有幂(例如,如果提供了all_powers(2, 3),它将必须生成2^02^1... 2^3

def all_powers(n, pow):
    yield from (n ** i for i in range(pow + 1)) 

虽然这稍微简化了语法,但节省一行for语句并不是一个很大的优势,也不能证明在语言中添加这样的更改是合理的。

事实上,这实际上只是一个副作用,yield from结构的真正存在理由是我们将在以下两部分中探讨的。

捕获子生成器返回的值

在下面的示例中,我们有一个生成器,它调用另外两个嵌套的生成器,生成序列中的值。这些嵌套生成器中的每一个都返回一个值,我们将看到顶级生成器如何有效地捕获返回值,因为它通过yield from调用内部生成器:

def sequence(name, start, end):
    logger.info("%s started at %i", name, start)
    yield from range(start, end)
    logger.info("%s finished at %i", name, end)
    return end
def main():
    step1 = yield from sequence("first", 0, 5)
    step2 = yield from sequence("second", step1, 10)
    return step1 + step2 

这是main中的代码在迭代时可能执行的情况:

>>> g = main()
>>> next(g)
INFO:generators_yieldfrom_2:first started at 0
0
>>> next(g)
1
>>> next(g)
2
>>> next(g)
3
>>> next(g)
4
>>> next(g)
INFO:generators_yieldfrom_2:first finished at 5
INFO:generators_yieldfrom_2:second started at 5
5
>>> next(g)
6
>>> next(g)
7
>>> next(g)
8
>>> next(g)
9
>>> next(g)
INFO:generators_yieldfrom_2:second finished at 10
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration: 15 

main的第一个行委托到内部生成器中,并生成值,直接从中提取。正如我们已经看到的,这并不是什么新鲜事。但是,请注意,sequence()生成器函数如何返回结束值(在第一行中分配给名为step1的变量),以及如何在该生成器的以下实例开始时正确使用该值。

最后,另一个生成器还返回第二个结束值(10,而主生成器则返回它们的总和(5+10=15,这是迭代停止后我们看到的值。

我们可以使用yield from捕获协同程序完成处理后的最后一个值。

通过这个示例和上一节中介绍的示例,您可以了解yield from构造在 Python 中的作用。yield from构造将获取生成器,并将其迭代转发到下游,但一旦完成,它将捕获其StopIteration异常,获取其值,并将该值返回给调用方函数。StopIteration异常的 value 属性成为该语句的结果。

这是一个强大的构造,因为结合下一节的主题(如何从子生成器发送和接收上下文信息),这意味着协同路由可以采用类似于线程的形式。

向子生成器发送数据和从子生成器接收数据

现在,我们将看到yield from语法的另一个很好的特性,可能是它的全部功能所在。正如我们在探索充当协同路由的生成器时已经介绍的,我们知道我们可以发送值并向它们抛出异常,在这种情况下,协同路由要么接收值进行内部处理,要么必须相应地处理异常。

如果我们现在有一个委托给其他人的协同程序(如前一个例子),我们也希望保留这个逻辑。必须手动执行此操作将非常复杂(如果我们没有让yield from自动处理此操作,您可以查看 PEP-380 中描述的代码)。

为了说明这一点,让我们保持与上一个示例相同的顶级生成器(main)未修改(调用其他内部生成器),但让我们修改内部生成器以使其能够接收值和处理异常。

代码可能不是惯用代码,只是为了说明该机制的工作原理:

def sequence(name, start, end):
    value = start
    logger.info("%s started at %i", name, value)
    while value < end:
        try:
            received = yield value
            logger.info("%s received %r", name, received)
            value += 1
        except CustomException as e:
            logger.info("%s is handling %s", name, e)
            received = yield "OK"
    return end 

现在,我们将调用main协程,不仅通过迭代它,还通过提供值和抛出异常来查看它们在序列中的处理方式:

>>> g = main()
>>> next(g)
INFO: first started at 0
0
>>> next(g)
INFO: first received None
1
>>> g.send("value for 1")
INFO: first received 'value for 1'
2
>>> g.throw(CustomException("controlled error"))
INFO: first is handling controlled error
'OK'
... # advance more times
INFO:second started at 5
5
>>> g.throw(CustomException("exception at second generator"))
INFO: second is handling exception at second generator
'OK' 

这个例子告诉我们很多不同的事情。请注意,我们从不向sequence发送值,而只向main发送值,即使如此,接收这些值的代码也是嵌套的生成器。尽管我们从未明确地向sequence发送任何内容,但它在yield from传递数据时接收数据。

main协程在内部调用另外两个协程,生成它们的值,并且它将在其中任何一个特定时间点暂停。当它在第一个停止时,我们可以看到日志告诉我们,是该协同路由实例接收了我们发送的值。当我们向它抛出异常时,也会发生同样的情况。当第一个协同路由完成时,它返回在名为step1的变量中分配的值,并作为第二个协同路由的输入传递,第二个协同路由也会这样做(它将相应地处理send()throw()调用)。

每个协同程序生成的值也会发生同样的情况。当我们处于任何给定步骤时,调用send()返回的值对应于子协同路由(当前暂停的main所产生的值)。当我们抛出一个正在处理的异常时,sequence协同路由产生值OK,该值被传播到被调用的协同路由(main,而该值将依次到达main's调用方。

正如预期的那样,这些方法以及yield from为我们提供了许多新功能(类似于线程的功能)。这为异步编程打开了大门,我们将在下一步进行探讨。

异步编程

通过到目前为止我们已经看到的构造,我们可以用 Python 创建异步程序。这意味着我们可以创建具有多个协程的程序,将它们安排为按特定顺序工作,并在对每个程序调用yield from后暂停它们时在它们之间切换。

我们可以从中获得的主要优势是以非阻塞方式并行 I/O 操作的可能性。我们需要的是一个低级生成器(通常由第三方库实现),它知道如何在挂起协同路由时处理实际的 I/O。这个想法是为了使协同程序暂停,以便我们的程序可以同时处理另一个任务。应用程序检索控件的方式是通过yield from语句,该语句将挂起并向调用者生成一个值(正如我们前面使用此语法更改程序控制流时看到的示例)。

这大概是异步编程在 Python 中工作了好几年的方式,直到决定需要更好的语法支持。

协同程序和生成器在技术上是相同的,这一事实会引起一些混乱。在句法上(和技术上),它们是相同的,但在语义上,它们是不同的。当我们想要实现高效的迭代时,我们创建生成器。我们通常创建协同路由,目标是运行非阻塞 I/O 操作。

虽然这种差异很明显,但 Python 的动态特性仍然允许开发人员混合这些不同类型的对象,最终在程序的后期阶段出现运行时错误。记住,在最简单和最基本的yield from语法形式中,我们对 iterable 对象使用了这种构造(我们创建了一种应用于字符串、列表等的chain函数)。这些对象都不是协程,而且仍然有效。然后,我们看到我们可以有多个协程,使用yield from发送值(或异常),并返回一些结果。这显然是两个非常不同的用例;但是,如果我们按照下面的语句写一些东西:

result = yield from iterable_or_awaitable() 

不清楚iterable_or_awaitable返回了什么。它可以是一个简单的 iterable,比如一个string,它可能仍然是语法正确的。或者,它可能是一个实际的协同程序。这一错误的代价将在很久以后的运行时支付。

因此,Python 中的键入系统必须进行扩展。在 Python3.5 之前,协同程序只是应用了@coroutine修饰符的生成器,它们将用yield from语法调用。现在,Python 解释器识别出一种特定类型的对象,即协同程序。

这一变化也预示着语法的变化。介绍了awaitasync def语法。前者旨在代替yield from使用,并且它仅适用于awaitable对象(协同路由恰好是这样的)。试图用不尊重awaitable接口的东西调用await会引发异常(这是一个很好的例子,说明了接口如何帮助实现更可靠的设计,防止运行时错误)。

async def是定义协同路由的新方法,取代了前面提到的 decorator,这实际上创建了一个对象,当调用该对象时,该对象将返回协同路由的实例。与调用生成器函数的方式相同,解释器将返回一个生成器对象,当您调用用async def定义的对象时,解释器将给您一个具有__await__方法的协同程序对象,因此可以在等待表达式中使用。

在不深入讨论 Python 中异步编程的所有细节和可能性的情况下,我们可以说,尽管有了新的语法和新的类型,但这与我们在本章中介绍的概念没有任何本质上的区别。

在 Python 中异步编程背后的思想是有一个event循环(通常是asyncio,因为它是standard库中包含的循环,但还有许多其他循环将同样工作)来管理一系列协同路由。这些协同路由属于事件循环,事件循环将根据其调度机制调用它们。当这些程序中的每一个运行时,它都会调用我们的代码(根据我们在编程的协同程序中定义的逻辑),当我们想要将控制权返回到事件循环时,我们会调用await <coroutine>,它将异步处理任务。当该操作保持运行时,事件循环将继续,并将发生另一个协同路由。

这个机制代表了 Python 中异步编程的基本原理。您可以认为,为协同程序(async def/await)添加的新语法只是一个 API,用于以事件循环调用的方式编写代码。默认情况下,该事件循环通常为asyncio,因为它是standard库中的一个,但是任何与 API 匹配的事件循环系统都可以工作。这意味着您可以使用类似于uvloop的库 https://github.com/MagicStack/uvlooptriohttps://github.com/python-trio/trio ),代码的工作原理与相同。您甚至可以注册自己的事件循环,并且它也应该可以运行相同的功能(前提是符合 API)。

在实践中,有更多的细节和边缘案例超出了本书的范围。然而,值得一提的是,这些概念与本章中介绍的概念相关,而且这个领域是另一个生成器证明是语言核心概念的地方,因为在它们之上构建了许多东西。

神奇的异步方法

我在前面的章节(希望能说服你)中提到,只要有可能,我们就可以利用 Python 中的神奇方法,使我们创建的抽象与语言的语法自然融合,从而获得更好、更紧凑、甚至更干净的代码。

但是,如果在这些方法中的任何一种上我们需要调用一个协程,会发生什么呢?如果我们必须在函数中调用await,这意味着函数本身必须是一个协程(用async def定义),否则将出现语法错误。

但是,对于当前的语法和魔术方法,这是如何工作的呢?没有。为了使用异步编程,我们需要新的语法和新的神奇方法。好消息是它们与之前的类似。

这里总结了新的魔术方法以及它们与新语法的关系。

| 概念 | 魔术 | 语法用法 | | 上下文管理器 | `__aenter__``__aexit__` | `async with async_cm() as x:`... | | 迭代 | `__aiter__``__anext__` | `async for e in aiter:`... |

表 7.2:异步语法及其神奇方法

PEP-492(中提到了这种新语法 https://www.python.org/dev/peps/pep-0492/ )。

异步上下文管理器

想法很简单:如果我们要使用上下文管理器,但需要在其上调用协同程序,我们不能使用正常的__enter____exit__方法,因为它们被定义为常规函数,因此我们需要使用新的__aenter____aexit__协同程序方法。我们不能把它仅仅称为 using with,而必须将async与一起使用。

contextlib模块中甚至有一个@asynccontextmanager修饰符,用于以与前面所示相同的方式创建异步上下文管理器。

异步上下文管理器的带语法的async工作方式类似:当输入上下文时,自动调用__aenter__协程,当退出时,__aexit__将触发。甚至可以使用语句将多个异步上下文管理器分组到同一个async中,但不可能将它们与常规上下文管理器混合使用。尝试使用语法为async的常规上下文管理器将以AttributeError失败。

我们来自第 2 章的示例Pythonic 代码,如果适用于异步编程,则类似于以下代码:

@contextlib.asynccontextmanager
async def db_management():
    try:
        await stop_database()
        yield
    finally:
        await start_database() 

此外,如果我们想要使用多个上下文管理器,我们可以这样做,例如:

@contextlib.asynccontextmanager
async def metrics_logger():
    yield await create_metrics_logger()

async def run_db_backup():
    async with db_management(), metrics_logger():
        print("Performing DB backup...") 

正如您所料,contextlib模块提供了抽象基类AbstractAsyncContextManager,需要实现__aenter____aexit__方法。

其他神奇的方法

其余的魔法方法会发生什么?它们都得到了异步的对应项吗?不,但我想指出一点:它不应该被需要。

请记住,实现干净的代码部分是为了确保您在代码中正确分配职责,并将内容放在适当的位置。举个例子,如果您正在考虑在__getattr__方法中调用一个协程,那么您的设计中可能存在一些错误,因为应该有一个更好的地方用于该协程。

我们等待的协同路由用于使部分代码并行运行,因此它们通常与正在管理的外部资源相关,而我们在其余神奇方法(__getitem____getattr__等)中使用的逻辑应该是面向对象的代码,或者可以仅根据该对象的内部表示来解析的代码。

出于同样的原因(以及遵循良好的设计实践),将__init__作为一个协同程序是不好的,因为我们通常需要轻量级的对象,我们可以安全地初始化这些对象而不会产生副作用。更好的是,我们已经介绍了使用依赖项注入的好处,因此这就是不需要异步初始化方法的更多原因:我们的对象应该使用已经初始化的依赖项。

上一个表的第二个案例,异步迭代,在本章中更感兴趣,因此我们将在下一节中探讨它。

异步迭代(asyncfor)的语法适用于任何异步迭代器,无论它是由我们创建的(我们将在下一节中看到如何创建),还是异步生成器(我们将在后面的一节中看到)。

异步迭代

与我们在本章开头看到的迭代器对象(即支持使用 Python 内置的for循环进行迭代的对象)的方式相同,我们也可以这样做,但采用异步方式。

想象一下,我们想要创建一个迭代器来抽象我们从外部源(如数据库)读取数据的方式,但是提取数据的部分本身是一个协程,因此我们不能像以前一样在已经熟悉的__next__操作中调用它。这就是为什么我们需要利用__anext__协同程序。

下面的示例以简单的方式说明了如何实现这一点。不考虑外部依赖性或任何其他意外复杂性,我们将重点关注使此类操作成为可能的方法,以便对其进行研究:

import asyncio
import random

async def coroutine():
    await asyncio.sleep(0.1)
    return random.randint(1, 10000)

class RecordStreamer:
    def __init__(self, max_rows=100) -> None:
        self._current_row = 0
        self._max_rows = max_rows

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self._current_row < self._max_rows:
            row = (self._current_row, await coroutine())
            self._current_row += 1
            return row
        raise StopAsyncIteration 

第一种方法__aiter__用于指示对象是异步迭代器。就像在同步版本中一样,大多数情况下,返回 self 就足够了,因此它不需要是一个协程。

但另一方面,__anext__恰恰是我们的代码中异步逻辑所在的部分,因此对于初学者来说,这需要一个协同程序。在本例中,我们正在等待另一个协程,以便返回要返回的部分数据。

它还需要一个单独的异常来发出迭代结束的信号,在本例中称为StopAsyncIteration

这个异常以类似的方式工作,只是它是针对async for类循环的。遇到这种情况时,解释器将完成循环。

此类对象可按以下形式使用:

async for row in RecordStreamer(10):
    ... 

您可以清楚地看到,这与我们在本章开头探讨的同步版本类似。然而,一个重要的区别是,正如我们所预期的,next()函数在这个对象上不起作用(毕竟它没有实现__next__,因此将异步发电机推进一个位置需要不同的习惯用法。

将异步迭代器向前推进一个位置可以通过执行以下操作来实现:

await async_iterator.__anext__() 

但是,不支持更有趣的构造,比如我们之前看到的关于使用next()函数在生成器表达式上搜索满足特定条件的第一个值的构造,因为它们无法处理异步迭代器。

受前面习惯用法的启发,我们可以使用异步迭代创建生成器表达式,然后从中获取第一个值。更好的是,我们可以创建自己版本的此函数来与异步生成器一起使用,它可能如下所示:

NOT_SET = object()

async def anext(async_generator_expression, default=NOT_SET):
    try:
        return await async_generator_expression.__anext__()
    except StopAsyncIteration:
        if default is NOT_SET:
            raise
        return default 

从 Python3.8 开始,asyncio模块有一个很好的功能,允许我们直接从 REPL 与协同路由进行交互。这样,我们就可以以交互方式测试前面的代码的工作方式:

$ python -m asyncio
>>> streamer = RecordStreamer(10)
>>> await anext(streamer)
(0, 5017)
>>> await anext(streamer)
(1, 5257)
>>> await anext(streamer)
(2, 3507)
...
>>> await anext(streamer)
(9, 5440)
>>> await anext(streamer)
Traceback (most recent call last):
    ...
    raise StopAsyncIteration
StopAsyncIteration
>>> 

您会注意到,它在接口和行为方面都与原始的next()函数相似。

现在我们知道如何在异步编程中使用迭代,但我们可以做得更好。大多数情况下,我们只需要一个生成器,而不是一个完整的迭代器对象。生成器的优点是其语法使其更易于编写和理解,因此在下一节中,我将介绍如何为异步程序创建生成器。

异步发电机

在 Python 3.6 之前,上一节中探讨的功能是在 Python 中实现异步迭代的唯一方法。由于我们在前几节中探讨的协同程序和生成器的复杂性,在协同程序中尝试使用yield语句并没有完全定义,因此不允许(例如,yield会尝试挂起协同程序,还是为调用者生成值?)。

PEP-525(中引入了异步发电机 https://www.python.org/dev/peps/pep-0525/ )。

在协同程序中使用yield关键字的问题在本 PEP 中得到了解决,现在它是允许的,但具有不同和明确的含义。与我们看到的第一个协同路由示例不同,正确定义的协同路由(使用asyncdef)中的yield并不意味着暂停或暂停该协同路由的执行,而是为调用方生成一个值。这是一个异步生成器:与我们在本章开头看到的生成器相同,但可以以异步方式使用(这意味着它们可能在其定义中等待其他协同路由)。

与迭代器相比,异步生成器的主要优势在于常规生成器具有相同的优势;它们允许我们以更紧凑的方式实现同样的目标。

正如所承诺的,前面的示例在使用异步生成器编写时看起来更紧凑:

async def record_streamer(max_rows):
    current_row = 0
    while current_row < max_rows:
        row = (current_row, await coroutine())
        current_row += 1
        yield row 

除了async def/await结构外,结构相同,因此感觉更接近于常规发电机。此外,您必须记住更少的细节(关于需要实现的方法和必须触发的正确异常),因此我建议尽可能使用异步生成器而不是迭代器。

这就结束了我们的 Python 迭代和异步编程之旅。特别是,我们刚刚探讨的最后一个主题是它的顶峰,因为它涉及到我们在本章中所学的所有概念。

总结

Python 中到处都有生成器。自从很久以前在 Python 中诞生以来,它们被证明是一个伟大的补充,它使程序更加高效,迭代更加简单。

随着时间的推移,更复杂的任务需要添加到 Python 中,生成器再次帮助支持协同路由。

而且,虽然在 Python 中协同路由是生成器,但我们仍然不必忘记它们在语义上是不同的。生成器是以迭代的思想创建的,而协同程序的目标是异步编程(在任何给定时间暂停并恢复程序的一部分执行)。这种区别变得如此重要,以致于 Python 的语法(和类型系统)得以发展。

迭代和异步编程构成了 Python 编程的最后一个主要支柱。现在,是时候看看所有的东西是如何结合在一起的,并将我们在过去几章中探索的所有概念付诸行动。这意味着到现在为止,您已经完全了解了 Python 的功能。

现在是时候利用这个优势了,所以在接下来的章节中,我们将看到如何将这些概念付诸实施,与软件工程的更一般的想法相关,如测试、设计模式和体系结构。

在下一章中,我们将通过探索单元测试和重构来开始我们旅程的这一新部分。

工具书类

以下是您可以参考的信息列表: