In [1]:
import time
from typing import (
    List,
)
import asyncio

In [9]:
import nest_asyncio
nest_asyncio.apply()

### 模拟爬虫的操作

### 同步测试

In [2]:
def crawl_page(url: str) -> None:
    """完成抓取"""
    print(f"crawling {url}")
    sleep_time = int(url.rsplit("_", maxsplit=1)[-1])
    time.sleep(sleep_time)
    print(f"抓取{url}完成")

def main(urls: List[str]) -> None:
    """批量抓取"""
    for url in urls:
        crawl_page(url)

In [3]:
%time main(['url_1', "url_2", "url_3", "url_4"])

crawling url_1
抓取url_1完成
crawling url_2
抓取url_2完成
crawling url_3
抓取url_3完成
crawling url_4
抓取url_4完成
Wall time: 10 s


**一共10s，这是同步的正常情况。我们看看异步的情况。**

In [4]:
async def crawl_page(url: str) -> None:
    """声明异步类型的抓取函数"""
    print(f"crawling {url}")
    sleep_time = int(url.rsplit("_", maxsplit=1)[-1])
    await asyncio.sleep(sleep_time)
    print(f"抓取{url}完成")

async def asy_main(urls: List[str]) -> None:
    """声明异步批量抓取"""
    for url in urls:
        await crawl_page(url)

In [10]:
%time asyncio.run(asy_main(['url_1', "url_2", "url_3", "url_4"]))

crawling url_1
抓取url_1完成
crawling url_2
抓取url_2完成
crawling url_3
抓取url_3完成
crawling url_4
抓取url_4完成
Wall time: 10 s


但是实际花费时间还是10s，那么很明显，这里并没有异步执行。为什么呢？  
那么肯定是循环 `await的问题` 。  
`await` 会阻塞等待执行的完成，但是在 `async` 装饰后的函数中（此时变成了一个携程对象），你又不得不使用`await`，那么我们怎么让他异步执行呢？  
我们用 `create_task` 来实现。 

In [14]:
async def crawl_page(url: str) -> None:
    """声明异步类型的抓取函数"""
    print(f"crawling {url}")
    sleep_time = int(url.rsplit("_", maxsplit=1)[-1])  # type: int
    await asyncio.sleep(sleep_time)
    print(f"抓取{url}完成")

async def asy_main(urls: List[str]) -> None:
    """声明异步批量抓取"""
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    for task in tasks:
        await task  # 等待完成

In [16]:
%time asyncio.run(asy_main(['url_1', "url_2", "url_3", "url_4"]))

crawling url_1
crawling url_2
crawling url_3
crawling url_4
抓取url_1完成
抓取url_2完成
抓取url_3完成
抓取url_4完成
Wall time: 4 s


**学过线程或者进程的同学应该觉得很熟悉，因为进程或者线程需要初始化一个任务对象，然后再运行。所以其实这里都是相通的， 不过异步创建就会运行，就不用手动运行了。**  
经过改造之后，我们发现运行速度变快了。只用了最长的运行时间，即 4s.  
是不是很方便就做出来了！没错，python的异步就是为了成为新的宠儿，才变得这么简单强大！

**当然, for 循环运行任务看起来不太优雅，我们有另一种写法。**

In [19]:
async def new_asy_main(urls: List[str]) -> None:
    """声明异步批量抓取"""
    tasks = (asyncio.create_task(crawl_page(url)) for url in urls)
    asyncio.gather(*tasks)

In [20]:
%time asyncio.run(new_asy_main(['url_1', "url_2", "url_3", "url_4"]))

Wall time: 1e+03 µs
crawling url_1
crawling url_2
crawling url_3
crawling url_4
抓取url_1完成
抓取url_2完成
抓取url_3完成
抓取url_4完成


这里我们把 `tasks` 优化成为了一个生成器，然后调用 `asyncio.gather` 解包 `tasks` ，这样写起来更加清晰易懂。

异步详解
---

In [25]:
import asyncio

async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')

async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')

async def main():
    task1 = asyncio.create_task(worker_1())  # 创建携程任务 并执行
    task2 = asyncio.create_task(worker_2())  # 创建携程任务 并执行
    print('before await')
    await task1  # 等待task1 完成
    print('awaited worker_1')
    await task2  # 等待task2 完成
    print('awaited worker_2')

%time asyncio.run(main())

before await
worker_1 start
worker_2 start
worker_1 done
awaited worker_1
worker_2 done
awaited worker_2
Wall time: 1.99 s


In [28]:
import asyncio
import random

async def consumer(queue, id):
    while True:
        val = await queue.get()
        print('{} get a val: {}'.format(id, val))
        await asyncio.sleep(1)

async def producer(queue, id):
    for i in range(5):
        val = random.randint(1, 10)
        await queue.put(val)
        print('{} put a val: {}'.format(id, val))
        await asyncio.sleep(1)

async def main():
    queue = asyncio.Queue()

    consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1'))
    consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2'))

    producer_1 = asyncio.create_task(producer(queue, 'producer_1'))
    producer_2 = asyncio.create_task(producer(queue, 'producer_2'))

    await asyncio.sleep(10)
    consumer_1.cancel()
    consumer_2.cancel()
    
    res = await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True)
    print(res)

%time asyncio.run(main())


producer_1 put a val: 3
producer_2 put a val: 2
consumer_1 get a val: 3
consumer_2 get a val: 2
producer_1 put a val: 10
producer_2 put a val: 3
consumer_2 get a val: 10
consumer_1 get a val: 3
producer_1 put a val: 1
producer_2 put a val: 6
consumer_1 get a val: 1
consumer_2 get a val: 6
producer_1 put a val: 1
producer_2 put a val: 7
consumer_1 get a val: 1
consumer_2 get a val: 7
producer_1 put a val: 8
producer_2 put a val: 8
consumer_1 get a val: 8
consumer_2 get a val: 8
[CancelledError(), CancelledError(), None, None]
Wall time: 10 s


### pint
  - ####  add_done_callback 回调。