### 协程声明
- async
### async vs sync
- 所谓 Sync，是指操作一个接一个地执行，下一个操作必须等上一个操作完成后才能执行。
- 而 Async 是指不同操作间可以相互交替执行，如果其中的某个操作被 block 了，程序并不会等待，而是会找出可执行的操作继续执行。
### ayncio 工作原理
- Asyncio 和其他 Python 程序一样，是单线程的，它只有一个主线程，但是可以进行多个不同的任务（task），这里的任务，就是特殊的 future 对象。这些不同的任务，被一个叫做 event loop 的对象所控制。
- event loop 会维护两个任务列表，分别对应这两种状态；并且选取预备状态的一个任务（具体选取哪个任务，和其等待的时间长短、占用的资源等等相关），使其运行，一直到这个任务把控制权交还给 event loop 为止。(这里简化任务只有两种:一是预备状态；二是等待状态)
- 当任务把控制权交还给 event loop 时，event loop 会根据其是否完成，把任务放到预备或等待状态的列表，然后遍历等待状态列表的任务，查看他们是否完成。
    - 如果完成，则将其放到预备状态的列表；
    - 如果未完成，则继续放在等待状态的列表。
### 协程调用
- await
- asyncio.create_task()
- asyncio.run():一个好的编程规范是,asyncio.run(main())作为主程序入口函数,在运行周期内,只调用一次asyncio.run.
- 多个协程并发执行:
    - await [task]
    - await asyncio.gather(*tasks)

### 多线程 vs Asyncio
- 如果是 I/O bound，并且 I/O 操作很慢，需要很多任务 / 线程协同实现，那么使用 Asyncio 更合适。
- 如果是 I/O bound，但是 I/O 操作很快，只需要有限数量的任务 / 线程，那么使用多线程就可以了。
- 如果是 CPU bound，则需要使用多进程来提高程序运行效率。

In [7]:
import time

async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    time.sleep(sleep_time)
    print('OK {}'.format(url))

async def main(urls):
    for url in urls:
        await crawl_page(url)

await main(['url_1', 'url_2', 'url_3', 'url_10'])

crawling url_1
OK url_1
crawling url_2
OK url_2
crawling url_3
OK url_3
crawling url_10
OK url_10


In [8]:
import asyncio

async def crawl_page(url):
    print('crawling {}'.format(url))
    print(crawl_page(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))

async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    for task in tasks:
        await task

await main(['url_1', 'url_2', 'url_3', 'url_10'])

crawling url_1
<coroutine object crawl_page at 0x1067ce6c0>
crawling url_2
<coroutine object crawl_page at 0x1067ce6c0>
crawling url_3
<coroutine object crawl_page at 0x1067ce6c0>
crawling url_10
<coroutine object crawl_page at 0x1067ce6c0>


  print(crawl_page(url))


OK url_1
OK url_2
OK url_3
OK url_10


In [9]:
import asyncio

async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))

async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    await asyncio.gather(*tasks)

await main(['url_1', 'url_2', 'url_3', 'url_10'])

crawling url_1
crawling url_2
crawling url_3
crawling url_10
OK url_1
OK url_2
OK url_3
OK url_10


In [15]:
import asyncio

async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')

async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')

async def main():
    task1 = asyncio.create_task(worker_1())
    task2 = asyncio.create_task(worker_2())
    print('before await')
    await task1
    print('awaited worker_1')
    await task2
    print('awaited worker_2')


await main()

before await
worker_1 start
worker_2 start
worker_1 done
awaited worker_1
worker_2 done
awaited worker_2


### 协程应用
- 协程任务限定运行时间:超时取消
- 协程运行时错误处理

In [17]:
async def worker_1():
    await asyncio.sleep(1)
    return 1

async def worker_2():
    await asyncio.sleep(2)
    return 2/0

async def worker_3():
    await asyncio.sleep(1)
    return 3

async def main():
    task_1 = asyncio.create_task(worker_1())
    task_2 = asyncio.create_task(worker_2())
    task_3 = asyncio.create_task(worker_3())

    await asyncio.sleep(2)
    task_3.cancel()

    result = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
    print(result)

await main()

[1, ZeroDivisionError('division by zero'), 3]


In [20]:
import random

async def consumer(queue, id):
    while True:
        item = await queue.get()
        print('{} get a item val: {}'.format(id, item))
        await asyncio.sleep(1)

async def producer(queue, id):
    for i in range(5):
        item = random.randint(1, 10)
        await queue.put(item)
        print('{} put a item val: {}'.format(id, item))
        await asyncio.sleep(1)

async def main():
    queue = asyncio.Queue()
    
    consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1'))
    consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2'))
    producer_1 = asyncio.create_task(producer(queue, 'producer_1'))
    producer_2 = asyncio.create_task(producer(queue, 'producer_2'))

    await asyncio.sleep(10)
    consumer_1.cancel()
    consumer_2.cancel()

    await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True)

await main()

