# 使用asyncio包处理并发

并发是指一次处理多件事。并行是指一次做多件事。二者不同，但是有联系。一个关于结构，一个关于执行。并发用于制定方案，用来解决可能（但未必）并行的问题。

因此，实际上大多数过程都是并发处理的，而不是并行处理。

本章介绍asyncio包，这个包使用事件循环驱动的协程实现并发



In [6]:
#!/usr/bin/env python3

# spinner_thread.py

# credits: Adapted from Michele Simionato's
# multiprocessing example in the python-list:
# https://mail.python.org/pipermail/python-list/2009-February/538048.html

# BEGIN SPINNER_THREAD
import threading
import itertools
import time


def spin(msg, done):  # 这个函数会在单独的线程中运行
    for char in itertools.cycle('|/-\\'):  # 无限循环
        status = char + ' ' + msg
        print(status, flush=True, end='\r')
        if done.wait(.1):  # <5>
            break
    print(' ' * len(status), end='\r')

def slow_function():  # <7>
    # pretend waiting a long time for I/O
    time.sleep(3)  # <8>
    return 42


def supervisor():  # 这个函数设置从属线程，显示线程对象，运行耗时的计算，最后杀死线程。
    done = threading.Event()
    spinner = threading.Thread(target=spin,
                               args=('thinking!', done))
    print('spinner object:', spinner)  # 显示从属线程对象。输出类似于<Thread(Thread-1,initial)>。
    spinner.start()  # 启动从属线程。
    result = slow_function()  # 运行slow_function函数，阻塞主线程。同时，从属线程以动画形式显示旋转指针。
    done.set()  # 呼叫set()会设置线程内部标记为真。所有等待它变为真的线程都被唤醒。 一旦标志为真，调用 wait() 的线程根本不会阻塞。
    spinner.join()  # 等待spinner线程结束
    return result


def main():
    result = supervisor()  # 运行supervisor函数
    print('Answer:', result)


if __name__ == '__main__':
    main()
# END SPINNER_THREAD


spinner object: <Thread(Thread-8, initial)>
Answer: 42 


In [6]:
import threading
import itertools
import time

def spin(msg, done):
    for loading in itertools.cycle('|/-\\'):
        print( loading + ' ' + msg ,flush = True, end='\r')
        if done.wait(0.1) :
            break

def slow_function(secs = 3.0):
    time.sleep(secs)
    answer = 42
    return answer


def supervisor():
    done = threading.Event()
    spiner = threading.Thread(target=spin, args=('thinking...', done))
    print('spinner object:', spiner) 
    spiner.start()
    result = slow_function(2.1) # 阻塞主线程
    done.set()
    spiner.join()
    return result

def main():
    r = supervisor()
    print('Answer is ' + str(r))

main()

spinner object: <Thread(Thread-11, initial)>
Answer is 42.


In [10]:
# 请运行spinner_asyncio.py，在ipython中无法运行asyncio


#!/usr/bin/env python3

# spinner_asyncio.py

# credits: Example by Luciano Ramalho inspired by
# Michele Simionato's multiprocessing example in the python-list:
# https://mail.python.org/pipermail/python-list/2009-February/538048.html

# BEGIN SPINNER_ASYNCIO
import asyncio
import itertools


async def spin(msg):  
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        print(status, flush=True, end='\r')
        try:
            await asyncio.sleep(.1) # 使用yield from asyncio.sleep(.1)代替time.sleep(.1)，这样的休眠不会阻塞事件循环。
        except asyncio.CancelledError: # 如果spin函数苏醒后抛出asyncio.CancelledError异常，其原因是发出了取消请求，因此退出循环。
            break
    print(' ' * len(status), end='\r')


async def slow_function():  # 现在，slow_function函数是协程，在用休眠假装进行I/O操作时，使用yield from继续执行事件循环。
    # pretend waiting a long time for I/O
    await asyncio.sleep(3)  # 这里把控制权交给主循环，在休眠结束后恢复这个协程。
    return 42


async def supervisor():  # 现在，supervisor函数也是协程，因此可以使用yield from驱动slow_function函数。
    spinner = asyncio.create_task(spin('thinking!'))  # 使用一个Task对象包装spin协程，并立即返回。
    print('spinner object:', spinner)  # spinner object: <Task pending name='Task-2' coro=<spin() >>
    result = await slow_function()  # 驱动slow_function（　）函数。结束后，获取返回值。同时，事件循环继续运行，因为slow_function函数最后使用 await asyncio.sleep(3)表达式把控制权交回给了主循环。
    spinner.cancel()  # Task对象可以取消；取消后会在协程当前暂停的yield处抛出asyncio.CancelledError异常。协程可以捕获这个异常，也可以延迟取消，甚至拒绝取消。
    return result


