# 课程笔记  
>《协程 & asyncio & 异步》 武沛齐  
课程官方地址  
https://www.bilibili.com/video/BV1dD4y127bD  

# 1. 协程  
* 不是计算机提供，程序员人为创造  
* 也可以称为微线程，一种用户态内上下文切换的技术  
* 通过一个线程实现代码块切换执行  

## 实现协程的几种方式  
* greenlet 早期模块  
* yield 关键字  
* asyncio 装饰器 v3.4  
* async，await 关键字 v3.5  

## greenlet 实现协程  

In [None]:
from greenlet import greenlet


def func1():
    print(1)  # 第 2 步，输出 1
    gr2.switch()  # 第 3 步，切换到 func2 函数
    print(2)  # 第 6 步，输出 2
    gr2.switch()  # 第 7 步，切换到 func2 函数，从上一次执行的位置继续向后执行


def func2():
    print(3)  # 第 4 步，输出 3
    gr1.switch()  # 第 5 步，切换到 func1 函数，从上一次执行的位置继续向后执行
    print(4)  # 第 8 步，输出 4


gr1 = greenlet(func1)
gr2 = greenlet(func2)

gr1.switch()  # 第 1 步，执行 func1 函数

## yield 关键字实现  

In [None]:
def func1():
    yield 1
    yield from func2()
    yield 2
    
def func2():
    yield 3
    yield 4

f1 = func1()
for item in f1:
    print(item)

## asyncio

* 在 python 3.4 之后  
* 遇到 IO 自动切换  

In [None]:
import nest_asyncio

# 因 jupyter 中已经运行了 event loop
# 为保证与 IDE 写法一致，用 nest_asyncio.apply() 封装 event_loop
# 当前语句运行后对整个 notebook 文件都有效
# 重启或关闭后，需要重新运行当前 cell 保证后面语句运行正常
nest_asyncio.apply()

In [None]:
import asyncio


@asyncio.coroutine
def func1():
    print(1)
    # 假设是网络 IO 请求，下载一张图片
    # 遇到 IO 耗时操作自动化切换到 tasks 中的其他任务
    yield from asyncio.sleep(2)
    print(2)
    
@asyncio.coroutine
def func2():
    print(3)
    # 假设是网络 IO 请求，下载一张图片
    # 遇到 IO 耗时操作自动化切换到 tasks 中的其他任务
    yield from asyncio.sleep(2)
    print(4)

