Python中的多进程
-
Unix和Linux操作系统上提供了 `fork()`系统调用来创建进程，调用`fork()`函数的是父进程，创建出的是子进程，子进程是父进程的一个拷贝，但是子进程拥有自己的PID。

普通的函数调用，调用一次，返回一次，但是fork()调用一次，返回两次，因为操作系统自动把当前进程（称为父进程）复制了一份（称为子进程），然后分别在父进程和子进程内返回。

子进程永远返回0，而父进程返回子进程的ID。这样做的理由是，一个父进程可以fork出很多子进程，所以，父进程要记下每个子进程的ID，而子进程只需要调用getppid()就可以拿到父进程的ID。

Python的os模块封装了常见的系统调用，其中就包括fork，可以在Python程序中轻松创建子进程：

In [None]:
import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

通过结果可以看出来`fork`函数返回了两次

Python的os模块提供了`fork()`函数。由于Windows系统没有`fork()`调用，因此要实现跨平台的多进程编程，可以使用`multiprocessing`模块的Process类来创建子进程，而且该模块还提供了更高级的封装，例如批量启动进程的进程池（Pool）、用于进程间通信的队列（Queue）和管道（Pipe）等。

### multiprocessing

In [None]:
# 单进程
from random import randint
from time import time,sleep

def download_task(filename):
    print('开始下载%s...' % filename)
    time_need = randint(2, 5)
    sleep(time_need)
    print(f'{filename}下载完成! 耗费了{time_need}秒')

def main():
    start = time()
    download_task('PHP从入门到放弃')
    download_task('Python从新手到大师')
    end = time()
    print('总共花费了%.2f秒' % (end - start))

if __name__ == '__main__':
    main()

In [None]:
# 多进程
from multiprocessing import Process
from os import getpid
from random import randint
from time import time,sleep

def download_task(filename):
    print('启动下载进程，进程号[%d].' % getpid())
    print('开始下载%s...' % filename)
    time_need = randint(2, 5)
    sleep(time_need)
    print(f'{filename}下载完成! 耗费了{time_need}秒')

def main():
    start = time()
    p1 = Process(target=download_task, args=('PHP从入门到放弃',))
    p1.start()
    p2 = Process(target=download_task, args=('Python从新手到大师',))
    p2.start()
    p1.join()
    p2.join()
    end = time()
    print('总共花费了%.2f秒' % (end - start))

if __name__ == '__main__':
    main()

创建子进程时，只需要传入一个执行函数和函数的参数，创建一个Process实例，用`start()`方法启动，这样创建进程比`fork()`还要简单。

`join()`方法可以等待子进程结束后再继续往下运行，通常用于进程间的同步。

在上面的代码中，我们通过Process类创建了进程对象，通过target参数我们传入一个函数来表示进程启动后要执行的代码，后面的args是一个元组，它代表了传递给函数的参数。Process对象的start方法用来启动进程，而join方法表示等待进程执行结束。运行上面的代码可以明显发现两个下载任务“同时”启动了，而且程序的执行时间将大大缩短，不再是两个任务的时间总和。下面是程序的一次执行结果。

除了multiprocessing模块我们也可以使用subprocess模块中的类和函数来创建和启动子进程，然后通过管道来和子进程通信。 

### 进程间通信（Queue）

下面我们看下如何实现两个进程间的通信。我们启动两个进程，一个输出Ping，一个输出Pong，两个进程输出的Ping和Pong加起来一共10个。听起来很简单吧，但是如果这样写可是错的哦。

In [None]:
from multiprocessing import Process
from time import sleep

counter = 0

def sub_task(string):
    global counter
    while counter < 10:
        print(string, end='', flush=True)
        counter += 1
        sleep(0.5)

        
def main():
    p1 = Process(target=sub_task, args=('Ping', ))
    p1.start()
    p2 = Process(target=sub_task, args=('Pong', ))
    p2.start()
    p1.join()
    p2.join()


if __name__ == '__main__':
    main()

看起来没毛病，但是最后的结果是Ping和Pong各输出了10个，Why？当我们在程序中创建进程的时候，子进程复制了父进程及其所有的数据结构，每个子进程有自己独立的内存空间，这也就意味着两个子进程中各有一个counter变量，所以结果也就可想而知了。要解决这个问题比较简单的办法是使用multiprocessing模块中的Queue类，它是可以被多个进程共享的队列，底层是通过管道和信号量（semaphore）机制来实现的，大家可以根据下面的例子来修改下。

Process之间肯定是需要通信的，操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制，提供了`Queue`、`Pipes`等多种方式来交换数据。

我们以Queue为例，在父进程中创建两个子进程，一个往Queue里写数据，一个从Queue里读数据：

In [None]:
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue，并传给各个子进程：
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw，写入:
    pw.start()
    # 启动子进程pr，读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr.join()
    # pr进程里是死循环，无法等待其结束，只能强行终止:
    pr.terminate()


在Unix/Linux下，multiprocessing模块封装了fork()调用，使我们不需要关注fork()的细节。由于Windows没有fork调用，因此，multiprocessing需要“模拟”出fork的效果，父进程所有Python对象都必须通过pickle序列化再传到子进程去，所以，如果multiprocessing在Windows下调用失败了，要先考虑是不是pickle失败了。

### Pool
如果要启动大量的子进程，可以用进程池的方式批量创建子进程：

In [1]:
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)  # [0,1)
    # time.sleep(random.randint(3,5))
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

Parent process 91673.
Run task 1 (91731)...Run task 2 (91732)...Run task 0 (91730)...Run task 3 (91733)...



Task 0 runs 0.10 seconds.
Run task 4 (91730)...
Waiting for all subprocesses done...
Task 4 runs 0.22 seconds.
Task 1 runs 1.53 seconds.
Task 2 runs 1.82 seconds.
Task 3 runs 2.68 seconds.
All subprocesses done.


代码解读（很好的解读）：

对`Pool`对象调用`join()`方法会等待所有子进程执行完毕，调用`join()`之前必须先调用`close()`，调用`close()`之后就不能继续添加新的`Process`了。

请注意输出的结果，task `0`，`1`，`2`，`3`是立刻执行的，而task `4`要等待前面某个task完成后才执行，这是因为`Pool`的默认大小是`4`(电脑的CPU数量)，因此，最多同时执行4个进程。这是`Pool`有意设计的限制，**并不是操作系统的限制**。如果改成：
```
p = Pool(5)
```
就可以同时跑5个进程。

由于Pool的默认大小是CPU的核数，如果你不幸拥有8核CPU，你要提交至少9个子进程才能看到上面的等待效果。

> 查看电脑进程数的代码：

In [None]:
import multiprocessing

print(multiprocessing.cpu_count())