def mainfunc():
    result = asyncio.run(supervisor())  # 驱动supervisor协程，让它运行完毕；这个协程的返回值是这次调用的返回值。
    print('Answer:', result)


mainfunc()
# END SPINNER_ASYNCIO


Exception ignored in: <coroutine object supervisor at 0x7fe363743c40>
Traceback (most recent call last):
  File "<string>", line 1, in <lambda>
KeyError: '__import__'
Exception ignored in: <coroutine object supervisor at 0x7fe363743c40>
Traceback (most recent call last):
  File "<string>", line 1, in <lambda>
KeyError: '__import__'


RuntimeError: asyncio.run() cannot be called from a running event loop

In [None]:
# 与更熟悉的threading模型相比，asyncio是如何编排并发作业的。

import asyncio
import itertools
import threading
import time


'''
没有API能从外部终止线程，因为线程随时可能被中断，导致系统处于无效状态。如果想终止任务，可以使用Task.cancel（　）
实例方法，在协程内部抛出CancelledError异常。协程可以在暂停的yield处捕获这个异常，处理终止请求。


supervisor协程必须在main函数中由loop.run_until_complete方法执行。 3.7之后已经不需要了
'''
async def spin(msg):
    for chars in itertools.cycle('|/-\\'):
        status = chars + ' ' + msg
        print(status, flush=True, end='\r')
        try:
            await asyncio.sleep(.1)
        except asyncio.CancelledError:
            break
    print(' ' * len(status))


def spin2(msg, done):
    for chars in itertools.cycle('|/-\\'):
        status = chars + ' ' + msg
        print(status, flush=True, end='\r')
        if done.wait(0.1) :
            break



'''
在线程版supervisor函数中，slow_function函数是普通的函数，直接由线程调用。在异步版supervisor函数中，slow_function函数是协程，由await驱动。
'''
async def slow_function(delay):
    await asyncio.sleep(delay)
    return 42

def slow_function2(delay):
    time.sleep(delay)
    return 32

'''
asyncio.Task对象差不多与threading.Thread对象等效。

“Task对象像是实现协作式多任务的库（例如gevent）中的绿色线程（green thread）”。

Task对象用于驱动协程，Thread对象用于调用可调用的对象。

获取的Task对象已经排定了运行时间（例如，由asyncio.async函数排定）；Thread实例则必须调用start方法，明确告知让它运行。


'''



async def supervisor():
    spinner = asyncio.create_task(spin('thinking...'))
    print('spinner : ' , spinner)
    result = await slow_function(3)
    spinner.cancel()
    return result


def supervisor2():
    done = threading.Event()
    spinner = threading.Thread(target=spin2, args=('thinking...' , done))
    spinner.start()
    result = slow_function2(3.0)
    done.set()
    spinner.join()
    return result



def main():
    t0 = time.time()
    result = asyncio.run(supervisor())
    print(result)
    t1 = time.time() - t0
    print('asyncio time count : {:.2f}'.format(t1))
    
    t0 = time.time()
    result = supervisor2()
    print(result)
    t1 = time.time() - t0
    print('threading time count : {:.2f}'.format(t1))
    


if __name__ == '__main__':
    main()

线程与协程之间的比较还有最后一点要说明：

如果使用线程做过重要的编程，你就知道写出程序有多么困难，因为调度程序任何时候都能中断线程。**必须记住保留锁**，去保护程序中的重要部分，防止多步操作在执行的过程中中断，防止数据处于无效状态。

而协程默认会做好全方位保护，以防止中断。我们必须显式产出才能让程序的余下部分运行。**对协程来说，无需保留锁，在多个线程之间同步操作，协程自身就会同步**，因为在任意时刻只有一个协程运行。想交出控制权时，可以使用yield或yieldfrom把控制权交还调度程序。这就是能够安全地取消协程的原因：按照定义，协程只能在暂停的yield处取消，因此可以处理CancelledError异常，执行清理操作。

## asyncio.Future：故意不阻塞

3.7之后加入await来代替yield from。

asyncio.Future类的.result（　）方法没有参数，因此不能指定超时时间。

此外，如果调用.result（　）方法时future还没运行完毕，那么.result（　）方法不会阻塞去等待结果，而是抛出asyncio.InvalidStateError异常。
然而，获取asyncio.Future对象的结果通常使用yield from，从中产出结果，如示例18-8所示。