tasks = [asyncio.ensure_future(func1()),
        asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

## async & await 关键字  

* python 3.5 之后  
* async 关键字替代 @asyncio.coroutine 装饰器  
* await 关键字替代 yield from  

In [None]:
import asyncio

async def func1():
    print(1)
    # 假设是网络 IO 请求，下载一张图片
    # 遇到 IO 耗时操作自动化切换到 tasks 中的其他任务
    await asyncio.sleep(2)
    print(2)
    
async def func2():
    print(3)
    # 假设是网络 IO 请求，下载一张图片
    # 遇到 IO 耗时操作自动化切换到 tasks 中的其他任务
    await asyncio.sleep(2)
    print(4)

tasks = [asyncio.ensure_future(func1()),
        asyncio.ensure_future(func2())
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

# 2. 协程的意义  

* 线程利用 IO 等待时间切换到其他程序  

## 案例 下载三张图片  

* 普通方式（同步）  

In [None]:
import requests

def download_image(url):
    print("开始下载", url)
    # 发送网络请求，下载图片
    response = requests.get(url)
    print("下载完成")
    # 图片保存到本地文件
    file_name = url.rsplit('/')[-1]
    print(file_name)
    with open(file_name, mode='wb') as file_object:
        file_object.write(response.content)
        
if __name__ == '__main__':
    url_list = ['https://www.apache.org/img/support-apache.jpg',
               'https://www.python.org/static/img/python-logo.png',
               'https://www.linux.org/images/logo.png']
    
    for item in url_list:
        download_image(item)

* 协程方式（异步）  

In [None]:
import asyncio

import aiohttp


async def fetch(session, url):
    print("发送请求", url)
    async with session.get(url, verify_ssl=False) as response:
        content = await response.content.read()
        file_name = url.rsplit('/')[-1]
        with open(file_name, mode='wb') as file_object:
            file_object.write(content)
        print('下载完成', url)
        
async def main():
    async with aiohttp.ClientSession() as session:
        url_list = ['https://www.apache.org/img/support-apache.jpg',
               'https://www.python.org/static/img/python-logo.png',
               'https://www.linux.org/images/logo.png']
        
        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
        
        await asyncio.wait(tasks)
        
if __name__=='__main__':
    asyncio.run(main())

# 3. 异步编程  

## 3.1 事件循环  

* 理解成一个死循环，检测并执行某些代码  

```
#伪代码
任务列表 = [任务1, 任务2, 任务3, ...]

while True:
    可执行的任务列表, 已完成的任务列表 = 去任务列表里检查所有的任务，将“可执行”和“已完成”的任务返回
    
    for 就绪任务 in 已准备就绪的任务列表:
        执行已就绪的任务

    for 已完成的任务 in 已完成的任务列表:
        在任务列表中移除 已完成的任务
        
    如果 任务列表中的任务都已经完成，则终止循环

```  

```python
import asyncio

# 生成或获取一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到任务列表
loop.run_until_complete(tasks)
```

## 3.2 快速上手  

* 协程函数：async 关键字修饰的函数  
* 协程对象：执行协程函数得到的对象  

```python
async def func():
    pass

result = func()
```  

* 执行协程函数，只创建协程对象，函数内部代码不会执行  
* 运行协程函数内部代码，必须将协程对象交给事件循环来处理  

### python 3.7 之前的写法

```python
import asyncio

async def func():
    print("hello world")

result = func()

# python 3.7 之前的写法
loop = asyncio.get_event_loop()
loop.run_until_complete(result)
```

### python 3.7 之后的写法  

In [None]:
import asyncio


async def func():
    print("hello world")

result = func()

# python 3.7 及之后的写法
# 本质与 asyncio.get_event_loop() 一样，只是简化了写法
asyncio.run(result)

## 3.3 await()  

* await + 可等待对象(协程对象，Future，Task 对象-->IO 等待)  

### 示例 1  

In [None]:
import asyncio


async def func():
    print("hello world")
    response = await asyncio.sleep(2)
    print("结束", response)
    
asyncio.run(func())

### 示例 2  

In [None]:
import asyncio


async def others():
    print("IO start")
    await asyncio.sleep(2)
    print("IO end")
    return "IO 操作返回值"

async def func():
    print("执行协程函数内部代码")
    
    # 遇到 IO 挂起当前协程
    # IO 操作完成之后再继续往下执行
    # 协程挂起时，事件循环可以去执行其他协程
    response = await others()
    
    print("IO 请求结束，结果为：", response)
    
asyncio.run(func())

### 示例 3  

In [None]:
import asyncio


async def others():
    print("IO start")
    await asyncio.sleep(2)
    print("IO end")
    return "IO 操作返回值"

async def func():
    print("执行协程函数内部代码")
    
    # 遇到 IO 挂起当前协程
    # IO 操作完成之后再继续往下执行
    # 协程挂起时，事件循环可以去执行其他协程
    response1 = await others()
    print("IO1 请求结束，结果为：", response1)
    
    response2 = await others()
    print("IO2 请求结束，结果为：", response2)    
    
asyncio.run(func())

* await 等待对象的值得到结果后再继续往下走  

## 3.4 Task 对象  

* 在事件循环中添加多个任务  
* Task 用于并发调度协程  
* 通过 asyncio.create_task(协程对象) 的方式创建 Task 对象  
* 可以让协程加入事件循环中等待被调度执行  
* 除了使用 asyncio.create_task() 函数以外，还可以用低层级的 loop.create_task() 或 ensure_future() 函数  
* asyncio.create_task() 函数在 Python 3.7 被加入，3.7 之前可以改用低层级的 asyncio.ensure_future() 函数  

### 示例 1  

In [None]:
import asyncio


async def func(mark):
    print(1, mark)
    await asyncio.sleep(2)
    print(2, mark)
    return "返回值"

async def main():
    print("main 开始")
    
    # 创建 Task 对象，将当前执行 func 函数任务添加到事件循环
    task1 = asyncio.create_task(func('task1'))
    
    # 创建 Task 对象，将当前执行 func 函数任务添加到事件循环
    task2 = asyncio.create_task(func('task2'))
    
    print('main 结束')
    # 当执行某协程遇到 IO 操作时，会自动化切换到其他任务
    # 此处的 await 是等待相对应的协程全部执行完毕并获取结果
    ret1 = await task1
    ret2 = await task2
    print(ret1, ret2)
    
# 事件循环中有 3 个任务，main() 和 两个 task 中的 func()
asyncio.run(main())

### 示例 2  

In [None]:
import asyncio


async def func(mark):
    print(1, mark)
    await asyncio.sleep(2)
    print(2, mark)
    return "返回值"

async def main():
    print("main 开始")
    
    task_list = [asyncio.create_task(func('task1')),
                 asyncio.create_task(func('task2'))]
    
    print('main 结束')
    
    # done 执行完成的结果集合，pending 表示完成的状态
    done, pending = await asyncio.wait(task_list, timeout=None)
    print(done)
    
# 事件循环中有 3 个任务，main() 和 两个 task 中的 func()
asyncio.run(main())

In [None]:
import asyncio


async def func(mark):
    print(1, mark)
    await asyncio.sleep(2)
    print(2, mark)
    return "返回值"

task_list = [func('task1'), func('task2')]
    
done, pending = asyncio.run(asyncio.wait(task_list))
print(done)

## 3.5 asyncio.Future 对象  

* task 继承 Future，Task 对象内部 await 结果的处理基于 Future 对象  

### 示例 1  

```python
import asyncio

async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    
    # 创建一个任务（Future 对象）,这个任务什么都不干
    fut = loop.create_future()
    
    # 等待任务最终结果（Future 对象），没有结果则会一直等下去
    await fut
    
asyncio.run(main())
```

### 示例 2  

In [None]:
import asyncio

async def set_after(fut):
    await asyncio.sleep(2)
    fut.set_result('hello')
    
async def main():
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    
    # 创建一个任务（Future 对象），没绑定任何行为，则这个任务永远不知道什么时候结束
    fut = loop.create_future()
    
    # 创建一个任务（Task 对象），绑定了 set_after 函数，函数内部在 2s 之后，会给 fut 赋值
    # 手动设置 future 任务的最终结果，fut 就可以结束了
    await loop.create_task(set_after(fut))
    
    # 等待 Future 对象获取最终结果，否则一直等下去
    data = await fut
    print(data)
    
asyncio.run(main())

## 3.6 concurrent.futures.Future 对象  

### 使用线程池，进程池实现异步操作时用到的对象   

In [None]:
import time
from concurrent.futures.thread import ThreadPoolExecutor


def func(value):
    time.sleep(1)
    print(value)
    return 123


# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)

for i in range(10):
    fut = pool.submit(func, i)

* 两种编程方式结合时，两种 Future 对象可能会同时出现  
* 比如某个功能暂时不支持协程的写法又需要引入时  

In [None]:
import asyncio
import time


def func1():
    # 某个耗时的操作
    time.sleep(2)
    return "finished"


async def main():
    loop = asyncio.get_running_loop()

    # 1. Run in the default loop's executor(默认 ThreadPoolExecutor)
    # 第一步：内部先调用 ThreadPoolExecutor 的 submit 方法，去线程池中申请一个线程执行 func1 函数，返回一个 concurrent.futrues.Future 对象
    # 第二步：调用 asyncio.wrap_future 将 concurrent.futures.Future 对象包装为 asyncio.Future 对象
    # concurrent.futures.Future 对象不支持 await 语法，包装为 asyncio.Future 对象才能在协程中使用
    fut = loop.run_in_executor(None, func1)
    result = await fut
    print("default thread pool", result)


asyncio.run(main())

## 案例 asyncio + 不支持异步的模块  

In [None]:
import asyncio

import requests


async def download_image(url):
    print("开始下载", url)

    loop = asyncio.get_event_loop()
    # requests 模块默认不支持异步操作，使用线程池来配合实现
    # 包装成 asyncio.Future 对象
    future = loop.run_in_executor(None, requests.get, url)

    # 发送网络请求，下载图片
    response = await future
    print("下载完成")
    # 图片保存到本地文件
    file_name = url.rsplit('/')[-1]
    print(file_name)
    with open(file_name, mode='wb') as file_object:
        file_object.write(response.content)


if __name__ == '__main__':
    url_list = ['https://www.apache.org/img/support-apache.jpg',
                'https://www.python.org/static/img/python-logo.png',
                'https://www.linux.org/images/logo.png']

    tasks = [download_image(url) for url in url_list]

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))

