<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Thread-Versus-Coroutine:-A-Comparison" data-toc-modified-id="Thread-Versus-Coroutine:-A-Comparison-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Thread Versus Coroutine: A Comparison</a></span></li></ul></div>

##### Thread Versus Coroutine: A Comparison

spinner with thread

In [4]:
import threading
import itertools
import time
import sys


class Signal: #1 Signal类有一个可变成员 go, 用它来控制线程的开启或者关闭.
    go = True


def spin(msg, signal): #2 第二个参数是 Signal的实例
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'): #3 这是一个无限循环, 参考 itertools.cycle
        status = char + '' + msg
        write(status)
        flush()
        write('\x08' * len(status)) #4 The trick to do text-mode animation: move the cursor back with backspace characters (\x08).
                                    #然而在jupyter里面看不到......
        time.sleep(.1)
        if not signal.go: #5 If the go attribute is no longer True, exit the loop.
            break
    write(' ' * len(status) + '\x08' * len(status)) #6 Clear the status line by overwriting with spaces and 
                                                    #  moving the cursor back to the beginning.


def slow_function(): #7 模拟一个耗时的计算
    time.sleep(3) #8 sleep函数阻塞主线程, 同时使主线程释放GIL, 此时副线程得以运行.
    return '高科技'


def supervisor(): #9 开辟副线程, 运行副线程并且模拟耗时计算, 最后杀死线程
    signal = Signal()
    # 开一个线程
    spinner = threading.Thread(target=spin, args=('thinking', signal))
    print('spinner object:', spinner) #10
    spinner.start() #11
    result = slow_function() #12
    signal.go = False #13
    spinner.join() #14
    return result


def main():
    result = supervisor() #15
    print('Answer:', result)

In [5]:
main()

spinner object: <Thread(Thread-5, initial)>
|thinkin/thinkin-thinkin\thinkin|thinkin/thinkin-thinkin\thinkin|thinkin/thinkin-thinkin\thinkin|thinkin/thinkin-thinkin\thinkin|thinkin/thinkin-thinkin\thinkin|thinkin/thinkin-thinkin\thinkin|thinkin/thinkin-thinkin\thinkin|thinkin        Answer: 高科技


spinner with asyncio

In [7]:
import asyncio
import itertools
import sys


@asyncio.coroutine #1
def spin(msg): #2
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle('|/-\\'):
        status = char + ' ' + msg
        write(status)
        flush()
        write('\x08' * len(status))
        try:
            yield from asyncio.sleep(.1) #3
        except asyncio.CancelledError: #4
            break
    write(' ' * len(status) + '\x08' * len(status))


@asyncio.coroutine
def slow_function(): #5
    yield from asyncio.sleep(3) #6
    return '高科技'


@asyncio.coroutine
def supervisor(): #7
    spinner = asyncio.async(spin('thinking')) #8
    print('spinner object:', spinner) #9
    result = yield from slow_function() #10
    spinner.cancel() #11
    return result


def main():
    loop = asyncio.get_event_loop() #12
    result = loop.run_until_complete(supervisor()) #13
    loop.close()
    print('Answer:', result)

<center>总结</center>

1. 如果你想使用 asyncio 来实现协程, 最好加上 @asyncio.coroutine 装饰器.
2. 这里不需要外界提供一个信号来控制协程.
3. 使用 asyncio 来实现协程时, IO-bound的操作要小心, 为了不阻塞事件循环, 需要用 yield from asyncio.sleep(.1).
4. 如果抛出 asyncio.CancelledError 异常, 就退出循环, 协程中止.
5. slow_function 现在也是一个协程(因为他原本有IO-bound的操作).
6. The yield from asyncio.sleep(3) expression handles the control flow to the main loop, 
which will resume this coroutine after the sleep delay.
7. supervisor 现在也是一个协程函数, 内部会 通过 yield from 调用协程函数 slow_function.
8. asyncio.async(…) schedules the spin coroutine to run, wrapping it in a ***Task*** object, which is ***returned immediately***.
9. 打印 Task 对象.
10. yield from slow_function(), 并获得其返回值. 同时 时间循环将会继续运行, 这是因为 slow_fucntion 内部 yield from 了 asyncio.sleep(3), 控制权会立刻回到主循环.
11. 通过调用 Task 对象的 cancell 方法, raise asyncio.CancelledError, 抛出异常的地方是Task包装的协程函数**当前挂起的 yield 语句的位置**. 协程函数可以捕获异常, 延迟异常甚至拒绝异常.
12. 获得事件循环的***引用***.
13. 启动 supervisor 协程函数直到他终止并且获取其返回值.

