# 并发编程

- 并发 Concurrency
  - 不是指同一时刻有多个操作（thread、task）同时进行。相反，某个特定的时刻只允许有一个操作发生，线程 / 任务之间会互相切换，直到完成
  - thread 和 task 两种切换顺序的方式
    - threading 操作系统知道每个线程的所有信息，会做主在适当的时候做线程切换
      - 好处是代码容易书写，程序员不需要做任何切换操作的处理
      - 切换线程的操作，也有可能出现在一个语句执行的过程中（比如 x += 1），容易出现 race condition 的情况。
    - asyncio 主程序想要切换任务时必须得到此任务可以被切换的通知，可以避免 race condition 的情况
  - 应用于 I/O 操作频繁的场景，比如从网站上下载多个文件，I/O 操作的时间可能会比 CPU 运行处理的时间长得多
- 并行 Parallelism
  - 同一时刻、同时发生。Python 中的 multi-processing 便是这个意思
  - multi-processing:n核开n进程，同时执行
  - 应用于 CPU heavy 的场景，比如 MapReduce 中的并行计算，为了加快运行速度，一般会用多台机器、多个处理器来完成

## 进程 process

In [None]:
import os
import time
import random
import subprocess
from multiprocessing import Process, Pool, Queue

print('Process (%s) start...' % os.getpid())
# OS 创建子进程
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))


def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))


# multiprocessing模块 跨平台版本的多进程模块
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('tests',))
print('Child process will start.')
p.start()
# join()方法可以等待子进程结束后再继续往下运行，通常用于进程间的同步
p.join()
print('Child process end.')


# 用进程池方式批量创建子进程
def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))


# task 0，1，2，3是立刻执行的，而task 4要等待前面某个task完成后才执行
# if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
    p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
# 调用join()方法会等待所有子进程执行完毕，调用join()之前必须先调用close()，调用close()之后就不能继续添加新的Process了。
p.join()
print('All subprocesses done.')

# 子进程并不是自身，而是一个外部进程。创建子进程后，还需要控制子进程的输入和输出
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

# 需要输入
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

### 进程间通信


In [None]:
# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())


# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)


# 父进程创建Queue，并传给各个子进程：
print('进程间通信')
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw，写入:
pw.start()
# 启动子进程pr，读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环，无法等待其结束，只能强行终止:
pr.terminate()

### 进程池


In [None]:
import multiprocessing as mp


def job(x):
    return x * x


def multicore():
    '''
    Pool默认调用是CPU的核数,传入processes可自定义CPU核数
    map()放入迭代参数,返回多个结果
    apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代
    '''
    pool = mp.Pool(processes=2)
    res = pool.map(job, range(10))
    print(res)
    '''
    apply_async()只能传递一个值，它只会放入一个核进行运算，传入的值因为必须是可迭代的，
    所以在传入值后需要加逗号，同时需要用get()方法获取返回值。
    '''
    res = pool.apply_async(job, (2,))
    multi_res = [pool.apply_async(job, (i,)) for i in range(10)]
    print(res.get())  # 获取单个结果
    print([res.get() for res in multi_res])  # 获取多个结果


# if __name__ == '__main__':
multicore()

### 共享内存


In [None]:
import multiprocessing as mp

'''
使用Value数据存储在一个共享的内存表中
d表示一个双精浮点类型，i表示一个带符号的整型
'''
value1 = mp.Value('i', 0)
value2 = mp.Value('d', 3.14)

'''
Array类，可以和共享内存交互，来实现在进程之间共享数据。
这里的Array和numpy中的不同，它只能是一维的，不能是多维的。
同样和Value 一样，需要定义数据形式，否则会报错。
'''
array = mp.Array('i', [1, 2, 3, 4])

In [None]:
# 进程锁
# 不同进程争夺资源
import multiprocessing as mp
import time


def job(v, num):
    for _ in range(5):
        time.sleep(0.1)
        v.value += num
        print(v.value, end="\n")


