## Concurrency with futures


In [11]:
# example 1
# 普通下载
# 并发
# 异步
# 多线程等

import os
import time
import datetime
import requests
import sys
import bs4

# 下载煎蛋妹子图为例
# 普通实例

PATH = 'pics/'
def download_pic(url):
    name = url.split('/')[-1]
    with open(PATH+name, 'wb')as f:
        f.write(requests.get('https:'+url).content)
        
    return name
        
def get_pic_urls(url):
    
    req = requests.get(url)
    soup = bs4.BeautifulSoup(req.text, 'lxml')
    soup_pic_urls = soup.find_all('a', class_='view_img_link')
    return [i['href'] for i in soup_pic_urls]

def down():
    url = 'http://jandan.net/ooxx/page-231#comments'
    for u in get_pic_urls(url):
        download_pic(u)
        
def main():
    start = datetime.datetime.now()
    down()
    end = datetime.datetime.now()
    print((end-start).seconds)
    
main()

27


In [5]:
# 在 100m 光纤的速度下，运行 main 函数花了 12 秒

In [9]:
# 并发代码

from concurrent import futures

MAX_WORKS = 20

def downoad_concurrent(cc_lst):
    workers = min(MAX_WORKS, len(cc_lst))
    with futures.ThreadPoolExecutor(workers) as exe:
        exe.map(download_pic, cc_lst)
        

def main_cc():
    url = 'http://jandan.net/ooxx/page-231#comments'
    start = datetime.datetime.now()
    downoad_concurrent(get_pic_urls(url))
    end = datetime.datetime.now()
    print((end-start).seconds)
    
main_cc()

10


In [10]:
# 只下载一页数据貌似不能体现并发的强大
# 毕竟 100m 光纤

In [18]:
# 并发代码 二
from concurrent import futures

def download_cc_2(lst):
    global futures
    with futures.ThreadPoolExecutor(max_workers=25) as exe:
        to_do = []
        for cc in lst:
            future = exe.submit(download_pic, cc)
            to_do.append(future)
            msg = 'scheduled for {}: {}'
            print(msg.format(cc, future))
            
    results = []
    for futures in futures.as_completed(to_do):
        res = future.result()
        msg = '{} result: {!r}'
        print(msg.format(future, res))
        results.append(msg)
        
    return len(results)

def main_cc_2():
    url = 'http://jandan.net/ooxx/page-231#comments'
    start = datetime.datetime.now()
    download_cc_2(get_pic_urls(url))
    end = datetime.datetime.now()
    print((end-start).seconds)
    
main_cc_2()

scheduled for //wx3.sinaimg.cn/large/6a4e1623gy1fkvqsjph0lj21ka2cfaw4.jpg: <Future at 0x7f7697af1898 state=running>
scheduled for //wx2.sinaimg.cn/large/6a4e1623gy1fkvqrv4tiij21kw1kwx2u.jpg: <Future at 0x7f7697af2358 state=running>
scheduled for //wx3.sinaimg.cn/large/6a4e1623gy1fkvqrqkdoqj21kw2dc4fc.jpg: <Future at 0x7f7697c3ba90 state=running>
scheduled for //wx2.sinaimg.cn/large/6a4e1623gy1fkvqrn7j02j2104104jx9.jpg: <Future at 0x7f7697c3ba20 state=running>
scheduled for //wx3.sinaimg.cn/large/6a4e1623gy1fkvqrkzp10j20jg0t6dkt.jpg: <Future at 0x7f7697c39c50 state=running>
scheduled for //wx2.sinaimg.cn/large/6a4e1623gy1fkvqqzdc2jj20jg0t6170.jpg: <Future at 0x7f7697c2de48 state=running>
scheduled for //wx3.sinaimg.cn/large/6a4e1623gy1fkvqqwhrbkj21kw2dcar0.jpg: <Future at 0x7f7697c2dc18 state=running>
scheduled for //wx2.sinaimg.cn/large/6a4e1623gy1fkvqqrnqkdj21hc1hc7jl.jpg: <Future at 0x7f7697c2d5f8 state=running>
scheduled for //wx1.sinaimg.cn/large/6a4e1623gy1fkvqqn47lej20jg0t5djy.jp

# 171029 Experimenting with Executor.map


In [5]:
from time import sleep, strftime
from concurrent import futures

def display(*args):
    print(strftime('[%H: %M: %S]'), end='')
    print(*args)
    
def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}):done'
    display(msg.format('\t'*n, n))
    return n * 10

def main():
    display('script starting...')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display('results: ', results)
    display('waiting for individual results:')
    for i, result in enumerate(results):
        display('results {}: {}'.format(i, result))
        
main()

[09: 59: 04][09: 59: 04][09: 59: 04]script starting...
	loiter(1): doing nothing for 1s...
[09: 59: 04][09: 59: 04]results: loiter(0): doing nothing for 0s... 
<generator object Executor.map.<locals>.result_iterator at 0x7fa713f6c150>[09: 59: 04]
[09: 59: 04]loiter(0):done
waiting for individual results:[09: 59: 04]
			loiter(3): doing nothing for 3s...
[09: 59: 04]results 0: 0
		loiter(2): doing nothing for 2s...
[09: 59: 05]	loiter(1):done
[09: 59: 05][09: 59: 05]results 1: 10
				loiter(4): doing nothing for 4s...
[09: 59: 06]		loiter(2):done
[09: 59: 06]results 2: 20
[09: 59: 07]			loiter(3):done
[09: 59: 07]results 3: 30
[09: 59: 09][09: 59: 09]results 4: 40				loiter(4):done



注意：

- futures.ThreadPoolExecutor 接受 max_workers 参数控制并发数量
- futures.ProcessPoolExecutor 没有 max_workers 参数，因为该参数与cpu 数量有关
- futures.ThreadPoolExecutor.map 有返回，但会随着单个并发的释放而放出


In [10]:
from tqdm import tqdm
import time

for i in tqdm(range(100)):
    time.sleep(.01)

100%|██████████| 100/100 [00:01<00:00, 96.88it/s]
