# 使用future处理并发

In [None]:
# 在I/O密集型应用中，如果代码写得正确，那么不管使用哪种并发策略（使用线程或asyncio包），吞吐量都比依序执行的代码高很多。

In [1]:
import os
import time
import sys

import requests
from concurrent import futures

In [2]:
from time import sleep, strftime

In [None]:
# 依序下载的脚本
POP20_CC = 'CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR'.split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = 'downloads/'

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp .write(img)

def get_flag(cc):
    url = '{}/{cc}/{cc}.git'.format(BASE_URL, cc = cc.lower())
    resp = requests.get(url)
    return resp.content

def show(text):
    print (text, end = ' ')
    sys.stdout.flush()

def download_many(cc_list):
    for cc in sorted(cc_list):
        imag = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')
    return len(cc_list)

def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print (msg.format(count, elapsed))
    
#多线程下载脚本
MAX_WORKERS = 20
def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc

def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        res = executor.map(download_one, sorted(cc_list))
    return len(list(res))

In [None]:
# future 在哪？future是concurrent.futures 和 asyncio包的重要组件
def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers = 3) as executor:
        to_do = []
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do.append(future)
            msg = 'Scheduled for {}: {}'
            prit (msg.format(cc, future))

        result = []
        # as_completed 返回的是future运行结束后产出future的迭代器，不会阻塞调用方线程
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = '{} result: {!r}'
            print (msg.format(future, res))
            results.append(res)
    return len(results)

In [None]:
# 目前测试的并发脚本都不能并行下载，使用concurrent.futures库实现的两个示例受GIL(Global Interpreter Lock，全局解释锁)的限制
# 两个疑问：
# 既然python线程受GIL的限制，任何时候都只允许运行一个线程，那么flags_threadpool.py脚本的下载速度怎么会提升5倍
# flags_asyncio.py脚本和flags.py脚本都在单个线程中运行，前者为啥快5倍

In [None]:
# CPython解释器本身就不是线程安全的，GIL一次只允许使用一个线程执行Python字节码，一个Python进程通常不能同时使用多个CPU核心

In [None]:
# 一个Python线程在等待网络响应时，阻塞型I/O函数会释放GIL，再运行一个线程。
# “Python 线程毫无作用” by David Beazley

In [None]:
# ProcessPoolExecutor 类把工作分配给多个Python进程处理，因此需要做CPU密集型处理，使用这个模块能绕开GIL，利用所有可用的CPU核心
# 最佳线程数取决于做的是什么事，以及可以用的内存有多少。
# 最佳进程数一般不要超过CPU数量

In [7]:
def display(*args):
    print (strftime('[%H:%M:%S]'), end=' ')
    print (*args)

def loiter(n):
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10

def test():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3)
    results = executor.map(loiter, range(5))
    display('results:', results)
    display('Waiting for individual results:')
    for i, result in enumerate(results):
        display('result {}: {}'.format(i, result))

In [8]:
test()

[14:22:10] Script starting.
[14:22:10] loiter(0): doing nothing for 0s...
[14:22:10] loiter(0): done.
[14:22:10] 	loiter(1): doing nothing for 1s...
[14:22:10] 		loiter(2): doing nothing for 2s...[14:22:10]
[14:22:10] results: <generator object Executor.map.<locals>.result_iterator at 0x110eb8f48>
[14:22:10] Waiting for individual results:
[14:22:10] result 0: 0
 			loiter(3): doing nothing for 3s...
[14:22:11] 	loiter(1): done.
[14:22:11] 				loiter(4): doing nothing for 4s...
[14:22:11] result 1: 10
[14:22:12] 		loiter(2): done.
[14:22:12] result 2: 20
[14:22:13] 			loiter(3): done.
[14:22:13] result 3: 30
[14:22:15] 				loiter(4): done.
[14:22:15] result 4: 40


In [9]:
# 更高级的工具 threading 和 multiprocessing模块
# 解决协作进程遇到的最大挑战: 在进程之间传递数据

In [1]:
import time, threading
import random

In [None]:
# 数据共享，线程同步
var = 0
lock = threading.Lock()

def change_it(n):
    global var
    var = var + n
    var = var - n