def multicore():
    v = mp.Value('i', 0)  # 定义共享变量
    p1 = mp.Process(target=job, args=(v, 1))
    p2 = mp.Process(target=job, args=(v, 3))  # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()


if __name__ == '__main__':
    multicore()

### 多进程

- 多进程并不会提升效率。反而很多时候，因为 CPU 数量的限制，导致其执行效率不如多线程版本

In [None]:
import multiprocessing
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound, numbers)


numbers = [10000000 + x for x in range(20)]

start_time = time.time()
find_sums(numbers)
duration = time.time() - start_time
print(f"Duration {duration} seconds")

## 线程 thread

In [None]:
# 单线程：简单明了，效率低下，程序的绝大多数时间都浪费在 I/O 等待上
import requests
import time


def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))


def download_all(sites):
    for site in sites:
        download_one(site)


def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))


# if __name__ == '__main__':
main()

In [None]:
#! usr/bin/env python

import thread
from time import sleep, ctime

loops = [4, 2]


def loop(nloop, nsec, lock):
    print('starting loop' + nloop, 'at:', ctime())
    sleep(nsec)
    print('ending loop', nloop, 'done at:', ctime())
    lock.release()


def main():
    print('starting at:', ctime())
    locks = []
    nloops = range(len(loops))

    for i in nloops:
        lock = thread.allocate_lock()
        lock.acquire()
        locks.append(lock)

    for i in nloops:
        thread.start_new_thread(loop, (i, loops[i], locks[i]))

    for i in nloops:
        while locks[i].locked():
            pass

    print('all DONE at:' + ctime())


main()

In [None]:
#! usr/bin/env python

import threading
import time
from time import sleep, ctime

loops = [4, 2]


def loop(nloop, nsec):
    print('starting loop', nloop, 'at:', ctime())
    sleep(nsec)
    print('ending loop', nloop, 'done at:', ctime())


def main():
    print('starting at:', ctime())
    threads = []
    nloops = range(len(loops))

    for i in nloops:
        t = threading.Thread(target=loop, args=(i, loops[i]))
        threads.append(t)

    for i in nloops:
        threads[i].start()

    for i in nloops:
        threads[i].join()

    print('all DONE at:', ctime())


# if __name__ == '__main__':
main()


# 新线程执行的代码:
def loop1():
    print('thread %s is running...' % threading.current_thread().name)
    n = 0
    while n < 5:
        n = n + 1
        # current_thread():永远返回当前线程的实例
        print('thread %s >>> %s' % (threading.current_thread().name, n))
        time.sleep(1)
    print('thread %s ended.' % threading.current_thread().name)


# 主线程实例的名字叫MainThread，子线程的名字在创建时指定，用LoopThread命名子线程
print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop1, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)


# Lock:
# 假定银行存款:
balance = 0
lock = threading.Lock()


def change_it(n):
    # 先存后取，结果应该为0:
    global balance
    balance = balance + n
    balance = balance - n


def run_thread(n):
    for i in range(100000):
        # 先要获取锁: 当多个线程同时执行lock.acquire()时，只有一个线程能成功地获取锁，然后继续执行代码，其他线程就继续等待直到获得锁为止
        # 获得锁的线程用完后一定要释放锁，否则那些苦苦等待锁的线程将永远等待下去，成为死线程。用try...finally来确保锁一定会被释放
        lock.acquire()
        try:
            change_it(n)
        finally:
            # 改完了一定要释放锁:
            lock.release()


t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)


In [None]:
import threading

# 创建全局ThreadLocal对象:
local_school = threading.local()


def process_student():
    # 获取当前线程关联的student:
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))


def process_thread(name):
    # 绑定ThreadLocal的student:
    local_school.student = name
    process_student()


t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()


### 多线程

