# 第十八章 使用asyncio包处理并发

## 18.1 线程和协程对比

In [1]:
#spinner_thread.py
#退格符在jupyter_notebook下有问题，把下列代码放到ide下运行就好了
import threading
import itertools
import time
import sys

class Signal:
    go = True

def spin(msg,signal):
    write,flush=sys.stdout.write,sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status=char+' '+msg
        write(status)
        flush()
        write('\b'*len(status))
        time.sleep(0.1)
        if not signal.go:
            break
    write(' '*len(status)+'\x08'*len(status) )

def slow_function():#假装等待I/O一段时间
    time.sleep(3)
    return 42

def supervisor():
    signal=Signal()
    spinner=threading.Thread(target=spin,args=('thinking!',signal))
    print('spinner object:',spinner)
    spinner.start()#启动从属线程
    result=slow_function()
    signal.go=False#python没有提供终止线程的api，若想关闭线程，须向线程发送消息
    spinner.join()#等待spinner线程结束
    return result

def main():
    result=supervisor()
    print('Answer:',result)

main()

spinner object: <Thread(Thread-6, initial)>
| thinking!

/ thinking!



- thinking!

\ thinking!



| thinking!

/ thinking!

- thinking!

\ thinking!



| thinking!



/ thinking!



- thinking!



\ thinking!



| thinking!

/ thinking!

- thinking!



\ thinking!

| thinking!



/ thinking!

- thinking!

\ thinking!

| thinking!

/ thinking!

- thinking!

\ thinking!

| thinking!

/ thinking!

- thinking!

\ thinking!

| thinking!

/ thinking!

           Answer:

 42


In [2]:
#spinner_asyncio.py

import asyncio
import itertools
import sys

@asyncio.coroutine
def spin(msg):
    write,flush=sys.stdout.write,sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status=char+' '+msg
        write(status)
        flush()
        write('\b'*len(status))
        try:
            yield from asyncio.sleep(0.1)
        except asyncio.CancelledError:
            break
    write (' '*len(status)+'\x08'*len(status) )

@asyncio.coroutine
def slow_function():
    yield from asyncio.sleep(3)
    return 42

@asyncio.coroutine
def supervisor():
    spinner=asyncio.async(spin('thinking!'))
    print('spinner object:',spinner)
    result  = yield from slow_function()
    spinner.cancel()
    return result

def main():
    loop=asyncio.get_event_loop()
    result=loop.run_until_complete(supervisor())
    loop.close()
    print('Answer:',result)
main()

spinner object: <Task pending coro=<spin() running at <ipython-input-2-234f6d27ad0c>:7>>
| thinking!

/ thinking!



- thinking!

\ thinking!



| thinking!

/ thinking!



- thinking!

\ thinking!



| thinking!

/ thinking!

- thinking!

\ thinking!



| thinking!



/ thinking!

- thinking!



\ thinking!

| thinking!



/ thinking!

- thinking!



\ thinking!

| thinking!



/ thinking!

- thinking!

\ thinking!

| thinking!

/ thinking!



- thinking!

\ thinking!



| thinking!

/ thinking!

           Answer: 42


In [3]:
print('222\x08\x08\x08\x08\x08')

222


asyncio.Future类的.result()方法没有参数，因此不能指定超时时间。此外，如果调用.result()方法时期物还没运行完毕，那么.result()方法不会阻塞去等待结果，而是抛出asyncio.InvalidStateError异常

然而，获取asyncio.Future对象的结果通常是使用yield from ,从中产出结果。

使用yield from 处理期物，等待期物运行完毕这一步无需我们关心，而且不会阻塞事件循环，因为在asyncio包中，yield from的作用是将控制权还给事件循环。

总之，asyncio.Future类的目的是与yield from一起使用，即asyncio.Future对象是靠yield from驱动的。所以通常不用使用my_future.add_done_callback()，因为可以直接把期物运行结束后执行的操作放在协程中yield from my_future表达式的后面；无需调用.result()，因为yield from从期物中产出的值就是结果，result=yield from my_future。

## 18.2 使用asyncio和aiohttp包下载

In [5]:
import sys
sys.path.insert(0,r'D:\WORKSPACE2\python35\python8.25\fluent_python')

In [6]:
import asyncio
import aiohttp
from flags import BASE_URL,save_flag,show,main

@asyncio.coroutine
def get_flag(cc):
    url='{}/{cc}/{cc}.gif'.format(BASE_URL,cc=cc.lower())
    resp=yield from aiohttp.request('GET',url)
    image=yield  from resp.read()
    return image

@asyncio.coroutine
def download_one(cc):
    image=yield  from get_flag(cc)#yield from：协程download_one暂停时，控制权回到事件循环中
    show(cc)
    save_flag(image,cc.lower()+'.gif')
    return cc

