# 加速爬虫: 异步加载 Asyncio


## 基本用法

In [1]:
import time


def job(t):
    print('Start job ', t)
    time.sleep(t)               # wait for "t" seconds
    print('Job ', t, ' takes ', t, ' s')
    

def main():
    [job(t) for t in range(1, 3)]
    
    
t1 = time.time()
main()
print("NO async total time : ", time.time() - t1)

Start job  1
Job  1  takes  1  s
Start job  2
Job  2  takes  2  s
NO async total time :  3.0008366107940674


从上面可以看出, 我们的 job 是按顺序执行的, 必须执行完 job 1 才能开始执行 job 2, 而且 job 1 需要1秒的执行时间, 而 job 2 需要2秒. 所以总时间是 3 秒多. 而如果我们使用 asyncio 的形式, job 1 在等待 time.sleep(t) 结束的时候, 比如是等待一个网页的下载成功, 在这个地方是可以切换给 job 2, 让它开始执行
## Translate above to async
**Now, let's see the running time using asyncio**

In [2]:
# .py文件中运行不报错
import asyncio
import time
import nest_asyncio
nest_asyncio.apply()

async def job(t):
    print('Start job ', t)
    await asyncio.sleep(t)          # wait for "t" seconds, it will look for another job while await
    print('Job ', t, ' takes ', t, ' s')


async def main(loop):
    tasks = [loop.create_task(job(t)) for t in range(1, 3)]     # just create, not run job
    await asyncio.wait(tasks)                                   # run jobs and wait for all tasks done

t1 = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
# loop.close()                          # Ipython notebook gives error if close loop
print("Async total time : ", time.time() - t1)
if __name__ == '__main__':
	pass


Start job  1
Start job  2
Job  1  takes  1  s
Job  2  takes  2  s
Async total time :  1.9878110885620117


从结果可以看出, 我们没有等待 job 1 的结束才开始 job 2, 而是 job 1 触发了 await 的时候就切换到了 job 2 了. 这时, job 1 和 job 2 同时在等待 await asyncio.sleep(t), 所以最终的程序完成时间, 取决于等待最长的 t, 也就是 2秒. 这和上面用普通形式的代码相比(3秒), 的确快了很多.
## aiohttp
接着我们来看看我们怎么用最一般的 requests 模块爬网页, 和我们怎么将 requests 替换成 aiohttp.


In [3]:
import requests

URL = 'https://mofanpy.com/'


def normal():  
    for i in range(2):
        r = requests.get(URL)
        url = r.url
        print(url)
    
t1 = time.time()
normal()
print("Normal total time:", time.time()-t1)

https://mofanpy.com/
https://mofanpy.com/
Normal total time: 0.3860809803009033


用 requests 用久了以后, 这样的代码真是信手拈来. 很好, 我们打开 莫烦 Python 的首页两次只花了 0.38秒. 然后我们在用 aiohttp 来实现一样的功能. 结果 asyncio 的方式只用了 0.11秒! 大获全胜.

In [4]:
# .py文件中运行不报错
import nest_asyncio
nest_asyncio.apply()
import asyncio

import aiohttp
import time
URL = 'https://mofanpy.com/'
async def job(session):
	response = await session.get(URL)
	return str(response.url)


async def main(loop):
	async with aiohttp.ClientSession() as session:
		tasks = [loop.create_task(job(session)) for _ in range(2)]
		finished, unfinished = await asyncio.wait(tasks)
		all_results = [r.result() for r in finished]  # get return from job
		print(all_results)


t1 = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
# loop.close()                      # Ipython notebook gives error if close loop
print("Async total time:", time.time() - t1)

['https://mofanpy.com/', 'https://mofanpy.com/']
Async total time: 0.09995436668395996


我们刚刚创建了一个 Session, 这是官网推荐的方式, 但是我觉得也可以直接用 request 形式, 细节请参考官方说明. 如果要获取网页返回的结果, 我们可以在 job() 中 return 个结果出来, 然后再在 finished, unfinished = await asyncio.wait(tasks) 收集完成的结果, 这里它会返回完成的和没完成的, 我们关心的都是完成的, 而且 await 也确实是等待都完成了才返回. 真正的结果被存放在了 result() 里面
## 和多进程分布式爬虫对比
**async**

