In [None]:
# 由于需要测试 niginx 环境，因此需要在本地环境运行相关的程序
!mkdir ch18
!mkdir ch18/downloads
!touch ch18/__init__.py

## 18.0 序论

- 并发是指一次处理多件事
- 并行是指一次做多件事
- `asyncio` 包，使用事件循环驱动的协程实现并发

## 18.1 线程和协程对比

###### 示例 18-1  spinner_thread.py: 通过线程以动画的形式显示文本式旋转指针5

In [4]:
%%writefile ch18/spinner.py
import itertools
import threading
import sys
import time


class Signal:
  """
  用于从外部控制协程
  """
  go = True


def spin(msg, signal):
  """
  在单独的线程中运行的函数
  """
  write, flush = sys.stdout.write, sys.stdout.flush
  for char in itertools.cycle('|/-\\'):  # 无限循环
    status = char + ' ' + msg
    write(status)
    flush()
    write('\x08' * len(status))  # 使用退格将光标移回之前的位置
    time.sleep(.1)
    if not signal.go:  # 如保 go 属性不再为 True，则退出循环
      break
  write(' ' * len(status) + '\x08' * len(status))  # 使用空格清除状态，将光标移回开头


def slow_function():
  """
  假设等待 IO 一段时间
  """
  time.sleep(3)
  return 42


def supervisor():
  """
  设置从属线程，显示线程对象，运行耗时计算，最后杀死线程
  """
  signal = Signal()
  spinner = threading.Thread(target=spin, args=('thinking!', signal))  # 设置从属线程
  print('spinner object:', spinner)  # 显示从属线程对象
  spinner.start()  # 启动从属线程
  result = slow_function()  # 阻塞主线程，从属线程以动画的形式显示旋转指针
  signal.go = False  # 改变 signal 的状态；此会终止 spin 函数中的 for 循环
  spinner.join()  # 等待 spinner 线程结束
  return result


def main():
  result = supervisor()
  print("Answer", result)


if __name__ == "__main__":
  main()

Overwriting ch18/spinner.py


In [5]:
!python ch18/spinner.py

spinner object: <Thread(Thread-1, initial)>
| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!           Answer 42


###### 示例 18-2 spinner_asyncio.py: 通过协程以动画形式显示文本式旋转指针

In [6]:
%%writefile ch18/spinner_asyncio.py
import asyncio
import itertools
import sys


async def spin(msg):  # 使用 async 和 await 代替 @async.coroutine 和 yield from
  write, flush = sys.stdout.write, sys.stdout.flush
  for char in itertools.cycle('|/-\\'):
    status = char + ' ' + msg
    write(status)
    flush()
    write('\x08' * len(status))
    try:
      await asyncio.sleep(.1)  # 如此休眠不会阻塞事件循环
    except asyncio.CancelledError:  # 如果抛出此错误，说明发出了取消请求，需退出循环
      break
  write(' ' * len(status) + '\x08' * len(status))


async def slow_function():
  """
  假装等待　I/O 一段时间
  """
  await asyncio.sleep(3)  # 把控制权交给主循环，在休眠结束后恢复这个协程
  return 42


async def supervisor():
  spinner = asyncio.Task(spin('thinking!'))  # 排定　spin 的运行时间，使用一个　Task　对象包装　spin　协程，并立即返回
  print('spinner object: ', spinner)  # 显示　Task 对象
  result = await slow_function()
  spinner.cancel()  #　Task 对象可以取消;取消后会在协程当前暂停的 yield 处抛出　asyncio.CancelledError 异常。协程可以捕获这个异常,也可以延迟取消,甚至拒绝取消
  return result


def main():
  loop = asyncio.get_event_loop()  # 获取事件的循环引用
  result = loop.run_until_complete(supervisor())  # 驱动　supervisor 协程，让其运行完毕
  loop.close()
  print('Answer: ', result)


if __name__ == '__main__':
  main()


Overwriting ch18/spinner_asyncio.py


In [7]:
!python ch18/spinner_asyncio.py

spinner object:  <Task pending coro=<spin() running at ch18/spinner_asyncio.py:6>>
| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!- thinking!\ thinking!| thinking!/ thinking!           Answer:  42


### 两个示例中 supervisor 函数的对比

- 线程版的 `supervisor` 函数

```python
def supervisor():
  """
  设置从属线程，显示线程对象，运行耗时计算，最后杀死线程
  """
  signal = Signal()
  spinner = threading.Thread(target=spin, args=('thinking!', signal))  # 设置从属线程
  print('spinner object:', spinner)  # 显示从属线程对象
  spinner.start()  # 启动从属线程
  result = slow_function()  # 阻塞主线程，从属线程以动画的形式显示旋转指针
  signal.go = False  # 改变 signal 的状态；此会终止 spin 函数中的 for 循环
  spinner.join()  # 等待 spinner 线程结束
  return result
```

- 异步版的 `supervisor` 协程

```python
async def supervisor():
  spinner = asyncio.Task(spin('thinking!'))  # 排定　spin 的运行时间，使用一个　Task　对象包装　spin　协程，并立即返回
  print('spinner object: ', spinner)  # 显示　Task 对象
  result = await slow_function()
  spinner.cancel()  #　Task 对象可以取消;取消后会在协程当前暂停的 yield 处抛出　asyncio.CancelledError 异常。协程可以捕获这个异常,也可以延迟取消,甚至拒绝取消
  return result
```

