## 可迭代对象

In [None]:
iterable = '可迭代对象'
for x in iterable:
    print(x)

In [None]:
# 常见的可送代对象
iterables = [
    "123",              # 字符串
    [1, 2, 3],          # 列表
    (1, 2, 3),          # 元组
    {1: 'a', 2: 'b'},   # 字典
    {1, 2, 3},          # 集合
]
for iterable in iterables:
    print(type(iterable))
    for x in iterable:
        print(x, end=', ')
    print('')


### 可迭代对象的接口


In [None]:
def common_attrs(*objs):  # *objs 多余参数存进名为objs的元组，**kwobjs 多余参数存进名为kwobjs的字典 使用函数时在变量前加*或**表示用元组或字典的方式解析参量
    """计算对象之间的共同属性"""
    assert len(objs) > 0
    attrs = set(dir(objs[0]))
    for obj in objs[1:]:
        attrs &= set(dir(obj))      # 取交集
    attrs -= set(dir(object))       # 剔除基础对象属性
    return attrs

In [None]:
# 计算可迭代对象属性
iterable_common_attrs = common_attrs(*iterables)

print(iterable_common_attrs)

In [None]:
# 文件对象也是可迭代对象
f = open('AsyncIO.ipynb','r')

# 加入到可迭代对象清单中
iterables.append(f)

# 继续求一次交集
iterable_common_attrs &= set(dir(f))
print(iterable_common_attrs)

`__iter__` 方法，对应的调用方法就是内置函数 `iter()`

In [None]:
for iterable in iterables:
    print(iter(iterable))

### 迭代器的接口

In [None]:
# 由可迭代对象得到相应的迭代器列表
iterators = [iter(iterable) for iterable in iterables]

# 计算属性
iterator_common_attrs = common_attrs(*iterators)

print(iterator_common_attrs)

「迭代器」有两个接口：`__iter__` & `__next__`

### 迭代器的用法

In [None]:
actions = ['点赞', '投币', '收藏']      # 可迭代对象

action_iterator = iter(actions)      # 第一步：构建迭代器

In [None]:
# 第二步：多次迭代
action = next(action_iterator)      # 运行3次
print(action)

In [None]:
# 第三步：迭代结束
action = next(action_iterator)      # 运行3次
print(action)

### 已知的迭代的三个步骤
1. 调用 `iter(iterable)` 来构建迭代器
2. （多次）调用 `next(iterator)` 来获取值
3. 最后捕获 `StopIteration` 异常来判断迭代结束

In [None]:
## 用 while 循环模拟 for 循环迭代

# 创建迭代器
iterator = iter(actions)        # 对应可迭代对象的 __iter__ 方法
while True:
    try:
        # 通过迭代器获取下一个对象
        print(next(iterator))   # 对应迭代器的 __next__ 方法
    except StopIteration:       # 捕获异常来判断结束
        # print('迭代结束')
        break

## 自定义迭代器
### 迭代器的基本功能：
- 初始化时要传入可迭代对象，这样才能知道去哪取数据
- 要初始化迭代进度
- 每次迭代时，即每次调用 `__next__()` 方法时：
  - 如果仍有元素可供迭代，则返回本轮迭代的元素，同时更新当前迭代进度
  - 如果已无元素可供返回，则迭代结束，抛出 `StopIteration` 异常


### 在添加一点额外的逻辑:
- 设置一个黑名单，如果当前元素在黑名单内，则跳过
- 将某些符合条件的数据 `*2` 之后再返回

In [None]:
BLACKLIST = ['白嫖','取关']


class SuzhiIterator:
    def __init__(self, action):
        self.actions = actions
        self.index = 0              # 初始化索引下标

    def __next__(self):
        while self.index < len(self.actions):
            action = self.actions[self.index]
            self.index += 1         # 更新索引下标
            if action in BLACKLIST:
                continue
            elif '币' in action:
                return action * 2
            else:
                return action
        raise StopIteration


actions = ['点赞', '投币', '取关']
sz_iterator = SuzhiIterator(actions)
while True:
    try:
        print(next(sz_iterator))
    except StopIteration:
        break

In [None]:
## for 循环需要的是「可迭代对象」而不是「迭代器」

for x in SuzhiIterator(actions):
    print(x)

In [None]:
## 一个没有意义的可迭代对象

class SuzhiActions:
    def __init__(self,actions):
        self.actions = actions

    def __iter__(self):
        return SuzhiIterator(self.actions)


for x in SuzhiActions(actions):
    print(x)

In [None]:
class SuzhiIterator:
    def __init__(self, action):
        self.actions = actions
        self.index = 0              # 初始化索引下标

    def __next__(self):
        while self.index < len(self.actions):
            action = self.actions[self.index]
            self.index += 1         # 更新索引下标
            if action in BLACKLIST:
                continue
            elif '币' in action:
                return action * 2
            else:
                return action
        raise StopIteration

    def __iter__(self):
        return self

In [None]:
## 「迭代器」也是「可迭代对象」

for x in SuzhiIterator(actions):
    print(x)

### 迭代器协议

在 Python 文档中明确指出了，迭代器必须同时实现 `__next__` 和 `__iter__` 两个方法，这称之为「**迭代器协议**」。

根据这个协议，迭代器**必须**是可迭代的，换言之，**「迭代器」是一种「可迭代对象」**。

缺少了 `__iter__` 方法的迭代器是不完整的，不符合迭代器协议的要求。

**所有**迭代器的 `__iter__` 方法都只要 `return self` 即可。

## 浅层的意义

- 统一通过 `next()` 方法获取数据，可以屏蔽底层不同的数据读取方式，简化编程
- 容器类的数据结构只关心数据的静态存储，**每一次**迭代都需要额外的迭代器对象专门负责记录迭代过程中的**状态**信息

按照这个思路，我们很容易形成这样的认知：