- 多线程:线程数并不是越多越好，因为线程的创建、维护和删除也会有一定的开销。如果设置的很大，反而可能会导致速度变慢。往往根据实际的需求做一些测试，来寻找最优的线程数量
- CPU 密集型任务(消耗大量 CPU 资源的任务) 使用多线程是无效的，使用多进程
    - 线程的本质是多个线程互相切换，同一时刻仍然只允许一个线程运行。使用多线程和使用一个主线程，本质上来说并没有什么差别；反而在很多情况下，因为线程切换带来额外损耗，还会降低程序的效率
    - 使用多进程可以允许多个进程之间 in parallel 地执行任务，能够有效提高程序的运行效率

In [None]:

import concurrent.futures
import requests
import threading
import time


def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))


def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_one, sites)


def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))


main()

### 通过锁机制解决争夺资源问题

In [None]:
import multiprocessing as mp
import time


def job(v, num, l):
    l.acquire()  # 锁住
    for _ in range(5):
        time.sleep(0.1)
        v.value += num
        print(v.value, end="\n")
    l.release()  # 释放


def multicore():
    l = mp.Lock()  # 定义一个进程锁
    v = mp.Value('i', 0)  # 定义共享变量
    p1 = mp.Process(target=job, args=(v, 1, l))  # 需要将lock传入
    p2 = mp.Process(target=job, args=(v, 3, l))  # 设定不同的number看如何抢夺内存
    p1.start()
    p2.start()
    p1.join()
    p2.join()


# if __name__ == '__main__':
multicore()

## process_vs_thread

In [None]:
# 进程与线程初识
import multiprocessing as mp
import threading as td


def job(a, d):
    print('aaa')


t1 = td.Thread(target=job, args=(1, 2))
p1 = mp.Process(target=job, args=(1, 2,))

t1.start()
p1.start()
t1.join()
p1.join()

In [None]:
# 输出结果存放至Queue
import multiprocessing as mp


def job(q):
    res = 0
    for i in range(1000000):
        res += i + i ** 2 + i ** 3
    q.put(res)


# if __name__ == '__main__':
q = mp.Queue()
p1 = mp.Process(target=job, args=(q,))
p2 = mp.Process(target=job, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = q.get()
res2 = q.get()
print('multicore:', res1 + res2)

In [None]:
# 效率对比
import multiprocessing as mp
import threading as td
import time


# 多核/多进程
def multicore():
    q = mp.Queue()
    p1 = mp.Process(target=job, args=(q,))
    p2 = mp.Process(target=job, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = q.get()
    res2 = q.get()
    print('multicore:', res1 + res2)


# 多线程
def multithread():
    q = mp.Queue()
    t1 = td.Thread(target=job, args=(q,))
    t2 = td.Thread(target=job, args=(q,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = q.get()
    res2 = q.get()
    print('multithread:', res1 + res2)


def normal():
    res = 0
    for _ in range(2):
        for i in range(1000000):
            res += i + i ** 2 + i ** 3
    print('normal:', res)


if __name__ == '__main__':
    st = time.time()
    normal()
    st1 = time.time()
    print('normal time:', st1 - st)
    multithread()
    st2 = time.time()
    print('multithread time:', st2 - st1)
    multicore()
    st3 = time.time()
    print('multicore time:', st3 - st2)

## 协程 coroutine

* async 修饰词声明异步函数
* 调用异步函数得到一个协程对象（coroutine object)
* 执行
  - 通过 await 来调用：await 执行效果和 Python 正常执行是一样的，也就是说程序会阻塞在这里，进入被调用的协程函数，执行完毕返回后再继续
  - 通过 asyncio.create_task() 来创建任务
  - asyncio.run 来触发运行
    - asyncio.run 函数是 Python 3.7 之后才有的特性，可以让 Python 的协程接口变得非常简单，不用去理会事件循环怎么定义和怎么使用的问题
    + 一个非常好的编程规范是，asyncio.run(main()) 作为主程序的入口函数，在程序运行周期内，只调用一次 asyncio.run
* 协程和多线程的区别
    - 协程为单线程
    - 协程由用户决定在哪些地方交出控制权，切换到下一个任务
* 写法更加简洁清晰，把 async / await 语法和 create_task 结合来用
* 写协程程序时脑海中要有清晰的事件循环概念，知道程序在什么时候需要暂停、等待 I/O，什么时候需要一并执行到底
    - 知道一个任务的哪个环节会造成I/O阻塞，然后把这个环节的代码异步化处理，并且通过await来标识在任务的该环节中断该任务执行，从而去执行下一个事件循环任务
    - 充分利用CPU资源，避免CPU等待I/O造成CPU资源白白浪费。当之前任务的那个环节的I/O完成后，线程可以从await获取返回值，然后继续执行没有完成的剩余代码
* 问题
    - `RuntimeWarning: coroutine 'main' was never awaited self.tb = tb RuntimeWarning: Enable tracemalloc to get the object allocation traceback`

In [None]:
# 最原始写法
import time


def crawl_page(url):
    print('crawling begin {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    time.sleep(sleep_time)

    print('OK {}'.format(url))


def main(urls):
    for url in urls:
        crawl_page(url)


%time main(['url_1', 'url_2', 'url_3', 'url_4'])

In [None]:
# await 调用 用异步接口写同步代码
import asyncio


async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))


async def main(urls):
    for url in urls:
        await crawl_page(url)


# %time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))
await main(['url_1', 'url_2', 'url_3', 'url_4'])

In [None]:
# 改为asyncio.create_task()：运行总时长等于运行时间最长的爬虫
import asyncio

async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))