- 主要区别
  - `asyncio.Task` 对象差不多与 `threading.Thread` 对象等效
    - `Task` 对象像是实现协作式多任务的库(如 gevent) 中的绿色线程(green thread)
  - `Task` 对象用于驱动协程，`Thread` 对象用于调用可调用对象
  - `Task` 对象已经排定了运行时间；`Thread` 实例则必须调用 `start` 方法，明确告知让它运行
  - 在线程中，`slow_function` 函数是普通的函数，直接由线程调用。在异步版 `supervisor` 函数中，`slow_function` 函数是协程，由  `await` 驱动
  - 没有 API 能从外部终止线程,因为如果线程随时可能被中断,可能导致系统处于无效状态。如果想终止 `Task`,可以使用 `Task.cancel()` 实例方法,在协程内部抛出 `CancelledError` 异常。协程可以在暂停的 `await` 处捕获这个异常,处理终止请求
  - `supervisor` 协程必须在 `main` 函数中由 `loop.run_until_complete` 方法执行

- 中断对线程和协程的影响
  - 线程
    - 因为调度程序在任何时候都能中断线程。所以必须记住保留锁，去保护程序中重要的部分，防止多少操作在执行的过程中中断，防目数据处于无效状态
  - 协程
    - 协程默认会做好全方位的保护，以防止中断
      - 协程必须显示产出才能让程序的余下部分运行
      - 无需保留锁，因为协程自身就会同步，在任意时刻只有一个协程运行
        - 想交出控制权时，可通过 `await` 语句把控制权交还给调度程序
      - 协程可以安全的被取消，因为协程只会在暂停的 `await` 处取消
        - 因此可以处理 `CancelledError` 异常，执行清理操作

### 18.1.1 `asyncio.Future`：故意不阻塞

- `ayncio.Task` 是 `asyncio.Future` 类的子类，用于包装协程
- `aysncio.Future.result()` 方法没有参数，也不会阻塞，如果调用其时， `future` 还没有运行完毕，则会抛出 `asyncio.InvalidStateError` 异常
- 可以用 `await` 来处理 `future`， `await` 会把控制权还给事件循环
  - `future` 中的延迟操作完成后， 事件会设置 `future` 的返回值，`await` 则在暂停的协程中生成返回值，恢复执行协程
- `asyncio.Future` 对象一般由 `await` 驱动，而不是靠调用 `.done`、`add_done_callback()`、 `.result()` 这些方法驱动。

## 18.2 使用 asyncio 和 aiohttp 包下载

- 在 `asyncio` 中，基本的操作流程为：
  - 在一个单线程程序中使用主循环依次激活队列里的协程
  - 各个协程向前执行几步，然后将控制权让给主循环
  - 主循环再激活队列里的下一个协程
- `asyncio` 包的 API 使用时，需要注意以下细节
  - 编写的协程链条始终通过最外层的委派生成器传给 `asyncio` 包 API 中的某个函数(如 `loop.run_until_complete()` 驱动)
  - 编写的协程链条最终通过 `await` 把职责委托给 `asyncio` 包中的某个协程函数或协程方法，或其它库中实现了高层协议的协程
    - 即，最内层的子生成器是库中真正执行 I/O 操作的函数，而不是自己编写的函数
- 使用 `asyncio` 包时，编写的异步代码中包含由 `asyncio` 本身驱动的协程（即委派生成器）， 而生成器最终把职责委派给了`asyncio` 或第三方库(如 `aiohttp` )中的协程
  - 这种处理方式相当于架起了管道，让 `asyncio` 事件循环(通过我们编写的协程)驱动执行低层异步 I/O 操作的库函数

 ###### 示例 18.5 flags_asyncio.py: 使用 asyncio 和 aiohttp 包实现的异步下载脚本

In [None]:
!pip install aiohttp requests

In [26]:
%%writefile ch18/flags_asyncio.py
import asyncio
import os
import time

import aiohttp  # 需要额外安装，因为其不在标准库中

from ch17.flags import BASE_URL, save_flag, show, POP20_CC

DEST_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)),'downloads/')


async def get_flag(session, cc):
  url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
  async with session.get(url) as resp: # 阻塞的操作通过协程实现，原书中的代码不再起作用，本处通过 session 客户端来解决
    return await resp.read()  # 读取响应内容是一条单独的异步操作


async def download_one(session, cc):
  image = await get_flag(session, cc)
  show(cc)
  # Python 在两个模块之间并不共享全局变量，需要以参数的形式传入
  save_flag(image, cc.lower() + '.gif', DEST_DIR)
  return cc


async def download_many(cc_list):
  async with aiohttp.ClientSession() as session:  # 参考：https://docs.aiohttp.org/en/stable/client_reference.html
    to_do = [download_one(session, cc) for cc in sorted(cc_list)]  # 构建一个生成器对象列表
    wait_coro = asyncio.wait(to_do)  # 一个协程，不是阻塞型函数，等待传给它的所有协程运行完毕后结束
    done, _ = await wait_coro
  return len(done)


def main():
  t0 = time.time()
  loop = asyncio.get_event_loop()  # 获取事件循环底层实现的引用
  count = loop.run_until_complete(download_many(POP20_CC))
  elapsed = time.time() - t0
  msg = '\n{} flags downloaded in {:.2f}s'
  print(msg.format(count, elapsed))


if __name__ == '__main__':
  main()

Overwriting ch18/flags_asyncio.py


In [27]:
!python -m ch18.flags_asyncio

FR BR MX ET TR JP CD VN DE PK RU US BD EG IN CN PH ID NG IR 
20 flags downloaded in 2.02s


- `asyncio.wait()` 的参数是一个由 `future` 或协程构成的可迭代对象； `wait` 会分别把各个协程包装进一个 `Task` 对象
  - `wait` 是协程函数，可以使用 await 进行驱动，其运行结束后返回一个元组，第一个元素是一系列结束的 `future`，第二个元素是一系列未结束的 `future`