def download_many(cc_list):
    loop=asyncio.get_event_loop()
    to_do=[download_one(cc) for cc in sorted(cc_list)]
    wait_coro=asyncio.wait(to_do)#分别把各个协程包装进一个task对象，返回一个协程或生成器
    res,_=loop.run_until_complete(wait_coro)
    loop.close()
    return len(res)

main(download_many)

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95F9E4940>




FR

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95F9F17F0>




CN

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95F9D6278>




BR

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95FA0AB38>




JP

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95F9D6940>




EG

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95FA0AFD0>




TR

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95FA0A208>




BD

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95FA02D30>




RU

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95F9E4E80>




NG

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95FA02400>




DE

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95F9F1C88>




ID

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95F9D6E80>




MX

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95F9F85F8>




Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95FA0A6A0>




Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95F9F8160>




Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95F9F8F28>




CD

 

ET

 

PH

 

PK

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95FA02898>




IR

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95F9E4400>




US

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95F9F1358>




VN

 

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000002E95F9F8A90>




IN

 


20 flags downloaded in 1.31s




## 18.4 改进asyncio下载脚本

In [7]:
import asyncio
import collections
import aiohttp
from aiohttp import web
import tqdm
from flags2_common import main,HTTPStatus,Result,save_flag
import contextlib

DEFAULT_CONCUR_REQ=5
MAX_CONCUR_REQ=1000

class FetchError(Exception):
    def __init__(self,country_code):
        self.country_code=country_code
@asyncio.coroutine
def get_flag(base_url, cc): # <2>
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    with contextlib.closing(resp):
        if resp.status == 200:
            image = yield from resp.read()
            return image
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)


@asyncio.coroutine
def download_one(cc, base_url, semaphore, verbose):  # <3>
    try:
        with (yield from semaphore):  # <4>
            image = yield from get_flag(base_url, cc)  # <5>
    except web.HTTPNotFound:  # <6>
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc  # <7>
    else:
        save_flag(image, cc.lower() + '.gif')  # <8>
        status = HTTPStatus.ok
        msg = 'OK'

    if verbose and msg:
        print(cc, msg)

    return Result(status, cc)
# END FLAGS2_ASYNCIO_TOP

# BEGIN FLAGS2_ASYNCIO_DOWNLOAD_MANY
@asyncio.coroutine
def downloader_coro(cc_list, base_url, verbose, concur_req):  # <1>
    counter = collections.Counter()
    semaphore = asyncio.Semaphore(concur_req)  # <2>
    to_do = [download_one(cc, base_url, semaphore, verbose)
             for cc in sorted(cc_list)]  # <3>

    to_do_iter = asyncio.as_completed(to_do)  # <4>
    if not verbose:
        to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))  # <5>
    for future in to_do_iter:  # <6>
        try:
            res = yield from future  # <7>
        except FetchError as exc:  # <8>
            country_code = exc.country_code  # <9>
            try:
                error_msg = exc.__cause__.args[0]  # <10>
            except IndexError:
                error_msg = exc.__cause__.__class__.__name__  # <11>
            if verbose and error_msg:
                msg = '*** Error for {}: {}'
                print(msg.format(country_code, error_msg))
            status = HTTPStatus.error
        else:
            status = res.status

        counter[status] += 1  # <12>

    return counter  # <13>


def download_many(cc_list, base_url, verbose, concur_req):
    loop = asyncio.get_event_loop()
    coro = downloader_coro(cc_list, base_url, verbose, concur_req)
    counts = loop.run_until_complete(coro)  # <14>
    loop.close()  # <15>

    return counts


main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
        

usage: ipykernel_launcher.py [-h] [-a] [-e] [-l N] [-m CONCURRENT] [-s LABEL]
                             [-v]
                             [CC [CC ...]]


ipykernel_launcher.py: error: unrecognized arguments: -f


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


在flags2_threadpool.py中，save_flag函数会阻塞运行download_one函数的线程，但是阻塞的众多工作线程的一个。阻塞型I/O调用在背后会释放GIL，因此另一个线程可以继续。但是在flags2_asyncio.py中，save_flag函数阻塞了客户代码与asyncio事件循环共用的唯一线程，因此保存文件时，整个应用程序都会冻结。这个问题的解决方法是，使用事件循环对象的run_in_executor方法。

In [None]:
@asyncio.coroutine
def download_one(cc,base_url,semaphore,verbose):
    try:
        with (yield from semaphore):
            image=yield from get_flag(base_url,cc)
    except web.HTTPNotFound:
        status=HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        loop=asyncio.get_event_loop()
        loop.run_in_executor(None,save_flag,image,cc.lower()+'.gif')
        status=HTTPStatus.ok
        msg='OK'
    if verbose and msg:
        print(cc.msg)
    return Result(status,cc)
