# 并发编程之Asyncio

在处理 I/O 操作时，使用多线程与普通的单线程相比，效率得到了极大的提高。那为什么还需要 Asyncio？

- 多线程运行过程容易被打断，因此有可能出现 race condition 的情况；
- 线程切换本身存在一定的损耗，线程数不能无限增加，因此，如果 I/O 操作非常 heavy，多线程很有可能满足不了高效率、高质量的需求。
- 。。。

## Sync VS Async

- Sync，是指操作一个接一个地执行，下一个操作必须等上一个操作完成后才能执行。
- Async 是指不同操作间可以相互交替执行，如果其中的某个操作被 block 了，程序并不会等待，而是会找出可执行的操作继续执行。

## Asyncio 工作原理

Asyncio 和其他 Python 程序一样，是单线程的，它只有一个主线程，但是可以进行多个不同的任务（task），这里的任务，就是特殊的 future 对象。这些不同的任务，被一个叫做 event loop 的对象所控制。

简化讲解这个问题，我们可以假设任务只有两个状态：
- 预备状态：是指任务目前空闲，但随时待命准备运行
- 等待状态：是指任务已经运行，但正在等待外部的操作完成，比如 I/O 操作。

在这种情况下，event loop 会维护两个任务列表，分别对应这两种状态；并且选取预备状态的一个任务（具体选取哪个任务，和其等待的时间长短、占用的资源等等相关），使其运行，一直到这个任务把控制权交还给 event loop 为止。当任务把控制权交还给 event loop 时，event loop 会根据其是否完成，把任务放到预备或等待状态的列表，然后遍历等待状态列表的任务，查看他们是否完成。

- 如果完成，则将其放到预备状态的列表；
- 如果未完成，则继续放在等待状态的列表。

而原先在预备状态列表的任务位置仍旧不变，因为它们还未运行。这样，当所有任务被重新放置在合适的列表后，新一轮的循环又开始了：event loop 继续从预备状态的列表中选取一个任务使其执行…如此周而复始，直到所有任务完成。

与 Future 多线程不同，对于 Asyncio 来说，它的任务在运行时不会被外部的一些因素打断，因此 Asyncio 内的操作不会出现 race condition 的情况，这样就不需要担心线程安全的问题了。

[参考文档](https://docs.python.org/3/library/asyncio-eventloop.html)

## 示例


In [1]:
# predefine URL
SITES = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]

In [2]:
import asyncio
import aiohttp
import copy
import time

async def download_one(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print('Read {} from {}'.format(resp.content_length, url))

async def download_all(sites):
    # asyncio.create_task 是 python 3.7 新增的，旧版本可以用 asyncio.ensure_future()
    tasks = [asyncio.create_task(download_one(site)) for site in sites]
    # 在 event loop 中运行 tasks 中的所有任务
    await asyncio.gather(*tasks)

async def main():
    sites = copy.copy(SITES)
    start_time = time.perf_counter()

    # 这种用法是 python 3.7 新增的，旧版本可以用下面的老式写法，拿到 event loop，然后运行
    # 这个在 jupyter 中会导致 event loop 的嵌套问题，使用 await 替代
    # asyncio.run(download_all(sites))
    # # 老式写法
    # loop = asyncio.get_event_loop()
    # try:
    #     loop.run_until_complete(download_all(sites))
    # finally:
    #     loop.close()
    await download_all(sites)

    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))
    
if __name__ == '__main__':
    await main()


Read 15839 from https://en.wikipedia.org/wiki/The_C_Programming_Language
Read 88641 from https://en.wikipedia.org/wiki/PHP
Read 69945 from https://en.wikipedia.org/wiki/Portal:Biography
Read 37025 from https://en.wikipedia.org/wiki/Node.js
Read 64974 from https://en.wikipedia.org/wiki/Java_(programming_language)
Read 61488 from https://en.wikipedia.org/wiki/Go_(programming_language)
Read 79710 from https://en.wikipedia.org/wiki/Portal:History
Read 33140 from https://en.wikipedia.org/wiki/Portal:Society
Read 40112 from https://en.wikipedia.org/wiki/Portal:Arts
Read 73550 from https://en.wikipedia.org/wiki/Computer_science
Read 80759 from https://en.wikipedia.org/wiki/Python_(programming_language)
Read None from https://en.wikipedia.org/wiki/Portal:Geography
Read None from https://en.wikipedia.org/wiki/Portal:Mathematics
Read None from https://en.wikipedia.org/wiki/Portal:Science
Read None from https://en.wikipedia.org/wiki/Portal:Technology
Download 15 sites in 0.7202079740000045 second

这里的 async 和 await 关键字是 Asyncio 的最新写法，表示这个语句 / 函数是 non-block 的，对应前面所讲的 event loop 的概念。如果任务执行的过程需要等待，则将其放入等待状态的列表中，然后继续执行预备状态列表里的任务。


## Asyncio 局限性

Future 多线程编程中，我们使用的是 requests 库，但 Asyncio 中我们使用 aiohttp 库，原因是 requests 库并不兼容 Asyncio，但是 aiohttp 库兼容。

Asyncio 软件库的兼容性问题，在 Python 3 的早期一直是个大问题，随着技术的发展，这个问题正逐步得到解决。

使用 Asyncio 时，在任务的调度方面有更大的自主权：

- 如果需要 await 一系列的操作，就得使用 asyncio.gather()
- 如果只是单个的 future，或许只用 asyncio.wait() 就可以了
- 使用 future，只能让它 run_until_complete() 或者 run_forever() 

## 多线程 还是 Asyncio

遵循以下伪代码的规范