## 18.3 避免阻塞型调用

- 表： 现代电脑从不同的存储介质中读取数据的延迟情况

| 存储介质  | CPU 周期      | 按比例换算为“人类时间” |
|-------|-------------|--------------|
| L1 缓存 | 3           | 3 秒          |
| L2 缓存 | 14          | 14 秒         |
| RAM   | 250         | 250秒         |
| 硬盘    | 41 000 000  | 1\.3 年       |
| 网络    | 240 000 000 | 7\.6 年       |


- 有两种方法能避免阻塞型调用中止整个应用程序进程
  - 在单独的线程中运行各个阻塞型操作
    - 在操作系统中，线程消耗的内存达兆字节。如果要处理几千个的连接，而每个链接都使用一个线程，负担不起
  - 把每个阻塞型操作转换为非阻塞的异步调用
    - 为了降低内存消耗，通常使用回调实现异步调用
      - 类似于硬件中断
      - 使用回调时，不等待响应，而是注册一个函数，在发生某件事时调用
        - 回调简单，消耗低
      - 只有异步应用程序底层的事件循环能依靠基础设置的中断、线程、轮询和后台进程等，确保多个并发请求能取得进展并最终完成，这样才能使用回调
    - 把生成器当做协程使用是异步编程的另一种方式
      - 对事件循环来说，调用回调与在暂停的协程上调用 `.send()` 方法的效果差不多
      - 各个暂停的协程需要消耗内存，但是比线程消耗的内存量级小，而且协程能避免可怕的“回调地狱“

## 18.4 改进 asyncio 下载脚本

### 18.4.1 使用 `asyncio.as_complete` 函数

###### flags2_asyncio.py:脚本的前半部分;余下的代码在示例 18-8 中

In [41]:
%%writefile ch18/flags2_asyncio.py
import asyncio
import collections
import os

import aiohttp
from aiohttp import web
import tqdm

from ch17.flags2_common import main, HTTPStatus, Result, save_flag

DEST_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)),'downloads/')
# 默认设为较小的值，防止远程网站出错
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

class FetchError(Exception):
  """
  自定义异常，用于包装其他 HTTP 或网络异响，并获取 country_code，以便报告错误
  """
  def __init__(self, country_code):
    self.country_code = country_code

  
async def get_flag(session, base_url, cc): 
  url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
  async with session.get(url) as resp:
    if resp.status == 200:
      return await resp.read()
    elif resp.status == 404:
      raise web.HTTPNotFound()
    else:
      resp.raise_for_status()


async def download_one(session, cc, base_url, semaphore, verbose):
  """
  semaphore 是 asyncio.Semaphore 类的实例，Semaphore 类是同步装置，用于限制并发请求数量
  """
  try:
    with await semaphore:  # 在 await 中将 semaphore 当做上下文管理器使用，防止阻塞整个系统，如果 semaphore 超过所允许的最大值，则只有这个协程会阻塞
      image = await get_flag(session, base_url, cc)  # 退出此 with 语句后，semaphore 计数器会递减，解除阻塞可能等待同一个 semaphore 对象的其它协程实例
  except web.HTTPNotFound:
    status = HTTPStatus.not_found
  except Exception as exc:
    raise FetchError(cc) from exc  # raise X from Y, 链接原来的异常： https://www.python.org/dev/peps/pep-3134/
  else:
    save_flag(image, cc.lower() + '.gif', DEST_DIR)
    status = HTTPStatus.ok
    msg = 'OK'
  if verbose and msg:
    print(cc, msg)
  
  return Result(status, cc)

Overwriting ch18/flags2_asyncio.py


###### 示例 18-8 flags2_asyncio.py:接续示例 18-7

In [42]:
%%writefile -a ch18/flags2_asyncio.py



async def downloader_coro(cc_list, base_url, verbose, concur_req):
  """
  类似于 download_many，但是是协程函数，不能直接调用
  """
  counter = collections.Counter()
  semaphore = asyncio.Semaphore(concur_req)  # 设置最多允许激活的协程数目

  # 根据文档 https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request，一个程序最好只使用一个 Session
  # setting the client to tell the server to close the connection after each request https://github.com/aio-libs/aiohttp/issues/850#issuecomment-471663047
  async with aiohttp.ClientSession(headers={"Connection": "close"}) as session:  
    to_do = [download_one(session, cc, base_url, semaphore, verbose)
            for cc in sorted(cc_list)]  # 创建一个协程对象列表
    to_do_iter = asyncio.as_completed(to_do)  # 获取一个迭代器，此迭代器会在  funture 运行结束后返回 future
  
    if not verbose:
      to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))  # 将迭代器传给 tqdm，以显示进度条
    for future in to_do_iter:  # 迭代运行结束的 future
      try:
        res = await future  # 获取 future 的运行结果
      except FetchError as exc:
        country_code = exc.country_code  # 获取国家代码 
        try:
          error_msg = 'HTTP error {resp.status} - {resp.message}'
          error_msg = error_msg.format(resp=exc.__cause__)
        except Exception:  # 如果异常中没有消息，使用链接异常的类名作为消息
          error_msg = exc.__cause__.__class__.__name__  
        if verbose and error_msg:
          msg = '*** Error for {}: {}'
          print(msg.format(country_code, error_msg))
        status = HTTPStatus.error
      else:
        status = res.status
      counter[status] += 1
  return counter


def download_many(cc_list, base_url, verbose, concur_req):
  loop = asyncio.get_event_loop()
  coro = downloader_coro(cc_list, base_url, verbose, concur_req)
  counts = loop.run_until_complete(coro)
  loop.close()

  return counts