<center>重点:asyncio.Future 与 concurrent.future.Future 的不同</center>

1. futures 对象是 results of scheduling something for execution. 在 asyncio 中, BaseEventLoop.creat_task() 接收一个协程函数, 调度它运行, 并且立刻返回一个 asyncio.Task 实例, Task 是 Future 的子类.
2. 在 asyncio.Future 的 result() 方法没有参数, 你不能指定 timeout, 并且如果 future 没有完成, 它不会像 concurrent 中的 Future 那样阻塞主线程, 而是抛出
   asyncio.InvalidStateError 异常.
3. 但是如果使用 yield from 语句而不是 result() 方法, 那么 future 会自动等待完成, 并将结果赋值给 yield from 左边的表达式. **事件循环**不会阻塞; 运行 yield from 的 协程函数会挂起, 直到 yield from 完成.

有两种方法来创建 Task 对象:

1. asyncio.async(coro_or_future, * , loop=None). 如果你传递了协程对象, async 将调用 loop.create_task() 来创建 Task 对象. 你亦可以指定事件循环, 如果不指定, 会调用 asyncio.get_event_loop() 获得**唯一的**事件循环.
2. BaseEventLoop.creat_task(coro) 调度协程函数并且立刻返回一个 Task 对象.

下面给出 donwload_flag 的 asyncio 版本, 注意:

1. 需要 python 3.7
2. 无法在 jupyter notebook 中运行, 需要在命令行下运行

In [3]:
import os
import time
import sys

import asyncio  # 1
import aiohttp  # 2

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


async def get_flag(session, cc):  # 3 async 关键字 等同于 @asyncio.coroutine
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    # 4 需要在 with 前加上 async, 因为 session.get是 IO bound 的,
    async with session.get(url) as resp:
                                         # 这一句等价于 resp = yield from aiohttp.request('GET', url)
        return await resp.read()  # 5 等价于 return yield from resp.read()


def show(text):
    print(text, end=' ')
    sys.stdout.flush()


async def download_one(session, cc):  # 6 download_one 也是一个协程函数, 因为它调用了 get_flag 协程
    # 7 等价于 yield from get_flag(session, cc)
    image = await get_flag(session, cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


async def download_many(cc_list):
    async with aiohttp.ClientSeesion() as session:  # 8 aiohttp 的 异步 session
        res = await asyncio.gather(  # 9 你无须显示的调用事件循环了.
            *[asyncio.creat_task(download_one(session, cc)) for cc in sorted(cc_list)]
        )
    return len(res)


def main():
    t0 = time.time()
    count = asyncio.run(download_many(POP20_CC))
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))

In [14]:
# main()

对比一下老版本的 asyncio 实现, 注意:

1. 仍然需要在命令行下实现
2. asyncio.wait 函数返回一个迭代器. wait 函数可以指定 **timeout 和 return_when** ;迭代器作为 loop.run_until_complete的参数后, run_until_complete
   将返回一个元组: (已完成的futures, 未完成的futures)---默认行为是等待所有 futures 完成

In [17]:
import asyncio, aiohttp

@asyncio.coroutine
def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = yield from aiohttp.request('GET', url)
    image = yield from resp.read()
    return image

@asyncio.coroutine
def download_one(cc):
    image = yield from get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    """这一版本的download_many需要显式地处理事件循环
    """
    loop = asyncio.get_event_loop() #1 创建当前线程事件循环的引用
    to_do = [download_one(cc) for cc in sorted(cc_list)] #2 创建生成器函数的list
    wait_coro = asyncio.wait(to_do) #3 这不是一个阻塞函数. 他是一个协程函数, 接受可迭代的对象, 可迭代的对象里储存了 futures或协程函数 ,
                                    #  当这些协程函数完成时, 它也完成.
    res, _ = loop.run_until_complete(wait_coro) #4 执行事件循环直到 wait_coro 终止
    loop.close() #5 关闭事件循环

def main(download_many):  
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))