迭代器就是为了让数据结构能够快捷地遍历而定义的**辅助**对象。

但是别忘了，如果只是在 `while` 循环中使用，是无需用到迭代器的 `__iter__` 方法的：
```
actions = ['点赞', '投币', '取关']
sz_iterator = SuzhiIterator(actions)
while True:
    try:
        print(next(sz_iterator))
    except StopIteration:
        break
```

In [None]:
for iterable in iterables:
    print(iter(iterable))

## 深层的意义
### 现在有两种可迭代对象：
- 容器类型的
  - 列表、元组、字典等
  - 只有 `__iter__` 接口
  - 静态的数据
  - 需要额外的迭代器支持
  - 支持多次迭代
- 迭代器类型
  - 文件、StringlO等
  - 同时实现 `__iter__` 和 `__next__` 接口
  - 动态的
  - 只能迭代一次

In [None]:
## 针对容器类可迭代对象的循环

for x in iterable:      # 背后操作：「可迭代对象」-> 「迭代器」
    pass

此种情况下，只有可迭代对象在前台露脸。

而迭代器是在背后使用默认的方式“悄悄”构建的，没有存在感，并且生命周期是和循环操作本身绑定在一起的。

而一旦迭代器实现了 `__iter__` 方法：

In [None]:
## 针对迭代器的循环

iterator = iter(iterable)       # 迭代器的构建过程远离循环存在

...

for x in iterator:              # 背后操作：「迭代器」->「迭代器」（self）
    pass

现在整个迭代过程只需要迭代器就够了！

迭代器不光是从后台走向了前台，而且直接让可迭代对象远离了循环！

现在，迭代器的构建是在明面上单独完成的，和当前循环操作**解耦**了。

于是乎：
- 一个可迭代对象可以构建出任意多个不同的迭代器
- 一种迭代器可以应用于任意多个可迭代对象（包括其它迭代器）

## 数据管道

如果迭代器不可迭代：

「for 循环」<-「迭代器（隐藏）」<-「可迭代对象」

如果迭代器可以任意的嵌套链接：

「for 循环」<-「迭代器」<-「迭代器」<-...<-「迭代器」<-「可迭代对象」

- 很多个迭代器串联起来，形成一个**处理数据的管道**，或者称为**数据流**
- 在这个管道中，每一次只通过一份数据，避免了一次性加载所有数据
- 迭代器也不仅仅只是按顺序返回数据那么简单了，它开始承担**处理数据**的责任
  - 例如，`SuzhiIterator` 实际实现了部分过滤器和放大器的功能。
- 当通过迭代器获取数据的时候，远离了数据存储，渐渐地开始**不关心数据到底是怎么存储的**。

## 附录：没有定义 `__iter__` 的可迭代对象

并不是所有的可迭代对象都必须定义有 `__iter__` 方法。

如果一个对象没有 `__iter__` 方法，但是定义了 `__getitem__` 方法，同样是可以迭
代的。

因此，不能通过检查 `__iter__` 方法来判断一个对象是否是可迭代的，而是应该直接使用  `iter()` 函数，如果不可迭代，则会抛出 `TypeError` 异常。

In [None]:
class DumbList:
    def __getitem__(self,index):
        if index > 5:
            raise StopIteration
            return index * 2


dl = DumbList()
for x in dl:
    print(x)

In [None]:
class NotIterable:
    pass


ni = NotIterable()
it = iter(ni)

## 什么是生成器
### yield 很特别
- 只能用在函数内
- 在函数内热河地方出现了 `yield` 关键字，哪怕永远无法被执行到，函数都会发生变异

In [None]:
def gen():
    print('hello')
    if 0:
        yield


g = gen()
print(g)

调用函数 `gen()` 没有像普通函数那样执行其中的代码，而是返回了一个 `generator` 对象。

In [None]:
print(type(gen))
print(type(g))

In [None]:
import inspect

# 是函数，也是生成器
print(inspect.isfunction(gen))
print(inspect.isgeneratorfunction(gen))

# 生成器函数不是 generator
print(inspect.isgenerator(gen))     # False
print(inspect.isgenerator(g))       # True

## 术语概念

通常，我们把含有 `yield` 的函数称之为「**生成器函数 generator function**]，把调用生成器函数返回的结果称为「**生成器 generator**」

但是根据 Python 官方文档：
> **generator 生成器**
>
> A function which returns a generator iterator.. —个返回生成器送代器的函数
>
> Usually refers to a generator function, but may refer to a generator iterator in some contexts. In cases where the intended meaning isnt clear, using the full terms avoids ambiguity 通常指的是生成器函数，但是在一定的语境下也可以指代生成器迭代器。为了避免使义，推荐使用完整的术语。

> **generator iterator 生成器迭代器**
>
> An object created by a generator function.
>
> —个由生成器西数创建的对象。

在不会产生岐义的情况下，提到「生成器」可能指的是函数，也可能是函数生成的对象，具体需要根据上下文判断。

为了区分，我一般将「生成器函数」的返回结果称为「生成器对象」。

这两者之间的关系将在稍后的技术内幕中详细解释。

### 生成器是迭代器

既然是迭代器，那么肯定要满足「迭代器协议」：
- `__iter__`，返回迭代器对象自身
- `__next__`，每次返回一个迭代数据，如果没有数据，则要抛出 `StopIteration` 异常

In [None]:
g = gen()
hasattr(g, '__next__') and hasattr(g, '__iter__')

In [None]:
g is iter(g)

## yield

### 语句 or 表达式？
- `yield` 语句
  - PEP 252-Simple Generators
  - Python 2.2
- `yield` 表达式
  - PEP 342-Coroutines via Enhanced Generators
  - Python 2.5

### `yield` 对函数做了什么

