# 协程&Asyncio&异步编程
---
为什么要讲？
- 越来越多的学生来问async异步相关问题，并且这一部分的知识点不太容易学习（异步非阻塞、asyncio）。
- 异步相关话题和框架越来越多，例如：tornado、fastapi、django 3.xasgi、aiohttp都在异步->提升性能。

如何讲解？
- 第一部分：协程。
- 第二部分：asyncio模块进行异步编程。
- 第三部分：实战案例。

## 协程
---
协程不是计算机提供，程序员人为创造。

协程（Coroutine），也可以称为微线程，是一种用户态内的上下文切换技术。</br>
简而言之，其实就是通过一个线程实现代码块相互切换执行。例如：

In [3]:
def func1():
    print(1)
    ...
    print(2)


def func2():
    print(3)
    ...
    print(4)


func1()
func2()


1
2
3
4


实现协程有这么几种方法：
- greenlet,早期模块。
- yield关键字
- asyncio装饰器（py3.4）
- async、await关键字（py3.5）[推荐]

### greenlet实现协程


In [2]:
from greenlet import greenlet


def func1():
    print(1)  # * 第2步：输出1
    g2.switch()  # * 第3步：切换到func2
    print(2)  # * 第6步：输出2
    g2.switch()  # * 第7步：切换到func2


def func2():
    print(3)  # * 第4步：输出3
    g1.switch()  # * 第5步：切换到func1
    print(4)  # * 第8步：输出4


g1 = greenlet(func1)
g2 = greenlet(func2)

g1.switch()  # * 第1步，去执行func1函数


1
3
2
4


### 通过yield关键字实现协程

In [5]:
def func1():
    yield 1
    yield from func2()
    yield 2


def func2():
    yield 3
    yield 4


f1 = func1()
for item in f1:
    print(item)


1
3
4
2


### asyncio实现协程
在python3.4及之后的版本

In [7]:
%%writefile test_asyncio.py
import asyncio

@asyncio.coroutine
def func1():
    print(1)
    # 网络IO请求：下载一张图片
    yield from asyncio.sleep(2) #* 遇到IO耗时操作，自动切换到tasks中的其他任务
    print(2)

@asyncio.coroutine
def func2():
    print(3)
    # 网络IO请求：下载一张图片
    yield from asyncio.sleep(2) #* 遇到IO耗时操作，自动切换到tasks中的其他任务
    print(4)
    
tasks=[asyncio.ensure_future(func1()),asyncio.ensure_future(func2())]

loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

Writing test_asyncio.py


### async & await关键字实现协程
用asyncio提供的@asyncio.coroutine可以把一个generator标记为coroutine类型，然后在coroutine内部用yield from调用另一个coroutine实现异步操作。

为了简化并更好地标识异步IO，从Python 3.5开始引入了新的语法async和await，可以让coroutine的代码更简洁易读。

请注意，async和await是针对coroutine的新语法，要使用新的语法，只需要做两步简单的替换：
1. 把@asyncio.coroutine替换为async；
2. 把yield from替换为await；

In [8]:
%%writefile test_asyncio.py
import asyncio

async def func1():
    print(1)
    # 网络IO请求：下载一张图片
    await asyncio.sleep(2) #* 遇到IO耗时操作，自动切换到tasks中的其他任务
    print(2)

async def func2():
    print(3)
    # 网络IO请求：下载一张图片
    await asyncio.sleep(2) #* 遇到IO耗时操作，自动切换到tasks中的其他任务
    print(4)
    
tasks=[asyncio.ensure_future(func1()),asyncio.ensure_future(func2())]

loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

Overwriting test_asyncio.py


## 协程意义
---
在一个线程中如果遇到IO等待时间，线程不会傻傻等，利用空闲的时候再去干点其他事。

案例：去下载三张图片（网络IO）
- 普通方式（同步）

In [14]:
import requests


def download_image(url):
    print("开始下载", url)
    # 发送网络请求，下载图片
    response = requests.get(url)
    print("下载完成")
    # 图片保存到本地文件
    file_name = url.split("_")[-1]
    with open(file_name, "wb") as f:
        f.write(response.content)


if __name__ == "__main__":
    url_list = [
        "https://car2.autoimg.cn/cardfs/product/g30/M0B/07/4E/1400x0_1_q95_autohomecar__ChxknGNsfhSALeWKAAbD352xoh0016.jpg",
        "https://car3.autoimg.cn/cardfs/product/g30/M01/5A/C8/1400x0_1_q95_autohomecar__ChsFJ2NsfhWAYM11AAbaasHf4YU139.jpg",
        "https://car3.autoimg.cn/cardfs/product/g30/M01/07/4E/1400x0_1_q95_autohomecar__ChxknGNsfhSAQIchAAaXwNPGCb4158.jpg",
    ]

    for item in url_list:
        download_image(item)