使用yield from处理future，等待future运行完毕这一步无需我们关心，而且不会阻塞事件循环，因为在asyncio包中，yieldfrom的作用是把控制权还给事件循环。

注意，使用yield from处理future与使用add_done_callback方法处理协程的作用一样：延迟的操作结束后，事件循环不会触发回调对象，而是设置future的返回值；而yield from表达式则在暂停的协程中生成返回值，恢复执行协程。

## 从future、任务和协程中产出

在asyncio包中，future和协程关系紧密，因为可以使用yieldfrom从asyncio.Future对象中产出结果。

## 使用asyncio和aiohttp包下载

In [12]:
"""Download flags of countries (with error handling).

asyncio async/await version

"""
# BEGIN FLAGS2_ASYNCIO_TOP
import asyncio
import collections

import aiohttp
from aiohttp import web
import tqdm


"""Utilities for second set of flag examples.
"""

import os
import time
import sys
import string
import argparse
from collections import namedtuple
from enum import Enum


Result = namedtuple('Result', 'status data')

HTTPStatus = Enum('Status', 'ok not_found error')

POP20_CC = ('AA CN IN US ID BR PK NG BD RU JP NN'
            'MX PH VN ET EG DE IR TR CD FR').split()

DEFAULT_CONCUR_REQ = 1
MAX_CONCUR_REQ = 1

SERVERS = {
    'REMOTE': 'http://flupy.org/data/flags',
    'LOCAL':  'http://localhost:8001/flags',
    'DELAY':  'http://localhost:8002/flags',
    'ERROR':  'http://localhost:8003/flags',
}
DEFAULT_SERVER = 'LOCAL'

DEST_DIR = 'downloads/'
COUNTRY_CODES_FILE = 'country_codes.txt'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def initial_report(cc_list, actual_req, server_label):
    if len(cc_list) <= 10:
        cc_msg = ', '.join(cc_list)
    else:
        cc_msg = 'from {} to {}'.format(cc_list[0], cc_list[-1])
    print('{} site: {}'.format(server_label, SERVERS[server_label]))
    msg = 'Searching for {} flag{}: {}'
    plural = 's' if len(cc_list) != 1 else ''
    print(msg.format(len(cc_list), plural, cc_msg))
    plural = 's' if actual_req != 1 else ''
    msg = '{} concurrent connection{} will be used.'
    print(msg.format(actual_req, plural))


def final_report(cc_list, counter, start_time):
    elapsed = time.time() - start_time
    print('-' * 20)
    msg = '{} flag{} downloaded.'
    plural = 's' if counter[HTTPStatus.ok] != 1 else ''
    print(msg.format(counter[HTTPStatus.ok], plural))
    if counter[HTTPStatus.not_found]:
        print(counter[HTTPStatus.not_found], 'not found.')
    if counter[HTTPStatus.error]:
        plural = 's' if counter[HTTPStatus.error] != 1 else ''
        print('{} error{}.'.format(counter[HTTPStatus.error], plural))
    print('Elapsed time: {:.2f}s'.format(elapsed))


def expand_cc_args(every_cc, all_cc, cc_args, limit):
    codes = set()
    A_Z = string.ascii_uppercase
    if every_cc:
        codes.update(a+b for a in A_Z for b in A_Z)
    elif all_cc:
        with open(COUNTRY_CODES_FILE) as fp:
            text = fp.read()
        codes.update(text.split())
    else:
        for cc in (c.upper() for c in cc_args):
            if len(cc) == 1 and cc in A_Z:
                codes.update(cc+c for c in A_Z)
            elif len(cc) == 2 and all(c in A_Z for c in cc):
                codes.add(cc)
            else:
                msg = 'each CC argument must be A to Z or AA to ZZ.'
                raise ValueError('*** Usage error: '+msg)
    return sorted(codes)[:limit]