`yield` 关键字最根本的作用是改变了函数的性质：
1. 调用生成器函数不是直接执行其中的代码，而是返回一个对象。
2. 生成器函数内的代码，需要通过生成器对象来执行。
从这一点上说，生成器函数的作用和**类(class)**是差不多的。

生成器对象就是迭代器，所以它的运行方式和迭代器是一致的：
- 通过 `next()` 函数来调用
- 每次 `next()` 都会在遇到 `yield` 后返回结果（作为 `next()` 的返回值）
- 如果函数运行结束（即遇到 `return`）则抛出 `StopIteration` 异常

### 简单示例1

In [None]:
# 定义一个生成器函数
def gen_666(meet_yield):
    print('hello')
    if meet_yield:  # 是否遇到 yield 由参数控制
        print('yield !')
        yield 666
        print('back!')
    print('bye.')
    return 'result'

In [None]:
g1 = gen_666(False)     # 不会遇到 yield
x1 = next(g1)
print(x1)

In [None]:
g2 = gen_666(True)      # 会遇到 yield
x2 = next(g2)
print(x2)
next(g2)

### 在循环中使用 `yield`
只遭遇一次 `yield` 语句的生成器就是只能迭代一次的迭代器，通常没什么实用价值。

要想能迭代多次，可以在函数内多次使用 `yield` 语句：

In [None]:
def gen_func() :
    print('--- yield 1 ---')
    yield 1
    print('--- yield 2 ---')
    yield 2
    print('--- yield 3 ---')
    yield 3


for x in gen_func():
    print(x)

相对应地，`yield` 一般也是搭配循环使用的：

In [None]:
# itertools.count 示例
def count(start=0, step=1):
# count(10) --> 10 11 12 13 14
# count(2.5, 0.5) -> 2.5 3.0 3.5
    n =  start
    while True:
        yield n
        n += step

### 生成器的4个状态
- 当调用生成器函数得到生成器对象时
  - 此时的生成器对象可以理解为处于**初始**状态
- 通过 `next()` 调用生成器对象，对应的生成器函数代码开始运行
  - 此时生成器对象处于**运行中**状态
- 如果遇到了 `yield` 语句，`next()` 返回时
  - `yield` 语句右边的对象作为 `next()` 的返回值
  - 生成器在 `yield` 语句所在的位置**暂停**，当再次使用 `next()` 时继续从该位置继续运行
- 如果执行到函数结束，则抛出 `StopIteration` 异常
  - 不管是使用了 `return` 语句显式地返回值，或者默认返回 `None` 值，返回值都只能作为异常的值一并抛出
  - 此时的生成器对象处于**结束**的状态
  - 对于已经结束的生成器对象再次调用 `next()`，直接抛出 `StopIteration` 异常，并且不含返回值

### 用 `yield` 重构迭代器

**和 `class` 定义的迭代器进行对比和**

| 动作  | `class` 实现的迭代器  | `yield` 实现的迭代器  |
|---|---|---|
| 定义迭代器  | `class Iterator:`<br>`def __init__ (self, *args):`  |  `def iter_fun(*args):` |
| 构建迭代器  | `Iterator(args)`  |  `iter_fun(args)` |
| `next(iterator)`  | `def __next__(self): return value`  | `yield value`  |
| `StopIteration`  | `raise StopIteration`  |  `return` |
| `iter(iterator)`  | `def __iter__(self): return self`  |  自动实现 |

### 生成器的三种应用场景：
- 定义一个容器类的可迭代对象，为该对象实现 `__iter__` 接口
- 定义一个处理其它可迭代对象的迭代器
- 定义一个不依赖数据存储的数据生成器

**为数据类实现 `__iter__` 接口**

In [None]:
# 迭代器类
class MyCustomDataIterator:
    def __init__(self, data):
        self.data = data
        self. index = 1

    def __iter__(self):
        return self

    def __next__(self):
        self. index += 1
        if self. index < self. data. size:
            return self.data.get_value (self. index)
        else:
            raise StopIteration
# 可迭代数据类
class MyCustomData:
    # 其余部分代码不重要都略过
    ...
    @property
    def size (self): # 假设可以得到数据的大小
        return self.size

    def get_value (self, index): # 假设可通过索引/按顺序得到数据
        return index

    def __iter__(self):
        return MyCustomDataIterator(self) # 构建迭代器

In [None]:
## 使用 yield 重构代码
class MyCustomData:
    # 其余部分代码不重要都略过
    ...
    @property
    def size (self): # 假设可以得到数据的大小
        return self.size

    def get_value (self, index): # 假设可通过索引/按顺序得到数据
        return index

    def __iter__(self):
        index = -1      # 注意，必须是局部变量
        while index < 2:        # 设定迭代完成条件
            index += 1
            yield self.get_value(index)


mydata = MyCustomData()     # 注意：mydata 是可迭代对象，但不是迭代器

**实现有处理数据能力的迭代器**

In [None]:
BLACKLIST = ['白嫖', '取关']


class SuzhiIterator:
    def __init__(self, actions):
        self. actions = actions
        self. index = 0             # 初始化索引下标

    def __next__(self):
        while self.index < len (self. actions):
            action = self. actions[self. index]
            self.index += 1         # 更新索引下标
            if action in BLACKLIST:
                continue
            elif '币' in action:
                return action * 2
            else:
                return action
        raise StopIteration

    def __iter__(self):
        return self

In [None]:
## 使用 yield 重构代码

BLACKLIST = ['白嫖', '取关']
def suzhi(actions):
    for action in actions:
        if action in BLACKLIST:
            continue
        elif '币' in action:
            yield action * 2
        else:
            yield action


actions = ['点赞', '投币', '收藏']
for x in suzhi (actions) :
    print (x)

**实现一个数据生成器**