if __name__ == '__main__':
  main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)


Appending to ch18/flags2_asyncio.py


In [43]:
!python -m ch18.flags2_asyncio -s DELAY -al 100 -m 100

DELAY site: http://localhost:8002/flags
Searching for 100 flags: from AD to LK
100 concurrent connections will be used.
100%|█████████████████████████████████████████| 100/100 [00:02<00:00, 47.72it/s]
--------------------
100 flags downloaded.
Elapsed time: 2.10s


- `asynci.Semaphore` 对象维护着一个内部计时器
  - 若在对象上调用 `.acquire()` 协程方法，计数器则递减
  - 若在对象上调用 `.release()` 协程方法，则计数器递增
  - 计数器初始化可通过以下语句进行

    ```python
    semaphore = asyncio.Semaphore(concur_req)
    ```
  - 如果计数器大于零，则调用 `.acquire()` 方法时不会阻塞
  - 如果计数器为零，则 `.acquire()` 方法会阻塞调用这个方法的协程，直到其它协程在同一个 `Semaphore` 对象上调用 `.release()` 方法，让计数器递增
  - 实例使用时，可以将 `Semaphore` 当做上下文管理器使用，其会自动处理 `.acquire()` 和 `.release()` 方法
    ```python
    with await semaphore:
      image = await get_flag(base_url, cc)
    ```

- 由于 `asyncio.as_completed` 函数返回的 `future` 与传给 `as_complete` 函数的 `future` 可能不同。因此，不能再以 `future` 为键来获取国家代码
  - 因此自定义 `FetchError` 来包装网络异常，并关联相应的国家代码

### 18.4.2 使用 `Executor` 对象，防止阻塞事件循环

- 访问本地文件系统会阻塞
  - `save_flag` 函数阻塞了客户代码与 `asyncio` 函数共用的唯一线程
  - 因此，保存文件时，整个应用程序都会被冻结
  - 解决方法是使用事件循环对象的 `run_in_executor` 方法

###### 示例 18-9 flags2_asyncio_executor.py:使用默认的 `ThreadPoolExecutor` 对象运行 `save_flag` 函数

In [33]:
%%writefile ch18/flags2_asyncio_executor.py
import asyncio
import os
import concurrent
import sys

from aiohttp import web

from ch17.flags2_common import main, HTTPStatus, Result, save_flag
from ch18.flags2_asyncio import FetchError, get_flag, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ, downloader_coro

DEST_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)),'downloads/')


async def download_one(session, cc, base_url, semaphore, verbose):
  try:
    with (await semaphore):
      image = await get_flag(session, base_url, cc)
  except web.HTTPNotFound:
    status = HTTPStatus.not_found
    msg = 'not found'
  except Exception as exc:
    raise FetchError(cc) from exc
  else:
    loop = asyncio.get_event_loop()  # 获取事件循环的对象引用
    loop.run_in_executor(None,  # 第一个参数是 Executor 实例，如果为 None，则使用事件循环的默认值，也可传入 concurrent.futures 中的 ThreadPoolExecutor 与 ProcessPoolExecutor
                           save_flag, image, cc.lower() + '.gif', DEST_DIR)  # 余下的参数是可调用对象，以及可调用对象的位置参数
    status = HTTPStatus.ok
    msg = 'OK'
  if verbose and msg:
    print(cc, msg)

  return Result(status, cc)


def download_many(cc_list, base_url, verbose, concur_req):
  loop = asyncio.get_event_loop()
  executor = concurrent.futures.ThreadPoolExecutor(max_workers=12)  # 需要按此处方式进行设置，不然会报错
  loop.set_default_executor(executor)  # 设置默认的 executor

  coro = downloader_coro(cc_list, base_url, verbose, concur_req)
  counts = loop.run_until_complete(coro)


  executor.shutdown(wait=True)
  loop.close()
  return counts


# 用新定义的函数替换相关模块中的函数
sys.modules['ch18.flags2_asyncio'].download_one = download_one


if __name__ == '__main__':
  main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)


Overwriting ch18/flags2_asyncio_executor.py


- 为了达到明显的测试效果，可以在 `save_flag` 中加入 `time.sleep()` 延时模块来模拟阻塞

In [34]:
!python -m ch18.flags2_asyncio_executor -s DELAY -al 100 -m 100

DELAY site: http://localhost:8002/flags
Searching for 100 flags: from AD to LK
100 concurrent connections will be used.
100%|█████████████████████████████████████████| 100/100 [00:02<00:00, 47.65it/s]
--------------------
100 flags downloaded.
Elapsed time: 2.10s


 ## 18.5 从回调到 future 和协程             

- "回调地狱"
  - 如果一个操作需要依赖之前操作的结果，那就得嵌套回调
  - 如果要连续做 3 次异步调用，就需要嵌套 3 层回调

###### 示例 18-10 JavaScript 中的回调地狱：嵌套匿名函数，也称为灾难金字塔

```javascript
api_call1(request1, function (response1) {
  // 第一步
  var request2 = step1(response1);
  api_call2(request2, function (response2) {
    // 第二步
    var request3 = step2(response2);
    api_call3(request3, function (response3) {
      // 第三步
      step3(response3);
    });
  });
});
```

- `api_call1`、 `api_call2` 和 `api_call3` 库函数，用于异步获取结果
  - 如 `api_call1` 从数据库中获取结果， `api_call2` 从 Web 服务器中获取结果
  - 在 JavaScript 中，回调通过匿名函数来实现
  - `step1`、`step2` 和 `setp3` 是应用程序中的常规函数，用为处理回调收到的响应