async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    # for task in tasks:
    #     await task
    await asyncio.gather(*tasks)


# %time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))
await main(['url_1', 'url_2', 'url_3', 'url_4'])

In [None]:
def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[CONSUMER] Consuming %s...' % n)
        r = '200 OK'


def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()


c = consumer()
produce(c)

In [None]:
import asyncio

async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")

# 获取EventLoop:
loop = asyncio.get_event_loop()
# 执行coroutine
loop.run_until_complete(hello())
loop.close()

In [None]:
import asyncio

from aiohttp import web

async def index(request):
    await asyncio.sleep(0.5)
    return web.Response(body=b'<h1>Index</h1>')

async def hello(request):
    await asyncio.sleep(0.5)
    text = '<h1>hello, %s!</h1>' % request.match_info['name']
    return web.Response(body=text.encode('utf-8'))

async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', index)
    app.router.add_route('GET', '/hello/{name}', hello)
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('Server started at http://127.0.0.1:8000...')
    return srv

loop = asyncio.get_event_loop()
loop.run_until_complete(init(loop))
loop.run_forever()

### 协程运行时

In [None]:
import asyncio

async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')

async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')

async def main():
    print('before await')
    await worker_1()
    print('awaited worker_1')
    await worker_2()
    print('awaited worker_2')

# %time asyncio.run(main())
await main()

In [None]:
import asyncio

async def worker_1():
    print('worker_1 start')
    await asyncio.sleep(1)
    print('worker_1 done')

async def worker_2():
    print('worker_2 start')
    await asyncio.sleep(2)
    print('worker_2 done')

async def main():
    task1 = asyncio.create_task(worker_1())
    task2 = asyncio.create_task(worker_2())
    print('before await')
    await task1
    print('awaited worker_1')
    await task2
    print('awaited worker_2')

# %time asyncio.run(main())
await main()

### 限定协程任务运行时间


In [None]:
import asyncio

async def worker_1():
    await asyncio.sleep(1)
    return 1

async def worker_2():
    await asyncio.sleep(2)
    return 2 / 0

async def worker_3():
    await asyncio.sleep(3)
    return 3

async def main():
    task_1 = asyncio.create_task(worker_1())
    task_2 = asyncio.create_task(worker_2())
    task_3 = asyncio.create_task(worker_3())

    await asyncio.sleep(2)
    task_3.cancel()

    res = await asyncio.gather(task_1, task_2, task_3, return_exceptions=True)
    print(res)

# %time asyncio.run(main())
await main()

### 用协程来实现一个经典的生产者消费者模型


In [None]:
import asyncio
import random