In [None]:
# 倒数计时器
class DownCounter:
    def __init__(self, start):
        self. start = start
    
    def __iter__(self):
        return self
    
    def __next__(self):
        if self. start > 0:
            self. start -= 1
            return self.start
        else:
            raise StopIteration()

In [None]:
## 使用 yield 重构代码
def countdown (start):
    while start > 0:
        start -= 1
        yield start

for x in countdown (5):
    print (x)

## 生成器的技术内幕
**请耐心看完，对理解协程非常重要！**
- 生成器函数和普通函数之间的区别
- 生成器对象和生成器函数之间的关系
- 生成器函数可以「暂停」执行的秘密

## 函数的运行机制

### 函数对象和代码对象
每当定义了一个函数之后，就得到了一个「**函数对象**」：

In [None]:
def func():
    x = 1
    print(x)

函数中的代码是保存在「**代码对象 (Code object)**」中的：

In [None]:
func.__code__

代码对象随着函数对象一起创建，是函数对象的一个重要属性。
> Code objects represent byte-compiled executable Python code, or bytecode.

代码对象中重要的属性以 `co_` 开头：

In [None]:
func_code = func.__code__

for attr in dir(func_code):
    if attr.startswith('co_'):
        print(f'{attr}\t: {getattr(func_code, attr)}')

### 函数运行帧
函数对象和代码对象保存了函数的基本信息，当函数运行的时候，还需要一个对象来保存运行时的状态。

这个对象就是 「**帧对象(Frame object)**」。

> Frame objects represent execution frames.

每一次调用函数，都会自动创建一个帧对象，记录当次运行的状态。

In [None]:
import inspect


def foo():
    # 获取到函数的运行帧并返回
    return inspect.currentframe()

f1 = foo()      # 由于被变量所引用，所以帧不会被垃圾回收
f1

In [None]:
# 再调用一次，得到两一个帧
f2 = foo()

## 函数对象，代码对象和帧对象之间的关系
import objgraph
objgraph.show_backrefs(foo.__code__, max_depth=2)

帧对象中重要的属性以 `f_` 开头：
- `f_code`：执行的代码对象
- `f_back`：指向上一个帧，也就是调用者的帧
- `f_1ocals`：局部变量
- `f_globals`：全局变量
- `f_lineno`：当前对应的行号

### 函数运行栈
当一个函数中调用了另一个函数，此时前一个函数还没有结束，所以这两个函数的帧对象是同时存在的。

比如，我们的程序一般都始于一个 `main` 函数，然后又调用其它函数，以此类推。

因此，一个程序的运行期，同时存在很多个帧对象。

函数之间的调用关系是先**执行的后退出**，所以帧对象之间的关系也是**先入后出**，正好以**栈**的形式保存。

因此，函数的运行帧又称为**栈帧**。

注意：一个线程只有一个函数运行栈。

In [None]:
## 展示发生函数调用时的栈
def foo():
    return inspect.currentframe()


def bar():
    return foo()        # 返回 foo 函数运行时的帧对象


f1 = bar()
objgraph.show_refs(f1, max_depth=3, too_many=2)

### 生成器函数有何不同
生成器函数仍然是函数对象，当然也包括了代码对象。

In [None]:
def gen_foo():
    for _ in range(10):
        yield inspect.currentframe()        #  每一次迭代都返回当前帧


objgraph.show_refs(gen_foo, max_depth=1 , too_many=1)

调用生成器函数不会直接运行（也就是说，不像普通函数那样创建帧对象并且压入函数栈），而是得到一个生成器
对象。

那么秘密自然是藏在生成器对象里：

In [None]:
gf = gen_foo()
objgraph.show_refs(gf, max_depth=2, too_many=2)

当每次使用 `next()` 对生成器进行迭代时，都用这个帧对象（`gi_frame`）来保存状态：

In [None]:
## 演示目的：展示生成器迭代的过程中都是同一个 frame 对象

gf = gen_foo()

# 存为变量，不然迭代结束后该属性会清空
gi_frame = gf.gi_frame

# 保存所有迭代结果
frames = list(gf)

print(gf.gi_frame)      # None

for f in frames:
    print(f is gi_frame)

生成器的 frame 对象在暂停状态下看不到调用的关系图：

In [None]:
gf = gen_foo()

# f = next(gf)

objgraph.show_refs(gf.gi_frame, max_depth=1, too_many=1)

想要在生成器运行的过程中观察运行栈的关系图也是可以的：

In [None]:
def gen_frame_graph():
    for _ in range(10):
        # 在运行时把图形生成
        graph = objgraph.show_refs(inspect.currentframe(), max_depth=3)
        yield graph


gfg = gen_frame_graph()

# 定义两个不同名的函数是为了方便观察栈的变化

def func_a(g):
    return next(g)


def func_b(g):
    return next(g)


func_a(gfg)

In [None]:
func_b(gfg)

综上，我们可以总结出：

- 生成器函数并不直接运行，而是借助于生成器对象来间接运行

- 创建生成器对象的同时创建了帧对象，并且由生成器对象保持引用

- 每次使用 `next()` 调用生成器时，就是将生成器引用的帧对象入栈

- 当 `next()` 返回时，也就是代码遇到 yield 暂停的时候，就是将帧出栈

- 直到迭代结束，帧最后一次出栈，并且被销毁

## 同步和异步
### 普通函数：
- 调用函数：构建帧对象并入栈
- 函数执行结束：帧对象出栈并销毀



In [None]:
## 普通函数只能实现同步方式运行多任务

def sync_task_runner():
    task_a()
    task_b()        # 只有等 task_a 执行完毕才可能开始执行

### 生成器函数：
- 创建生成器：构建帧对象
- （多次）通过 `next` 触发执行：帧入栈
- （多次）遇到 `yield`：帧出栈（保留）
- 迭代结束：帧出栈并销毁

In [None]:
## 生成器函数让异步运行任务变得可能