###### 示例 18-11 Python 中的回调地狱:链式回调

```python
def stage1(response1):
  request2 = step1(response1)
  api_call2(request2, stage2)


def stage2(response2):
  request3 = step2(response2)
  api_call3(request3, stage3)

def stage3(response3):
  step3(response3)

  
api_call1(request1, stage1)
```

- 按照回调的方式组织的代码，难以阅读，也更难编写
- 每个函数做一部分工作，设置下一个回调，然后返回，让事件循环继续运行
  - 如此，所有的本地上下文都会丢失
  - 如果要在 `stage2` 中使用 `request2`，则必须依靠闭包或把它储存在外部数据结构中，以便在处理过程的不同阶段使用
- 如果在处理 `api_call2(request2, stage2)` 调用时抛出了 I/O 异常，则这个异常无法在 `stage1` 函数中捕获，因为 `api_call2` 是异步调用
  - 在基于回调的 API 中，为了解决这个问题，需要为每个异步调用注册两个回调
    - 一个用于处理操作成功时返回的结果
    - 一个用于处理错误
  - 即一旦涉及到错误处理，回调地狱的危害程序就会迅速增大

###### 示例 18-12 使用协程和 yield from 结构做异步编程,无需使用回调

```python
async def three_stages(request1):
  response1 = await api_call1(request1)
  # 第一步
  request2 = step1(response1)
  response2 = await api_call2(request2)
  # 第二步
  request3 = step2(response2)
  response3 = await api_call3(request3)
  # 第三步
  step3(response3)


loop.create_task(three_stages(request1))  # 必须显式调度执行
```

- 操作的 3 个步骤依次写在同一个函数中，后续处理便于使用前一步的结果；而且提供了上下文，能通过异常来报告错误
- 在故障处理方面，可以将异步调用 `api_call1`, `api_call2` 和 `api_call3` 的 `await` 表达式放在 `try/except` 块中进行异常处理
- 只要有 `async` 和 `await`，函数就会变为协程，而协程不能直接调用，而需要通过 `loop.create_task()` 来驱动

### 每次下载可以发起多个请求

- 下载每面国旗时发起两个请求
  - 一个请求用于获取国旗
  - 另一个请求用于获取图像所在目录里的 `metadata.json` 文件，以从中获取国家名称

###### 示例 18-13 flags3_asyncio.py:再定义几个协程,把职责委托出去,每次下载国旗时发起两次请求

In [37]:
%%writefile ch18/flags3_asyncio.py

import asyncio
import concurrent
import os
import sys

import aiohttp
from aiohttp import web

from ch17.flags2_common import main, HTTPStatus, Result, save_flag
from ch18.flags2_asyncio import FetchError, downloader_coro, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ
from ch18.flags2_asyncio_executor import download_many

DEST_DIR = os.path.join(os.path.dirname(os.path.realpath(__file__)),'downloads/')


async def http_get(session, url):
  async with session.get(url) as resp:
    if resp.status == 200:
      ctype = resp.headers.get('Content-type', '').lower()
      if 'json' in ctype or url.endswith('json'):
        data = await resp.json()  # 在响应上调用 .json() 方法，解析响应，返回一个 Python 数据结构 －－ 这里是一个字典
      else:
        data = await resp.read()  # 读取原始字节
      return data
    elif resp.status == 404:
      raise web.HTTPNotFound()
    else:
      resp.raise_for_status()



async def get_country(session, base_url, cc):
  url = '{}/{cc}/metadata.json'.format(base_url, cc=cc.lower())
  metadata = await http_get(session, url)
  return metadata['country']


async def get_flag(session, base_url, cc):
  url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
  return await http_get(session, url)


async def download_one(session, cc, base_url, semaphore, verbose):
  try:
    async with semaphore:
      image = await get_flag(session, base_url, cc)
    async with semaphore:
      country = await get_country(session, base_url, cc)
  except web.HTTPNotFound:
    status = HTTPStatus.not_found
    msg = 'not found'
  except Exception as exc:
    raise FetchError(cc) from exc
  else:
    country = country.replace(' ', '_')
    filename = '{}-{}.gif'.format(country, cc)
    loop = asyncio.get_event_loop()
    loop.run_in_executor(None, save_flag, image, filename, DEST_DIR)
    status = HTTPStatus.ok
    msg = 'OK'

  if verbose and msg:
    print(cc, msg)

  return Result(status, cc)


# 用新定义的函数替换相关模块中的函数
sys.modules['ch18.flags2_asyncio'].download_one = download_one


if __name__ == '__main__':
  main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

Overwriting ch18/flags3_asyncio.py


In [45]:
!python -m ch18.flags3_asyncio -s DELAY -al 100 -m 100

DELAY site: http://localhost:8002/flags
Searching for 100 flags: from AD to LK
100 concurrent connections will be used.
100%|█████████████████████████████████████████| 100/100 [00:04<00:00, 24.07it/s]
--------------------
100 flags downloaded.
Elapsed time: 4.16s


 ## 18.6 使用 `asyncio` 包编写服务器

### 18.6.1 使用 `asyncio` 包编写 TCP 服务器

- `charfinder` 模块读取 Python 内建的 Unicode 数据库,为每个字符名称中的每个单词建立索引,然后倒排索引,存进一个字典。

###### charfinder.py

In [None]:
%%writefile ch18/charfinder.py
#!/usr/bin/env python3

