#### 第十八章 使用asyncio包处理并发
并发是指一次处理多件事。  
并行是指一次做多件事。  
二者不同，但是有联系。  
一个关于结构，一个关于执行。  
并发用于制定方案，用来解决可能（但未必）并行的问题。

##### 18.1 线程与协程对比

In [7]:
# 通过线程以动画形式显示文本式旋转指针
import threading
import itertools
import time
import sys

class Signal:
    go = True
    
def spin(msg, signal):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        time.sleep(.1)
        if not signal.go:
            break
    write(' ' * len(status) + '\x08' * len(status))

def slow_function():
    # 假装等等I/O一段时间
    time.sleep(3)
    return 42

def supervisor():
    signal = Signal()
    spinner = threading.Thread(target=spin, args=('thinking!', signal))
    print('spinner object:', spinner)
    spinner.start()
    result = slow_function()
    signal.go = False
    spinner.join()
    return result

def main():
    result = supervisor()
    print('Answer:', result)

In [8]:
main()

spinner object: <Thread(Thread-7, initial)>
| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking- thinking\ thinking| thinking/ thinking          Answer: 42


- 适合 asyncio API 的协程在定义体中必须使用 yield from，而不能使用 yield。此外，适合 asyncio 的协程要由调用方驱动，并由调用方通过 yield from 调用；或者把协程传给 asyncio包中的某个函数，例如 asyncio.async(...)，从而驱动协程。最后，@asyncio.coroutine 装饰器应该应用在协程上。

In [2]:
# 通过协程以动画形式显示文本式旋转指针
import asyncio
import itertools
import sys

@asyncio.coroutine
def spin(msg):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            yield from asyncio.sleep(.1)
        except asyncio.CancelledError:
            break
    write(' ' * len(status) + '\x08' * len(status))

@asyncio.coroutine
def slow_function():
    # 假装等等I/O一段时间
    yield from asyncio.sleep(3)
    return 42

@asyncio.coroutine
def supervisor():
    spinner = asyncio.ensure_future(spin('thinking!'))
    print('spinner object:', spinner)
    result = yield from slow_function()
    spinner.cancel()
    return result

def main():
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(supervisor())
    loop.close()
    print('Answer:', result)

In [1]:
main()




##### 18.2 使用asyncio和aiohttp包下载

In [3]:
# 使用asyncion 和 aiohttp包实现的异步下载脚本
import asyncio
import os
import time

import aiohttp

POP20_CC = 'CN IN US ID BR PK NG BD RU JP \
            MX PH VN ET EG DE IR TR CD FR'.split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def show(text):
    print(text, end=' ')
    sys.stdout.flush()


def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


@asyncio.coroutine
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.ClientSession().get(url)
    image = yield from resp.read()
    return image


@asyncio.coroutine
def download_one(cc):
    image = yield from get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    loop = asyncio.get_event_loop()
    to_do = [download_one(cc) for cc in sorted(cc_list)]
    wait_coro = asyncio.wait(to_do)
    res, _ = loop.run_until_complete(wait_coro)
    # loop.close()

    return len(res)

In [4]:
main(download_many)

RuntimeError: This event loop is already running

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD55BA8>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B0317E28>, 3530557.843)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B03058D0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD559B0>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B0317FA8>, 3530557.953)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B030F630>


FR IR 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD55358>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B046E408>, 3530558.062)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B02F9710>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD556D8>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B0317CA8>, 3530558.078)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B02C87F0>


CN RU 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173B0106A90>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B046E0A8>, 3530558.546)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B031B390>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD557B8>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B0317D68>, 3530558.609)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B02EF550>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD55B00>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B046E228>, 3530558.656)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B030FEF0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD55780>


NG TR MX 

Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B0317DC8>, 3530558.734)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B02EF0F0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD55AC8>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B046E348>, 3530558.765)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B02F9B70>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD55828>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B02EE948>, 3530558.828)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B02C8C50>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD55978>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B02EE948>, 3530558.875)]