def async_task():
    # step 1 
    yield
    # step 2
    yield
    # step 3
    yield


# 任务队列
all_tasks=[]

# 创建两个任务
async_task_a = async_task()
async_task_b = async_task()

# 都加入到队列
all_tasks.append(async_task_a)
all_tasks.append(async_task_b)


def async_task_runner():
    """一个最简陋的一部任务调度器"""
    for task in all_tasks:
        next(task)

## 从生成器到协程
现在，我们可以更好地理解所谓的 `generator iterator` 是什么了：
**「生成器对象」是一个用来迭代执行「生成器函数」的迭代器**

- 数据的迭代器：针对一个包含很多元素的数据集，逐个返回其中的元素
- 生成器迭代器：针对一个包含很多代码的函数，分段执行其中代码
是不是有异曲同工之妙？

**让一个函数可以多次迭代运行其中的代码**才是生成器对象最最根本的作用，而不仅是字面意思上的*生成数据*的东
西。

迭代产出数据只是迭代执行代码的自然结果而已。

当用生成器来实现迭代器的时候，我们关注的是重点是：`yield <value>` 返回出来的数据。

如果把焦点集中到「被迭代执行的代码」上，就能对生成器有个全新的视角，那就是「**协程**」。

## 协程（yield版）

### 基于生成器的协程
> Generator-based Coroutines
> 
> Deprecated since version 3.8, will be removed in version 3.11: Use `async def` instead.

按照 Python 文档的描述，所谓的「基于生成器的协程」指的是用 `yield from` 创建的生成器，并且还要搭配 `asyncio.coroutine` 装饰器来使用。


### 对比 generaotr 和 coroutine

In [4]:
def generator_func():
    yield


gen = generator_func()

print(gen)
print(sorted(set(dir(gen)) - set(dir(object))))

<generator object generator_func at 0x7ff4f8b73f40>
['__del__', '__iter__', '__name__', '__next__', '__qualname__', 'close', 'gi_code', 'gi_frame', 'gi_running', 'gi_yieldfrom', 'send', 'throw']


In [5]:
async def coroutine_func():
    await coroutine_func()


coro = coroutine_func()

print(coro)
print(sorted(set(dir(coro)) - set(dir(object))))

<coroutine object coroutine_func at 0x7ff4f8b73610>
['__await__', '__del__', '__name__', '__qualname__', 'close', 'cr_await', 'cr_code', 'cr_frame', 'cr_origin', 'cr_running', 'send', 'throw']


### 生成器的增强点
- `yield` 语句
  - PEP 252-Simple Generators
  - Python 2.2
- `yield` 表达式
  - = PEP 342-Coroutines via Enhanced Generators
  - Python 2.5

### `yield` 表达式
PEP 342-Coroutines via Enhanced Generators把 `yield` 关键字升级成了表达式。