"""
Unicode character finder utility:
find characters based on words in their official names.

This can be used from the command line, just pass words as arguments.

Here is the ``main`` function which makes it happen::

    >>> main('rook')  # doctest: +NORMALIZE_WHITESPACE
    U+2656  ♖  WHITE CHESS ROOK
    U+265C  ♜  BLACK CHESS ROOK
    (2 matches for 'rook')
    >>> main('rook', 'black')  # doctest: +NORMALIZE_WHITESPACE
    U+265C  ♜  BLACK CHESS ROOK
    (1 match for 'rook black')
    >>> main('white bishop')  # doctest: +NORMALIZE_WHITESPACE
    U+2657  ♗   WHITE CHESS BISHOP
    (1 match for 'white bishop')
    >>> main("jabberwocky's vest")
    (No match for "jabberwocky's vest")


For exploring words that occur in the character names, there is the
``word_report`` function::

    >>> index = UnicodeNameIndex(sample_chars)
    >>> index.word_report()
        3 SIGN
        2 A
        2 EURO
        2 LATIN
        2 LETTER
        1 CAPITAL
        1 CURRENCY
        1 DOLLAR
        1 SMALL
    >>> index = UnicodeNameIndex()
    >>> index.word_report(10)
    75821 CJK
    75761 IDEOGRAPH
    74656 UNIFIED
    13196 SYLLABLE
    11735 HANGUL
     7616 LETTER
     2232 WITH
     2180 SIGN
     2122 SMALL
     1709 CAPITAL

Note: characters with names starting with 'CJK UNIFIED IDEOGRAPH'
are indexed with those three words only, excluding the hexadecimal
codepoint at the end of the name.

"""

import sys
import re
import unicodedata
import pickle
import warnings
import itertools
import functools
from collections import namedtuple

RE_WORD = re.compile(r'\w+')
RE_UNICODE_NAME = re.compile('^[A-Z0-9 -]+$')
RE_CODEPOINT = re.compile(r'U\+([0-9A-F]{4,6})')

INDEX_NAME = 'charfinder_index.pickle'
MINIMUM_SAVE_LEN = 10000
CJK_UNI_PREFIX = 'CJK UNIFIED IDEOGRAPH'
CJK_CMP_PREFIX = 'CJK COMPATIBILITY IDEOGRAPH'

sample_chars = [
    '$',  # DOLLAR SIGN
    'A',  # LATIN CAPITAL LETTER A
    'a',  # LATIN SMALL LETTER A
    '\u20a0',  # EURO-CURRENCY SIGN
    '\u20ac',  # EURO SIGN
]

CharDescription = namedtuple('CharDescription', 'code_str char name')

QueryResult = namedtuple('QueryResult', 'count items')


def tokenize(text):
    """return iterable of uppercased words"""
    for match in RE_WORD.finditer(text):
        yield match.group().upper()


def query_type(text):
    text_upper = text.upper()
    if 'U+' in text_upper:
        return 'CODEPOINT'
    elif RE_UNICODE_NAME.match(text_upper):
        return 'NAME'
    else:
        return 'CHARACTERS'


class UnicodeNameIndex:

    def __init__(self, chars=None):
        self.load(chars)

    def load(self, chars=None):
        self.index = None
        if chars is None:
            try:
                with open(INDEX_NAME, 'rb') as fp:
                    self.index = pickle.load(fp)
            except OSError:
                pass
        if self.index is None:
            self.build_index(chars)
        if len(self.index) > MINIMUM_SAVE_LEN:
            try:
                self.save()
            except OSError as exc:
                warnings.warn('Could not save {!r}: {}'
                              .format(INDEX_NAME, exc))

    def save(self):
        with open(INDEX_NAME, 'wb') as fp:
            pickle.dump(self.index, fp)

    def build_index(self, chars=None):
        if chars is None:
            chars = (chr(i) for i in range(32, sys.maxunicode))
        index = {}
        for char in chars:
            try:
                name = unicodedata.name(char)
            except ValueError:
                continue
            if name.startswith(CJK_UNI_PREFIX):
                name = CJK_UNI_PREFIX
            elif name.startswith(CJK_CMP_PREFIX):
                name = CJK_CMP_PREFIX

            for word in tokenize(name):
                index.setdefault(word, set()).add(char)

        self.index = index

    def word_rank(self, top=None):
        res = [(len(self.index[key]), key) for key in self.index]
        res.sort(key=lambda item: (-item[0], item[1]))
        if top is not None:
            res = res[:top]
        return res

    def word_report(self, top=None):
        for postings, key in self.word_rank(top):
            print('{:5} {}'.format(postings, key))

    def find_chars(self, query, start=0, stop=None):
        stop = sys.maxsize if stop is None else stop
        result_sets = []
        for word in tokenize(query):
            chars = self.index.get(word)
            if chars is None:  # shorcut: no such word
                result_sets = []
                break
            result_sets.append(chars)

        if not result_sets:
            return QueryResult(0, ())

        result = functools.reduce(set.intersection, result_sets)
        result = sorted(result)  # must sort to support start, stop
        result_iter = itertools.islice(result, start, stop)
        return QueryResult(len(result),
                           (char for char in result_iter))

    def describe(self, char):
        code_str = 'U+{:04X}'.format(ord(char))
        name = unicodedata.name(char)
        return CharDescription(code_str, char, name)

    def find_descriptions(self, query, start=0, stop=None):
        for char in self.find_chars(query, start, stop).items:
            yield self.describe(char)

    def get_descriptions(self, chars):
        for char in chars:
            yield self.describe(char)

    def describe_str(self, char):
        return '{:7}\t{}\t{}'.format(*self.describe(char))

    def find_description_strs(self, query, start=0, stop=None):
        for char in self.find_chars(query, start, stop).items:
            yield self.describe_str(char)

    @staticmethod  # not an instance method due to concurrency
    def status(query, counter):
        if counter == 0:
            msg = 'No match'
        elif counter == 1:
            msg = '1 match'
        else:
            msg = '{} matches'.format(counter)
        return '{} for {!r}'.format(msg, query)