```python
if io_bound:
    if io_slow:
        print('Use Asyncio')
    else:
        print('Use multi-threading')
else if cpu_bound:
    print('Use multi-processing')
```

multi-processing ? python 的多进程模式

使用模块 multiprocessing 或者 concurrent.futures。

```python
import concurrent.futures
import multiprocessing

with ProcessPoolExecutor(max_workers=nprocs) as executor:
    executor.map(func, nums)

# 等同于
with multiprocessing.Pool(nprocs) as pool:
    pool.map(func, nums)
```

## 思考题

输入一个列表，对于列表中的每个元素，我想计算 0 到这个元素的所有整数的平方和。

根据以下常规版本，写出多进程版本，并比较性能。

In [3]:
import time

def cpu_bound(number):
    print(sum(i * i for i in range(number)))

def calculate_sums(numbers):
    for number in numbers:
        cpu_bound(number)

def main():
    start_time = time.perf_counter()
    numbers = [10000000 + x for x in range(20)]
    calculate_sums(numbers)
    end_time = time.perf_counter()
    print('Calculation takes {} seconds'.format(end_time - start_time))

if __name__ == '__main__':
    main()

333333283333335000000
333333383333335000000
333333483333355000001
333333583333395000005
333333683333455000014
333333783333535000030
333333883333635000055
333333983333755000091
333334083333895000140
333334183334055000204
333334283334235000285
333334383334435000385
333334483334655000506
333334583334895000650
333334683335155000819
333334783335435001015
333334883335735001240
333334983336055001496
333335083336395001785
333335183336755002109
Calculation takes 16.041871466999964 seconds


In [4]:
import concurrent.futures
import time
import multiprocessing

def cpu_bound(number):
    return sum(i * i for i in range(number))

def main():
    start_time = time.perf_counter()
    numbers = [10000000 + x for x in range(20)]
    with concurrent.futures.ProcessPoolExecutor() as executor:
        # can adjust chunksize by no. cores
        # check no. cores: sysctl -n hw.ncpu
        for number in executor.map(cpu_bound, numbers, chunksize=2):
            print(number)
    end_time = time.perf_counter()
    print('Calculation takes {} seconds'.format(end_time - start_time))

if __name__ == '__main__':
    main()


333333283333335000000
333333383333335000000
333333483333355000001
333333583333395000005
333333683333455000014
333333783333535000030
333333883333635000055
333333983333755000091
333334083333895000140
333334183334055000204
333334283334235000285
333334383334435000385
333334483334655000506
333334583334895000650
333334683335155000819
333334783335435001015
333334883335735001240
333334983336055001496
333335083336395001785
333335183336755002109
Calculation takes 3.5311830840000766 seconds


In [1]:
import multiprocessing
import time

def cpu_bound(number):
    x = sum(i * i for i in range(number))
    print(x)
    return x

def find_sums(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound, numbers)

if __name__ == "__main__":
    start_time = time.time()
    numbers = [10000000 + x for x in range(20)]
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"Duration {duration} seconds")

333333583333395000005333333783333535000030

333333283333335000000
333333983333755000091
333334383334435000385333333483333355000001333333683333455000014333334183334055000204



333334083333895000140333333883333635000055

333333383333335000000
333334283334235000285
333334583334895000650333334683335155000819

333334483334655000506
333334783335435001015
333334883335735001240
333335183336755002109
333335083336395001785
333334983336055001496
Duration 4.05443263053894 seconds


## 补充学习

找到一个杜克大学的资料，参考 [Multi-Core Parallelism](http://people.duke.edu/~ccc14/sta-663-2016/19B_Threads_Processses_Concurrency.html)

- 进程状态、取消和增加回调方法


In [9]:
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import time
import os

counter = 0

def f2(i):
    global counter
    counter = counter + 1
    print(os.getpid(), i)
    return counter

njobs = 24
res = []
with ProcessPoolExecutor(max_workers=4) as pool:

    for i in range(njobs):
        res.append(pool.submit(f2, *np.random.rand(1)))
        if i % 2 == 0:
            res[i].add_done_callback(lambda future: print("Process done!"))

    res[4].cancel()
    if res[4].cancelled():
        print("Process 4 cancelled")

    for i, x in enumerate(res):
        while x.running():
            
            print("Running")
            time.sleep(1)
        if not x.cancelled():
            print(x.result())


8771587716 8771787718 0.6039777834995945  0.6646650435847195
0.94583302084681460.26480101039519066
87715

87716 8771887717  0.4483436414806864 0.5945795387355
0.055936314337816430.9696934783214501
87715

 8771687717877180.9043808081329642   0.64520606972594520.7700363825163905
0.6201562351776333

87715
8771887716 87717  0.056204267705176836 0.4621033859146042
0.19086556865263870.466501294374000887715


 8771887716877170.6093156205745859   
0.24549128000360710.688678585394620.0910545226392349787715


 87718877170.12255726112350152  
0.349989158822429540.5751596134811675

Process done!
Process 4 cancelled
Running
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
Process done!
1
1
1
1
2
2
2
2
3
3
3
3
4
4
4
4
5
5
5
5
6
6
6


- 调用多参数方法


In [16]:
def f(a, b):
    return a + b

def f_(args):
    return f(*args)

xs = np.arange(24)
print(xs)
print(np.array_split(xs, xs.shape[0]/2))
with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(f_, np.array_split(xs, xs.shape[0]//2))
list(res)


[ 0  1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23]
[array([0, 1]), array([2, 3]), array([4, 5]), array([6, 7]), array([8, 9]), array([10, 11]), array([12, 13]), array([14, 15]), array([16, 17]), array([18, 19]), array([20, 21]), array([22, 23])]


[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]