所谓**表达式(Expression）**，意味着它可以被解析成一个**值**。



In [None]:
# 赋值给变量
x = yield

# 计算后赋值给变量
y =yield + 1

# yield 可以作为函数入参，但是需要使用（）括起来
print((yield))

### `yield` 表达式如何获取到值？

In [None]:
def show_yield_value():
    x = yield
    print(f'x is {x}')


g = show_yield_value()
next(g)     # 1st time

next(g)     # 2nd time

使用 `next()` 函数来驱动生成器的时候，`yield` 表达式的值总是为 `None`。

为生成器增加一个 `send()` 方法，该方法可以接受一个入参。

`send` 方法顾名思义，将该参数**发送**给生成器，使生成器恢复运行的同时，将该入参作为 **`yield` 表达式的值**。

In [None]:
def show_yield_value():
    print("开始")
    x = yield
    print(f'x is {x}')

    
g = show_yield_value()
# g.send('hello') # 第1次只能是 None
g.send(None)

g.send('hello')     # after 1st time

### 关于 prime
对于刚创建好的生成器，总是需要在第一次的时候 `send(None)` 值，使其运行到 `yield` 的地方暂停，这个步骤术语称为 prime。
> 这里 prime 做动词解的意思：PREPARE SOMEBODY to prepare someone for a situation so that they know what to do <- 使准备好〔应付某个情况〕

> prime 在《流畅的Python》中文版被翻译为「预激」，感觉有点过于专业物口，其实这个步骤后续并没有那么重要（后面会解释），所以我就直接称为/激活」，大家明白是什么意思就好。

### `yield` 表达式的优先级

In [None]:
def add_yield_value():
    # x = yield + 1     # is equivalente to: x = (yield 1)
    x = (yield) + 1
    print(f'x is {x}')


g = add_yield_value()
g.send(None)        # prime

In [None]:
g.send(1)

### `send()` 用法总结
- `send` 是生成器对象的方法
- 对于生成器对象 `g`，`next(g)` 等价于 `g.send(None)`
- 只有当生成器处在「暂停」状态时，才能传入非None的值
- send 方法是为了协程而增加的 API，所以：
  - 如果将生成器视作协程，就应该只用 `send` 方法
  - 如果视作迭代器，就仍用 `next`
所以，后面我们统一都使用 `g.send(None)` 的方式，而不再采用 `next(g)` 的方式。

### 一个极简的 echo

In [None]:
def gen_echo():
    while True:
        print((yield))      # double parentheses

echo = gen_echo()
echo.send(None)

In [None]:
一键三连 = ('点赞', '投币', '收藏')
for action in 一键三连:
    echo.send(action)

In [None]:
echo.send(StopIteration('stop'))        # sending StopIeration into generator can not stop it

### 使用 `close()` 结束生成器

当生成器作为迭代器来用的时候，它的生命周期取决于有多少元素可以迭代。

而当作协程来用的时候，通常可以视作是在执行一个任务，我们希望任务的终止能够变得可控。

新增的 `c1ose` 方法就是用来结束一个协程：

In [None]:
echo.close()

In [None]:
echo.send('hi')

由于 `echo` 协程的内容非常简单，所以可以直接结束。
如果协程的代码比较复杂，它可能需要在结束的时候做一些善后处理，比如释放资源等。
类似于 `StopIteration` 的实现机制，结束协程也是靠异常来实现的：

In [None]:
def gen_echo_v2():
    while True:
        try:
            x = yield
        except GeneratorExit:       # 协程和生成器使用同一种异常结束
            print('exit. bye!')
            return      # 必须要结束
        else:
            print(x)

In [None]:
echo_v2 = gen_echo_v2()
echo_v2.send(None)      # prime 需要激活才能捕捉到异常
echo_v2.close()         # 手动退出

除了显式地调用 `close` 方法，如果生成器对象被垃圾回收，也会自动调用 `close`:

In [None]:
echo_v2 = gen_echo_v2()
echo_v2.send(None)      # prime 需要激活才能捕捉到异常

# del echo_v2
echo_v2 = 123       # 改变对象或者删除生成器对象

### 使用 `throw()` 将异常抛给 `yield`

In [None]:
def gen_echo_v3():
    while True:
        try:
            x = yield
        except GeneratorExit:
            print('exit. bye!')
            return      # 退出循环，避免再一次进入 try
        except KeyboardInterrupt:
            print('按下了 Ctrl-C')
        else:
            print(x)

In [None]:
echo_v3 = gen_echo_v3()
echo_v3.send(None)      # prime

In [None]:
echo_v3.throw(KeyboardInterrupt)        # 没有退出循环，停在了下一个 yield 处

In [None]:
echo_v3.send(2)

In [None]:
echo_v3.throw(RuntimeError)     # 没有 except 的异常,也会造成退出循环

In [None]:
echo_v3.send(2)

### 总结协程的几个功能点

- 例子最初来源：https://mail.python.org/pipermaiL/python-ideas/2009-ApriL/003841.htmL

- 《流畅的 Python》中有一个改写的版本；

- 这里同时借鉴了两者的写法

In [None]:
def coro_averager():
    """计算移动平均值"""
    count = 0
    total = 0
    avg = None
    while True:
        try:
            val = yield avg
        except GeneratorExit:
            return total, count, avg
        else:
            total += val
            count += 1
            avg = total / count

1. 在 `yield` 的位置产出数据
2. 在 `yield` 的位置暂停
3. 在 `yield` 的位置恢复，并接受新的参数
4. 在 `yield` 的位置传入结束信号
5. 在 `yield` 的位置传入其它异常

### 生成器的3种模式
来自「仁慈的独裁者」的权威解释

Python 之父 Guido 在一封邮件里总结道，生成器有3种模式：

> There's the traditional "pull" style (terators), "push" style (like the averaging example), and then there are "tasks"
> 
> --Guido van Rossum 讨论 yield from 的邮件
- pull：特点在于能不断向外产出数据，也就是迭代器
- push：特点在于能不断向内发送数据，比如上一章中的计算移动平均值的例子，是非常早期的协程概念
- task：任务式（是 Asynclo 里的协程）

**pull vs push**


In [None]:
#＃ PuLL式，也就是生成器的数据流
from graphviz import Digraph
pull式的数据流 = Digraph(graph_attr={'rankdir': 'LR'}, node_attr={'shape': 'box'})
# dot.node('可迭代对象')
pull式的数据流.node('A', '迭代器')
pull式的数据流.node('B', '迭代器')
pull式的数据流.node('C', '迭代器')
pull式的数据流.node('D', '迭代器')
# dot.node('for循环')
pull式的数据流.node('a', '...', {'shape': 'plaintext'})
pull式的数据流.edges([('可迭代对象', 'A'), ('AB'), ('Ba'), ('aC'), ('CD'), ('D', 'for循环')])
pull式的数据流

In [None]:
# push 式，也就是经典协程的数据流
push式的数据流1 = Digraph(graph_attr={'rankdir': 'LR'}, node_attr={'shape': 'box'})
push式的数据流1.node('A', '协程')
push式的数据流1.node('B', '协程')
push式的数据流1.node('C', '协程')
push式的数据流1.node('D', '协程')
push式的数据流1.node('a', '...', shape='plaintext')
push式的数据流1.node('b', '...', {'shape': 'plaintext'})
push式的数据流1.edges([('数据源头', 'A'), ('AB'), ('Ba'), ('aC'), ('CD'), ('Db')])
push式的数据流1

In [None]:
## push 式，也就是经典协程的数据流
push式的数据流2 = Digraph(graph_attr={'rankdir': 'LR'}, node_attr={'shape': 'box'})
push式的数据流2.node('A', '协程')
push式的数据流2.node('B', '协程')
push式的数据流2.node('C', '协程')
push式的数据流2.node('D', '协程')
push式的数据流2.node('E', '协程')
push式的数据流2.edges([('数据源头', 'A'), ('AB'), ('AC'), ('AD'), ('BE'), ('CE'), ('DE')])
push式的数据流2

**push vs task**

In [1]:
# 这是 push 式生成器
def coro_averager():
    """计算移动平均值"""
    count = 0
    total = 0
    avg = None
    while True:
        try:
            val = yield avg
        except GeneratorExit:
            return total, count, avg
        else:
            total += val
            count += 1
            avg = total / count

In [None]:
# 这是 task 式生成器
def coro_some_task(n):
    print('doint step 1.')
    yield "1秒钟后回来"     # 出栈前能够传出去的最后消息
    # 1秒钟时间到，继续
    print('doing step 2')
    data = yield "data 可用了再叫我"
    # data 准备好了，继续吧
    return 'done'

**data vs event**

一句话解释：

- pull/push 都是受**数据驱动**的
- task 是受**事件驱动**的

In [None]:
# PuLL风格生成器的伪代码
def pull_style():
    while still_have_data:
        yield data

In [None]:
# push风格生成器的伪代码
def push_style():
    while still_have_data:
        input_data = yield output_data

In [None]:
# task式生成器，也就是协程
def task_style():
    ??? = yield ???

**什么是「event」?**

事件(event)是一个抽象的概念，就是指一件事情发生了。

例如：

要休息3秒钟后继续执行 滴答，滴答，滴答**3秒时间到**，这就是一个事件。

再例如：

要读取网络数据

socket.recv(1024)

如果 socket 还没接受到数据，此时这个调用就会阻塞在这里，直到有数据可读。**socket变得可读**，这就是一个事件。

**event 是如何运作的？**

事件通常都是通过**回调函数 (callback）**来处理的。

In [None]:
#＃设置回调的函数大概是下面这样子

#3 秒后调用 func
call_later(3, func)

# 当 socket 可读时调用 read
register(sock, selectors.EVENT_READ, read)

**灵魂发问**
- 为什么要**让出**（`yield`）执行权（也就是出栈）？
  - 遇到什么样的事件需要 `yield`?
  - 在出栈前该如何设置事件（回调）？
- 凭什么能**恢复**执行（也就是入栈）？
  - 是谁促成了事件的发生？
  - 是谁（感知到了事件的发生）让出栈的协程再次入栈（也就是说，谁来调用 send）？
PS:大家不都说有了协程不用写回调函数了嘛，这又是怎么回事？

## 协程（`yield from` 版）

### `yield from` 的入门例子

绝大多数教程，包括《流畅的Python》这本书中，关于 `yield from` 的基础例子都非常糟糕。

基本上都是以一个 `for` 循环开场。

然后各种新术语安排上，什么委派生成器，子生成器等等。。

一通操作下来让人满脸问号jpg，费这么大周折难道就是为了少写一层循环？

In [None]:
## yield from 的用法很简单
RESULT = yield from EXPR

In [None]:
##以下代码是 RESULT = yield from EXPR
#＃ 的等价写法
#＃ 来自：https://peps.python.org/pep-0380/

_i = iter(EXPR)
try:
    _y = next(_i)
except StopIteration as _e:
    _r = _e.value
else:
    while 1:
        try:
            _s = yield _y
        except GeneratorExit as _e:
            try:
                _m = _i.close
            except AttributeError:
                pass
            else:
                _m()
            raise _e
        except BaseException as _e:
            _x = sys.exc_info()
            try:
                _m = _i.throw
            except AttributeError:
                raise _e
            else:
                try:
                    _y = _m(*_x)
                except StopIteration as _e:
                    _r = _e.value
                    break
        else:
            try:
                if _s is None:
                    _y = next(_i)
                else:
                    _y = _i.send(_s)
            except StopIteration as _e:
                _r = _e.value
                break
RESULT = _r

In [None]:
## 部分简化后的结果

_i = iter(EXPR)     # __iter__ -> __await__
try:
    _y = _i.send(None) # prime
except StopIteration as _e: # 直接就结束了，一次 yield 都没遇上
    _r = _e.value
else:
    while 1: # 不遇到 StopIteration 不算完
        try:
            _s = yield _y # 照原样 yield 出去，并接受 send 传入的值
        except GeneratorExit as _e: # 处理 close
            _i.close()
            raise _e
        except BaseException as _e: # 处理其它异常
            _x = sys.exc_info()
            try:
                _y = _i.throw(*_x)
            except StopIteration as _e:
                _r = _e.value
                break
        else:
            try:
                _y = _i.send(_s) # 接受到的值原样再 send 下去
            except StopIteration as _e:
                _r = _e.value
                break
RESULT = _r # StopIteration 带出来的值就是结果

In [None]:
## 已知总是会 send(None)，继续简化后的结果
## 再去掉异常处理部分代码

_i = iter(EXPR)     # __iter__ -> __await__
while 1:
    try:
        _y = _i.send(None)      # 总是 None,  也就无所谓 prime
    except StopIteration as _e: # 直接就结束了，一次 yield 都没遇上
        _r = _e.value
        break
    else:
        yield _y # 照原样 yield 出去，并接受 send 传入的值


RESULT = _r # StopIteration 带出来的值就是结果

### 定义一个任务

**一个同步模式的简单任务**

In [1]:
def one_task():
    """一个任务"""
    print(f'begin task')
    ...     # other steps
    print(f'    begin big_step:')

    big_result = big_step()     # <---

    print(f'    end big_step with {big_result}')
    ...     # other steps

    print(f'end task')


def big_step():
    ...     # other steps
    print(f'        begin small_step:')

    small_result = small_step()     # <---

    print(f'        end small_step with {small_result}')
    ...     # other steps
    return small_result * 1000


def small_step():
    print('         hard working...')
    return 123      # done


# run task
one_task()

begin task
    begin big_step:
        begin small_step:
         hard working...
        end small_step with 123
    end big_step with 123000
end task


**遇到阻塞了**

In [2]:
from time import sleep


def small_step():
    print('         have a rest, come back soon')
    sleep(2)
    print('         hard working...')
    return 123


one_task()

begin task
    begin big_step:
        begin small_step:
         have a rest, come back soon
         hard working...
        end small_step with 123
    end big_step with 123000
end task


**听说 `yield` 变协程可以不阻塞**

In [3]:
from time import sleep


def small_step():
    print('         have a rest, come back soon')
    yield sleep(2)
    print('         hard working...')
    return 123



In [4]:

one_task()

begin task
    begin big_step:
        begin small_step:
        end small_step with <generator object small_step at 0x10dbe0a50>


TypeError: unsupported operand type(s) for *: 'generator' and 'int'

In [5]:
def big_step():
    ...     # other steps
    print(f'        begin small_step:')

    # small_result = small_step()     # <---
    small_coro = small_step()
    while True:
        try:
            x = small_coro.send(None)
        except StopIteration as e:
            small_result = e.value
            break
        else:
            pass        # deal with x

    print(f'        end small_step with {small_result}')
    ...     # other steps
    return small_result * 1000


one_task()

begin task
    begin big_step:
        begin small_step:
         have a rest, come back soon
         hard working...
        end small_step with 123
    end big_step with 123000
end task


**将阻塞从下游传到上游**

In [6]:
from time import sleep
import time


def small_step():
    print('         have a rest, come back soon')
    t1 = time.time()
    yield sleep,2
    assert time.time() - t1 > 2, "睡眠时间不足"
    print('         hard working...')
    return 123

one_task()

begin task
    begin big_step:
        begin small_step:
         have a rest, come back soon


AssertionError: 睡眠时间不足

In [7]:
def big_step():
    ...     # other steps
    print(f'        begin small_step:')

    # small_result = small_step()     # <---
    small_coro = small_step()
    while True:
        try:
            x = small_coro.send(None)
        except StopIteration as e:
            small_result = e.value
            break
        else:
            yield x

    print(f'        end small_step with {small_result}')
    ...     # other steps
    return small_result * 1000



In [8]:

one_task()

begin task
    begin big_step:
    end big_step with <generator object big_step at 0x10deab220>
end task


In [9]:
def one_task():
    """一个任务"""
    print(f'begin task')
    ...     # other steps
    print(f'    begin big_step:')

    # big_result = big_step()     # <---
    big_coro = big_step()

    while True:
        try:
            x = big_coro.send(None)
        except StopIteration as e:
            big_result = e.value
            break
        else:
            func, arg = x
            func(arg)

    print(f'    end big_step with {big_result}')
    ...     # other steps

    print(f'end task')


In [10]:

one_task()

begin task
    begin big_step:
        begin small_step:
         have a rest, come back soon
         hard working...
        end small_step with 123
    end big_step with 123000
end task


**阶段总结1**
- 协程白己并不能消除阻塞
- 协程具有传染性
- 协程通过 `yield` 把阻塞换个方式传递给了上游
- 最终阻銮仍然斋要被解决

**`yield from`来帮忙**

In [11]:
def big_step():
    ...     # other steps
    print(f'        begin small_step:')

    small_result = yield from small_step()

    print(f'        end small_step with {small_result}')

    ...     # other steps
    return small_result * 1000



In [12]:
one_task()

begin task
    begin big_step:
        begin small_step:
         have a rest, come back soon
         hard working...
        end small_step with 123
    end big_step with 123000
end task


**阶段总结2**

为了方便讨论：

我们将最未端的遇到阳寒而不得不主动 vield 的协程称之为「主动协程

中间接受到下游的传导而不得不跟随者 yield 的协程称之为「被动协程」
- 「主动协程|是最先出栈的位置
- 「被动协程」可能有很多层
- `yield from` 大大简化了「被动协程」的编码

**`yield from`一统江湖**

In [16]:
from time import sleep
import time


def small_step():
    print('         have a rest, come back soon')
    t1 = time.time()
    yield from YieldFromaAble((sleep,2))
    assert time.time() - t1 > 2, "睡眠时间不足"
    print('         hard working...')
    return 123


In [15]:
class YieldFromaAble:
    def __init__(self, obj) -> None:
        self.value = obj

    def __iter__(self):
        yield self.value


In [17]:
one_task()


begin task
    begin big_step:
        begin small_step:
         have a rest, come back soon
         hard working...
        end small_step with 123
    end big_step with 123000
end task


**阶段总结3**

通过一个`YieldFromable`对象，将最末端的`yield`进行封装，把协程的调用方式统一成了`yield from`。

**将任务彻底协程化**

In [18]:
def one_task():
    """一个任务"""
    print(f'begin task')
    ...     # other steps
    print(f'    begin big_step:')

    big_result = yield from big_step()

    print(f'    end big_step with {big_result}')
    ...     # other steps

    print(f'end task')


In [19]:
one_task()

<generator object one_task at 0x10e156570>

**一个通用的任务驱动器**

In [22]:
class YieldFromaAble:
    def __init__(self, obj) -> None:
        self.value = obj

    def __iter__(self):
        yield self


In [25]:
class Task:
    def __init__(self, coro) -> None:
        self.coro = coro

    def run(self):
        print('----------')
        while True:
            try:
                x = self.coro.send(None)
            except StopIteration as e:
                result = e.value
                break
            else:
                assert isinstance(x, YieldFromaAble)
                func, arg = x.value
                func(arg)
        print('----------')

In [26]:
t = Task(one_task())
t.run()

----------
begin task
    begin big_step:
        begin small_step:
         have a rest, come back soon
         hard working...
        end small_step with 123
    end big_step with 123000
end task
----------


**完成了整个任务的协程化改造**

In [27]:
def one_task():
    """一个任务"""
    print(f'begin task')
    ...     # other steps
    print(f'    begin big_step:')

    big_result = yield from big_step()

    print(f'    end big_step with {big_result}')
    ...     # other steps

    print(f'end task')


In [28]:
def big_step():
    ...     # other steps
    print(f'        begin small_step:')

    small_result = yield from small_step()

    print(f'        end small_step with {small_result}')

    ...     # other steps
    return small_result * 1000



In [29]:
from time import sleep
import time


def small_step():
    print('         have a rest, come back soon')
    t1 = time.time()
    yield from YieldFromaAble((sleep,2))
    assert time.time() - t1 > 2, "睡眠时间不足"
    print('         hard working...')
    return 123


In [30]:
t = Task(one_task())
t.run()

----------
begin task
    begin big_step:
        begin small_step:
         have a rest, come back soon
         hard working...
        end small_step with 123
    end big_step with 123000
end task
----------