def main(*args):
    index = UnicodeNameIndex()
    query = ' '.join(args)
    n = 0
    for n, line in enumerate(index.find_description_strs(query), 1):
        print(line)
    print('({})'.format(index.status(query, n)))

if __name__ == '__main__':
    if len(sys.argv) > 1:
        main(*sys.argv[1:])
    else:
        print('Usage: {} word1 [word2]...'.format(sys.argv[0]))


Overwriting ch18/charfinder.py


In [None]:
!python ch18/charfinder.py sun

U+2600 	☀	BLACK SUN WITH RAYS
U+2609 	☉	SUN
U+263C 	☼	WHITE SUN WITH RAYS
U+26C5 	⛅	SUN BEHIND CLOUD
U+2E9C 	⺜	CJK RADICAL SUN
U+2F47 	⽇	KANGXI RADICAL SUN
U+3230 	㈰	PARENTHESIZED IDEOGRAPH SUN
U+3290 	㊐	CIRCLED IDEOGRAPH SUN
U+C21C 	순	HANGUL SYLLABLE SUN
U+1F31E	🌞	SUN WITH FACE
U+1F323	🌣	WHITE SUN
U+1F324	🌤	WHITE SUN WITH SMALL CLOUD
U+1F325	🌥	WHITE SUN BEHIND CLOUD
U+1F326	🌦	WHITE SUN BEHIND CLOUD WITH RAIN
(14 matches for 'sun')


###### 示例 18-14 tcp_charfinder.py:使用 `asyncio.start_server` 函数实现的简易 TCP 服务器;这个模块余下的代码在示例 18-15 中

In [46]:
%%writefile ch18/tcp_charfinder.py
import asyncio
import sys

from charfinder import UnicodeNameIndex  # 用于构建名称索引，提供查询方法

CRLF = b'\r\n'
PROMPT = b'?> '

index = UnicodeNameIndex()  # 实例化 UnicodeNameIndex 类时，会使用 charfinder_index.pickle 文件(如果存在)，或者构建这个文件，因此第一次运行时要等几秒钟服务器才会启动


async def handle_queries(reader, writer):  # 这个协程要传给 asyncio.start_server 函数，接收的两个参数是 asyncio.StreamReader 对象和 asyncio.StreamWriter 对象
  while True:  # 循环处理会话，直到从客户端接收到控制字符后退出
    writer.write(PROMPT)  # StreamWriter.write 方法不是协程，只是普通函数；这行代码发送 ?> 提示符
    await writer.drain()  # StreamWriter.drain 方法刷新 writer 缓冲，是协程
    data = await reader.readline()  # StreamReader.readline 方法是协程，返回一个 bytes 对象
    try:
      query = data.decode().strip()  # 默认编码 utf8
    except UnicodeDecodeError:
      query = '\x00'
    client = writer.get_extra_info('peername')  # 返回与套接子连接的远程地址
    print('Received from {}: {!r}'.format(client, query))  # 在服务器控制台中记录查询
    if query:
      if ord(query[:1]) < 32:
        break  # 如果收到控制字符或空字符，退出循环
      lines = list(index.find_description_strs(query))  # 返回一个生成器，产出包含 Unicode 码位、真正的字符串和字符名称的字符串，此处从生成器中构建了一个列表
      if lines:
        writer.writelines(line.encode() + CRLF for line in lines)  # 使用默认的 UTF-8 编码把 lines 转换为 bytes 对象，并在每一行添加回车符和换行符；此处的参数是一个生成器表达式
      writer.write(index.status(query, len(lines)).encode() + CRLF)  # 输出状态

      await writer.drain()  # 刷新输出缓存
      print('Sent {} results'.format(len(lines)))  #  在服务器控制台中记录响应

  print('Close the client socket')  # 在服务器的控制台中记录会话结束
  writer.close()  # 关闭 StreamWriter 流

Overwriting ch18/tcp_charfinder.py


###### 示例 18-15 tcp_charfinder.py(接续示例 18-14):`main` 函数创建并销毁事件循环和套接字服务器

In [47]:
%%writefile -a ch18/tcp_charfinder.py



def main(address='127.0.0.1', port=2323):
  port = int(port)
  loop = asyncio.get_event_loop()
  server_coro = asyncio.start_server(handle_queries, address, port, loop=loop)  # 运行结束后，返回的协程对象返回一个 asyncio.Server 实例，即一个 TCP 服务器
  server = loop.run_until_complete(server_coro)  # 驱动 server_coro 协程，启动服务器
  host = server.sockets[0].getsockname()  # 获取服务器的第一个套接字端口
  print('Serving on {}. Hit CTRL-C to stop.'.format(host))  # 在服务器的控制台显示出来
  try:
    loop.run_forever()  # 运行事件循环，main() 函数在这里阻塞，直到在服务器的控制台中按 CTRL-C 才会关闭
  except KeyboardInterrupt:
    pass

  print('Server shutting down.')
  server.close()  # 关闭服务器
  loop.run_until_complete(server.wait_closed())  # server.wait_close() 方法返回一个协程
  loop.close()


if __name__ == '__main__':
  main(*sys.argv[1:])


Appending to ch18/tcp_charfinder.py


In [48]:
!python ch18/tcp_charfinder.py