async def consumer(queue, id):
    while True:
        val = await queue.get()
        print('{} get a val: {}'.format(id, val))
        await asyncio.sleep(1)


async def producer(queue, id):
    for i in range(5):
        val = random.randint(1, 10)
        await queue.put(val)
        print('{} put a val: {}'.format(id, val))
        await asyncio.sleep(1)


async def main():
    queue = asyncio.Queue()

    consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1'))
    consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2'))

    producer_1 = asyncio.create_task(producer(queue, 'producer_1'))
    producer_2 = asyncio.create_task(producer(queue, 'producer_2'))

    await asyncio.sleep(10)
    consumer_1.cancel()
    consumer_2.cancel()

    await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True)

# %time asyncio.run(main())
await main()

### 实战：获取豆瓣北京近期上映电影的名称、时间和海报


In [None]:
import requests
from bs4 import BeautifulSoup

def main():
    url = "https://movie.douban.com/cinema/later/beijing/"
    init_page = requests.get(url).content
    init_soup = BeautifulSoup(init_page, 'lxml')

    all_movies = init_soup.find('div', id="showing-soon")
    print(all_movies)
#     for each_movie in all_movies.find_all('div', class_="item"):
#         all_a_tag = each_movie.find_all('a')
#         all_li_tag = each_movie.find_all('li')

#         movie_name = all_a_tag[1].text
#         url_to_fetch = all_a_tag[1]['href']
#         movie_date = all_li_tag[0].text

#         response_item = requests.get(url_to_fetch).content
#         soup_item = BeautifulSoup(response_item, 'lxml')
#         img_tag = soup_item.find('img')

#         print('{} {} {}'.format(movie_name, movie_date, img_tag['src']))

# %time main()
main()

### 协程实现回调


In [None]:
import asyncio


async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    return 'OK {}'.format(url)


async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    for task in tasks:
        task.add_done_callback(lambda future: print('result: ', future.result()))
    await asyncio.gather(*tasks)

# %time asyncio.run(main(['url_1', 'url_2', 'url_3', 'url_4']))
await main(['url_1', 'url_2', 'url_3', 'url_4'])

## Futures

* Futures 模块位于 concurrent.futures 和 asyncio 中，都表示带有延迟的操作。Futures 会将处于等待状态的操作包裹起来放到队列中，这些操作的状态随时可以查询，结果或是异常，也能够在操作完成后被获取
* 作为用户，不用考虑如何去创建 Futures，这些 Futures 底层都会处理好。要做的，实际上是去 schedule 这些 Futures 的执行
* 执行 executor.submit(func) 时，安排里面的 func() 函数执行，并返回创建好的 future 实例以便之后查询调用
* 方法 done() 表示相对应的操作是否完成——True 表示完成，False 表示没有完成
  * 注意，done() 是 non-blocking 的，会立即返回结果。相对应的 add_done_callback(fn) 表示 Futures 完成后，相对应的参数函数 fn，会被通知并执行调用
* result() 表示当 future 完成后，返回其对应的结果或异常
* as_completed(fs) 是针对给定的 future 迭代器 fs，在其完成后，返回完成后的迭代器

In [None]:
# TODO
# request.get 会触发：ConnectionError, TimeOut, HTTPError等，所有显示抛出的异常都是继承requests.exceptions.RequestException
# executor.map(download_one, urls) 会触发concurrent.futures.TimeoutError
# result() 会触发Timeout，CancelledError
# as_completed() 会触发TimeOutError

import concurrent.futures
import requests
import time


def download_one(url):
    resp = requests.get(url)
    print('Read {} from {}'.format(len(resp.content), url))


def download_all(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        to_do = []
        for site in sites:
            future = executor.submit(download_one, site)
            to_do.append(future)

        for future in concurrent.futures.as_completed(to_do):
            future.result()


def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    download_all(sites)
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))


# if __name__ == '__main__':
main()

## Asyncio