## 3.7 异步迭代器  

In [None]:
import asyncio


class Reader(object):
    """自定义异步迭代器，同时也是异步可迭代对象"""

    def __init__(self):
        self.count = 0

    async def readline(self):
        # await asyncio.sleep(1)
        self.count += 1
        if self.count == 10:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.readline()
        if val == None:
            raise StopAsyncIteration
        return val


async def func():
    obj = Reader()
    # async for 必须在 async 函数内执行
    async for item in obj:
        print(item)


asyncio.run(func())

## 3.8 异步上下文管理器

* 通过定义 __aenter__() 和 __aexit__() 方法对 async with 语句中的环境进行控制  

In [None]:
import asyncio

class AsyncContextManager:
    def __init__(self):
        #self.conn = conn
        print()
        
    async def do_something(self):
        # 异步操作数据库
        return '数据库返回结果'
    
    async def __aenter__(self):
        # 异步连接数据库
#         self.conn = await asyncio.sleep(1)
        return self
    
    async def __aexit__(self, exc_tye, exc, tb):
        # 异步关闭数据库连接
        await asyncio.sleep(1)

async def func():
    async with AsyncContextManager() as f:
        result = await f.do_something()
        print(result)
        
asyncio.run(func())

# 4. uvloop  

* asyncio 事件循环的替代方案  
* 事件循环比默认 asyncio 事件循环更高  