Writing test.py


- 协程方式(异步)

In [1]:
%%writefile test_coroutine.py
import aiohttp
import asyncio


async def fetch(session, url):
    print("发送请求", url)
    async with session.get(url, verify_ssl=False) as resp:
        content = await resp.content.read()
        file_name = url.split("_")[-1]
        # 图片保存到本地文件
        print("下载完成")
        with open(file_name, "wb") as f:
            f.write(content)


async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            "https://car2.autoimg.cn/cardfs/product/g30/M0B/07/4E/1400x0_1_q95_autohomecar__ChxknGNsfhSALeWKAAbD352xoh0016.jpg",
            "https://car3.autoimg.cn/cardfs/product/g30/M01/5A/C8/1400x0_1_q95_autohomecar__ChsFJ2NsfhWAYM11AAbaasHf4YU139.jpg",
            "https://car3.autoimg.cn/cardfs/product/g30/M01/07/4E/1400x0_1_q95_autohomecar__ChxknGNsfhSAQIchAAaXwNPGCb4158.jpg",
        ]

        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
        await asyncio.wait(tasks)


if __name__ == "__main__":
    # asyncio.run(main())
    loop=asyncio.get_event_loop()
    loop.run_until_complete(main())

Overwriting test_coroutine.py


## 异步编程
---
### 事件循环
理解成为一个死循环，去检测并执行某些代码。

- 伪代码

        任务列表 = [ 任务1, 任务2, 任务3, ]
        while True:

            可执行的任务列表，已完成的任务列表 = 去任务列表中检查所有的任务，将“可执行”和“已完成”的任务返回

            for 就绪任务 in 已准备就绪的任务列表:
                执行已就绪的任务

            for 已完成的任务 in 已完成的任务列表:
                在任务列表中移除 已完成的任务

            如果 任务列表 中的任务都已经完成，则终止循环

In [None]:
import asyncio

# * 去生成或获取一个事件循环
loop = asyncio.get_event_loop()

# * 将任务放到‘任务列表’
# * loop.run_until_complete(任务)

### 快速上手
协程函数(coroutine function)： 定义函数时候 async def 函数名。

协程对象(coroutine object)：执行协程函数() 得到的协程对象。

In [None]:
import asyncio


async def func():
    pass


result = func()

***注意：执行协程函数创建协程对象，函数内部代码不会执行***

如果想要运行协程函数内部代码，必须要将协程对象交给事件循环来处理。

In [6]:
# %%writefile test_coroutine.py
import asyncio


async def func():
    print("来执行我吧")


result = func()

# loop=asyncio.get_event_loop()
# loop.run_until_complete(result)

# asyncio.run(result) # python 3.7 之后的用法
await result  # jupyter 用法

来执行我吧


### await
await + 可等待对象（协程对象、Future、Task对象 -> IO等待）

示例1：

In [8]:
import asyncio


async def func():
    print("来呀")
    response = await asyncio.sleep(2)
    print("结束", response)


# asyncio.run(func())
await func()

来呀
结束 None


示例2：

In [9]:
import asyncio


async def others():
    print("start")
    await asyncio.sleep(2)
    print("end")
    return "返回值"


async def func():
    print("执行协程函数内部代码")

    # * 遇到IO操作挂起当前协程（任务），等IO操作完成之后再继续往下执行。当前协程挂起时，事件循环可以去执行其他协程（任务）。
    response = await others()

    print("IO请求结束，结果为：", response)


# asyncio.run(func())

await func()

执行协程函数内部代码
start
end
IO请求结束，结果为： 返回值


示例3：

In [10]:
import asyncio


async def others():
    print("start")
    await asyncio.sleep(2)
    print("end")
    return "返回值"


async def func():
    print("执行协程函数内部代码")

    # * 遇到IO操作挂起当前协程（任务），等IO操作完成之后再继续往下执行。当前协程挂起时，事件循环可以去执行其他协程（任务）。
    response1 = await others()
    print("IO请求结束，结果为：", response1)

    response2 = await others()
    print("IO请求结束，结果为：", response2)


# asyncio.run(func())

await func()

执行协程函数内部代码
start
end
IO请求结束，结果为： 返回值
start
end
IO请求结束，结果为： 返回值


await就是等待对象的值得到结果之后再继续向下走

### Task对象

>任务 被用来“并行的”调度协程</br>
>当一个协程通过 asyncio.create_task() 等函数被封装为一个 任务，该协程会被自动调度执行:

白话：在事件循环中添加多个任务的