CD DE BD ID VN 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD55A58>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B02EED08>, 3530559.156)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B02F9FD0>


EG 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD557F0>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B02EE1C8>, 3530560.515)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B02EFE10>


BR 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD55908>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B02EE1C8>, 3530561.234)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173AFD55A20>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD55390>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B02EE708>, 3530561.25)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B030F1D0>


PK IN 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD55940>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B02EE0A8>, 3530561.968)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B02EF9B0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173B0106B70>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B02EE408>, 3530562.14)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B031B7F0>


US PH 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD55860>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B02EE5E8>, 3530564.875)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B0305470>


ET 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000173AFD559E8>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x00000173B02DFF48>, 3530567.109)]']
connector: <aiohttp.connector.TCPConnector object at 0x00000173B030FA90>


JP 

##### 18.4 改进asyncio下载脚本

In [5]:
# flags2_asyncio.py
import asyncio
import collections

import aiohttp
from aiohttp import web
import tqdm

from flags2_common import main, HTTPStatus, Result, save_flag

# 默认设为较小值， 防止远程网站出错
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

class FetchError(Exception):
    def __init__(self, country_code):
        self.country_code = country_code

@asyncio.coroutine
def get_flag(base_url, cc):
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = yield from aiohttp.ClientSession.get(url)
    if resp.status == 200:
        image = yield from resp.read()
        return image
    elif resp.status == 404:
        raise web.HTTPNotFound()
    else:
        raise aiohttp.HttpProcessingError(
            code=resp.status, message=resp.reason,
            headers=resp.headers)

@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):
    try:
        with (yield from semaphore):
            image = yield from get_flag(base_url, cc)
    except web.HTTPNotFound:
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        loop = asyncio.get_event_loop()
        loop.run_in_executor(None, save_flag,
                            image, cc.lower() + '.gif')
        # save_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.ok
        msg = 'OK'
    
    if verbose and msg:
        print(cc, msg)
    
    return Result(status, cc)

In [2]:
@asyncio.coroutine
def downloader_coro(cc_list, base_url, verbose, concur_req):
    counter = collections.Counter()
    semaphore = asyncio.Semaphore(concur_req)
    to_do = [download_one(cc, base_url, semaphore, verbose)
             for cc in sorted(cc_list)]

    to_do_iter = asyncio.as_completed(to_do)
    if not verbose:
        to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))
    for future in to_do_iter:
        try:
            res = yield from future
        except FetchError as exc:
            country_code = exc.country_code
            try:
                error_msg = exc.__cause__.args[0]
            except IndexError:
                error_msg = exc.__cause__.__class__.__name__
            if verbose and error_msg:
                msg = '*** Error for {}: {}'
                print(msg.format(country_code, error_msg))
            status = HTTPStatus.error
        else:
            status = res.status
        counter[status] += 1
    return counter

In [3]:
def download_many(cc_list, base_url, verbose, concur_req):
    loop = asyncio.get_event_loop()
    coro = downloader_coro(cc_list, base_url, verbose, concur_req)
    counts = loop.run_until_complete(coro)
    loop.close()
    return counts

main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)

usage: ipykernel_launcher.py [-h] [-a] [-e] [-l N] [-m CONCURRENT] [-s LABEL]
                             [-v]
                             [CC [CC ...]]
ipykernel_launcher.py: error: unrecognized arguments: -f


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


##### 18.5 从回调到期物和协程

In [6]:
# Python 中的回调地狱：链式回调
def stage1(response1):
    request2 = step(response1)
    api_call2(request2, stage2)

def stage2(response2):
    request3 = step2(response2)
    api_call3(request3, stage3)

def stage3(response3):
    step3(response3)

api_call1(request1, stage1)

NameError: name 'api_call1' is not defined

##### 18.6 使用asyncio包编写服务器