producer_1 put a item val: 6
producer_2 put a item val: 9
consumer_1 get a item val: 6
consumer_2 get a item val: 9
producer_1 put a item val: 2
producer_2 put a item val: 1
consumer_1 get a item val: 2
consumer_2 get a item val: 1
producer_1 put a item val: 7
producer_2 put a item val: 3
consumer_1 get a item val: 7
consumer_2 get a item val: 3
producer_1 put a item val: 7
producer_2 put a item val: 10
consumer_1 get a item val: 7
consumer_2 get a item val: 10
producer_1 put a item val: 10
producer_2 put a item val: 5
consumer_1 get a item val: 10
consumer_2 get a item val: 5


In [22]:
import aiohttp
import bs4

async def fetch_content(url):
    async with aiohttp.ClientSession(
        connector=aiohttp.TCPConnector(ssl=False)
    ) as session:
        async with session.get(url) as response:
            return await response.text()

async def crawl_movie(url):
    init_page = await fetch_content(url)
    init_soup = bs4.BeautifulSoup(init_page, 'lxml')

    movie_names, urls_to_fetch, movie_dates = [], [], []
    all_movies = init_soup.find('div', id="showing-soon")
    for movie in all_movies.find_all('div', class_='item'):
        all_a_tag = movie.find_all('a')
        all_li_tag = movie.find_all('li')
        # eg:<a href="http://example.com/1">Link 1</a>
        movie_name = all_a_tag[1].text
        url_to_fetch = all_a_tag[1]['href']
        movie_date = all_li_tag[0].text

        movie_names.append(movie_name)
        urls_to_fetch.append(url_to_fetch)
        movie_dates.append(movie_date)

        
    tasks = [fetch_content(url) for url in urls_to_fetch]
    pages = await asyncio.gather(*tasks)

    for movie_name, movie_date, page in zip(movie_names, movie_dates, pages):
        soup_item = bs4.BeautifulSoup(page, 'lxml')
        img_tag = soup_item.find("img")
        print('{} {} {}'.format(movie_name, movie_date, img_tag['src']))

async def main():
    url = "https://movie.douban.com/cinema/later/beijing/"
    await crawl_movie(url)

await main()


在他乡 01月22日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2917516542.jpg
射雕英雄传：侠之大者 01月29日 https://img1.doubanio.com/view/photo/s_ratio_poster/public/p2917502509.jpg
封神第二部：战火西岐 01月29日 https://img3.doubanio.com/view/photo/s_ratio_poster/public/p2917559392.jpg
哪吒之魔童闹海 01月29日 https://img2.doubanio.com/view/photo/s_ratio_poster/public/p2916323291.jpg
蛟龙行动 01月29日 https://img9.doubanio.com/view/photo/s_ratio_poster/public/p2917596815.jpg
唐探1900 01月29日 https://img9.doubanio.com/view/photo/s_ratio_poster/public/p2917556416.jpg
熊出没·重启未来 01月29日 https://img9.doubanio.com/view/photo/s_ratio_poster/public/p2917469446.jpg
花样年华 02月14日 https://img2.doubanio.com/view/photo/s_ratio_poster/public/p2917498801.jpg
美国队长4 02月14日 https://img1.doubanio.com/view/photo/s_ratio_poster/public/p2915154169.jpg
我们的命中注定 02月14日 https://img1.doubanio.com/view/photo/s_ratio_poster/public/p2911292290.jpg
真爱营业 02月14日 https://img2.doubanio.com/view/photo/s_ratio_poster/public/p2916519681.jpg
多幸运遇见你 02月14日 https

In [3]:
import time
def cpu_bound(number):
    print(sum(i * i for i in range(number)))

def calculate_sums(numbers):
    for number in numbers:
        cpu_bound(number)

def main():
    start_time = time.perf_counter()  
    numbers = [10000000 + x for x in range(20)]
    calculate_sums(numbers)
    end_time = time.perf_counter()
    print('Calculation takes {} seconds'.format(end_time - start_time))
    
if __name__ == '__main__':
    main()

333333283333335000000
333333383333335000000
333333483333355000001
333333583333395000005
333333683333455000014
333333783333535000030
333333883333635000055
333333983333755000091
333334083333895000140
333334183334055000204
333334283334235000285
333334383334435000385
333334483334655000506
333334583334895000650
333334683335155000819
333334783335435001015
333334883335735001240
333334983336055001496
333335083336395001785
333335183336755002109
Calculation takes 10.589922042097896 seconds


In [2]:
import time
import multiprocessing
def cpu_bound(number):
    print(sum(i * i for i in range(number)))

def calculate_sums(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound, numbers)

def main():
    start_time = time.perf_counter()  
    numbers = [10000000 + x for x in range(20)]
    calculate_sums(numbers)
    end_time = time.perf_counter()
    print('Calculation takes {} seconds'.format(end_time - start_time))
    
if __name__ == '__main__':
    main()

Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/lib/python3.12/multiprocessing/pool.py", line 114, in worker
    task = get()
           ^^^^^
  File "/opt/anaconda3/lib/python3.12/multiprocessing/queues.py", line 389, in get
    return _ForkingPickler.loads(res)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'cpu_bound' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
Process SpawnPoolWorker-2:
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/anaconda3/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/o

KeyboardInterrupt: 