* 多线程局限性
  - 多线程运行过程容易被打断，因此有可能出现 race condition 的情况
  - 线程切换本身存在一定的损耗，线程数不能无限增加
- 如果的 I/O 操作非常 heavy，多线程很有可能满足不了高效率、高质量的需求
* Sync VS Async
    - Sync 操作一个接一个地执行，下一个操作必须等上一个操作完成后才能执行
    - Async 不同操作间可以相互交替执行，如果其中的某个操作被 block 了，程序并不会等待，而是会找出可执行的操作继续执行
* Asyncio
  * 和其他 Python 程序一样，是单线程的，只有一个主线程，可以进行多个不同的任务（task），这里的任务就是特殊的 future 对象。这些不同的任务被一个叫做 event loop 的对象所控制
  * 任务只有两个状态:event loop 会维护两个任务列表，分别对应这两种状态；并且选取预备状态的一个任务（具体选取哪个任务，和其等待的时间长短、占用的资源等等相关），使其运行，一直到这个任务把控制权交还给 event loop 为止
    - 预备状态 任务目前空闲，但随时待命准备运行
    - 等待状态 任务已经运行，但正在等待外部的操作完成，比如 I/O 操作
  * 任务把控制权交还给 event loop 时，event loop 会根据其是否完成，把任务放到预备或等待状态的列表，然后遍历等待状态列表的任务，查看是否完成
    - 如果完成，则将其放到预备状态的列表
    - 如果未完成，则继续放在等待状态的列表
  * 当所有任务被重新放置在合适的列表后，新一轮的循环又开始了
* 对于 Asyncio 来说，任务在运行时不会被外部的一些因素打断，因此 Asyncio 内的操作不会出现 race condition 的情况，这样就不需要担心线程安全的问题
* 缺点
    - requests 库并不兼容 Asyncio，但是 aiohttp 库兼容
    - 使用 Asyncio 时，因为在任务调度方面有了更大的自主权，写代码时得更加注意，不然很容易出错

In [None]:
import asyncio
import aiohttp
import time


async def download_one(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print('Read {} from {}'.format(resp.content_length, url))


async def download_all(sites):
    tasks = [asyncio.create_task(download_one(site)) for site in sites]
    await asyncio.gather(*tasks)


def main():
    sites = [
        'https://en.wikipedia.org/wiki/Portal:Arts',
        'https://en.wikipedia.org/wiki/Portal:History',
        'https://en.wikipedia.org/wiki/Portal:Society',
        'https://en.wikipedia.org/wiki/Portal:Biography',
        'https://en.wikipedia.org/wiki/Portal:Mathematics',
        'https://en.wikipedia.org/wiki/Portal:Technology',
        'https://en.wikipedia.org/wiki/Portal:Geography',
        'https://en.wikipedia.org/wiki/Portal:Science',
        'https://en.wikipedia.org/wiki/Computer_science',
        'https://en.wikipedia.org/wiki/Python_(programming_language)',
        'https://en.wikipedia.org/wiki/Java_(programming_language)',
        'https://en.wikipedia.org/wiki/PHP',
        'https://en.wikipedia.org/wiki/Node.js',
        'https://en.wikipedia.org/wiki/The_C_Programming_Language',
        'https://en.wikipedia.org/wiki/Go_(programming_language)'
    ]
    start_time = time.perf_counter()
    asyncio.run(download_all(sites))
    end_time = time.perf_counter()
    print('Download {} sites in {} seconds'.format(len(sites), end_time - start_time))


if __name__ == '__main__':
    main()

### 多线程 vs Asyncio

* I/O bound
  * I/O 操作很慢，需要很多任务 / 线程协同实现，使用 Asyncio 更合适
  * I/O 操作很快，需要有限数量的任务 / 线程，使用多线程就可以
* CPU bound 使用多进程来提高程序运行效率

In [None]:
import time


def cpu_bound(number):
    print(sum(i * i for i in range(number)))


def calculate_sums(numbers):
    for number in numbers:
        cpu_bound(number)


def main():
    start_time = time.perf_counter()
    numbers = [10000000 + x for x in range(20)]
    calculate_sums(numbers)
    end_time = time.perf_counter()
    print('Calculation takes {} seconds'.format(end_time - start_time))


# if __name__ == '__main__':
main()

In [None]:
import multiprocessing
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound, numbers)


