Python并发包含多线程、多进程和异步IO三块内容。我们需要掌握  
1. 用并发技术创建高性能和响应式的python应用
2. 开发多线程应用  
3. 开发能并行处理多任务的应用
4. 理解单线程里的并发模型

# Multithreading
## 多进程和多线程的差异
| Criteria | Process | Thread |
| --- | --- | --- |
| Memeory Sharing | Memory is not shared between processes | Memory is shared between threads within a process | 
| Memeory footprint | Large | Small |
| CPU-bound / IO-bound | CPU bound | IO bound |
| Starting time | Slower than a thread | Faster than a process |
| Interruptablity | Child processes are interruptible | Threads are not interruptible |
## Threading


In [2]:
from time import sleep, perf_counter

def task():
    print('Starting a task...')
    sleep(1)
    print('done')

start_time = perf_counter()
task()
task()
end_time = perf_counter()
print(f'It took {end_time-start_time: 0.2f} seconds')

Starting a task...
done
Starting a task...
done
It took  2.00 seconds


In [4]:
from threading import Thread 
# new_thread = Thread(target=fn,args=args_tuple)
# new_thread.start()
## To let the thread complete in the main thread
# new_thread.join()
## The main thread will wait for the second thread to complete before it is terminated.
start_time = perf_counter()
t1 = Thread(target=task)
t2 = Thread(target=task)
t1.start()
t2.start()
t1.join()
t2.join()
end_time = perf_counter()
print(f'It took {end_time-start_time: 0.2f} seconds')

Starting a task...
Starting a task...
It took  0.00 seconds


done
done


上面这个程序会创建三个线程，python解释器创建main线程，程序创建另两个线程.

In [5]:
def task(id):
    print(f'Starting the task {id}...')
    sleep(1)
    print(f'The task {id} completed')
    
start_time = perf_counter()
threads = []
for i in range(1,11):
    t = Thread(target=task, args=(i,))
    threads.append(t)
    t.start()
    
for t in threads:
    t.join()
end_time = perf_counter()
print(f'It took {end_time-start_time: 0.2f} seconds')

Starting the task 1...
Starting the task 2...
Starting the task 3...
Starting the task 4...
Starting the task 5...
Starting the task 6...
Starting the task 7...
Starting the task 8...
Starting the task 9...
Starting the task 10...
The task 1 completed
The task 2 completed
The task 3 completed
The task 4 completed
The task 5 completed
The task 6 completed
The task 7 completed
The task 8 completed
The task 9 completed
The task 10 completed
It took  1.01 seconds


In [None]:
# 示例： 替换多个文件里的字符串
from threading import Thread 
from time import perf_counter

def replace(filename, substr, new_substr):
    print(f'Processing the file {filename}')
    with open(filename, 'r') as f:
        content = f.read()
    content = content.replace(substr, new_substr)
    with open(filename, 'w') as f:
        f.write(content)

def main():
    filenames = [
        '~/12.txt',
        '~/34.txt',
        '~/56.txt',
        '~/78.txt',
        '~/90.txt',
        '~/ab.txt',
        '~/cd.txt',
        '~/ef.txt',
    ]
    threads = [Thread(target=replace, args=(filename,'id','ids')) for filename in filenames]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    
if __name__ == "__main__":
    start_time = perf_counter()
    main()
    end_time = perf_counter()
    print(f'It took {end_time-start_time: 0.2f} seconds')

## 多线程例子
开发一个多线程程序，从Yahoo Finance站点上爬取股票价格。
```shell
pip install requests lxml
```

In [1]:
import threading 
import requests 
from lxml import html

class Stock(threading.Thread):
    def __init__(self, symbol:str):
        super().__init__()
        self.symbol = symbol
        self.url = f'https://finance.yahoo.com/quote/{symbol}'
        self.price = None 

    def run(self):
        response = requests.get(self.url)
        if response.status_code == 200:
            tree = html.fromstring(response.text)
            price_text = tree.xpath('//*[@id="quote-header-info"]/div[3]/div[1]/div[1]/fin-streamer[1]/text()')
            if price_text:
                try:
                    self.price = float(price_text[0].replace(',',''))
                except ValueError:
                    self.price = None

    def __str__(self):
        return f'{self.symbol}\t{self.price}'     


In [None]:
symbols = ['MSFT','GOOGL','AAPL','META']
threads = []

for symbol in symbols:
    t = Stock(symbol)
    t.start()
    threads.append(t)

for t in threads:
    t.join()
    print(t)

## Threading Event
有时，为了线程间通信，可以使用互斥锁(mutual exclusion lock(mutex))和一个boolean变量.更优雅的方式是使用threading模块的Event类。

In [None]:
from threading import Event 
event = Event()
print(event.is_set())
# event.set()
# event.clear()
# event.wait() # event.wait(timeout=5)

In [2]:
# Example
from threading import Thread, Event 
from time import sleep 

def task(event:Event, id:int):
    print(f'Thread {id} started. Waiting for the signal...')
    event.wait()
    print(f'Received signal. Thre thread {id} was completed')

