# 什么是协程

协程(Coroutines)是一种特殊的软件构造。它允许程序在执行过程中暂停并恢复执行，而不丢失当前的执行上下文。与线程和进程不同，协程在单个线程中运行，通过调度机制实现并发，降低了上下文切换的开销，提供程序的执行效率。协程通常用于处理I/O密集型任务，如网络请求，文件读写等。



In [None]:
import time

def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    time.sleep(sleep_time)
    print('OK {}'.format(url))

def main(urls):
    for url in urls:
        crawl_page(url)

%time main(['url_1', 'url_2', 'url_3', 'url_4'])



In [None]:
import asyncio
import nest_asyncio


# call apply()
nest_asyncio.apply()

async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))

async def main(urls):
    for url in urls:
        await crawl_page(url)

%time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))



`import asyncio` 导入该库，里面包含了大部分我们实现协程所需要的魔法工具。

async修饰词声明异步函数，所以`crawl_page`和`main`都变成异步函数。而调用异步函数，可以得到一个协程对象(coroutine object)。例如：如果你 print(crawl_page(''))，便会输出，提示你这是一个 Python 的协程对象，而并不会真正执行这个函数。


## 协程的执行

1. 通过`await`来调用。await执行的效果和Python正常执行是一样的，程序会被阻塞在这里，进入被调用的协程函数，执行完毕返回后再继续。代码中的`await asyncio.sleep(sleep_time)`会在这里休息若干秒， `await crawl_page(url)`会执行crawel_page()函数。
2. 通过`asyncio.create_task()`来创建任务
3. 通过`asynico.run`来触发运行。asyncio.run 这个函数是 Python 3.7 之后才有的特性，可以让 Python 的协程接口变得非常简单。一个非常好的编程规范是，asyncio.run(main()) 作为主程序的入口函数，在程序运行周期内，只调用一次 asyncio.run。

## 为什么两次执行结果的时间一样呢

因为`await`是同步调用，因此，`crawl_page(url)`在当前的调用结束之前，是不会触发下一次调用的。（所谓同步，是指在发出一个功能调用时，在没有得到结果之前，该调用就不会返回)。

In [None]:
import asyncio
import nest_asyncio

nest_asyncio.apply()

async def crawl_page(url):
    print(f'crawling {url}')
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print(f'OK {url}')
    
    
async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    for task in tasks:
        await task
        

%time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))
    

通过 `asyncio.create_task` 来创建任务。任务创建后很快就会被调度执行，这样，我们的代码也不会阻塞在任务这里。用`for task in tasks: await task` 即可。

另外一种方式，用`asyncio.gather()`来实现task的调用。*tasks 解包列表，将列表变成了函数的参数；与之对应的是， ** dict 将字典变成了函数的参数。

In [None]:
import asyncio
import nest_asyncio

nest_asyncio.apply()

async def crawl_page(url):
    print(f'crawling {url}')
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print(f'OK {url}')
    
    
async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls ]
    await asyncio.gather(*tasks)
    
%time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))

# 解密协程运行时



In [None]:
import asyncio
import nest_asyncio

nest_asyncio.apply()

async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')
    
    
async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')
    
async def main():
    print('before await')
    await worker_1()
    print('awaited worker_1')
    await worker_2()
    print('awaited worker_2')
    
%time asyncio.run(main())

In [1]:
import asyncio
import nest_asyncio

nest_asyncio.apply()

async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')
    
    
async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')
    
async def main():
    task1 = asyncio.create_task(worker_1())
    task2 = asyncio.create_task(worker_2())
    print('before await')
    await task1
    print('awaited worker_1')
    await task2
    print('awaited worker_2')
    
%time asyncio.run(main())

before await
worker_1 start
worker_2 start
worker_1 done
awaited worker_1
worker_2 done
awaited worker_2
CPU times: total: 0 ns
Wall time: 2 s


代码发生的具体过程：
1. `asyncio.run(main())`，程序进入 main() 函数，事件循环开启
2. task1 和 task2 任务被创建，并进入事件循环等待运行；运行到 print，输出 'before await'
3. `await task1` 执行，用户选择从当前的主任务中切出，事件调度器开始调度 worker_1
4. worker_1 开始运行，运行 print 输出'worker_1 start'，然后运行 `await asyncio.sleep(1)`， 从当前任务切出，事件调度器开始调度 worker_2
5. worker_2 开始运行，运行 print 输出'worker_2 start'，然后运行 `await asyncio.sleep(2)` 从当前任务切出
6. 以上所有事件的运行时间，都应该在 1ms 到 10ms 之间，甚至可能更短，事件调度器从这个时候开始暂停调度
7. 一秒钟后，worker_1 的 sleep 完成，事件调度器将控制权重新传给 task_1，输出 'worker_1 done'，task_1 完成任务，从事件循环中退出
8. await task1 完成，事件调度器将控制器传给主任务，输出 'awaited worker_1'，·然后在 await task2 处继续等待
9. 两秒钟后，worker_2 的 sleep 完成，事件调度器将控制权重新传给 task_2，输出 'worker_2 done'，task_2 完成任务，从事件循环中退出
10. 主任务输出 'awaited worker_2'，协程全任务结束，事件循环结束


接下来，我们进阶一下。如果我们想给某些协程任务限定运行时间，一旦超时就取消，又该怎么做呢？再进一步，如果某些协程运行时出现错误，又该怎么处理呢？

In [2]:
import asyncio

