# 多进程

## 使用Unix/Linux的fork()系统调用。

- fork()系统调用非常特殊，普通函数的调用，调用一次返回一次，但是fork()函数调用一次，返回两次，因为操作系统自动把当前进程（称为父进程）复制了一份（称为子进程）， 然后分别在父进程和子进程内返回。

In [None]:
import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

- 以上代码只在linux环境下运行
- 有了fork调用，一个进程在接到新任务时就可以复制出一个子进程来处理任务，常见的Apache服务器就是由父进程监听端口，每当有新的http请求时，就fork出子进程来处理新的http请求。

## multiprocessing模块：跨平台的多进程模块

In [2]:
from multiprocessing import Process
import os

### multiprocessing模块提供了一个Process类来表示一个进程对象

In [3]:
# 子进程要执行的代码
def run_proc(name):
    print('Run child process {}({})...'.format(name, os.getpid()))


if __name__ == '__main__':
    print('Parent process {}.'.format(os.getpid()))
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

Parent process 4196.
Child process will start.
Child process end.


- 在jupyter notebook中看不到子进程的输出结果，到命令行窗口试了下有结果
- 创建子进程的时候只需要传入一个执行函数和函数的参数，创建一个Process实例
- 用start()方法启动，创建进程比fork()还简单
- join()方法可以等待子进程结束后再继续往下运行，通常用于进程间的同步

### 使用Pool进程池的方式批量创建子进程，可以启动大量的子进程

In [None]:
from multiprocessing import Pool
import os, time, random

In [None]:
def long_time_task(name):
    print('Run task {} ({})...'.format(name, os.getpid()))
    start = time.time()
    time.sleep(1)
    end = time.time()
    print('Task {} runs {:0.2f} seconds.'.format(name, (end - start)))

if __name__ == '__main__':
    print('Parent process {}.'.format(os.getpid()))
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i, ))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

Parent process 4196.
Waiting for all subprocesses done...


![](1.png)

- 对Pool对象调用join()方法会等待所有子进程执行完毕
- 调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了
- p = Pool(4)语句规定了最多只能同时运行4个进程，因此task0/1/2/3先执行
- Pool的默认大小是CPU的核数

## 子进程（subprocess模块）

subprocess模块可以让我们非常方便地启动一个子进程，然后控制其输入和输出

In [1]:
import subprocess

In [2]:
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)

$ nslookup www.python.org
Exit code: 0


### 如果子进程还需要输入，可以通过communicate()方法输入

In [3]:
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('GBK'))
print('Exit code:', p.returncode)

$ nslookup
默认服务器:  promote.cache-dns.local
Address:  10.9.0.110

> > 服务器:  promote.cache-dns.local
Address:  10.9.0.110

python.org	MX preference = 50, mail exchanger = mail.python.org
> 
Exit code: 0


## 进程间通信

- python的multiprocessing模块包装了底层的机制，提供了Queue、Pipes等多种方式来交换数据

### 使用Queue完成进程间通信

In [4]:
from multiprocessing import Process, Queue
import os, time, random

写数据进程的代码

In [5]:
def write(q):
    print('Process to write: {}'.format(os.getpid()))
    for value in ['A', 'B', 'C']:
        print('Put {} to queue...'.format(value))
        q.put(value)
        time.sleep(random.random())

读数据进程的代码

In [6]:
def read(q):
    print('Process to read: {}'.format(os.getpid()))
    while True:
        value = q.get(True)
        print('Get {} from queue.'.format(value))

In [None]:
if __name__ == '__main__':
    q = Queue()
    pw = Process(target=write, args=(q, ))
    pr = Process(target=read, args=(q, ))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()

运行结果

![](1.png)

### 使用Pipes完成进程间通信

In [None]:
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())  # [42, None, 'hello']
    p.join()

- 两个管道的实例分别代表了管道的两端
- 每一端都可以发送或者写数据
- 同时读或者写，会让管道中的数据变成脏数据

### 使用shared memory共享内存完成进程间通信

In [None]:
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

- d代表了双精度浮点数
- i代表了有符号整数

### 使用server process服务器进程完成进程间通信

In [None]:
from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

- windows没有fork调用，因此，multiprocessing需要模拟出fork的效果，父进程所有的python对象都必须通过pickle序列化再传到子进程去，在windows下调用失败了，要先考虑是不是pickle失败了。

## 小结

- 在Unix/Linux下，可以使用fork()调用实现多进程
- 要实现跨平台的多进程，可以使用multiprocessing模块
- 进程间通信是通过Queue、Pipe、shared memory、server process(Manager)实现