Tasks用于并发调度协程，通过使用高层级的 asyncio.create_task(协程对象) 函数来创建 Task 对象，这样可以让协程加入事件循环中等待被调度执行。</br>
也可用低层级的 loop.create_task() 或 ensure_future() 函数。不建议手动实例化 Task 对象。

***注意：asyncio.create_task()函数是在Python3.7中被加入。在Python3.7之前，可以改用低层级的asyncio.ensure_future()函数。***

示例1：

In [13]:
import asyncio


async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"


async def main():
    print("main 开始")

    # * 创建Task对象，将当前执行func函数任务添加到事件循环
    task1 = asyncio.create_task(func())

    # * 创建Task对象，将当前执行func函数任务添加到事件循环
    task2 = asyncio.create_task(func())

    print("main结束")

    # * 当执行某协程遇到IO操作时，会自动化切换执行其他任务。
    # * 此处的await是等待相对应的协程全都执行完毕并获取结果。
    ret1 = await task1
    print("获取到ret1")
    ret2 = await task2
    print("获取到ret2")
    print(ret1, ret2)


await main()
# asyncio.run(main())

main 开始
main结束
1
1
2
2
获取到ret1
获取到ret2
返回值 返回值


示例2：

In [15]:
import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

async def main():
    print("main 开始")

    # * 创建Task对象，将当前执行func函数任务添加到事件循环
    task_list = [asyncio.create_task(func(),name='n1'), asyncio.create_task(func(),name='n2')]

    print("main结束")

    #* pending是还没有完成的，超时的任务。
    done, pending = await asyncio.wait(task_list,timeout=None)
    print(done)


await main()
# asyncio.run(main())

main 开始
main结束
1
1
2
2
{<Task finished name='n1' coro=<func() done, defined at C:\Users\15515\AppData\Local\Temp\ipykernel_8240\692307068.py:3> result='返回值'>, <Task finished name='n2' coro=<func() done, defined at C:\Users\15515\AppData\Local\Temp\ipykernel_8240\692307068.py:3> result='返回值'>}


示例3：

In [18]:
%%writefile test_coroutine.py
import asyncio

async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"

#! 会报错，因为没有创建事件循环。RuntimeError: no running event loop
#! sys:1: RuntimeWarning: coroutine 'func' was never awaited
# task_list = [asyncio.create_task(func(),name='n1'), asyncio.create_task(func(),name='n2')]

task_list = [func(),func()]

done, pending = asyncio.run(asyncio.wait(task_list))
print(done)

Overwriting test_coroutine.py


### asyncio的Future对象

>Future 是一种特殊的 低层级 可等待对象，表示一个异步操作的 最终结果。</br>
>当一个 Future 对象 被等待，这意味着协程将保持等待直到该 Future 对象在其他地方操作完毕。</br>
>在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。</br>
>通常情况下 没有必要 在应用层级的代码中创建 Future 对象。</br>
>Future 对象有时会由库和某些 asyncio API 暴露给用户，用作可等待对象:</br>

Task继承Future，Task对象内部await结果的处理基于Future对象来的。

示例1：

In [1]:
import asyncio

async def main():
    # 获取当前事件循环
    loop=asyncio.get_running_loop()

    #* 创建一个任务（Future对象），这个任务什么都不干。
    fut = loop.create_future()

    #* 等待任务最终结果（Future对象），没有结果则会一直等下去。
    await fut
    
await main()

示例2：

In [1]:
import asyncio

async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result('666')

async def main():
    # 获取当前事件循环
    loop=asyncio.get_running_loop()

    #* 创建一个任务（Future对象），没绑定任何行为，则这个任务永远不知道什么时候结束。
    fut = loop.create_future()
    
    #* 创建一个任务（Task对象），绑定了set_after()函数，这个函数2s后，会给fut赋值。
    #* 即手动设置future任务的最终结果，那么fut就可以结束了。
    await loop.create_task(set_after(fut))

    await fut
    
await main()

### concurrent.futures.Future对象
使用线程池、进程池实现异步操作时用到的对象。

In [3]:
%%writefile test_future.py
import time
from concurrent.futures import Future,ThreadPoolExecutor,ProcessPoolExecutor

def func(value):
    time.sleep(1)
    print(value)

# 创建线程池
pool=ThreadPoolExecutor(max_workers=5)

# 创建进程池
# pool=ProcessPoolExecutor(max_workers=5)

for i in range(10):
    fut=pool.submit(func,i)
    print(fut)
    

Writing test_future.py


以后写代码可能会存在交叉使用。例如：crm项目80%都是基于协程异步编程+MySQL(不支持)【线程、进程做异步编程】。

In [4]:
import time
import asyncio
import concurrent.futures
def func1():
    #某个耗时操作
    time.sleep(2)
    return "SB"

