# 并行计算

## 引入

我们都知道，现代操作系统都是支持“多任务”的操作系统，多核CPU已经非常普及。而即使过去的单核CPU，也可以执行多任务。由于CPU执行代码都是顺序执行的，那么，单核CPU是怎么执行多任务的呢？答案就是操作系统轮流让各个任务交替执行，任务1执行0.01秒，切换到任务2，任务2执行0.01秒，再切换到任务3，执行0.01秒……这样反复执行下去。表面上看，每个任务都是交替执行的，但是，由于CPU的执行速度实在是太快了，我们感觉就像所有任务都在同时执行一样。真正的并行执行多任务只能在多核CPU上实现，但是，由于任务数量远远多于CPU的核心数量，所以，操作系统也会自动把很多任务轮流调度到每个核心上执行。

对于操作系统来说，一个任务就是一个**进程（Process）**，比如打开一个Word就启动了一个Word进程。在一个进程内部，要同时干多件事，就需要同时运行多个“子任务”，我们把进程内的这些“子任务”称为**线程（Thread）**。由于每个进程至少要干一件事，所以，一个进程至少有一个线程。

所以，多任务的实现可以分为2种方式：

- 多进程模式；
- 多线程模式；

同时执行多个任务通常各个任务之间并不是没有关联的，而是需要相互通信和协调，有时，任务1必须暂停等待任务2完成后才能继续执行，有时，任务3和任务4又不能同时执行，所以，多进程和多线程的程序涉及到同步、数据共享的问题，相比我们之前编写的单线程程序更复杂。

## 多线程 vs 多进程

既然都是处理多任务的方式，多线程和多进程有何优劣？实际运用中我们如何选择？

- **数据共享** 多线程和多进程最大的不同在于，多进程中，同一个变量，各自有一份拷贝存在于每个进程中，互不影响，而多线程中，所有变量都由所有线程共享，所以，任何一个变量都可以被任何一个线程修改，因此，线程之间共享数据最大的危险在于多个线程同时改一个变量，把内容给改乱了。