```python
import asyncio
import uvloop

# uvloop 代替默认的 事件循环，其他代码写法与默认一致
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
```  

* uvicorn 内部使用的就是 uvloop  

# 5. 实战案例  

## 5.1 异步 redis  

* 使用 python 代码操作 redis 时，连接、操作、断开都是 IO 操作  

### 示例 1  

In [None]:
import asyncio
import aioredis

async def execute(address, password):
    print('开始执行', address)
    # 网络 IO 操作，创建 redis 连接
    # 此时如果有其他任务，线程会切换到其他任务
    redis = await aioredis.create_redis(address, password = password)
    
    # 网络 IO 操作，在 redis 中设置哈希值 car，内部设三个键值对
    # redis = {car: {key1:1, key2:2, kye3:3}}
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    
    # 网络 IO 操作，去 redis 中获取值
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)
    
    redis.close()
    
    # 网络 IO 操作，关闭 redis 连接
    await redis.wait_closed()
    
    print("结束", address)
    
asyncio.run(execute('redis://127.0.0.1:6379', "test"))

### 示例2  

In [None]:
import asyncio
import aioredis

async def execute(address, password):
    print('开始执行', address)
    # 网络 IO 操作，创建 redis 连接
    # 此时如果有其他任务，线程会切换到其他任务
    redis = await aioredis.create_redis(address, password = password)
    
    # 网络 IO 操作，在 redis 中设置哈希值 car，内部设三个键值对
    # redis = {car: {key1:1, key2:2, kye3:3}}
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    
    # 网络 IO 操作，去 redis 中获取值
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)
    
    redis.close()
    
    # 网络 IO 操作，关闭 redis 连接
    await redis.wait_closed()
    
    print("结束", address)
    