def process_args(default_concur_req):
    server_options = ', '.join(sorted(SERVERS))
    parser = argparse.ArgumentParser(
                description='Download flags for country codes. '
                'Default: top 20 countries by population.')
    parser.add_argument('cc', metavar='CC', nargs='*',
                help='country code or 1st letter (eg. B for BA...BZ)')
    parser.add_argument('-a', '--all', action='store_true',
                help='get all available flags (AD to ZW)')
    parser.add_argument('-e', '--every', action='store_true',
                help='get flags for every possible code (AA...ZZ)')
    parser.add_argument('-l', '--limit', metavar='N', type=int,
                help='limit to N first codes', default=sys.maxsize)
    parser.add_argument('-m', '--max_req', metavar='CONCURRENT', type=int,
                default=default_concur_req,
                help='maximum concurrent requests (default={})'
                      .format(default_concur_req))
    parser.add_argument('-s', '--server', metavar='LABEL',
                default=DEFAULT_SERVER,
                help='Server to hit; one of {} (default={})'
                      .format(server_options, DEFAULT_SERVER))
    parser.add_argument('-v', '--verbose', action='store_true',
                help='output detailed progress info')
    args = parser.parse_args()
    if args.max_req < 1:
        print('*** Usage error: --max_req CONCURRENT must be >= 1')
        parser.print_usage()
        sys.exit(1)
    if args.limit < 1:
        print('*** Usage error: --limit N must be >= 1')
        parser.print_usage()
        sys.exit(1)
    args.server = args.server.upper()
    if args.server not in SERVERS:
        print('*** Usage error: --server LABEL must be one of',
              server_options)
        parser.print_usage()
        sys.exit(1)
    try:
        cc_list = expand_cc_args(args.every, args.all, args.cc, args.limit)
    except ValueError as exc:
        print(exc.args[0])
        parser.print_usage()
        sys.exit(1)

    if not cc_list:
        cc_list = sorted(POP20_CC)
    return args, cc_list


def main(download_many, default_concur_req, max_concur_req):
    # args = 
    cc_list = sorted(POP20_CC)
    actual_req = min(5, max_concur_req, len(cc_list))
    initial_report(cc_list, actual_req, 'REMOTE')
    base_url = SERVERS['REMOTE']
    t0 = time.time()
    counter = download_many(cc_list, base_url, False, actual_req)
    assert sum(counter.values()) == len(cc_list), \
        'some downloads are unaccounted for'
    final_report(cc_list, counter, t0)





# default set low to avoid errors from remote site, such as
# 503 - Service Temporarily Unavailable
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000


class FetchError(Exception):  # <1>
    def __init__(self, country_code):
        self.country_code = country_code


async def get_flag(session, base_url, cc): # <2>
    url = '{}/{cc}/{cc}.gif'.format(base_url, cc=cc.lower())
    async with session.get(url) as resp:
        if resp.status == 200:
            return await resp.read()
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.HttpProcessingError(
                code=resp.status, message=resp.reason,
                headers=resp.headers)


async def download_one(session, cc, base_url, semaphore, verbose):  # <3>
    try:
        async with semaphore:  # <4>
            image = await get_flag(session, base_url, cc)  # <5>
    except web.HTTPNotFound:  # <6>
        status = HTTPStatus.not_found
        msg = 'not found'
    except Exception as exc:
        raise FetchError(cc) from exc  # <7>
    else:
        save_flag(image, cc.lower() + '.gif')  # <8>
        status = HTTPStatus.ok
        msg = 'OK'

    if verbose and msg:
        print(cc, msg)

    return Result(status, cc)
# END FLAGS2_ASYNCIO_TOP

# BEGIN FLAGS2_ASYNCIO_DOWNLOAD_MANY
async def downloader_coro(cc_list, base_url, verbose, concur_req):  # <1>
    counter = collections.Counter()
    semaphore = asyncio.Semaphore(concur_req)  # <2>
    async with aiohttp.ClientSession() as session:  # <8>
        to_do = [download_one(session, cc, base_url, semaphore, verbose)
                for cc in sorted(cc_list)]  # <3>

        to_do_iter = asyncio.as_completed(to_do)  # <4>
        if not verbose:
            to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list))  # <5>
        for future in to_do_iter:  # <6>
            try:
                res = await future  # <7>
            except FetchError as exc:  # <8>
                country_code = exc.country_code  # <9>
                try:
                    error_msg = exc.__cause__.args[0]  # <10>
                except IndexError:
                    error_msg = exc.__cause__.__class__.__name__  # <11>
                if verbose and error_msg:
                    msg = '*** Error for {}: {}'
                    print(msg.format(country_code, error_msg))
                status = HTTPStatus.error
            else:
                status = res.status

            counter[status] += 1  # <12>

    return counter  # <13>


def download_many(cc_list, base_url, verbose, concur_req):
    loop = asyncio.get_event_loop()
    coro = downloader_coro(cc_list, base_url, verbose, concur_req)
    counts = loop.run_until_complete(coro)  # <14>
    loop.close()  # <15>

    return counts


if __name__ == '__main__':
    main(download_many, DEFAULT_CONCUR_REQ, MAX_CONCUR_REQ)
# END FLAGS2_ASYNCIO_DOWNLOAD_MANY


REMOTE site: http://flupy.org/data/flags
Searching for 21 flags: from AA to VN
5 concurrent connections will be used.


RuntimeError: This event loop is already running