Serving on ('127.0.0.1', 2323). Hit CTRL-C to stop.
Received from ('127.0.0.1', 34740): 'sun'
Sent 14 results
Received from ('127.0.0.1', 34738): 'sun'
Sent 14 results
Received from ('127.0.0.1', 34738): 'big'
Sent 16 results
Received from ('127.0.0.1', 34740): 'good'
Sent 1 results
Received from ('127.0.0.1', 34740): 'bad'
Sent 20 results
Received from ('127.0.0.1', 34738): 'haha'
Sent 0 results
Received from ('127.0.0.1', 34738): '\x00'
Close the client socket
Received from ('127.0.0.1', 34740): '\x00'
Close the client socket
^C
Server shutting down.


 ### 18.6.2 使用 `aiohttp` 包编写 Web 服务器 

###### 示例 18-14, 18-15  http_charfinder.py

- 模板文件

In [49]:
%%writefile ch18/http_charfinder.html
<!DOCTYPE html>
<html lang="en">
  <head>
    <meta charset="utf-8">
    <title>Charfinder</title>
  </head>
  <body>
    Examples: {links}
    <p>
      <form action="/">
        <input type="search" name="query" value="{query}">
        <input type="submit" value="find"> {message}
      </form>
    </p>
    <table>
      {result}
    </table>
  </body>
</html>

Overwriting ch18/http_charfinder.html


In [52]:
%%writefile ch18/http_charfinder.py
import sys
import asyncio
from aiohttp import web

from charfinder import UnicodeNameIndex

TEMPLATE_NAME = 'ch18/http_charfinder.html'
CONTENT_TYPE = 'text/html;'
SAMPLE_WORDS = ('bismillah chess cat circled Malayalam digit'
                ' Roman face Ethiopic black mark symbol dot'
                ' operator Braille hexagram').split()

ROW_TPL = '<tr><td>{descr.code_str}</td><th>{descr.char}</th><td>{descr.name}</td></tr>'
LINK_TPL = '<a href="/?query={0}" title="find &quot;{0}&quot;">{0}</a>'
LINKS_HTML = ', '.join(LINK_TPL.format(word) for word in
                       sorted(SAMPLE_WORDS, key=str.upper))


index = UnicodeNameIndex()
with open(TEMPLATE_NAME) as tpl:
    template = tpl.read()
template = template.replace('{links}', LINKS_HTML)  # 渲染模板中开头的链接


def home(request):  # 路由处理函数，参数是一个 aiohttp.web.Request 实例
  query = request.query.get('query', '').strip()  # 获取查询字符，去掉首尾空白
  if query:  # 如果有查询字符串,从索引(index)中找到结果,使用 HTML 表格中的行渲染结果,把结果赋值给 res 变量,再把状态消息赋值给 msg 变量
    descriptions = list(index.find_descriptions(query))
    res = '\n'.join(ROW_TPL.format(descr = descr)
                    for descr in descriptions)
    msg = index.status(query, len(descriptions))
  else:
    descriptions = []
    res = ''
    msg = 'Enter words describing characters.'

  html = template.format(query=query, result=res, message=msg)  # 渲染 HTML 页面
  print('Sending {} results'.format(len(descriptions)))  # 在服务器的控制台中记录响应
  return web.Response(content_type=CONTENT_TYPE, text=html, charset='utf8')  # 构建 response 对象，将其返回


async def init(loop, address, port):  # init 协程产出一个服务器，交给事件循环驱动
  app = web.Application(loop=loop)  # aiohttp.web.Application 类表示 WEB 应用
  app.router.add_route('GET', '/', home)  # 通过路由把 URL 模式映射到处理函数上，如果 home 是普通的函数，则在内部会将其转换为协程
  handler = app.make_handler()  # 返回一个 aiohttp.web.RequestHandler  实例，根据 app 对象设置路由处理 HTPP 请求
  server = await loop.create_server(handler, address, port)  # create_server 方法创建服务器，以 handler 为协议处理程序，并把服务器绑定在指定的地址(address)和端口(port)上
  return server.sockets[0].getsockname(), server  # 返回第一个服务器套接字的地址和端口和 server


def main(address='127.0.0.1', port=7878):
  port = int(port)
  loop = asyncio.get_event_loop()
  host, server= loop.run_until_complete(init(loop, address, port))  # 运行 init 函数，启动服务器，获取服务器的地址和端口; 只有驱动协程，协程才能做事
  print('Serving on {}. Hit CTRL-C to stop.'.format(host))
  try:
    loop.run_forever()  # 运行事件循环，main 函数会在此处阻塞
  except KeyboardInterrupt:  # 按 CTRL-C 键
    print('Server shutting down.')
  loop.run_until_complete(server.wait_closed())  # 关闭服务器
  print('Server shutting down.')
  loop.close()  # 关闭事件循环


if __name__ == '__main__':
  main(*sys.argv[1:])

Overwriting ch18/http_charfinder.py


In [53]:
!python ch18/http_charfinder.py

Serving on ('127.0.0.1', 7878). Hit CTRL-C to stop.
Sending 114 results
Sending 14 results
Sending 6 results
^C
Server shutting down.


### 18.6.3 更好的支持并发的智能客户端

- 视图函数如果要完全考虑异步，则应该异步访问数据库
- 除了防止阻塞调用之外，高并发的系统还必须把复杂的工作分为多步，以保持敏捷，避免响应时间过长
  - 常用的方法是实现分页
  - 一般需配合 AJAX 或 WebSocket 使用
  - 实现分批发送所需的大多数代码都在浏览器这一端，因此大型互联网公司大量依赖客户端代码构建服务
    - 智能的异步客户端能更好的使用服务器资源
- 为了更好的为智能客户端服务，需要全方位支持异步编程的框架