def run_thread(n):
    for i in range(3):
        # 拿到锁
        lock.acquire()
        change_it(n)
        # 释放锁
        lock.release()
t1 = threading.Thread(target = run_thread, args = (5,)) # 逗号不能省
t2 = threading.Thread(target = run_thread, args = (8,))
t1.start()
t2.start()
t1.join()
t2.join()

In [None]:
# 线程阻塞
# threading.Event.clear() 阻塞
# threading.Event.set() 不阻塞

In [4]:
def light():
    if not event.isSet():
        event.set()
    count = 0
    while True:
        if count < 10:
            print ('\033[42;1m---green light on---\033[0m')
        elif count < 13:
            print('\033[43;1m---yellow light on---\033[0m')
        elif count < 20:
            if event.isSet():
                event.clear()
            print('\033[41;1m---red light on---\033[0m')
        else:
            count = 0
            event.set()    #打开绿灯
        time.sleep(1)
        count += 1

def car(n):
    while 1:
        time.sleep(random.randrange(3, 10))
        #print(event.isSet())
        if event.isSet():
            print("car [%s] is running..." % n)
        else:
            print('car [%s] is waiting for the red light...' % n)
            event.wait()    #红灯状态下调用wait方法阻塞，汽车等待状态

In [5]:
car_list = ['BMW', 'AUDI', 'SANTANA']
event = threading.Event()
Light = threading.Thread(target=light)
Light.start()
for i in car_list:
    t = threading.Thread(target=car, args=(i,))
    t.start()

[42;1m---green light on---[0m
[42;1m---green light on---[0m
[42;1m---green light on---[0m
[42;1m---green light on---[0m
[42;1m---green light on---[0mcar [AUDI] is running...

[42;1m---green light on---[0m
[42;1m---green light on---[0m
car [AUDI] is running...
[42;1m---green light on---[0m
[42;1m---green light on---[0m
car [BMW] is running...
car [SANTANA] is running...
[42;1m---green light on---[0m
[43;1m---yellow light on---[0m
[43;1m---yellow light on---[0m
[43;1m---yellow light on---[0m
[41;1m---red light on---[0m
car [AUDI] is waiting for the red light...
[41;1m---red light on---[0m
[41;1m---red light on---[0m
car [SANTANA] is waiting for the red light...
[41;1m---red light on---[0m
car [BMW] is waiting for the red light...
[41;1m---red light on---[0m
[41;1m---red light on---[0m
[41;1m---red light on---[0m
[42;1m---green light on---[0m
[42;1m---green light on---[0m


In [2]:
def run():
    thread = threading.current_thread()
    print('%s is running...'% thread.getName())    #返回线程名称
    time.sleep(10)    #休眠10S方便统计存活线程数量

#print('The current number of threads is: %s' % threading.active_count())
for i in range(10):
    print('The current number of threads is: %s' % threading.active_count())    #返回当前存活线程数量
    thread_alive = threading.Thread(target=run, name='Thread-***%s***' % i)
    thread_alive.start()
thread_alive.join()
print('\n%s thread is done...'% threading.current_thread().getName())

The current number of threads is: 5
Thread-***0*** is running...
The current number of threads is: 6
Thread-***1*** is running...
The current number of threads is: 7
Thread-***2*** is running...
The current number of threads is: 8
Thread-***3*** is running...The current number of threads is: 9

Thread-***4*** is running...The current number of threads is: 10

Thread-***5*** is running...The current number of threads is: 11
Thread-***6*** is running...
The current number of threads is: 12

Thread-***7*** is running...The current number of threads is: 13
Thread-***8*** is running...
The current number of threads is: 14

Thread-***9*** is running...

MainThread thread is done...


In [6]:
def run(n):
    print('-'*30)
    print("Pid is :%s" % threading.get_ident())  # 返回线程pid

threading.main_thread().setName('Chengd---python')    #自定义线程名
for i in range(3):
    thread_alive = threading.Thread(target=run, args=(i,))
    thread_alive.start()
thread_alive.join()
print('\n%s thread is done...'% threading.current_thread().getName())    #获取线程名

------------------------------
Pid is :123145498144768------------------------------

Pid is :123145503399936
------------------------------
Pid is :123145498144768

Chengd---python thread is done...


In [None]:
# multiprocessing