numbers = [10000000 + x for x in range(20)]

start_time = time.time()
find_sums(numbers)
duration = time.time() - start_time
print(f"Duration {duration} seconds")

## GIL Global Interpreter Lock 全局解释器锁

* 每一个 Python 线程在 CPython 解释器中执行时，都会先锁住自己的线程，阻止别的线程执行,轮流执行 Python 线程。用户看到的就是“伪并行”——Python 线程在交错执行，来模拟真正并行的线程
* CPython 使用引用计数来管理内存，所有 Python 脚本中创建的实例都会有一个引用计数，来记录有多少个指针指向它。当引用计数只有 0 时，则会自动释放内存
* 如果有两个 Python 线程同时引用 a，就会造成引用计数 race condition，引用计数可能最终只增加 1，造成内存被污染。因为第一个线程结束时，会把引用计数减少 1，这时可能达到条件释放内存，当第二个线程再试图访问 a 时，就找不到有效的内存
* 引进原因
    - 设计者为了规避类似于内存管理这样的复杂的竞争风险问题（race condition）
    - 因为 CPython 大量使用 C 语言库，但大部分 C 语言库都不是原生线程安全的（线程安全会降低性能和增加复杂度）
* check_interval:CPython 解释器会去轮询检查线程 GIL 锁住情况。每隔一段时间，Python 解释器就会强制当前线程去释放 GIL，这样别的线程才能有执行的机会.早期 Python 是 100 个 ticks，大致对应 1000 个 bytecodes；Python 3 以后，interval 是 15 毫秒
* GIL 的设计主要是为了方便 CPython 解释器层面的编写者，而不是 Python 应用层面的程序员。作为 Python 使用者还是需要 lock 等工具，来确保线程安全
* python 下想要充分利用多核CPU，就用多进程:每个进程有各自独立的GIL，互不干扰，这样就可以真正意义上的并行执行

In [2]:
# 单线程
def CountDown(n):
    while n > 0:
        n -= 1

In [3]:
# 多线程
from threading import Thread

n = 100000000

t1 = Thread(target=CountDown, args=[n // 2])
t2 = Thread(target=CountDown, args=[n // 2])
t1.start()
t2.start()
t1.join()
t2.join()

In [None]:
import sys

a = []
b = a
sys.getrefcount(a)

In [None]:
# 每一个 Python 线程都是类似这样循环的封装
for (;;) {
    if (--ticker < 0) {
        ticker = check_interval;

        /* Give another thread a chance */
        PyThread_release_lock(interpreter_lock);

        /* Other threads may run now */

        PyThread_acquire_lock(interpreter_lock, 1);
    }

    bytecode = *next_instr++;
    switch (bytecode) {
        /* execute the next instruction ... */
    }
}

### 线程安全

In [None]:

import threading

n = 0
lock = threading.Lock()

def foo():
    global n
with lock:
    n += 1


threads = []
for i in range(100):
    t = threading.Thread(target=foo)
    threads.append(t)

for t in threads:
    t.start()

for t in threads:
    t.join()

print(n)

In [None]:
    n += 1


threads = []
for i in range(100):
    t = threading.Thread(target=foo)
    threads.append(t)

for t in threads:
    t.start()

for t in threads:
    t.join()

print(n)

In [None]:
# n+=1这一句代码让线程并不安全,四行 bytecode 中间都是有可能被打断的
import dis

dis.dis(foo)

In [None]:
n = 0
lock = threading.Lock()


def foo():
    global n
    with lock:
        n += 1

### 绕过　GIL

* 绕过 CPython，使用 JPython（Java 实现的 Python 解释器）等别的实现
* 把关键性能代码放到别的语言（一般是 C++）中实现