def main():
    event = Event()
    t1 = Thread(target=task, args=(event,1))
    t2 = Thread(target=task, args=(event,2))
    t1.start()
    t2.start()

    print('Blocking the main thread for 3 seconds...')
    sleep(3)
    event.set()

if __name__ == '__main__':

    main()

Thread 1 started. Waiting for the signal...
Thread 2 started. Waiting for the signal...
Blocking the main thread for 3 seconds...
Received signal. Thre thread 1 was completedReceived signal. Thre thread 2 was completed



## 结束线程


In [1]:
from threading import Thread, Event
from time import sleep 

def task(event: Event):
    for i in range(6):
        print(f'Running #{i+1}')
        sleep(1)
        if event.is_set():
            print('The thread was stopped prematurely')
            break
    else:
        print('The thread was stopped maturely.')

def main():
    event = Event()
    thread = Thread(target=task, args=(event,))
    thread.start()
    sleep(3)
    event.set()

if __name__ == '__main__':
    main()

Running #1
Running #2
Running #3
The thread was stopped prematurely


In [2]:
class Worker(Thread):
    def __init__(self, event, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.event = event 
        
    def run(self):
        for i in range(6):
            print(f'Running #{i+1}')
            sleep(1)
            if self.event.is_set():
                print('Thre thread was stopped prematurely')
                break 
        else:
            print('The thread was stopped maturely')

def main():
    event = Event()
    thread = Worker(event)
    thread.start()
    sleep(3)
    event.set()

if __name__ == '__main__':
    main()

Running #1
Running #2
Running #3
Running #4


Thre thread was stopped prematurely


## 守护线程
每个程序都至少有一个线程，main线程。通过threading模块可以创建多线程。为了让任务在后台执行，我们创建守护线程(daemon thread). 


In [None]:
from threading import Thread 
import time 

def show_timer():
    count = 0
    while True:
        count += 1
        time.sleep(1)
        print(f'Has been waiting for {count} second(s)...')

t = Thread(target=show_timer) # non-daemon thread
t.start()

answer = input('Do you want to exit?\n')
# Needs to wait for all non-daemon threads to complete before it can exit.


In [None]:
from threading import Thread 
import time 

def show_timer():
    count = 0
    while True:
        count += 1
        time.sleep(1)
        print(f'Has been waiting for {count} second(s)...')

t = Thread(target=show_timer, daemon=True)
t.start()

answer = input('Do you want to exit?\n')
# doesn't need to wait for the daemon thread to complete. 
# The daemon thread is killed automatically when the program exits.

## Thread-safe Queue
queue.Queue实现了锁语义，可用于安全地交换数据。

In [None]:
from queue import Queue 

q = Queue() # Queue(maxsize=10)
# q.add(item) # cannot add when q is full
# q.put(item) # block until the q has space available.
# q.put(item, timeout=3) # add an item to a size limited queue and block with a timeout
item = q.get() # block until an item is available
# q.get(block=False) # q.get(timeout=10)
size = q.size()
# q.empty()  # q.full()

In [None]:
item = q.get()
# process the item
q.task_done() # mark the item as completed

q.join() # wait for all tasks on the queue to be completed

In [1]:
import time 
from queue import Empty, Queue 
from threading import Thread 

def producer(queue):
    for i in range(1,6):
        print(f'Inserting item {i} into the queue')
        time.sleep(1)
        queue.put(i)

def consumer(queue):
    while True:
        try:
            item = queue.get()
        except Empty:
            continue
        else:
            print(f'Processing item {item}')
            time.sleep(2)
            queue.task_done()

def main():
    queue = Queue()
    producer_thread = Thread(target=producer, args=(queue,))
    producer_thread.start()
    consumer_thread = Thread(target=consumer,args=(queue,),daemon=True)
    consumer_thread.start()

    producer_thread.join() # wait for all tasks to be added to the queue
    queue.join() # wait for all tasks on the queue to be completed

if __name__ == '__main__':
    main()

Inserting item 1 into the queue
Inserting item 2 into the queueProcessing item 1

Inserting item 3 into the queue
Processing item 2
Inserting item 4 into the queue
Inserting item 5 into the queue
Processing item 3
Processing item 4
Processing item 5


## ThreadPoolExecutor
手动管理线程的方式不太高效，因为创建和销毁众多线程会浪费计算资源。这时要reuse threads.

# Multiprocessing
计算密集型用多进程

In [2]:
import time 

def task(n=100_000_000):
    while n:
        n -= 1

if __name__ =='__main__':
    start = time.perf_counter()
    task()
    task()
    finish = time.perf_counter()
    print(f'It took {finish-start: .2f} seconds to finish')

It took  6.84 seconds to finish


In [3]:
import multiprocessing

if __name__ == '__main__':
    start = time.perf_counter()
    p1 = multiprocessing.Process(target=task)
    p2 = multiprocessing.Process(target=task)
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    finish = time.perf_counter()
    print(f'It took {finish-start: .2f} seconds to finish')

It took  3.48 seconds to finish


# Async I/O
用asyncio可以在单线程里实现异步。
## Event Loop
在单线程并发模型里，我们只需要一个线程。在遇到I/O密集型任务时，我们把它交给操作系统的event notification系统，然后我们仍可以运行其他代码。
## async await
协程：用async关键字来创建协程，用await关键字来暂停协程。

In [4]:
async def square(number):
    return number * number 

result = square(10)
print(result)

<coroutine object square at 0x7f37c07b5d90>


上面并没有立即执行协程里的代码，而是返回一个协程对象。  
协程需要在event loop上执行。python 3.7以后，可以用asyncio.run()来自动创建event loop,运行协程最后close它。

In [None]:
import asyncio 

async def square(number):
    return number * number 

result = asyncio.run(square(10))
print(result)

await可以暂停程序执行，直到协程返回结果。 注意await只能在协程里工作。

In [None]:
import asyncio

async def square(number: int) -> int:
    return number*number

async def main():
    x = await square(10)
    print(x)

    y = await square(5)
    print(y)

    print(f'total={x+y}')

asyncio.run(main())

## asyncio.create_task()

In [None]:
import asyncio 
import time 

async def call_api(message, result=1000, delay=3):
    print(message)
    await asyncio.sleep(delay)
    return result 

# async def main():
#     start = time.perf_counter()
#     price = await call_api('Get stock price of GOOG...',300)
#     print(price)
#     price = await call_api('Get price of APPL...',400)
#     print(price)
#     end = time.perf_counter()
#     print(f'It took {end-start} seconds')

# await 阻塞了整个协程。 如果不想阻塞，则用create_task()封装成Task
async def main():
    start = time.perf_counter()
    task1 = asyncio.create_task(call_api('Get stock price of GOOG...',300))
    task2 = asyncio.create_task(call_api('Get stock price of APPL...',400))
    price = await task1
    print(price)
    price = await task2
    print(price)
    end = time.perf_counter()
    print(f'It took {end-start} seconds')
    
asyncio.run(main())

## 取消任务
如果任务运行时间太长，或永久运行，我们通常得取消它。

In [None]:
async def main():
    task = asyncio.create_task(call_api('Calling api...', result=2000, delay=5))
    if not task.done():
        print('Cancelling the task...')
        task.cancel()

    try:
        await task 
    except asyncio.CancelledError:
        print("task has been cancelled.")

## asyncio.wait_for()

In [None]:
async def main():
    task = asyncio.create_task(call_api('Calling API...', result=2000, delay=5))
    MAX_TIMEOUT = 3
    try:
        await asyncio.wait_for(task, timeout=MAX_TIMEOUT)
    except asyncio.TimeoutError:
        print('The task was cancelled due to a timeout')

## shield task
asyncio.shiled() prevents the cancellation of a task.


In [None]:
async def main():
    task = asyncio.create_task(call_api('Calling API...', result=2000, delay=5))
    MAX_TIMEOUT = 3
    try:
        await asyncio.wait_for(asyncio.shield(task), timeout=MAX_TIMEOUT)
    except asyncio.TimeoutError:
        print("The task took mare than expected an will complete soon")
        result = await task
        print(result)

## asyncio.wait()

In [None]:
import asyncio
from asyncio import create_task


class APIError(Exception):
    pass


async def call_api(message, result=100, delay=3, raise_exception=False):
    print(message)
    await asyncio.sleep(delay)
    if raise_exception:
        raise APIError
    else:
        return result


async def main():
    task_1 = create_task(call_api('calling API 1...', result=1, delay=1))
    task_2 = create_task(call_api('calling API 2...', result=2, delay=2))
    task_3 = create_task(call_api('calling API 3...', result=3, delay=3))

    pending = (task_1, task_2, task_3)

    while pending:
        done, pending = await asyncio.wait(
            pending,
            return_when=asyncio.FIRST_COMPLETED
        )
        result = done.pop().result()
        print(result)


asyncio.run(main())

## Future
Future,Coroutine,Task的对象都要用await关键字来执行。

## asyncio.gather()

In [None]:
import asyncio 

async def call_api(message, result, delay=3):
    print(message)
    await asyncio.sleep(delay)
    return result

async def main():
    a,b = await asyncio.gather(call_api('Calling API 1 ...',100,1),call_api('Calling API 2 ...',200,2))
    print(a,b)

asyncio.run(main())

In [None]:
import asyncio


class APIError(Exception):
    def __init__(self, message):
        self._message = message

    def __str__(self):
        return self._message


async def call_api_failed():
    await asyncio.sleep(3)
    raise APIError('API failed')


async def call_api(message, result, delay=3):
    print(message)
    await asyncio.sleep(delay)
    return result


async def main():
    a, b, c = await asyncio.gather(
        call_api('Calling API 1 ...', 100, 1),
        call_api('Calling API 2 ...', 200, 2),
        call_api_failed(),
        return_exceptions=True
    )
    print(a, b, c)


asyncio.run(main())