async def main():
    loop = asyncio.get_running_loop()
    
    # 1.Run in the default loop's executor(默认ThreadPoolExecutor)
    # 第一步：内部会先调用ThreadPoolExecutor的submit方法去线程池中申请一个线程去执行func1函数，并返
    # 回一个concurrent.futures.Future对象
    # 第二步：调用asyncio.wrap_future将concurrent.futures.Future对象包装为asycio.Future对象。
    # 因为concurrent.futures.Future对象不支持await语法，所以需要包装为asycio.Future对象才能使用。
    fut = loop.run_in_executor(None,func1)
    result = await fut
    print('default thread pool',result)
    
    # 2.Run in a custom thread pool:
    # with concurrent.futures.ThreadPoolExecutor()as pool:
    #   result = await loop.run_in_executor(
    #   pool,func1)
    #   print('custom thread pool',result)
    
    # 3.Run in a custom process pool:
    # with concurrent.futures.ProcessPoolExecutor()as pool:
    #     result = await loop.run_in_executor(
    #     pool,func1)
    #     print('custom process pool',result)
        
# asyncio.run(main())
await main()

default thread pool SB


案例：asyncio+不支持异步的模块

In [6]:
%%writefile test_coroutine.py
import asyncio
import requests


async def download_image(ur1):
    # 发送网络请求，下载图片（遇到网络下载图片的IO请求，自动化切换到其他任务）
    print("开始下载：", ur1)

    loop = asyncio.get_running_loop()
    # requests模块默认不支持异步操作，所以就使用线程池来配合实现了。
    future = loop.run_in_executor(None, requests.get, ur1)

    response = await future
    print("下载完成")
    # 图片保存到本地文件
    file_name = ur1.rsplit("_")[-1]
    with open(file_name, mode="wb") as file_object:
        file_object.write(response.content)


if __name__ == "__main__":
    url_list = [
        "https://car2.autoimg.cn/cardfs/product/g30/M0B/07/4E/1400x0_1_q95_autohomecar__ChxknGNsfhSALeWKAAbD352xoh0016.jpg",
        "https://car3.autoimg.cn/cardfs/product/g30/M01/5A/C8/1400x0_1_q95_autohomecar__ChsFJ2NsfhWAYM11AAbaasHf4YU139.jpg",
        "https://car3.autoimg.cn/cardfs/product/g30/M01/07/4E/1400x0_1_q95_autohomecar__ChxknGNsfhSAQIchAAaXwNPGCb4158.jpg",
    ]

    tasks = [download_image(ur1) for ur1 in url_list]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

Overwriting test_coroutine.py


### 异步迭代器

什么是异步迭代器？

实现了__aiter__()和__anext__()方法的对象。__anext__必须返回一个awaitable对象。async for会
处理异步迭代器的__anext__()方法所返回的可等待对象，直到其引发一个StopAsyncIteration异常。由
PEP492引入。

什么是异步可迭代对象？

可在async for语句中被使用的对象。必须通过它的__aiter__()方法返回一个asynchronous iterator。由
PEP492引入。

In [7]:
import asyncio

class Reader(object):
    """ 自定义异步送代器（同时也是异步可送代对象）"""
 
    def __init__(self):
        self.count = 0
        
    async def readline(self):
        # awaitasyncio.sleep(1)
        self.count += 1
        return None if self.count ==100 else self.count
    
    def __aiter__(self):
        return self
        
    async def __anext__(self):
        val = await self.readline()
        if val is None:
            raise StopAsyncIteration
        return val


async def func():
    obj= Reader()
    async for item in obj:
        print(item)
        
# asyncio.run(func())
await func()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99


### 异步上下文管理器
此种对象通过定义__aenter__()和__aexit__()方法来对async with 语句中的环境进行控制。由PEP 492引入。

In [None]:
import asyncio

class AsyncContextManager():
    def __init__(self,conn):
        self.conn=conn
    
    
    async def do_something(self):
        # 异步操作数据库
        return 666
    
    
    async def __aenter__(self):
        # 异步连接数据库    
        self.conn = await asyncio.sleep(1)
        return self
    
    
    async def __aexit__(self,exception_type,exception_value,exception_traceback):
        # 异步关闭数据库连接
        await asyncio.sleep(1)
        

async def main():
    async with AsyncContextManager("mysql连接对象") as f:
        result = await f.do_something()
        print(result)

# asyncio.run(main())
await main()

## uvloop
是asyncio的事件循环替代方案，不支持windows系统。事件循环>默认asyncio的事件循环。

In [None]:
%%writefile test_uvloop.py
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# 编写asyncio的代码，与之前写的一致。

#* 内部的事件循环自动会变为uvloop
asyncio.run(...)

注意：一个asgi->uvicorn内部使用的就是uvloop

## 实战案例

### 异步操作redis