async def worker_1():
    await asyncio.sleep(1)
    return 1

async def worker_2():
    await asyncio.sleep(2)
    return 2 / 0

async def worker_3():
    await asyncio.sleep(3)
    return 3

async def main():
    task_1 = asyncio.create_task(worker_1())
    task_2 = asyncio.create_task(worker_2())
    task_3 = asyncio.create_task(worker_3())

    await asyncio.sleep(2)
    task_3.cancel()

    res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
    print(res)

%time asyncio.run(main())



[1, ZeroDivisionError('division by zero'), CancelledError('')]
CPU times: total: 0 ns
Wall time: 2.01 s


你可以看到，worker_1 正常运行，worker_2 运行中出现错误，worker_3 执行时间过长被我们 cancel 掉了，这些信息会全部体现在最终的返回结果 res 中。 不过要注意return_exceptions=True这行代码。如果不设置这个参数，错误就会完整地 throw 到我们这个执行层，从而需要 try except 来捕捉，这也就意味着其他还没被执行的任务会被全部取消掉。为了避免这个局面，我们将 return_exceptions 设置为 True 即可

## 用协程来实现一个经典的生产者消费者模型

In [3]:
import asyncio
import random

async def consumer(queue, id):
    while True:
        val = await queue.get()
        print('{} get a val: {}'.format(id, val))
        await asyncio.sleep(1)

async def producer(queue, id):
    for i in range(5):
        val = random.randint(1, 10)
        await queue.put(val)
        print('{} put a val: {}'.format(id, val))
        await asyncio.sleep(1)

async def main():
    queue = asyncio.Queue()

    consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1'))
    consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2'))

    producer_1 = asyncio.create_task(producer(queue, 'producer_1'))
    producer_2 = asyncio.create_task(producer(queue, 'producer_2'))

    await asyncio.sleep(10)
    consumer_1.cancel()
    consumer_2.cancel()
    
    await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True)

%time asyncio.run(main())


producer_1 put a val: 5
producer_2 put a val: 4
consumer_1 get a val: 5
consumer_2 get a val: 4
producer_1 put a val: 10
producer_2 put a val: 8
consumer_2 get a val: 10
consumer_1 get a val: 8
producer_1 put a val: 4
producer_2 put a val: 8
consumer_1 get a val: 4
consumer_2 get a val: 8
producer_1 put a val: 4
producer_2 put a val: 6
consumer_2 get a val: 4
consumer_1 get a val: 6
producer_1 put a val: 2
producer_2 put a val: 9
consumer_1 get a val: 2
consumer_2 get a val: 9
CPU times: total: 0 ns
Wall time: 10 s


# 实战：豆瓣近日推荐电影爬虫

任务描述：https://movie.douban.com/cinema/later/beijing/ 这个页面描述了北京最近上映的电影。通过 Python 得到这些电影的名称、上映时间和海报。这个页面的海报是缩小版的，希望能从具体的电影描述页面中抓取到海报。



In [4]:
import requests
from bs4 import BeautifulSoup

def main():
    url = "https://movie.douban.com/cinema/later/beijing/"
    init_page = requests.get(url).content
    init_soup = BeautifulSoup(init_page, 'lxml')

    all_movies = init_soup.find('div', id="showing-soon")
    for each_movie in all_movies.find_all('div', class_="item"):
        all_a_tag = each_movie.find_all('a')
        all_li_tag = each_movie.find_all('li')

        movie_name = all_a_tag[1].text
        url_to_fetch = all_a_tag[1]['href']
        movie_date = all_li_tag[0].text

        response_item = requests.get(url_to_fetch).content
        soup_item = BeautifulSoup(response_item, 'lxml')
        img_tag = soup_item.find('img')

        print('{} {} {}'.format(movie_name, movie_date, img_tag['src']))

%time main()



ProxyError: HTTPSConnectionPool(host='movie.douban.com', port=443): Max retries exceeded with url: /cinema/later/beijing/ (Caused by ProxyError('Cannot connect to proxy.', NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x0000021F3833AC50>: Failed to establish a new connection: [WinError 10061] No connection could be made because the target machine actively refused it')))

In [5]:
import asyncio
import aiohttp

from bs4 import BeautifulSoup

async def fetch_content(url):
    async with aiohttp.ClientSession(
        headers=header, connector=aiohttp.TCPConnector(ssl=False)
    ) as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    url = "https://movie.douban.com/cinema/later/beijing/"
    init_page = await fetch_content(url)
    init_soup = BeautifulSoup(init_page, 'lxml')

    movie_names, urls_to_fetch, movie_dates = [], [], []

    all_movies = init_soup.find('div', id="showing-soon")
    for each_movie in all_movies.find_all('div', class_="item"):
        all_a_tag = each_movie.find_all('a')
        all_li_tag = each_movie.find_all('li')

        movie_names.append(all_a_tag[1].text)
        urls_to_fetch.append(all_a_tag[1]['href'])
        movie_dates.append(all_li_tag[0].text)

    tasks = [fetch_content(url) for url in urls_to_fetch]
    pages = await asyncio.gather(*tasks)

    for movie_name, movie_date, page in zip(movie_names, movie_dates, pages):
        soup_item = BeautifulSoup(page, 'lxml')
        img_tag = soup_item.find('img')

        print('{} {} {}'.format(movie_name, movie_date, img_tag['src']))

%time asyncio.run(main())



NameError: name 'header' is not defined