task_list = [execute('redis://127.0.0.1:6379', 'test'),
            execute('redis://127.0.0.1:6379', 'test')]

asyncio.run(asyncio.wait(task_list))

## 5.2 异步 MySQL  

In [None]:
import asyncio
import aiomysql

async def execute():
    # 网络 IO 操作，连接 MySQL
    conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='root', db='mysql')
    
    # 网络 IO 操作，创建 cursor
    cur = await conn.cursor()
    
    # 网络 IO 操作，执行 SQL
    await cur.execute('SELECT Host, User FROM user')
    
    # 网络 IO 操作，获取 SQL 结果
    result = await cur.fetchall()
    print(result)
    
    # 网络 IO 操作，关闭连接
    await cur.close()
    conn.close()
    
asyncio.run(execute())

In [None]:
import asyncio
import aiomysql

async def execute():
    # 网络 IO 操作，连接 MySQL
    conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='root', db='mysql')
    
    # 网络 IO 操作，创建 cursor
    cur = await conn.cursor()
    
    # 网络 IO 操作，执行 SQL
    await cur.execute('SELECT Host, User FROM user')
    
    # 网络 IO 操作，获取 SQL 结果
    result = await cur.fetchall()
    print(result)
    
    # 网络 IO 操作，关闭连接
    await cur.close()
    conn.close()

task_list=[execute(), execute()]
asyncio.run(asyncio.wait(task_list))

## 5.3 FastAPI 框架  

### 示例 1  

```python
import uvicorn
from fastapi import FastAPI

app = FastAPI()


@app.get("/")
def index():
    """普通操作接口"""
    # 某个 IO 操作 10s
    return {"message", "Hello world"}


if __name__ == '__main__':
    uvicorn.run("fastapi_example1:app", host="127.0.0.1", port=5000, log_level="info")
```

In [None]:
# 执行示例 1 的 py 文件
%run fastapi_example1.py

### 示例 2  

```python
import asyncio

import aioredis
import uvicorn
from aioredis import Redis
from fastapi import FastAPI

app = FastAPI()

# 创建 redis 连接池
REDIS_POOL = aioredis.ConnectionsPool('redis://127.0.0.1:6379', password='test', minsize=1, maxsize=10)


@app.get("/")
def index():
    """普通操作接口"""
    # 某个 IO 操作 10s
    return {"message", "Hello world"}


@app.get('/red')
async def red():
    """异步操作接口"""

    print("请求来了")

    await asyncio.sleep(3)

    # 连接池获取一个连接
    conn = await REDIS_POOL.acquire()
    redis = Redis(conn)

    # 设置值
    result = await redis.hgetall('car', encoding='utf-8')
    print(result)

    REDIS_POOL.release(conn)

    return result


if __name__ == '__main__':
    uvicorn.run("fastapi_example2:app", host="127.0.0.1", port=5000, log_level="info")

```

In [None]:
%run fastapi_example2.py

## 5.4 爬虫  

In [None]:
import asyncio

import aiohttp


async def fetch(session, url):
    print("发送请求", url)
    async with session.get(url, verify_ssl=False) as response:
        text = await response.text()
        print("得到结果", url, len(text))
        return text


async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            'https://python.org',
            'https://www.baidu.com',
            'https://www.linux.org'
        ]

        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]

        done, pending = await asyncio.wait(tasks)
        print(done)

if __name__ == '__main__':
    asyncio.run(main())


# 总结  

* 最大的意义：通过一个线程 IO 等待时间去做其他事情  