![](https://ss0.bdstatic.com/70cFvHSh_Q1YnxGkpoWK1HF6hhy/it/u=1940967811,3885710679&fm=15&gp=0.jpg)

In [2]:
# 一个小例子
import time, threading
balance = 0   # 假定这是你的银行存款:

def change_it(n):
    # 先存后取，结果应该为0:
    global balance
    balance = balance + n
    balance = balance - n

def run_thread(n):
    for i in range(2000000):
        change_it(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

27


- **GIL的影响**  

其他语言，CPU 是多核时是支持多个线程同时执行。但在 Python 中，无论是单核还是多核，同时只能由一个线程在执行。其根源是**GIL**的存在。

GIL 的全称是 Global Interpreter Lock(全局解释器锁)，来源是 Python 设计之初的考虑，为了数据安全所做的决定。某个线程想要执行，必须先拿到 GIL，我们可以把 GIL 看作是“通行证”，并且在一个 Python 进程中，GIL 只有一个。拿不到通行证的线程，就不允许进入 CPU 执行。

多进程能够让程序绕过GIL锁，去并行地处理程序，并能够更充分地使用cpu。虽然它与threading模块本质不同，但是语法上非常相似。多进程库会为每个进程提供各自的解释器和GIL锁。

在多线程上会发生的问题（如数据混淆、死锁等）在多进程上并不会发生。这是因为在多进程上，不同的线程直接的存储不共享，因此也就不会发生同时不同空间同时更改同一内存空间这一情况。但这也带来了由于进程间数据交互和内存复制导致的额外开销。

#### 小结： 根据程序类型选择：

- 多进程：程序比较偏重于计算，需要经常使用 CPU 来运算（CPU密集型）。例如科学计算的程序，机器学习的程序等。
- 多线程：程序中有大量与数据交互/网络交互，需要频繁进行输入输出操作（I/O 密集型）。爬虫程序就是典型的 I/O 密集型程序。


## 多线程的实现

In [4]:
# 以一个简单的计算为例，如果使用单线程
import numpy as np
import pandas as pd
import time
import sys

def countsum(x):
    sumx = 0
    i = 1
    while i <= x:
        sumx += i
        i += 1
        time.sleep(1)
    print(x,sumx)
    print('Finish')
    
start = time.perf_counter()
countsum(x = 4)
countsum(x = 5)
t = (time.perf_counter() - start)
print(t)

4 10
Finish
5 15
Finish
9.005948300000455


In [5]:
import threading

if __name__ == '__main__':
    start = time.perf_counter()
    thread1 = threading.Thread(target = countsum, args = (4,))
    thread2 = threading.Thread(target = countsum, args = (5,)) # 创建多个线程
    thread1.start() 
    thread2.start() # 线程开始
# Join函数执行顺序是逐个执行每个线程，执行完毕后继续往下执行。
# 主线程结束后，子线程还在运行，join函数使得主线程等到子线程结束时才退出。
    thread1.join()
    thread2.join() 
    t = (time.perf_counter() - start)
    print(t)

4 10
Finish
5 15
Finish
5.014140100000077


### 加锁
如前所述，多线程之间由于数据共享可能存在变量被误修改的问题，我们可以采用加锁的方式规定线程之间的秩序。仍然采用前面的例子，此处进行改进后，输出一定是0。

In [11]:
import threading
balance = 0   # 假定这是你的银行存款:
lock = threading.Lock()
def change_it_with_lock(n):
    # 先存后取，结果应该为0:
    global balance
    lock.acquire()
    balance = balance + n
    balance = balance - n
    lock.release()
def run_thread(n):
    for i in range(2000000):
        change_it_with_lock(n)

t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

0


## 多进程的实现

In [None]:
from multiprocessing import  Process

if __name__ == '__main__':

    start = time.perf_counter()
    p1 = Process(target = countsum, args = (4,))
    p2 = Process(target = countsum, args = (4,))
    p1.start() 
    p2.start() # 进程开始
    p1.join()
    p2.join() 
    t = (time.perf_counter() - start)
    print(t)
    
# output：

# 4 10
# Finish
# 4 10
# Finish
# 4.9973042

# 可以看到实现语法与多线程非常相似，表面上只是将Thread改为Process

### 进程池：如果要启动大量的子进程，可以用进程池（pool）的方式批量创建子进程：

In [None]:
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid()) # os.getpid()获取当前进程id 
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

# Parent process 11304.
# Waiting for all subprocesses done...
# Run task 0 (22260)...
# Run task 1 (16992)...
# Run task 2 (23384)...
# Run task 3 (24160)...
# Task 1 runs 1.23 seconds.
# Run task 4 (16992)...
# Task 3 runs 1.76 seconds.
# Task 0 runs 2.06 seconds.
# Task 2 runs 2.23 seconds.
# Task 4 runs 1.67 seconds.
# All subprocesses done.

我们设置了4个进程，5个任务，那么第5个任务自动在有任务完成后才开始。

## 多进程间的通信与数据共享

进程是系统独立调度核分配系统资源（CPU、内存）的基本单位，进程之间是相互独立的，每启动一个新的进程相当于把数据进行了一次克隆，子进程里的数据修改无法影响到主进程中的数据，不同子进程之间的数据也不能共享，这是多进程在使用中与多线程最明显的区别。但是难道Python多进程中间难道就是孤立的吗？当然不是，python也提供了多种方法实现了多进程中间的通信和数据共享（可以修改一份数据）     
- 进程间通信：Queue、Pipe
- 数据共享：value、array、Manager

### 多进程间通信——Queue

In [None]:
from multiprocessing import Process, Queue
import time, os

def prodcut(q):
    print("开始生产.")
    for i in range(5):
        time.sleep(1)
        q.put('产品'+str(i))
        print("产品"+str(i)+"生产完成")

def consume(q):
    while True:
        prod = q.get()
        print("消费者：{}，消费产品:{}".format(os.getpid(), prod))
        time.sleep(1)

if __name__ == '__main__':
    q = Queue()
    p = Process(target=prodcut, args=(q, ))  # 生产者
    c1 = Process(target=consume, args=(q, ))  # 消费者1
    c2 = Process(target=consume, args=(q, ))  # 消费者2
    p.start()
    c1.start()
    c2.start()
    p.join()  # 当生产者结束后，将两个消费则也结束
    c1.terminate()
    c2.terminate()

# 开始生产.
# 产品0生产完成
# 消费者：14600，消费产品:产品0
# 产品1生产完成
# 消费者：20748，消费产品:产品1
# 产品2生产完成
# 消费者：14600，消费产品:产品2
# 产品3生产完成
# 消费者：20748，消费产品:产品3
# 产品4生产完成
# 消费者：14600，消费产品:产品4

### 多进程间通信——Pipe

In [None]:
from multiprocessing import Process, Pipe
def fun1(conn):
    conn.send('你好主进程')
    print('子进程接受消息：')
    print(conn.recv())
    conn.close()

if __name__ == '__main__':
    conn1, conn2 = Pipe() #pipe实例化生成一个双向管
    p = Process(target=fun1, args=(conn2,)) #conn2传给子进程
    p.start()
    print(conn1.recv())
    print('主进程发送消息：')
    conn1.send("你好子进程")
    p.join()

# 子进程接受消息：
# 你好主进程
# 主进程发送消息：
# 你好子进程

Pipe的两端都可以发送和接受，用于两个进程之间的通信。

### 多进程数据共享——value & array

Python进程间数据共享主要有两种方式，一种是共享内存，另一种是通过数据管理器(Manager)来实现。

In [None]:
# 不进行数据共享
from multiprocessing import Process, Lock
import time

num = 0

def add_one(lock):
    global num
    for i in range(3):
        lock.acquire()
        num += 1
        print(num)
        time.sleep(1)
        lock.release()

if __name__ == '__main__':
    lock = Lock()
    p1 = Process(target=add_one, args=(lock,))
    p2 = Process(target=add_one, args=(lock,))
    p1.start()
    p2.start()

# 1
# 1
# 2
# 2
# 3
# 3

由于进程间数据不能共享，每个进程只能独立修改值

In [None]:
# 用value进行数据共享
from multiprocessing import Process, Lock, Value
import time

def add_one(lock, num):
    for i in range(3):
        lock.acquire()
        num.value += 1
        print(num.value)
        time.sleep(1)
        lock.release()

if __name__ == '__main__':
    num = Value('i', 0) # 'i'表示整型，'d'表示浮点数
    lock = Lock()
    p1 = Process(target=add_one, args=(lock, num))
    p2 = Process(target=add_one, args=(lock, num))
    p1.start()
    p2.start()
# 1
# 2
# 3
# 4
# 5
# 6

In [None]:
# 用array进行数据共享
from multiprocessing.sharedctypes import Array
from multiprocessing import Process, Lock

def add_one(lock, arr):
    lock.acquire()
    for i in range(len(arr)):
        arr[i] += 1
    lock.release()
    print(arr[:])

if __name__ == '__main__':
    lock = Lock()
    arr = Array('i', range(10))
    print(arr[:])
    p1 = Process(target=add_one, args=(lock, arr))
    p2 = Process(target=add_one, args=(lock, arr))
    p1.start()
    p2.start()
    
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# [2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

### Manager（数据管理器）
Manager的作用是提供多进程共享的全局变量，Manager()方法会返回一个对象，该对象控制着一个服务进程，该进程中保存的对象运行其他进程使用代理进行操作。

Manager支持的类型有：list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。  
使用服务进程的管理器比使用共享内存对象更灵活，因为它们可以支持任意对象类型。此外，单个管理器可以通过网络由不同计算机上的进程共享。但是，它们比使用共享内存慢。

In [None]:
from multiprocessing import Process, Manager

def fun1(dic,lis,index):

    dic[index] = 'a'
    dic['2'] = 'b'    
    lis.append(index)    #[0,1,2,3,4,0,1,2,3,4,5,6,7,8,9]
    #print(l)

if __name__ == '__main__':
    with Manager() as manager:
        dic = manager.dict()#注意字典的声明方式，不能直接通过{}来定义
        l = manager.list(range(5))#[0,1,2,3,4]

        process_list = []
        for i in range(5):
            p = Process(target=fun1, args=(dic,l,i))
            p.start()
            process_list.append(p)

        for res in process_list:
            res.join()
        print(dic)
        print(l)

# {0: 'a', '2': 'b', 3: 'a', 1: 'a', 2: 'a', 4: 'a'}
# [0, 1, 2, 3, 4, 0, 3, 1, 2, 4]

可以看到主进程定义了一个字典和一个列表，在子进程中，可以添加和修改字典的内容，在列表中插入新的数据，实现进程间的数据共享，即可以共同修改同一份数据。