In [None]:
import aiohttp
import asyncio
import time
from bs4 import BeautifulSoup
from urllib.request import urljoin
import re
import multiprocessing as mp

base_url = "https://mofanpy.com/"
# base_url = "http://127.0.0.1:4000/"

# DON'T OVER CRAWL THE WEBSITE OR YOU MAY NEVER VISIT AGAIN
if base_url != "http://127.0.0.1:4000/":
    restricted_crawl = True
else:
    restricted_crawl = False
        
        
seen = set()
unseen = set([base_url])


def parse(html):
    soup = BeautifulSoup(html, 'html.parser')
    urls = soup.find_all('a', {"href": re.compile('^/.+?/$')})
    title = soup.find('h1').get_text().strip()
    page_urls = set([urljoin(base_url, url['href']) for url in urls])
    url = soup.find('meta', {'property': "og:url"})['content']
    return title, page_urls, url


async def crawl(url, session):
    r = await session.get(url)
    html = await r.text()
    await asyncio.sleep(0.1)        # slightly delay for downloading
    return html


async def main(loop):
    pool = mp.Pool(8)               # slightly affected
    async with aiohttp.ClientSession() as session:
        count = 1
        while len(unseen) != 0:
            print('\nAsync Crawling...')
            tasks = [loop.create_task(crawl(url, session)) for url in unseen]
            finished, unfinished = await asyncio.wait(tasks)
            htmls = [f.result() for f in finished]
            
            print('\nDistributed Parsing...')
            parse_jobs = [pool.apply_async(parse, args=(html,)) for html in htmls]
            results = [j.get() for j in parse_jobs]
            
            print('\nAnalysing...')
            seen.update(unseen)
            unseen.clear()
            for title, page_urls, url in results:
                # print(count, title, url)
                unseen.update(page_urls - seen)
                count += 1

if __name__ == "__main__":
    t1 = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    # loop.close()
    print("Async total time: ", time.time() - t1)


Async Crawling...

Distributed Parsing...


**多线程**

In [None]:
from urllib.request import urlopen, urljoin
from bs4 import BeautifulSoup
import multiprocessing as mp
import re
import time


def crawl(url):
    response = urlopen(url)
    time.sleep(0.1)             # slightly delay for downloading
    return response.read().decode()


def parse(html):
    soup = BeautifulSoup(html, 'html.parser')
    urls = soup.find_all('a', {"href": re.compile('^/.+?/$')})
    title = soup.find('h1').get_text().strip()
    page_urls = set([urljoin(base_url, url['href']) for url in urls])
    url = soup.find('meta', {'property': "og:url"})['content']
    return title, page_urls, url


if __name__ == '__main__':
    base_url = 'https://mofanpy.com/'
    # base_url = "http://127.0.0.1:4000/"
        
    # DON'T OVER CRAWL THE WEBSITE OR YOU MAY NEVER VISIT AGAIN
    if base_url != "http://127.0.0.1:4000/":
        restricted_crawl = True
    else:
        restricted_crawl = False
        
    unseen = set([base_url,])
    seen = set()

    pool = mp.Pool(8)                       # number strongly affected
    count, t1 = 1, time.time()
    while len(unseen) != 0:                 # still get some url to visit
        if restricted_crawl and len(seen) > 20:
            break
        print('\nDistributed Crawling...')
        crawl_jobs = [pool.apply_async(crawl, args=(url,)) for url in unseen]
        htmls = [j.get() for j in crawl_jobs]                                       # request connection
        htmls = [h for h in htmls if h is not None]     # remove None

        print('\nDistributed Parsing...')
        parse_jobs = [pool.apply_async(parse, args=(html,)) for html in htmls]
        results = [j.get() for j in parse_jobs]                                     # parse html

        print('\nAnalysing...')
        seen.update(unseen)
        unseen.clear()

        for title, page_urls, url in results:
            # print(count, title, url)
            count += 1
            unseen.update(page_urls - seen)

    print('Total time: %.1f s' % (time.time()-t1, ))

